1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
#pragma once
#include "sds.h"
#include "log.h"
#include "swarmkv_limits.h"
#include "swarmkv/swarmkv.h"
#include "future_promise.h"
#include <netinet/ip.h>
#include <uuid/uuid.h>
#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/thread.h>
#include <event2/http.h>
enum cmd_exec_result
{
NEED_KEY_ROUTE,
REDIRECT,
FINISHED
};
/*
struct swarmkv_node_addr
{
char ip_addr[INET6_ADDRSTRLEN]; //#define INET6_ADDRSTRLEN 46 (#include <arpa/inet.h>, see man inet_ntop
unsigned int cluster_port; //host order
};*/
#define MAX_IPV4_ADDR_LEN 15
typedef struct{
char addr[MAX_IPV4_ADDR_LEN+5+1]; //IP:port
}node_t;
enum slot_state
{
STATE_STABLE=0,
STATE_MIGRATING,
STATE_IMPORTING
};
struct key_slot
{
int slot_id;
node_t owner;
uuid_t uuid;
};
void json2keyslots(sds json_buffer, void *slot_container_base, size_t sz_slot_container, size_t offset_slot, size_t n_slot_container);
sds keyslots2json(void *slot_container_base, size_t sz_slot_container, size_t offset_slot, size_t n_slot_container);
void health_response2active_nodes(const char *resp_buff, node_t *nodes, uuid_t *uuids, size_t *n_node);
void leader_response2leader_node(const char *resp_body, node_t *leader);
int key_hash_slot(const char* key, size_t keylen);
int key2tid(const sds key, int nr_worker_threads);
int str2integer(const char *str, long long *integer);
char *str_replace(char *orig, char *rep, char *with);
struct swarmkv_options
{
unsigned int cluster_port;
unsigned int cluster_announce_port;
unsigned int health_check_port;
unsigned int health_check_announce_port;
int dryrun;
int run_for_leader_enabled;
int network_compression_enabled;
unsigned int consul_port;
unsigned int cluster_timeout_us;
unsigned int sync_interval_us;
unsigned int sync_retry_time_ms;
unsigned int sync_retry_interval_ms;
struct log_handle *logger;
int loglevel;
const char *logpath;
char cluster_announce_ip[MAX_IPV4_ADDR_LEN];
char bind_address[MAX_IPV4_ADDR_LEN];
char consul_agent_host[MAX_IPV4_ADDR_LEN];
uuid_t bin_uuid;
int nr_worker_threads;
int nr_caller_threads;
int total_threads;
int is_assigned_to_db;
int sanity_check;
int batch_sync_enabled;
struct timeval *eb_max_interval; //An interval after which Libevent should stop running callbacks and check for more events, or NULL if there should be no such interval.
int eb_max_callbacks; //A number of callbacks after which Libevent should stop running callbacks and check for more events, or -1 if there should be no such limit.
};
struct swarmkv_cmd
{
node_t caller;
size_t argc;
sds *argv;
};
struct swarmkv_cmd *swarmkv_cmd_new(size_t argc, const node_t *originator);
void swarmkv_cmd_free(struct swarmkv_cmd *p);
struct swarmkv_cmd *swarmkv_cmd_dup(const struct swarmkv_cmd *origin);
int swarmkv_cmd_parse_integer(const struct swarmkv_cmd *cmd, const char *name, long long *ival);
int swarmkv_cmd_parse_double(const struct swarmkv_cmd *cmd, const char *name, double *dval);
struct swarmkv_reply *swarmkv_reply_new_string(const char *str, size_t sz);
struct swarmkv_reply *swarmkv_reply_new_string_fmt(const char *format, ...);
struct swarmkv_reply *swarmkv_reply_new_string_from_integer(long long integer);
struct swarmkv_reply *swarmkv_reply_new_verbatim(const char *str, size_t sz, const char *ext);
struct swarmkv_reply *swarmkv_reply_new_integer(long long integer);
struct swarmkv_reply *swarmkv_reply_new_double(double dval);
struct swarmkv_reply *swarmkv_reply_new_array(size_t n_element);
struct swarmkv_reply *swarmkv_reply_new_node(node_t *node, int is_ask);
struct swarmkv_reply *swarmkv_reply_new_nil(void);
struct swarmkv_reply *swarmkv_reply_new_status(const char *format, ...);
struct swarmkv_reply *swarmkv_reply_new_error(const char *format, ...);
void swarmkv_reply_merge_array(struct swarmkv_reply **dst, struct swarmkv_reply *src);
void swarmkv_reply_append_string(struct swarmkv_reply **dst, struct swarmkv_reply *src);
int node_list_new_from_reply(node_t **node_list, size_t *n_node, const struct swarmkv_reply *reply);
int node_list_exists(const node_t *list, size_t n_list, const node_t *node);
int node_list_remove(node_t *list, size_t n_list, const node_t *node);
sds node_print_json(const node_t *self, uuid_t uuid);
sds node_addr2sds(const node_t *node);
const char *node_addr2cstr(const node_t *node);
int node_init_from_sds(node_t *node, const char *addr_str);
int node_init_from_reply(node_t *node, const struct swarmkv_reply *reply);
void node_init(node_t *node, const char *ipv4, unsigned int port);
void node_init_from_cstr(node_t *node, const char *addr_str);
void node_init_from_string(node_t *node, const char *addr_str, size_t sz_addr);
/* Return 0 if node1 == node 2 */
int node_compare(const node_t *node1, const node_t *node2);
void node_copy(node_t *dst, const node_t *src);
int node_sanity(const node_t *node);
int node_is_empty(const node_t *node);
int node_parse(const node_t *node, unsigned int *port, char *addr_str, size_t sz_addr);
void node_init_from_sockaddr(node_t *node, const struct sockaddr * sa);
void node_to_sockaddr(const node_t *node, struct sockaddr *sockaddr);
int http_blocking_request(enum evhttp_cmd_type type, const char* host, unsigned int port, const char* url, sds req_body, sds *resp_body);
const node_t *swarmkv_self_node(const struct swarmkv *db);
|