summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZheng Chao <[email protected]>2023-01-18 20:17:02 +0800
committerZheng Chao <[email protected]>2023-01-18 20:17:02 +0800
commitc283dfdac2ea674fdd52fc61f00814caf8f6f5c6 (patch)
tree1c51eafc559c90e19fa1dfb55ef10ce99b34dca1
parent40f5ca926572ee4089e16db73d232237fe2e5547 (diff)
:construction: Refactoring Consul communication of keyspace.
-rw-r--r--examples/async_example.c2
-rw-r--r--examples/simple_example.c2
-rw-r--r--include/swarmkv/swarmkv.h5
-rw-r--r--src/inc_internal/swarmkv_common.h6
-rw-r--r--src/swarmkv.c4
-rw-r--r--src/swarmkv_api.c14
-rw-r--r--src/swarmkv_keyspace.c519
-rw-r--r--src/swarmkv_store.c2
-rw-r--r--test/swarmkv_cli_test.cpp26
-rw-r--r--test/swarmkv_gtest.cpp8
-rw-r--r--test/swarmkv_perf_test.cpp12
-rw-r--r--test/swarmkv_scalability_test.cpp4
-rw-r--r--tools/swarmkv_cli.c10
-rw-r--r--tools/swarmkv_simple_node.cpp8
14 files changed, 302 insertions, 320 deletions
diff --git a/examples/async_example.c b/examples/async_example.c
index b9bc5a1..5149bcf 100644
--- a/examples/async_example.c
+++ b/examples/async_example.c
@@ -24,7 +24,7 @@ int main(int argc, char **argv)
for(size_t i=0; i<2; i++)
{
opts[i]=swarmkv_options_new();
- swarmkv_options_set_p2p_port(opts[i], 5210+i);
+ swarmkv_options_set_cluster_port(opts[i], 5210+i);
db[i]=swarmkv_open(opts[i], cluster_name, &err);
if(err)
{
diff --git a/examples/simple_example.c b/examples/simple_example.c
index fad8262..d00eae1 100644
--- a/examples/simple_example.c
+++ b/examples/simple_example.c
@@ -11,7 +11,7 @@ int main(int argc, char **argv)
for(size_t i=0; i<2; i++)
{
opts[i]=swarmkv_options_new();
- swarmkv_options_set_p2p_port(opts[i], 5210+i);
+ swarmkv_options_set_cluster_port(opts[i], 5210+i);
db[i]=swarmkv_open(opts[i], cluster_name, &err);
if(err)
{
diff --git a/include/swarmkv/swarmkv.h b/include/swarmkv/swarmkv.h
index 47117af..c29df32 100644
--- a/include/swarmkv/swarmkv.h
+++ b/include/swarmkv/swarmkv.h
@@ -44,10 +44,9 @@ void swarmkv_reply_print(const struct swarmkv_reply *reply, FILE* stream);
struct swarmkv_options;
struct swarmkv_options* swarmkv_options_new(void);
-int swarmkv_options_set_p2p_port(struct swarmkv_options *opts, unsigned int p2p_port);
-int swarmkv_options_set_p2p_listen_port(struct swarmkv_options *opts, unsigned int p2p_port);
+int swarmkv_options_set_cluster_port(struct swarmkv_options *opts, unsigned int cluster_port);
int swarmkv_options_set_health_check_port(struct swarmkv_options *opts, unsigned int health_check_port);
-int swarmkv_options_set_p2p_timeout_us(struct swarmkv_options *opts, unsigned int timeout_us);
+int swarmkv_options_set_cluster_timeout_us(struct swarmkv_options *opts, unsigned int timeout_us);
int swarmkv_options_set_sync_interval_us(struct swarmkv_options *opts, unsigned int interval_us);
int swarmkv_options_set_bind_address(struct swarmkv_options *opts, const char* ip_addr);
int swarmkv_options_set_logger(struct swarmkv_options *opts, void *logger);
diff --git a/src/inc_internal/swarmkv_common.h b/src/inc_internal/swarmkv_common.h
index c63f4c2..d039c67 100644
--- a/src/inc_internal/swarmkv_common.h
+++ b/src/inc_internal/swarmkv_common.h
@@ -28,7 +28,7 @@ enum cmd_exec_result
struct swarmkv_node_addr
{
char ip_addr[INET6_ADDRSTRLEN]; //#define INET6_ADDRSTRLEN 46 (#include <arpa/inet.h>, see man inet_ntop
- unsigned int p2p_port; //host order
+ unsigned int cluster_port; //host order
};*/
#define MAX_IPV4_ADDR_LEN 15
@@ -59,7 +59,7 @@ char *str_replace(char *orig, char *rep, char *with);
struct swarmkv_options
{
unsigned int health_check_port;
- unsigned int p2p_port;
+ unsigned int cluster_port;
int dryrun;
int run_for_leader_flag;
unsigned int consul_port;
@@ -68,7 +68,7 @@ struct swarmkv_options
struct log_handle *logger;
int loglevel;
char bind_address[MAX_IPV4_ADDR_LEN];
- char consul_address[MAX_IPV4_ADDR_LEN];
+ char consul_agent_host[MAX_IPV4_ADDR_LEN];
uuid_t bin_uuid;
size_t nr_worker_threads;
int is_assigned_to_db;
diff --git a/src/swarmkv.c b/src/swarmkv.c
index 2158927..ab8473f 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -1176,7 +1176,7 @@ struct swarmkv *swarmkv_open(struct swarmkv_options* opts, const char * db_name,
if(opts->dryrun)
{
}
- node_init(&db->self, opts->bind_address, opts->p2p_port);
+ node_init(&db->self, opts->bind_address, opts->cluster_port);
db->mod_monitor=swarmkv_monitor_new(db->opts);
db->mod_keyspace=swarmkv_keyspace_new(db->opts, db_name, db->logger, err);
@@ -1187,7 +1187,7 @@ struct swarmkv *swarmkv_open(struct swarmkv_options* opts, const char * db_name,
swarmkv_keyspace_set_exec_cmd_func(db->mod_keyspace, __exec_cmd, db);
swarmkv_keyspace_set_monitor_handle(db->mod_keyspace, db->mod_monitor);
- //Note: if the p2p_port is 0, swarmkv_net_new updates db->self.p2p_port.
+ //Note: if the cluster_port is 0, swarmkv_net_new updates db->self.cluster_port.
db->net=swarmkv_net_new(opts, &db->self, db->logger, err);
if(*err)
{
diff --git a/src/swarmkv_api.c b/src/swarmkv_api.c
index 36d70e6..bd7e24e 100644
--- a/src/swarmkv_api.c
+++ b/src/swarmkv_api.c
@@ -24,13 +24,13 @@ struct swarmkv_options* swarmkv_options_new(void)
{
struct swarmkv_options *opts=ALLOC(struct swarmkv_options, 1);
opts->consul_port=8500;
- opts->p2p_port=5210;
+ opts->cluster_port=5210;
opts->health_check_port=0;
opts->loglevel=0;
opts->p2p_timeout_us=500*1000;//Default 500ms
opts->sync_interval_us=10*1000; //Default 10ms
strcpy(opts->bind_address, "127.0.0.1");
- strcpy(opts->consul_address, "127.0.0.1");
+ strcpy(opts->consul_agent_host, "127.0.0.1");
uuid_generate(opts->bin_uuid);
opts->nr_worker_threads=1;
opts->is_assigned_to_db=0;
@@ -49,9 +49,9 @@ void swarmkv_options_free(struct swarmkv_options *opt)
}
return;
}
-int swarmkv_options_set_p2p_port(struct swarmkv_options *opts, unsigned int p2p_port)
+int swarmkv_options_set_cluster_port(struct swarmkv_options *opts, unsigned int cluster_port)
{
- opts->p2p_port=p2p_port;
+ opts->cluster_port=cluster_port;
return 0;
}
int swarmkv_options_set_health_check_port(struct swarmkv_options *opts, unsigned int health_check_port)
@@ -59,7 +59,7 @@ int swarmkv_options_set_health_check_port(struct swarmkv_options *opts, unsigned
opts->health_check_port=health_check_port;
return 0;
}
-int swarmkv_options_set_p2p_timeout_us(struct swarmkv_options *opts, unsigned int timeout_ms)
+int swarmkv_options_set_cluster_timeout_us(struct swarmkv_options *opts, unsigned int timeout_ms)
{
opts->p2p_timeout_us=timeout_ms;
return 0;
@@ -87,7 +87,7 @@ int swarmkv_options_set_disable_run_for_leader(struct swarmkv_options *opts)
}
int swarmkv_options_set_consul_host(struct swarmkv_options *opts, const char* ip_addr)
{
- strncpy(opts->consul_address, ip_addr, sizeof(opts->consul_address));
+ strncpy(opts->consul_agent_host, ip_addr, sizeof(opts->consul_agent_host));
return 0;
}
int swarmkv_options_set_consul_port(struct swarmkv_options *opts, unsigned int consul_port)
@@ -98,7 +98,7 @@ int swarmkv_options_set_consul_port(struct swarmkv_options *opts, unsigned int c
int swarmkv_options_set_dryrun(struct swarmkv_options *opts)
{
opts->dryrun=1;
- opts->p2p_port=15210;//overrides default p2p port
+ opts->cluster_port=15210;//overrides default p2p port
return 0;
}
int swarmkv_options_set_worker_thread_number(struct swarmkv_options *opts, size_t nr_worker_threads)
diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c
index 3c0c8ff..ae00534 100644
--- a/src/swarmkv_keyspace.c
+++ b/src/swarmkv_keyspace.c
@@ -150,7 +150,7 @@ static void crdt_del_on_succ(void* result, void* user)
// assert(reply->integer==1);
if(reply->type==SWARMKV_REPLY_INTEGER && reply->integer !=1)
{
-// printf("crdt del %s at %s:%u failed.\n", ctx->key, ctx->peer.ip_addr, ctx->peer.p2p_port);
+// printf("crdt del %s at %s:%u failed.\n", ctx->key, ctx->peer.ip_addr, ctx->peer.cluster_port);
}
crdt_del_ctx_free(ctx);
return;
@@ -193,102 +193,17 @@ struct slot_runtime
int I_am_owner;
pthread_mutex_t mutex;
};
-struct http_req_context
-{
- char symbol[128];
- struct evhttp_connection *conn;
- struct log_handle *logger;
- struct future *f;
-};
-void http_req_error_cb(enum evhttp_request_error error, void *arg)
-{
- return;
-}
-void http_request_callback(struct evhttp_request *req, void *arg)
-{
- struct http_req_context *ctx=(struct http_req_context *)arg;
- int resp_code=0;
-
- struct promise *p=future_to_promise(ctx->f);
- if(!req || !evhttp_request_get_response_code(req))
- {
- int errcode = EVUTIL_SOCKET_ERROR();
- log_fatal(ctx->logger, MODULE_SWARMKV_KEYSPACE, "%s failed: %s",
- ctx->symbol,
- evutil_socket_error_to_string(errcode));
- promise_failed(p, FUTURE_ERROR_EXCEPTION, "socket error");
- }
- else
- {
- resp_code=evhttp_request_get_response_code(req);
- }
-
- if(resp_code!=200)
- {
- char what[256];
- snprintf(what, sizeof(what), "resp_code=%d", resp_code);
- promise_failed(p, FUTURE_ERROR_EXCEPTION, what);
- }
- else
- {
- promise_success(p, req);
- }
- evhttp_connection_free(ctx->conn);
- future_destroy(ctx->f);
- ctx->f=NULL;
- free(ctx);
- return;
-}
-void http_get_async(const char *symbol, node_t self, const char *url, const char *host, unsigned short port, struct event_base *evbase, struct future *f, struct log_handle *logger)
-{
- struct http_req_context *ctx=ALLOC(struct http_req_context, 1);
- strncpy(ctx->symbol, symbol, sizeof(ctx->symbol));
- ctx->logger=logger;
- ctx->f=f;
- ctx->conn=evhttp_connection_base_new(evbase, NULL, host, port);
- struct evhttp_request *request=evhttp_request_new(http_request_callback, ctx);
- evhttp_request_set_error_cb(request, http_req_error_cb);
-
- struct evkeyvalq *output_headers = evhttp_request_get_output_headers(request);
- evhttp_connection_set_timeout(ctx->conn, 1800);//set to 30min for blocking query.
-
- char ip[16]={};
- unsigned int p2p_port=0;
- node_parse(&self, &p2p_port, ip, sizeof(ip));
- evhttp_add_header(output_headers, "Host", ip);
- evhttp_add_header(output_headers, "Connection", "close");
-
- evhttp_make_request(ctx->conn, request, EVHTTP_REQ_GET, url);
-}
-struct consul_task
-{
- char task_name[SWARMKV_SYMBOL_MAX];
- struct evhttp_connection* evhttpconn; //tcp connection for the request
- UT_hash_handle hh;
-};
void http_connection_close_callback(struct evhttp_connection *conn, void *arg)
{
// struct consul_task *task=(struct consul_task *)arg;
//printf("http connection %s %p closed\n", task->task_name, conn);
return;
}
-struct consul_task *consul_task_new(const char *task_name, struct event_base *evbase, const char *host_name, unsigned short port)
-{
- struct consul_task *task=ALLOC(struct consul_task, 1);
- strncpy(task->task_name, task_name, sizeof(task->task_name));
- task->evhttpconn=evhttp_connection_base_new(evbase, NULL, host_name, port);
- evhttp_connection_set_closecb(task->evhttpconn, http_connection_close_callback, task);
- return task;
-}
-void consul_task_free(struct consul_task *task)
-{
- evhttp_connection_free(task->evhttpconn);
- task->evhttpconn=NULL;
- free(task);
-}
+
struct swarmkv_keyspace
{
struct swarmkv_module module;
+ const struct swarmkv_options *opts;
char db_name[SWARMKV_SYMBOL_MAX];
pthread_t thr;
int readable_tid;
@@ -313,8 +228,8 @@ struct swarmkv_keyspace
char consul_session_id[SWARMKV_SYMBOL_MAX];
struct evhttp *http_server;
- struct event_base * evbase;
- struct consul_task *consul_task_hash;
+ struct event_base *evbase;
+ struct consul_client *consul_client;
//reference, no need free
exec_cmd_func *exec_cmd_func;
@@ -322,16 +237,118 @@ struct swarmkv_keyspace
struct swarmkv_module *mod_monitor;
void *logger;
};
-struct consul_task *consul_task_getnxset(struct swarmkv_keyspace *ks, const char *task_name)
+void http_request_on_close(struct evhttp_connection *conn, void *arg)
+{
+ return;
+}
+
+struct consul_client;
+struct consul_request
{
- struct consul_task *task=NULL;
- HASH_FIND(hh, ks->consul_task_hash, task_name, strlen(task_name), task);
- if(!task)
+ char symbol[SWARMKV_SYMBOL_MAX];
+ long long request_id;
+ struct evhttp_connection *evhttpconn; //tcp connection for the request
+ struct evhttp_request *evhttpreq;
+ struct future *f;
+ struct consul_client *ref_client;
+ UT_hash_handle hh;
+};
+struct consul_client
+{
+ unsigned short consul_agent_port;
+ char consul_agent_host[32];
+ long long req_id_generator;
+ struct event_base *evbase; //reference of keyspace evbase
+ struct log_handle *logger; //reference of logger
+ struct consul_request *request_table;
+};
+void consul_request_callback(struct evhttp_request *evhttpreq, void *arg);
+struct consul_request *consul_request_new(struct consul_client *client, const char *symbol, struct future *f)
+{
+ struct consul_request *req=ALLOC(struct consul_request, 1);
+
+ strncpy(req->symbol, symbol, sizeof(req->symbol));
+ req->evhttpconn=evhttp_connection_base_new(client->evbase, NULL, client->consul_agent_host, client->consul_agent_port);
+ evhttp_connection_set_timeout(req->evhttpconn, 1800);//set to 30min for blocking query.
+ req->evhttpreq=evhttp_request_new(consul_request_callback, req);
+ req->f=f;
+ req->ref_client=client;
+ req->request_id=client->req_id_generator++;
+
+ HASH_ADD(hh, client->request_table, request_id, sizeof(req->request_id), req);
+
+ return req;
+}
+void consul_request_free(struct consul_request *req)
+{
+ evhttp_connection_free(req->evhttpconn);
+ //if(req->evhttpreq) evhttp_request_free(req->evhttpreq);
+ future_destroy(req->f);
+ HASH_DEL(req->ref_client->request_table, req);
+ free(req);
+}
+void consul_request_callback(struct evhttp_request *evhttpreq, void *arg)
+{
+ struct consul_request *req=(struct consul_request *)arg;
+ int resp_code=0;
+
+ struct promise *p=future_to_promise(req->f);
+ if(!evhttpreq || !evhttp_request_get_response_code(evhttpreq))
+ {
+ int errcode = EVUTIL_SOCKET_ERROR();
+ log_fatal(req->ref_client->logger, MODULE_SWARMKV_KEYSPACE, "%s failed: %s",
+ req->symbol,
+ evutil_socket_error_to_string(errcode));
+ promise_failed(p, FUTURE_ERROR_EXCEPTION, "socket error");
+ }
+ else
+ {
+ resp_code=evhttp_request_get_response_code(evhttpreq);
+ }
+
+ if(resp_code!=200)
+ {
+ char what[256];
+ snprintf(what, sizeof(what), "resp_code=%d", resp_code);
+ promise_failed(p, FUTURE_ERROR_EXCEPTION, what);
+ }
+ else
{
- task=consul_task_new(task_name, ks->evbase, ks->consul_agent_host, (unsigned short)ks->consul_agent_port);
- HASH_ADD(hh, ks->consul_task_hash, task_name, strlen(task_name), task);
+ promise_success(p, evhttpreq);
}
- return task;
+ //The request should be freed automatically after the http_request_done has finished executing.
+ req->evhttpreq=NULL;
+ consul_request_free(req);
+ return;
+}
+void consul_request_make(struct consul_request *req, enum evhttp_cmd_type cmd, const char *url)
+{
+ struct evkeyvalq *output_headers = evhttp_request_get_output_headers(req->evhttpreq);
+
+ evhttp_connection_set_closecb(req->evhttpconn, http_request_on_close, req);
+ evhttp_add_header(output_headers, "Host", req->ref_client->consul_agent_host);
+ evhttp_add_header(output_headers, "Connection", "close");
+
+ evhttp_make_request(req->evhttpconn, req->evhttpreq, cmd, url);
+}
+
+struct consul_client *consul_client_new(const char *host, unsigned short port, struct event_base *evbase, struct log_handle *logger)
+{
+ struct consul_client *client=ALLOC(struct consul_client, 1);
+ client->consul_agent_port=port;
+ strncpy(client->consul_agent_host, host, sizeof(client->consul_agent_host));
+ client->logger=logger;
+ client->evbase=evbase;
+ return client;
+}
+void consul_client_free(struct consul_client *client)
+{
+ struct consul_request *req=NULL, *tmp=NULL;
+ HASH_ITER(hh, client->request_table, req, tmp)
+ {
+ consul_request_free(req);
+ }
+ free(client);
}
void consul_acquire_session_lock_async(struct swarmkv_keyspace* ks);
void consul_watch_leadership_changes_async(struct swarmkv_keyspace* ks);
@@ -341,22 +358,25 @@ void keyspace_active_expire_cycle(struct swarmkv_keyspace *ks);
void health_check_cb(struct evhttp_request *request, void *arg)
{
+ struct swarmkv_keyspace *ks=(struct swarmkv_keyspace *)arg;
struct evbuffer* evbuf_reply = evbuffer_new();
struct evkeyvalq *output_headers = evhttp_request_get_output_headers(request);
//HTTP header
- evhttp_add_header(output_headers, "Server", "swarmkv v2.0");
+ evhttp_add_header(output_headers, "Server", "Swarmkv v3.0");
evhttp_add_header(output_headers, "Content-Type", "text/plain; charset=UTF-8");//utf8
evhttp_add_header(output_headers, "Connection", "close");
- evbuffer_add_printf(evbuf_reply, "SwarmKV Cluster");
+ evbuffer_add_printf(evbuf_reply, "SwarmKV node of cluster %s is running", ks->db_name);
evhttp_send_reply(request, HTTP_OK, "OK", evbuf_reply);
evbuffer_free(evbuf_reply);
return;
}
-struct evhttp* http_server_new(struct event_base* evbase, unsigned int *port_listen, void* logger)
+struct evhttp* http_server_new(struct event_base* evbase, unsigned int *port_listen, void *arg)
{
+ struct swarmkv_keyspace *ks=(struct swarmkv_keyspace *)arg;
+ void *logger=ks->logger;
struct evhttp* http = evhttp_new(evbase);
struct evhttp_bound_socket *bound_socket=NULL;
bound_socket=evhttp_bind_socket_with_handle(http, "0.0.0.0", *port_listen);
@@ -381,7 +401,7 @@ struct evhttp* http_server_new(struct event_base* evbase, unsigned int *port_lis
evhttp_set_timeout(http, http_option_timeout);
evhttp_set_allowed_methods( http , EVHTTP_REQ_GET);
//Set a callback for a specified URI
- evhttp_set_cb(http, "/health", health_check_cb, NULL);
+ evhttp_set_cb(http, "/health", health_check_cb, ks);
return http;
}
@@ -414,15 +434,15 @@ int consul_service_register(struct swarmkv_keyspace* ks)
int ret=0;
char health_check_url[SWARMKV_URL_MAX]="";
char uuid_str[37];
- unsigned int p2p_port=0;
+ unsigned int cluster_port=0;
char ip[16];
- node_parse(&ks->self, &p2p_port, ip, sizeof(ip));
+ node_parse(&ks->self, &cluster_port, ip, sizeof(ip));
cJSON* service=cJSON_CreateObject();
cJSON_AddStringToObject(service, "Name", ks->db_name);
- snprintf(ks->consul_service_id, sizeof(ks->consul_service_id), "%.128s-%d", ks->db_name, p2p_port);
+ snprintf(ks->consul_service_id, sizeof(ks->consul_service_id), "%.128s-%d", ks->db_name, cluster_port);
cJSON_AddStringToObject(service, "ID", ks->consul_service_id);
cJSON_AddStringToObject(service, "Address", ip);
- cJSON_AddNumberToObject(service, "Port", p2p_port);
+ cJSON_AddNumberToObject(service, "Port", cluster_port);
cJSON* meta=cJSON_CreateObject();
uuid_unparse_lower(ks->uuid, uuid_str);
@@ -433,7 +453,7 @@ int consul_service_register(struct swarmkv_keyspace* ks)
cJSON* check=cJSON_CreateObject();
cJSON_AddStringToObject(check, "Name", ks->db_name);
- snprintf(ks->consul_check_id, sizeof(ks->consul_check_id), "%.128s-%u", ks->db_name, p2p_port);
+ snprintf(ks->consul_check_id, sizeof(ks->consul_check_id), "%.128s-%u", ks->db_name, cluster_port);
cJSON_AddStringToObject(check, "CheckID", ks->consul_check_id);
snprintf(health_check_url, sizeof(health_check_url), "http://%s:%u/health", ip, ks->health_check_port);
cJSON_AddStringToObject(check, "HTTP", health_check_url);
@@ -461,24 +481,43 @@ int consul_service_register(struct swarmkv_keyspace* ks)
cJSON_Delete(service);
return ret;
}
+static void __ks_dummy_event_handler(evutil_socket_t fd, short what, void * arg)
+{
+ struct swarmkv_keyspace *ks=(struct swarmkv_keyspace *)arg;
+ keyspace_active_expire_cycle(ks);
+ return;
+}
-void __consul_session_create_cb(struct evhttp_request *req, void *arg)
+void *swarmkv_keyspace_thread(void *arg)
{
+ struct swarmkv_keyspace *ks=(struct swarmkv_keyspace *)arg;
+ char thread_name[16];
+ snprintf(thread_name, sizeof(thread_name), "swarmkv-ks");
+ prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL);
+
+ struct event * ev = event_new(ks->evbase, -1, EV_PERSIST, __ks_dummy_event_handler, ks);
+ struct timeval timer_delay = {2, 0};
+ evtimer_add(ev, &timer_delay);
+
+ consul_watch_slots_changes_async(ks);
+ consul_watch_nodes_changes_async(ks);
+
+ event_base_dispatch(ks->evbase);
+ event_del(ev);
+ event_free(ev);
+
+ return NULL;
+}
+void consul_session_create_on_success(void *result, void *arg)
+{
+ struct evhttp_request *req=(struct evhttp_request *)result;
+ struct swarmkv_keyspace* ks = (struct swarmkv_keyspace *)arg;
int resp_code=0;
cJSON* session_id=NULL, *session_create_response=NULL;
- struct swarmkv_keyspace* ks = (struct swarmkv_keyspace *)arg;
+
int is_running_for_leader=0;
- if(!req || !evhttp_request_get_response_code(req))
- {
- int errcode = EVUTIL_SOCKET_ERROR();
- log_fatal(ks->logger, MODULE_SWARMKV_KEYSPACE, "session create failed: socket error %s",
- evutil_socket_error_to_string(errcode));
- }
- else
- {
- resp_code=evhttp_request_get_response_code(req);
-
- }
+
+ resp_code=evhttp_request_get_response_code(req);
if(resp_code==200)
{
struct evbuffer *buf=NULL;
@@ -507,34 +546,12 @@ void __consul_session_create_cb(struct evhttp_request *req, void *arg)
}
return;
}
-static void __ks_dummy_event_handler(evutil_socket_t fd, short what, void * arg)
+void consul_session_create_on_on_fail(enum e_future_error err, const char * what, void * arg)
{
- struct swarmkv_keyspace *ks=(struct swarmkv_keyspace *)arg;
- keyspace_active_expire_cycle(ks);
+ struct swarmkv_keyspace *ks=(struct swarmkv_keyspace*)arg;
+ log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "consul session create failed: %s.", what);
return;
}
-
-void *swarmkv_keyspace_thread(void *arg)
-{
- struct swarmkv_keyspace *ks=(struct swarmkv_keyspace *)arg;
- char thread_name[16];
- snprintf(thread_name, sizeof(thread_name), "swarmkv-ks");
- prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL);
-
- struct event * ev = event_new(ks->evbase, -1, EV_PERSIST, __ks_dummy_event_handler, ks);
- struct timeval timer_delay = {2, 0};
- evtimer_add(ev, &timer_delay);
-
- consul_watch_slots_changes_async(ks);
- consul_watch_nodes_changes_async(ks);
-
- event_base_dispatch(ks->evbase);
- event_del(ev);
- event_free(ev);
-
- return NULL;
-}
-
void consul_create_session_async(struct swarmkv_keyspace* ks)
{
cJSON* session=NULL, *service_check_array=NULL, *service_check=NULL;
@@ -557,50 +574,40 @@ void consul_create_session_async(struct swarmkv_keyspace* ks)
char url[SWARMKV_URL_MAX]="";
snprintf(url, sizeof(url), "/v1/session/create");
- struct consul_task *task=consul_task_getnxset(ks, __func__);
+ struct future *f=future_create(__func__, consul_session_create_on_success, consul_session_create_on_on_fail, ks);
+ struct consul_request *req=consul_request_new(ks->consul_client, __func__, f);
- struct evhttp_request* request=evhttp_request_new(__consul_session_create_cb, ks);
- struct evhttp_connection* conn=task->evhttpconn;
- evhttp_connection_set_timeout(conn, 2);
+
+ evhttp_connection_set_timeout(req->evhttpconn, 2);
- struct evkeyvalq *output_headers = evhttp_request_get_output_headers(request);
+ struct evkeyvalq *output_headers = evhttp_request_get_output_headers(req->evhttpreq);
- char ip[16]={};
- unsigned int port=0;
- node_parse(&ks->self, &port, ip, sizeof(ip));
- evhttp_add_header(output_headers, "Host", ip);
+ evhttp_add_header(output_headers, "Host", ks->consul_agent_host);
evhttp_add_header(output_headers, "Connection", "close");
- struct evbuffer* output_buffer = evhttp_request_get_output_buffer(request);
+ struct evbuffer *output_buffer = evhttp_request_get_output_buffer(req->evhttpreq);
evbuffer_add(output_buffer, payload, strlen(payload));
char number[64];
snprintf(number, sizeof(number), "%zu", strlen(payload));
evhttp_add_header(output_headers, "Content-Type", "application/json");
evhttp_add_header(output_headers, "Content-Length", number);
- evhttp_make_request(conn, request, EVHTTP_REQ_PUT, url);
+ consul_request_make(req, EVHTTP_REQ_PUT, url);
free(payload);
return;
}
-void __consul_session_check_cb(struct evhttp_request *req, void *arg)
+void consul_session_check_on_success(void *result, void *arg)
{
int resp_code=0;
cJSON *session_check=NULL;
int session_check_array_size=0;
- struct swarmkv_keyspace* ks = (struct swarmkv_keyspace *)arg;
+ struct swarmkv_keyspace *ks = (struct swarmkv_keyspace *)arg;
int is_running_for_leader=0;
- if(!req || !evhttp_request_get_response_code(req))
- {
- int errcode = EVUTIL_SOCKET_ERROR();
- log_fatal(ks->logger, MODULE_SWARMKV_KEYSPACE, "session check failed: socket error %s",
- evutil_socket_error_to_string(errcode));
- }
- else
- {
- resp_code=evhttp_request_get_response_code(req);
-
- }
+ struct evhttp_request *req=(struct evhttp_request *)result;
+
+ resp_code=evhttp_request_get_response_code(req);
+
if(resp_code==200)
{
struct evbuffer *buf=NULL;
@@ -636,102 +643,87 @@ void __consul_session_check_cb(struct evhttp_request *req, void *arg)
}
return;
}
-
+void consul_session_check_on_fail(enum e_future_error err, const char * what, void * arg)
+{
+ struct swarmkv_keyspace *ks=(struct swarmkv_keyspace*)arg;
+ log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "consul session check failed: %s.", what);
+ return;
+}
void consul_session_check_async(struct swarmkv_keyspace* ks)
{
char url[SWARMKV_URL_MAX]="";
snprintf(url, sizeof(url), "/v1/session/info/%s", ks->consul_session_id);
- struct consul_task *task=consul_task_getnxset(ks, __func__);
-
- struct evhttp_request* request=evhttp_request_new(__consul_session_check_cb, ks);
- struct evhttp_connection* conn=task->evhttpconn;
- evhttp_connection_set_timeout(conn, 2);
-
- struct evkeyvalq *output_headers = evhttp_request_get_output_headers(request);
-
- char ip[16]={};
- unsigned int port=0;
- node_parse(&ks->self, &port, ip, sizeof(ip));
- evhttp_add_header(output_headers, "Host", ip);
- evhttp_add_header(output_headers, "Connection", "close");
- evhttp_make_request(conn, request, EVHTTP_REQ_GET, url);
+ struct future *f=future_create(__func__, consul_session_check_on_success, consul_session_check_on_fail, ks);
+ struct consul_request *req=consul_request_new(ks->consul_client, __func__, f);
+ consul_request_make(req, EVHTTP_REQ_GET, url);
return;
}
-void __acquire_session_lock_cb(struct evhttp_request *req, void *arg)
+void acquire_session_lock_on_success(void* result, void *arg)
{
+ struct evhttp_request *req=(struct evhttp_request *)result;
struct swarmkv_keyspace* ks=(struct swarmkv_keyspace*)arg;
int resp_code=0;
- if(!req || !evhttp_request_get_response_code(req))
- {
- int errcode = EVUTIL_SOCKET_ERROR();
- log_fatal(ks->logger, MODULE_SWARMKV_KEYSPACE, "run for leader failed: %s",
- evutil_socket_error_to_string(errcode));
- }
- else
+
+ resp_code=evhttp_request_get_response_code(req);
+ if(resp_code==200)
{
- resp_code=evhttp_request_get_response_code(req);
- if(resp_code==200)
- {
- struct evbuffer *buf=NULL;
- buf=evhttp_request_get_input_buffer(req);
- size_t len = evbuffer_get_length(buf);
- sds resp_body_buff=sdsnewlen(SDS_NOINIT, len);
- evbuffer_copyout(buf, resp_body_buff, len);
+ struct evbuffer *buf=NULL;
+ buf=evhttp_request_get_input_buffer(req);
+ size_t len = evbuffer_get_length(buf);
+ sds resp_body_buff=sdsnewlen(SDS_NOINIT, len);
+ evbuffer_copyout(buf, resp_body_buff, len);
- if(0==strncmp(resp_body_buff, "true", strlen("true")))
- {
- ks->is_leader=1;
- log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "become cluster leader.");
- }
- else
- {
- log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "lose the election.");
- ks->is_leader=0;
- }
- sdsfree(resp_body_buff);
+ if(0==strncmp(resp_body_buff, "true", strlen("true")))
+ {
+ ks->is_leader=1;
+ log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "become cluster leader.");
}
else
{
- log_fatal(ks->logger, MODULE_SWARMKV_KEYSPACE, "run for leader failed: HTTP Code %d.", resp_code);
+ log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "lose the election.");
+ ks->is_leader=0;
}
+ sdsfree(resp_body_buff);
+ }
+ else
+ {
+ log_fatal(ks->logger, MODULE_SWARMKV_KEYSPACE, "run for leader failed: HTTP Code %d.", resp_code);
}
consul_watch_leadership_changes_async(ks);
}
-
+void acquire_session_lock_on_fail(enum e_future_error err, const char * what, void * arg)
+{
+ struct swarmkv_keyspace *ks=(struct swarmkv_keyspace*)arg;
+ log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "consul session lock aquire failed: %s.", what);
+ consul_watch_leadership_changes_async(ks);
+ return;
+}
void consul_acquire_session_lock_async(struct swarmkv_keyspace* ks)
{
sds req_body=node_print_json(&ks->self, ks->uuid);
char url[SWARMKV_URL_MAX];
- struct consul_task *task=consul_task_getnxset(ks, __func__);
-
- //run for leader
snprintf(url, sizeof(url), "/v1/kv/swarmkv/%s/lead?acquire=%s", ks->db_name, ks->consul_session_id);
- struct evhttp_request* request=evhttp_request_new(__acquire_session_lock_cb, ks);
- struct evhttp_connection* conn=task->evhttpconn;
- evhttp_connection_set_timeout(conn, 2);
+ struct future *f=future_create(__func__, acquire_session_lock_on_success, acquire_session_lock_on_fail, ks);
+ struct consul_request *req=consul_request_new(ks->consul_client, __func__, f);
+
+ evhttp_connection_set_timeout(req->evhttpconn, 2);
- struct evbuffer* output_buffer = evhttp_request_get_output_buffer(request);
+ struct evbuffer* output_buffer = evhttp_request_get_output_buffer(req->evhttpreq);
evbuffer_add(output_buffer, req_body, sdslen(req_body));
char number[64];
- struct evkeyvalq *output_headers = evhttp_request_get_output_headers(request);
+ struct evkeyvalq *output_headers = evhttp_request_get_output_headers(req->evhttpreq);
snprintf(number, sizeof(number), "%zu", sdslen(req_body));
- char ip[16]={};
- unsigned int port=0;
- node_parse(&ks->self, &port, ip, sizeof(ip));
- evhttp_add_header(output_headers, "Host", ip);
- evhttp_add_header(output_headers, "Connection", "close");
-
evhttp_add_header(output_headers, "Content-Type", "application/json");
evhttp_add_header(output_headers, "Content-Length", number);
- evhttp_make_request(conn, request, EVHTTP_REQ_PUT, url);
+ consul_request_make(req, EVHTTP_REQ_PUT, url);
sdsfree(req_body);
}
@@ -813,7 +805,8 @@ void consul_watch_leadership_changes_async(struct swarmkv_keyspace* ks)
snprintf(url, sizeof(url), "/v1/kv/swarmkv/%s/lead?index=%d&wait=10s", ks->db_name, ks->consul_lead_key_modify_idx);
struct future *f=future_create(__func__, watch_leadership_changes_on_success, watch_leadership_changes_on_fail, ks);
- http_get_async(__func__, ks->self, url, ks->consul_agent_host, ks->consul_agent_port, ks->evbase, f, ks->logger);
+ struct consul_request *req=consul_request_new(ks->consul_client, __func__, f);
+ consul_request_make(req, EVHTTP_REQ_GET, url);
return;
}
void watch_slots_changes_on_success(void *result, void *arg)
@@ -861,7 +854,7 @@ void watch_slots_changes_on_success(void *result, void *arg)
if(slot_rt->state!=STATE_STABLE)
{
// log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "slot %d is reset to STABLE state, owner %s:%u.",
-// slot->slot_id, owner->addr.ip_addr, owner->addr.p2p_port);
+// slot->slot_id, owner->addr.ip_addr, owner->addr.cluster_port);
memset(&(slot_rt->rebalancing_peer), 0, sizeof(slot_rt->rebalancing_peer));
}
slot_rt->state=STATE_STABLE;
@@ -892,24 +885,17 @@ void consul_watch_slots_changes_async(struct swarmkv_keyspace *ks)
char url[SWARMKV_URL_MAX]="";
snprintf(url, sizeof(url), "/v1/kv/swarmkv/%s/slots?index=%d", ks->db_name, ks->consul_slots_modify_idx);
struct future *f=future_create(__func__, watch_slots_changes_on_success, watch_slots_changes_on_fail, ks);
- http_get_async(__func__, ks->self, url, ks->consul_agent_host, ks->consul_agent_port, ks->evbase, f, ks->logger);
+ struct consul_request *req=consul_request_new(ks->consul_client, __func__, f);
+ consul_request_make(req, EVHTTP_REQ_GET, url);
return;
}
-void __propagate_slot_table_cb(struct evhttp_request *req, void *arg)
+void propagate_slot_table_on_success(void *result, void *arg)
{
- struct swarmkv_keyspace* ks=(struct swarmkv_keyspace*)arg;
+ struct evhttp_request *req=(struct evhttp_request *)result;
+ struct swarmkv_keyspace *ks=(struct swarmkv_keyspace*)arg;
int resp_code=0;
- if(!req || !evhttp_request_get_response_code(req))
- {
- int errcode = EVUTIL_SOCKET_ERROR();
- log_fatal(ks->logger, MODULE_SWARMKV_KEYSPACE, "key slots table propagate failed: socket error %s",
- evutil_socket_error_to_string(errcode));
- }
- else
- {
- resp_code=evhttp_request_get_response_code(req);
-
- }
+
+ resp_code=evhttp_request_get_response_code(req);
if(resp_code==200)
{
log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "key slots table propagate success.");
@@ -920,35 +906,35 @@ void __propagate_slot_table_cb(struct evhttp_request *req, void *arg)
}
return;
}
+void propagate_slot_table_on_fail(enum e_future_error err, const char * what, void * arg)
+{
+ struct swarmkv_keyspace *ks=(struct swarmkv_keyspace*)arg;
+ log_fatal(ks->logger, MODULE_SWARMKV_KEYSPACE, "key slots table propagate failed: %s",
+ what);
+}
void propagate_slot_table_async(struct swarmkv_keyspace *ks, struct key_slot new_slot[], size_t slot_num)
{
- struct consul_task *task=consul_task_getnxset(ks, __func__);
-
sds new_slots_json=NULL;
new_slots_json=keyslots2json(new_slot, sizeof(struct key_slot), 0, KEYSPACE_SLOT_NUM);
char url[SWARMKV_URL_MAX]="";
snprintf(url, sizeof(url), "/v1/kv/swarmkv/%s/slots", ks->db_name);
- struct evhttp_request *request=evhttp_request_new(__propagate_slot_table_cb, ks);
- struct evhttp_connection *conn=task->evhttpconn;
- evhttp_connection_set_timeout(conn, 20);
-
- struct evkeyvalq *output_headers = evhttp_request_get_output_headers(request);
+ struct future *f=future_create(__func__, propagate_slot_table_on_success, propagate_slot_table_on_fail, ks);
+ struct consul_request *req=consul_request_new(ks->consul_client, __func__, f);
- evhttp_add_header(output_headers, "Host", ks->consul_agent_host);
- evhttp_add_header(output_headers, "Connection", "close");
+ evhttp_connection_set_timeout(req->evhttpconn, 20);
- struct evbuffer* output_buffer = evhttp_request_get_output_buffer(request);
+ struct evkeyvalq *output_headers = evhttp_request_get_output_headers(req->evhttpreq);
+
+ struct evbuffer* output_buffer = evhttp_request_get_output_buffer(req->evhttpreq);
evbuffer_add(output_buffer, new_slots_json, sdslen(new_slots_json));
char number[64];
snprintf(number, sizeof(number), "%zu", sdslen(new_slots_json));
evhttp_add_header(output_headers, "Content-Type", "application/json");
evhttp_add_header(output_headers, "Content-Length", number);
- evhttp_make_request(conn, request, EVHTTP_REQ_PUT, url);
-
+ consul_request_make(req, EVHTTP_REQ_PUT, url);
sdsfree(new_slots_json);
new_slots_json=NULL;
-
}
void remove_failed_nodes_from_global_slot_table(struct swarmkv_keyspace *ks, node_t *health_nodes, size_t n_node)
{
@@ -1133,8 +1119,8 @@ void consul_watch_nodes_changes_async(struct swarmkv_keyspace* ks)
snprintf(url, sizeof(url), "/v1/health/service/%s?passing=1&index=%d&wait=10s", ks->db_name, ks->consul_nodes_modify_idx);
struct future *f=future_create(__func__, watch_nodes_changes_on_success, watch_nodes_changes_on_fail, ks);
- http_get_async(__func__, ks->self, url, ks->consul_agent_host, ks->consul_agent_port, ks->evbase, f, ks->logger);
-
+ struct consul_request *req=consul_request_new(ks->consul_client, __func__, f);
+ consul_request_make(req, EVHTTP_REQ_GET, url);
}
int keyslots_init(struct swarmkv_keyspace *ks)
@@ -1214,17 +1200,18 @@ struct swarmkv_module* swarmkv_keyspace_new(const struct swarmkv_options *opts,
ks->module.lock=keyspace_lock;
ks->module.unlock=keyspace_unlock;
strncpy(ks->db_name, db_name, sizeof(ks->db_name));
- node_init(&ks->self, opts->bind_address, opts->p2p_port);
+ node_init(&ks->self, opts->bind_address, opts->cluster_port);
uuid_copy(ks->uuid, opts->bin_uuid);
-
+ ks->opts=opts;
ks->health_check_port=opts->health_check_port;
ks->consul_agent_port=opts->consul_port;
- strncpy(ks->consul_agent_host, opts->consul_address, sizeof(ks->consul_agent_host));
+ strncpy(ks->consul_agent_host, opts->consul_agent_host, sizeof(ks->consul_agent_host));
ks->evbase = event_base_new();
ks->logger = logger;
ks->dryrun= opts->dryrun;
+ ks->consul_client=consul_client_new(ks->opts->consul_agent_host, ks->opts->consul_port, ks->evbase, ks->logger);
int ret=-1;
ret=keyslots_init(ks);
@@ -1242,7 +1229,7 @@ struct swarmkv_module* swarmkv_keyspace_new(const struct swarmkv_options *opts,
log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "work on dry run mode.");
return &(ks->module);
}
- ks->http_server=http_server_new(ks->evbase, &(ks->health_check_port), ks->logger);
+ ks->http_server=http_server_new(ks->evbase, &(ks->health_check_port), ks);
if(!ks->http_server)
{
asprintf(err, "health check HTTP server start failed.");
@@ -1295,12 +1282,8 @@ void swarmkv_keyspace_free(struct swarmkv_module* module)
}
pthread_mutex_destroy(&slot_rt->mutex);
}
- struct consul_task *task=NULL, *tmp_task=NULL;
- HASH_ITER(hh, ks->consul_task_hash, task, tmp_task)
- {
- HASH_DELETE(hh, ks->consul_task_hash, task);
- consul_task_free(task);
- }
+ consul_client_free(ks->consul_client);
+ ks->consul_client=NULL;
event_base_free(ks->evbase);
free(ks);
return;
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c
index 1b33c71..3dedef3 100644
--- a/src/swarmkv_store.c
+++ b/src/swarmkv_store.c
@@ -580,7 +580,7 @@ struct swarmkv_module *swarmkv_store_new(const struct swarmkv_options *opts, exe
pthread_rwlock_init(&(store->rwlock[i]), NULL);
}
uuid_copy(store->my_uuid, node_uuid);
- node_init(&store->self, opts->bind_address, opts->p2p_port);
+ node_init(&store->self, opts->bind_address, opts->cluster_port);
store->exec_cmd=send_cmd;
store->exec_cmd_handle=handle_send_cmd;
diff --git a/test/swarmkv_cli_test.cpp b/test/swarmkv_cli_test.cpp
index ad1c65d..f932f56 100644
--- a/test/swarmkv_cli_test.cpp
+++ b/test/swarmkv_cli_test.cpp
@@ -15,7 +15,7 @@
struct swarmkv_cli_node_addr
{
char ip_addr[INET6_ADDRSTRLEN];
- unsigned int p2p_port;
+ unsigned int cluster_port;
};
void wait_for_check()
@@ -38,14 +38,14 @@ void swarmkv_cli_set_db(const char *cluster_name)
asprintf(&(g_cli_system.cluster_name), "%s", cluster_name);
}
-void swarmkv_cli_attach(const char *ip_addr, unsigned int p2p_port)
+void swarmkv_cli_attach(const char *ip_addr, unsigned int cluster_port)
{
g_cli_system.attached=1;
if(g_cli_system.attach_target_line)
{
free(g_cli_system.attach_target_line);
}
- asprintf(&(g_cli_system.attach_target_line), "--attach %s:%u", ip_addr, p2p_port);
+ asprintf(&(g_cli_system.attach_target_line), "--attach %s:%u", ip_addr, cluster_port);
}
void swarmkv_cli_detach()
@@ -149,7 +149,7 @@ int swarmkv_cli_get_addr_node(char *result, struct swarmkv_cli_node_addr *node_a
p+=3;
}
int n_read=0;
- n_read=sscanf(p, "%[^:]:%u", node_addr[i].ip_addr, &node_addr[i].p2p_port);
+ n_read=sscanf(p, "%[^:]:%u", node_addr[i].ip_addr, &node_addr[i].cluster_port);
if(n_read!=2)
{
return -1;
@@ -172,7 +172,7 @@ protected:
swarmkv_cli_create_cluster(cluster_name, "127.0.0.1:5210");
logger=log_handle_create(log_path, 0);
struct swarmkv_options* opts=swarmkv_options_new();
- swarmkv_options_set_p2p_port(opts, 5210);
+ swarmkv_options_set_cluster_port(opts, 5210);
swarmkv_options_set_health_check_port(opts, 6210);
swarmkv_options_set_logger(opts, logger);
db=swarmkv_open(opts, cluster_name, &err);
@@ -286,7 +286,7 @@ TEST_F(SwarmkvCliNodes, ReplicaDel)
swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), NULL, "KEYSPACE RLIST %s", "xiaolv");
swarmkv_cli_get_addr_node(result, &node_addr);
- swarmkv_cli_attach(node_addr.ip_addr, node_addr.p2p_port);
+ swarmkv_cli_attach(node_addr.ip_addr, node_addr.cluster_port);
cmd_exec_arg_expect_integer(reply_arg, 1);
swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_integer, "CRDT DEL xiaolv");
cmd_exec_arg_clear(reply_arg);
@@ -339,7 +339,7 @@ TEST_F(SwarmkvCliNodes, KeyspaceRdel)
swarmkv_cli_get_addr_node(result, &node_addr);
cmd_exec_arg_expect_integer(reply_arg, 1);
- swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_integer, "KEYSPACE RDEL %s %s:%u", "xiaoqin", node_addr.ip_addr, node_addr.p2p_port);
+ swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_integer, "KEYSPACE RDEL %s %s:%u", "xiaoqin", node_addr.ip_addr, node_addr.cluster_port);
cmd_exec_arg_clear(reply_arg);
cmd_exec_arg_expect_NIL(reply_arg);
@@ -387,11 +387,11 @@ TEST_F(SwarmkvCliNodes, KeyspaceRadd)
swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), NULL, "KEYSPACE RLIST %s", "xiaolan");
swarmkv_cli_get_addr_node(result, &node_addr);
- unsigned int p2p_port = (node_addr.p2p_port == 5210) ? 5211: 5210;
+ unsigned int cluster_port = (node_addr.cluster_port == 5210) ? 5211: 5210;
- sprintf(reply_string, "1) %s:%u,2) %s:%u", node_addr.ip_addr, node_addr.p2p_port, node_addr.ip_addr, p2p_port);
+ sprintf(reply_string, "1) %s:%u,2) %s:%u", node_addr.ip_addr, node_addr.cluster_port, node_addr.ip_addr, cluster_port);
cmd_exec_arg_expect_cstring(reply_arg, reply_string);
- swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_string, "KEYSPACE RADD %s %s:%d", "xiaolan", node_addr.ip_addr, p2p_port);
+ swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_string, "KEYSPACE RADD %s %s:%d", "xiaolan", node_addr.ip_addr, cluster_port);
cmd_exec_arg_clear(reply_arg);
wait_for_check();
@@ -403,7 +403,7 @@ TEST_F(SwarmkvCliNodes, KeyspaceRadd)
swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_integer, "CLUSTER SANITY heal");
cmd_exec_arg_clear(reply_arg);
- sprintf(reply_string, "1) %s:%d", node_addr.ip_addr, node_addr.p2p_port);
+ sprintf(reply_string, "1) %s:%d", node_addr.ip_addr, node_addr.cluster_port);
cmd_exec_arg_expect_cstring(reply_arg, reply_string);
swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_string, "KEYSPACE RLIST %s", "xiaolan");
@@ -543,7 +543,7 @@ protected:
logger=log_handle_create(log_path, 0);
struct swarmkv_options* opts1=swarmkv_options_new();
- swarmkv_options_set_p2p_port(opts1, 5210);
+ swarmkv_options_set_cluster_port(opts1, 5210);
swarmkv_options_set_health_check_port(opts1, 6210);
swarmkv_options_set_logger(opts1, logger);
db1=swarmkv_open(opts1, cluster_name, &err);
@@ -555,7 +555,7 @@ protected:
}
struct swarmkv_options* opts2=swarmkv_options_new();
- swarmkv_options_set_p2p_port(opts2, 5211);
+ swarmkv_options_set_cluster_port(opts2, 5211);
swarmkv_options_set_health_check_port(opts2, 6211);
swarmkv_options_set_logger(opts2, logger);
db2=swarmkv_open(opts2, cluster_name, &err);
diff --git a/test/swarmkv_gtest.cpp b/test/swarmkv_gtest.cpp
index 34575c7..386ee1e 100644
--- a/test/swarmkv_gtest.cpp
+++ b/test/swarmkv_gtest.cpp
@@ -38,7 +38,7 @@ protected:
swarmkv_cli_create_cluster(cluster_name, "127.0.0.1:5210");
logger=log_handle_create(log_path, 0);
struct swarmkv_options* opts=swarmkv_options_new();
- swarmkv_options_set_p2p_port(opts, 5210);
+ swarmkv_options_set_cluster_port(opts, 5210);
swarmkv_options_set_health_check_port(opts, 6210);
swarmkv_options_set_logger(opts, logger);
db=swarmkv_open(opts, cluster_name, &err);
@@ -491,7 +491,7 @@ protected:
logger=log_handle_create(log_path, 0);
struct swarmkv_options* opts1=swarmkv_options_new();
- swarmkv_options_set_p2p_port(opts1, 5210);
+ swarmkv_options_set_cluster_port(opts1, 5210);
swarmkv_options_set_health_check_port(opts1, 6210);
swarmkv_options_set_logger(opts1, logger);
db1=swarmkv_open(opts1, cluster_name, &err);
@@ -503,7 +503,7 @@ protected:
}
struct swarmkv_options* opts2=swarmkv_options_new();
- swarmkv_options_set_p2p_port(opts2, 5211);
+ swarmkv_options_set_cluster_port(opts2, 5211);
swarmkv_options_set_health_check_port(opts2, 6211);
swarmkv_options_set_logger(opts2, logger);
db2=swarmkv_open(opts2, cluster_name, &err);
@@ -1259,7 +1259,7 @@ protected:
for(i=0; i<TEST_NODE_NUMBER; i++)
{
opts[i]=swarmkv_options_new();
- swarmkv_options_set_p2p_port(opts[i], 5210+i);
+ swarmkv_options_set_cluster_port(opts[i], 5210+i);
swarmkv_options_set_health_check_port(opts[i], 6210+i);
swarmkv_options_set_worker_thread_number(opts[i], 2);
swarmkv_options_set_logger(opts[i], logger);
diff --git a/test/swarmkv_perf_test.cpp b/test/swarmkv_perf_test.cpp
index 8879172..136074a 100644
--- a/test/swarmkv_perf_test.cpp
+++ b/test/swarmkv_perf_test.cpp
@@ -98,7 +98,7 @@ TEST(Performance, Nthreads)
for(i=0; i<NODE_NUMBER; i++)
{
opts[i]=swarmkv_options_new();
- swarmkv_options_set_p2p_port(opts[i], 5310+i);
+ swarmkv_options_set_cluster_port(opts[i], 5310+i);
swarmkv_options_set_health_check_port(opts[i], 6310+i);
swarmkv_options_set_logger(opts[i], logger);
swarmkv_options_set_worker_thread_number(opts[i], WORKER_THREAD_NUMBER);
@@ -233,11 +233,11 @@ TEST(Resilience, AddSlotOwner)
for(i=0; i<NODE_NUMBER+CANDINATE_NUMBER; i++)
{
opts[i]=swarmkv_options_new();
- swarmkv_options_set_p2p_port(opts[i], p2p_port_start+i);
+ swarmkv_options_set_cluster_port(opts[i], p2p_port_start+i);
swarmkv_options_set_health_check_port(opts[i], health_port_start+i);
swarmkv_options_set_logger(opts[i], logger);
swarmkv_options_set_worker_thread_number(opts[i], WORKER_THREAD_NUMBER);
- swarmkv_options_set_p2p_timeout_us(opts[i], 100*1000);
+ swarmkv_options_set_cluster_timeout_us(opts[i], 100*1000);
swarmkv_options_set_disable_run_for_leader(opts[i]);
db[i]=swarmkv_open(opts[i], cluster_name, &err);
if(err)
@@ -346,11 +346,11 @@ TEST(Performance, Sync)
for(size_t i=0; i<NODE_NUMBER; i++)
{
opts[i]=swarmkv_options_new();
- swarmkv_options_set_p2p_port(opts[i], p2p_port_start+i);
+ swarmkv_options_set_cluster_port(opts[i], p2p_port_start+i);
swarmkv_options_set_health_check_port(opts[i], health_port_start+i);
swarmkv_options_set_logger(opts[i], logger);
swarmkv_options_set_worker_thread_number(opts[i], WORKER_THREAD_NUMBER);
- swarmkv_options_set_p2p_timeout_us(opts[i], 1000*1000);
+ swarmkv_options_set_cluster_timeout_us(opts[i], 1000*1000);
swarmkv_options_set_sync_interval_us(opts[i], 100*1000);
swarmkv_options_set_disable_run_for_leader(opts[i]);
db[i]=swarmkv_open(opts[i], cluster_name, &err);
@@ -427,7 +427,7 @@ TEST(Resilience, Failover)
for(size_t i=0; i<NODE_NUMBER; i++)
{
opts[i]=swarmkv_options_new();
- swarmkv_options_set_p2p_port(opts[i], p2p_port_start+i);
+ swarmkv_options_set_cluster_port(opts[i], p2p_port_start+i);
swarmkv_options_set_health_check_port(opts[i], health_port_start+i);
db[i]=swarmkv_open(opts[i], cluster_name, &err);
if(err)
diff --git a/test/swarmkv_scalability_test.cpp b/test/swarmkv_scalability_test.cpp
index 35f1505..75deb06 100644
--- a/test/swarmkv_scalability_test.cpp
+++ b/test/swarmkv_scalability_test.cpp
@@ -169,11 +169,11 @@ TEST(Scalability, MultiThreads)
const char *cluster_name="demo";
struct swarmkv_options *opts=swarmkv_options_new();
swarmkv_options_set_dryrun(opts);
- swarmkv_options_set_p2p_port(opts, 5212);
+ swarmkv_options_set_cluster_port(opts, 5212);
swarmkv_options_set_health_check_port(opts, 6212);
swarmkv_options_set_logger(opts, logger);
swarmkv_options_set_worker_thread_number(opts, 2);
- swarmkv_options_set_p2p_timeout_us(opts, 500*1000);
+ swarmkv_options_set_cluster_timeout_us(opts, 500*1000);
struct swarmkv *db=swarmkv_open(opts, cluster_name, &err);
if(err)
{
diff --git a/tools/swarmkv_cli.c b/tools/swarmkv_cli.c
index 65a41e9..a655008 100644
--- a/tools/swarmkv_cli.c
+++ b/tools/swarmkv_cli.c
@@ -26,7 +26,7 @@ struct config
{
char db_name[256];
unsigned int consul_port;
- unsigned int p2p_port;
+ unsigned int cluster_port;
char consul_host[64];
struct swarmkv *db;
struct cluster_manager_command cluster_mgr_cmd;
@@ -389,8 +389,8 @@ struct swarmkv_reply *cluster_addslotowner_command(struct swarmkv *db, char *arg
{
continue;
}
-/* printf("Migrating slot %d from %s %u to %s %u ... ", tmp->slot_id, tmp->owner.addr.ip_addr, tmp->owner.addr.p2p_port,
- node.addr.ip_addr, node.addr.p2p_port);
+/* printf("Migrating slot %d from %s %u to %s %u ... ", tmp->slot_id, tmp->owner.addr.ip_addr, tmp->owner.addr.cluster_port,
+ node.addr.ip_addr, node.addr.cluster_port);
*/
/*STEP 2.1 Set NEW node's slot to IMPORTING state. Slot will return to STABLE state after udpate the global slot table*/
setslot_reply=swarmkv_command_on(db, new_node->addr, "keyspace setslot %d IMPORTING %s",
@@ -1185,7 +1185,7 @@ int main(int argc, char * argv[])
}
else if(!strcmp(argv[i], "-l") && !lastarg)
{
- sscanf(argv[++i], "%u", &g_config.p2p_port);
+ sscanf(argv[++i], "%u", &g_config.cluster_port);
}
else if(!strcmp(argv[i], "--cluster-create") && !lastarg)
{
@@ -1242,7 +1242,7 @@ int main(int argc, char * argv[])
void * logger=log_handle_create("swarmkv-cli.log", 0);
struct swarmkv_options *opts=swarmkv_options_new();
swarmkv_options_set_dryrun(opts);
- swarmkv_options_set_p2p_port(opts, g_config.p2p_port);//listen on random port
+ swarmkv_options_set_cluster_port(opts, g_config.cluster_port);//listen on random port
swarmkv_options_set_consul_host(opts, g_config.consul_host);
swarmkv_options_set_consul_port(opts, g_config.consul_port);
swarmkv_options_set_worker_thread_number(opts, 1);
diff --git a/tools/swarmkv_simple_node.cpp b/tools/swarmkv_simple_node.cpp
index c67ea76..70f7deb 100644
--- a/tools/swarmkv_simple_node.cpp
+++ b/tools/swarmkv_simple_node.cpp
@@ -33,7 +33,7 @@ int main(int argc, char ** argv)
long long key_number=0;
int worker_thread_number=1;
unsigned int consul_port=8500;
- unsigned int p2p_port;
+ unsigned int cluster_port;
char cluster_name[256];
if(argc < 3) { help();}
strcpy(consul_host, "127.0.0.1");
@@ -47,7 +47,7 @@ int main(int argc, char ** argv)
}
else if (!strcmp(argv[i], "-h") && !lastarg)
{
- sscanf(argv[++i], "%[^:]:%u", host, &p2p_port);
+ sscanf(argv[++i], "%[^:]:%u", host, &cluster_port);
}
else if(!strcmp(argv[i], "-c") && !lastarg)
{
@@ -70,12 +70,12 @@ int main(int argc, char ** argv)
char *err=NULL;
struct swarmkv_options *opts=swarmkv_options_new();
- swarmkv_options_set_p2p_port(opts, p2p_port);
+ swarmkv_options_set_cluster_port(opts, cluster_port);
swarmkv_options_set_bind_address(opts, host);
swarmkv_options_set_consul_host(opts, consul_host);
swarmkv_options_set_consul_port(opts, consul_port);
swarmkv_options_set_worker_thread_number(opts, worker_thread_number);
- swarmkv_options_set_p2p_timeout_us(opts, 500000);
+ swarmkv_options_set_cluster_timeout_us(opts, 500000);
swarmkv_options_set_sync_interval_us(opts, 500);
//swarmkv_options_set_disable_run_for_leader(opts);
struct swarmkv *db=swarmkv_open(opts, cluster_name, &err);