diff options
| author | Zheng Chao <[email protected]> | 2023-01-18 20:17:02 +0800 |
|---|---|---|
| committer | Zheng Chao <[email protected]> | 2023-01-18 20:17:02 +0800 |
| commit | c283dfdac2ea674fdd52fc61f00814caf8f6f5c6 (patch) | |
| tree | 1c51eafc559c90e19fa1dfb55ef10ce99b34dca1 | |
| parent | 40f5ca926572ee4089e16db73d232237fe2e5547 (diff) | |
:construction: Refactoring Consul communication of keyspace.
| -rw-r--r-- | examples/async_example.c | 2 | ||||
| -rw-r--r-- | examples/simple_example.c | 2 | ||||
| -rw-r--r-- | include/swarmkv/swarmkv.h | 5 | ||||
| -rw-r--r-- | src/inc_internal/swarmkv_common.h | 6 | ||||
| -rw-r--r-- | src/swarmkv.c | 4 | ||||
| -rw-r--r-- | src/swarmkv_api.c | 14 | ||||
| -rw-r--r-- | src/swarmkv_keyspace.c | 519 | ||||
| -rw-r--r-- | src/swarmkv_store.c | 2 | ||||
| -rw-r--r-- | test/swarmkv_cli_test.cpp | 26 | ||||
| -rw-r--r-- | test/swarmkv_gtest.cpp | 8 | ||||
| -rw-r--r-- | test/swarmkv_perf_test.cpp | 12 | ||||
| -rw-r--r-- | test/swarmkv_scalability_test.cpp | 4 | ||||
| -rw-r--r-- | tools/swarmkv_cli.c | 10 | ||||
| -rw-r--r-- | tools/swarmkv_simple_node.cpp | 8 |
14 files changed, 302 insertions, 320 deletions
diff --git a/examples/async_example.c b/examples/async_example.c index b9bc5a1..5149bcf 100644 --- a/examples/async_example.c +++ b/examples/async_example.c @@ -24,7 +24,7 @@ int main(int argc, char **argv) for(size_t i=0; i<2; i++) { opts[i]=swarmkv_options_new(); - swarmkv_options_set_p2p_port(opts[i], 5210+i); + swarmkv_options_set_cluster_port(opts[i], 5210+i); db[i]=swarmkv_open(opts[i], cluster_name, &err); if(err) { diff --git a/examples/simple_example.c b/examples/simple_example.c index fad8262..d00eae1 100644 --- a/examples/simple_example.c +++ b/examples/simple_example.c @@ -11,7 +11,7 @@ int main(int argc, char **argv) for(size_t i=0; i<2; i++) { opts[i]=swarmkv_options_new(); - swarmkv_options_set_p2p_port(opts[i], 5210+i); + swarmkv_options_set_cluster_port(opts[i], 5210+i); db[i]=swarmkv_open(opts[i], cluster_name, &err); if(err) { diff --git a/include/swarmkv/swarmkv.h b/include/swarmkv/swarmkv.h index 47117af..c29df32 100644 --- a/include/swarmkv/swarmkv.h +++ b/include/swarmkv/swarmkv.h @@ -44,10 +44,9 @@ void swarmkv_reply_print(const struct swarmkv_reply *reply, FILE* stream); struct swarmkv_options; struct swarmkv_options* swarmkv_options_new(void); -int swarmkv_options_set_p2p_port(struct swarmkv_options *opts, unsigned int p2p_port); -int swarmkv_options_set_p2p_listen_port(struct swarmkv_options *opts, unsigned int p2p_port); +int swarmkv_options_set_cluster_port(struct swarmkv_options *opts, unsigned int cluster_port); int swarmkv_options_set_health_check_port(struct swarmkv_options *opts, unsigned int health_check_port); -int swarmkv_options_set_p2p_timeout_us(struct swarmkv_options *opts, unsigned int timeout_us); +int swarmkv_options_set_cluster_timeout_us(struct swarmkv_options *opts, unsigned int timeout_us); int swarmkv_options_set_sync_interval_us(struct swarmkv_options *opts, unsigned int interval_us); int swarmkv_options_set_bind_address(struct swarmkv_options *opts, const char* ip_addr); int swarmkv_options_set_logger(struct swarmkv_options *opts, void *logger); diff --git a/src/inc_internal/swarmkv_common.h b/src/inc_internal/swarmkv_common.h index c63f4c2..d039c67 100644 --- a/src/inc_internal/swarmkv_common.h +++ b/src/inc_internal/swarmkv_common.h @@ -28,7 +28,7 @@ enum cmd_exec_result struct swarmkv_node_addr { char ip_addr[INET6_ADDRSTRLEN]; //#define INET6_ADDRSTRLEN 46 (#include <arpa/inet.h>, see man inet_ntop - unsigned int p2p_port; //host order + unsigned int cluster_port; //host order };*/ #define MAX_IPV4_ADDR_LEN 15 @@ -59,7 +59,7 @@ char *str_replace(char *orig, char *rep, char *with); struct swarmkv_options { unsigned int health_check_port; - unsigned int p2p_port; + unsigned int cluster_port; int dryrun; int run_for_leader_flag; unsigned int consul_port; @@ -68,7 +68,7 @@ struct swarmkv_options struct log_handle *logger; int loglevel; char bind_address[MAX_IPV4_ADDR_LEN]; - char consul_address[MAX_IPV4_ADDR_LEN]; + char consul_agent_host[MAX_IPV4_ADDR_LEN]; uuid_t bin_uuid; size_t nr_worker_threads; int is_assigned_to_db; diff --git a/src/swarmkv.c b/src/swarmkv.c index 2158927..ab8473f 100644 --- a/src/swarmkv.c +++ b/src/swarmkv.c @@ -1176,7 +1176,7 @@ struct swarmkv *swarmkv_open(struct swarmkv_options* opts, const char * db_name, if(opts->dryrun) { } - node_init(&db->self, opts->bind_address, opts->p2p_port); + node_init(&db->self, opts->bind_address, opts->cluster_port); db->mod_monitor=swarmkv_monitor_new(db->opts); db->mod_keyspace=swarmkv_keyspace_new(db->opts, db_name, db->logger, err); @@ -1187,7 +1187,7 @@ struct swarmkv *swarmkv_open(struct swarmkv_options* opts, const char * db_name, swarmkv_keyspace_set_exec_cmd_func(db->mod_keyspace, __exec_cmd, db); swarmkv_keyspace_set_monitor_handle(db->mod_keyspace, db->mod_monitor); - //Note: if the p2p_port is 0, swarmkv_net_new updates db->self.p2p_port. + //Note: if the cluster_port is 0, swarmkv_net_new updates db->self.cluster_port. db->net=swarmkv_net_new(opts, &db->self, db->logger, err); if(*err) { diff --git a/src/swarmkv_api.c b/src/swarmkv_api.c index 36d70e6..bd7e24e 100644 --- a/src/swarmkv_api.c +++ b/src/swarmkv_api.c @@ -24,13 +24,13 @@ struct swarmkv_options* swarmkv_options_new(void) { struct swarmkv_options *opts=ALLOC(struct swarmkv_options, 1); opts->consul_port=8500; - opts->p2p_port=5210; + opts->cluster_port=5210; opts->health_check_port=0; opts->loglevel=0; opts->p2p_timeout_us=500*1000;//Default 500ms opts->sync_interval_us=10*1000; //Default 10ms strcpy(opts->bind_address, "127.0.0.1"); - strcpy(opts->consul_address, "127.0.0.1"); + strcpy(opts->consul_agent_host, "127.0.0.1"); uuid_generate(opts->bin_uuid); opts->nr_worker_threads=1; opts->is_assigned_to_db=0; @@ -49,9 +49,9 @@ void swarmkv_options_free(struct swarmkv_options *opt) } return; } -int swarmkv_options_set_p2p_port(struct swarmkv_options *opts, unsigned int p2p_port) +int swarmkv_options_set_cluster_port(struct swarmkv_options *opts, unsigned int cluster_port) { - opts->p2p_port=p2p_port; + opts->cluster_port=cluster_port; return 0; } int swarmkv_options_set_health_check_port(struct swarmkv_options *opts, unsigned int health_check_port) @@ -59,7 +59,7 @@ int swarmkv_options_set_health_check_port(struct swarmkv_options *opts, unsigned opts->health_check_port=health_check_port; return 0; } -int swarmkv_options_set_p2p_timeout_us(struct swarmkv_options *opts, unsigned int timeout_ms) +int swarmkv_options_set_cluster_timeout_us(struct swarmkv_options *opts, unsigned int timeout_ms) { opts->p2p_timeout_us=timeout_ms; return 0; @@ -87,7 +87,7 @@ int swarmkv_options_set_disable_run_for_leader(struct swarmkv_options *opts) } int swarmkv_options_set_consul_host(struct swarmkv_options *opts, const char* ip_addr) { - strncpy(opts->consul_address, ip_addr, sizeof(opts->consul_address)); + strncpy(opts->consul_agent_host, ip_addr, sizeof(opts->consul_agent_host)); return 0; } int swarmkv_options_set_consul_port(struct swarmkv_options *opts, unsigned int consul_port) @@ -98,7 +98,7 @@ int swarmkv_options_set_consul_port(struct swarmkv_options *opts, unsigned int c int swarmkv_options_set_dryrun(struct swarmkv_options *opts) { opts->dryrun=1; - opts->p2p_port=15210;//overrides default p2p port + opts->cluster_port=15210;//overrides default p2p port return 0; } int swarmkv_options_set_worker_thread_number(struct swarmkv_options *opts, size_t nr_worker_threads) diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c index 3c0c8ff..ae00534 100644 --- a/src/swarmkv_keyspace.c +++ b/src/swarmkv_keyspace.c @@ -150,7 +150,7 @@ static void crdt_del_on_succ(void* result, void* user) // assert(reply->integer==1); if(reply->type==SWARMKV_REPLY_INTEGER && reply->integer !=1) { -// printf("crdt del %s at %s:%u failed.\n", ctx->key, ctx->peer.ip_addr, ctx->peer.p2p_port); +// printf("crdt del %s at %s:%u failed.\n", ctx->key, ctx->peer.ip_addr, ctx->peer.cluster_port); } crdt_del_ctx_free(ctx); return; @@ -193,102 +193,17 @@ struct slot_runtime int I_am_owner; pthread_mutex_t mutex; }; -struct http_req_context -{ - char symbol[128]; - struct evhttp_connection *conn; - struct log_handle *logger; - struct future *f; -}; -void http_req_error_cb(enum evhttp_request_error error, void *arg) -{ - return; -} -void http_request_callback(struct evhttp_request *req, void *arg) -{ - struct http_req_context *ctx=(struct http_req_context *)arg; - int resp_code=0; - - struct promise *p=future_to_promise(ctx->f); - if(!req || !evhttp_request_get_response_code(req)) - { - int errcode = EVUTIL_SOCKET_ERROR(); - log_fatal(ctx->logger, MODULE_SWARMKV_KEYSPACE, "%s failed: %s", - ctx->symbol, - evutil_socket_error_to_string(errcode)); - promise_failed(p, FUTURE_ERROR_EXCEPTION, "socket error"); - } - else - { - resp_code=evhttp_request_get_response_code(req); - } - - if(resp_code!=200) - { - char what[256]; - snprintf(what, sizeof(what), "resp_code=%d", resp_code); - promise_failed(p, FUTURE_ERROR_EXCEPTION, what); - } - else - { - promise_success(p, req); - } - evhttp_connection_free(ctx->conn); - future_destroy(ctx->f); - ctx->f=NULL; - free(ctx); - return; -} -void http_get_async(const char *symbol, node_t self, const char *url, const char *host, unsigned short port, struct event_base *evbase, struct future *f, struct log_handle *logger) -{ - struct http_req_context *ctx=ALLOC(struct http_req_context, 1); - strncpy(ctx->symbol, symbol, sizeof(ctx->symbol)); - ctx->logger=logger; - ctx->f=f; - ctx->conn=evhttp_connection_base_new(evbase, NULL, host, port); - struct evhttp_request *request=evhttp_request_new(http_request_callback, ctx); - evhttp_request_set_error_cb(request, http_req_error_cb); - - struct evkeyvalq *output_headers = evhttp_request_get_output_headers(request); - evhttp_connection_set_timeout(ctx->conn, 1800);//set to 30min for blocking query. - - char ip[16]={}; - unsigned int p2p_port=0; - node_parse(&self, &p2p_port, ip, sizeof(ip)); - evhttp_add_header(output_headers, "Host", ip); - evhttp_add_header(output_headers, "Connection", "close"); - - evhttp_make_request(ctx->conn, request, EVHTTP_REQ_GET, url); -} -struct consul_task -{ - char task_name[SWARMKV_SYMBOL_MAX]; - struct evhttp_connection* evhttpconn; //tcp connection for the request - UT_hash_handle hh; -}; void http_connection_close_callback(struct evhttp_connection *conn, void *arg) { // struct consul_task *task=(struct consul_task *)arg; //printf("http connection %s %p closed\n", task->task_name, conn); return; } -struct consul_task *consul_task_new(const char *task_name, struct event_base *evbase, const char *host_name, unsigned short port) -{ - struct consul_task *task=ALLOC(struct consul_task, 1); - strncpy(task->task_name, task_name, sizeof(task->task_name)); - task->evhttpconn=evhttp_connection_base_new(evbase, NULL, host_name, port); - evhttp_connection_set_closecb(task->evhttpconn, http_connection_close_callback, task); - return task; -} -void consul_task_free(struct consul_task *task) -{ - evhttp_connection_free(task->evhttpconn); - task->evhttpconn=NULL; - free(task); -} + struct swarmkv_keyspace { struct swarmkv_module module; + const struct swarmkv_options *opts; char db_name[SWARMKV_SYMBOL_MAX]; pthread_t thr; int readable_tid; @@ -313,8 +228,8 @@ struct swarmkv_keyspace char consul_session_id[SWARMKV_SYMBOL_MAX]; struct evhttp *http_server; - struct event_base * evbase; - struct consul_task *consul_task_hash; + struct event_base *evbase; + struct consul_client *consul_client; //reference, no need free exec_cmd_func *exec_cmd_func; @@ -322,16 +237,118 @@ struct swarmkv_keyspace struct swarmkv_module *mod_monitor; void *logger; }; -struct consul_task *consul_task_getnxset(struct swarmkv_keyspace *ks, const char *task_name) +void http_request_on_close(struct evhttp_connection *conn, void *arg) +{ + return; +} + +struct consul_client; +struct consul_request { - struct consul_task *task=NULL; - HASH_FIND(hh, ks->consul_task_hash, task_name, strlen(task_name), task); - if(!task) + char symbol[SWARMKV_SYMBOL_MAX]; + long long request_id; + struct evhttp_connection *evhttpconn; //tcp connection for the request + struct evhttp_request *evhttpreq; + struct future *f; + struct consul_client *ref_client; + UT_hash_handle hh; +}; +struct consul_client +{ + unsigned short consul_agent_port; + char consul_agent_host[32]; + long long req_id_generator; + struct event_base *evbase; //reference of keyspace evbase + struct log_handle *logger; //reference of logger + struct consul_request *request_table; +}; +void consul_request_callback(struct evhttp_request *evhttpreq, void *arg); +struct consul_request *consul_request_new(struct consul_client *client, const char *symbol, struct future *f) +{ + struct consul_request *req=ALLOC(struct consul_request, 1); + + strncpy(req->symbol, symbol, sizeof(req->symbol)); + req->evhttpconn=evhttp_connection_base_new(client->evbase, NULL, client->consul_agent_host, client->consul_agent_port); + evhttp_connection_set_timeout(req->evhttpconn, 1800);//set to 30min for blocking query. + req->evhttpreq=evhttp_request_new(consul_request_callback, req); + req->f=f; + req->ref_client=client; + req->request_id=client->req_id_generator++; + + HASH_ADD(hh, client->request_table, request_id, sizeof(req->request_id), req); + + return req; +} +void consul_request_free(struct consul_request *req) +{ + evhttp_connection_free(req->evhttpconn); + //if(req->evhttpreq) evhttp_request_free(req->evhttpreq); + future_destroy(req->f); + HASH_DEL(req->ref_client->request_table, req); + free(req); +} +void consul_request_callback(struct evhttp_request *evhttpreq, void *arg) +{ + struct consul_request *req=(struct consul_request *)arg; + int resp_code=0; + + struct promise *p=future_to_promise(req->f); + if(!evhttpreq || !evhttp_request_get_response_code(evhttpreq)) + { + int errcode = EVUTIL_SOCKET_ERROR(); + log_fatal(req->ref_client->logger, MODULE_SWARMKV_KEYSPACE, "%s failed: %s", + req->symbol, + evutil_socket_error_to_string(errcode)); + promise_failed(p, FUTURE_ERROR_EXCEPTION, "socket error"); + } + else + { + resp_code=evhttp_request_get_response_code(evhttpreq); + } + + if(resp_code!=200) + { + char what[256]; + snprintf(what, sizeof(what), "resp_code=%d", resp_code); + promise_failed(p, FUTURE_ERROR_EXCEPTION, what); + } + else { - task=consul_task_new(task_name, ks->evbase, ks->consul_agent_host, (unsigned short)ks->consul_agent_port); - HASH_ADD(hh, ks->consul_task_hash, task_name, strlen(task_name), task); + promise_success(p, evhttpreq); } - return task; + //The request should be freed automatically after the http_request_done has finished executing. + req->evhttpreq=NULL; + consul_request_free(req); + return; +} +void consul_request_make(struct consul_request *req, enum evhttp_cmd_type cmd, const char *url) +{ + struct evkeyvalq *output_headers = evhttp_request_get_output_headers(req->evhttpreq); + + evhttp_connection_set_closecb(req->evhttpconn, http_request_on_close, req); + evhttp_add_header(output_headers, "Host", req->ref_client->consul_agent_host); + evhttp_add_header(output_headers, "Connection", "close"); + + evhttp_make_request(req->evhttpconn, req->evhttpreq, cmd, url); +} + +struct consul_client *consul_client_new(const char *host, unsigned short port, struct event_base *evbase, struct log_handle *logger) +{ + struct consul_client *client=ALLOC(struct consul_client, 1); + client->consul_agent_port=port; + strncpy(client->consul_agent_host, host, sizeof(client->consul_agent_host)); + client->logger=logger; + client->evbase=evbase; + return client; +} +void consul_client_free(struct consul_client *client) +{ + struct consul_request *req=NULL, *tmp=NULL; + HASH_ITER(hh, client->request_table, req, tmp) + { + consul_request_free(req); + } + free(client); } void consul_acquire_session_lock_async(struct swarmkv_keyspace* ks); void consul_watch_leadership_changes_async(struct swarmkv_keyspace* ks); @@ -341,22 +358,25 @@ void keyspace_active_expire_cycle(struct swarmkv_keyspace *ks); void health_check_cb(struct evhttp_request *request, void *arg) { + struct swarmkv_keyspace *ks=(struct swarmkv_keyspace *)arg; struct evbuffer* evbuf_reply = evbuffer_new(); struct evkeyvalq *output_headers = evhttp_request_get_output_headers(request); //HTTP header - evhttp_add_header(output_headers, "Server", "swarmkv v2.0"); + evhttp_add_header(output_headers, "Server", "Swarmkv v3.0"); evhttp_add_header(output_headers, "Content-Type", "text/plain; charset=UTF-8");//utf8 evhttp_add_header(output_headers, "Connection", "close"); - evbuffer_add_printf(evbuf_reply, "SwarmKV Cluster"); + evbuffer_add_printf(evbuf_reply, "SwarmKV node of cluster %s is running", ks->db_name); evhttp_send_reply(request, HTTP_OK, "OK", evbuf_reply); evbuffer_free(evbuf_reply); return; } -struct evhttp* http_server_new(struct event_base* evbase, unsigned int *port_listen, void* logger) +struct evhttp* http_server_new(struct event_base* evbase, unsigned int *port_listen, void *arg) { + struct swarmkv_keyspace *ks=(struct swarmkv_keyspace *)arg; + void *logger=ks->logger; struct evhttp* http = evhttp_new(evbase); struct evhttp_bound_socket *bound_socket=NULL; bound_socket=evhttp_bind_socket_with_handle(http, "0.0.0.0", *port_listen); @@ -381,7 +401,7 @@ struct evhttp* http_server_new(struct event_base* evbase, unsigned int *port_lis evhttp_set_timeout(http, http_option_timeout); evhttp_set_allowed_methods( http , EVHTTP_REQ_GET); //Set a callback for a specified URI - evhttp_set_cb(http, "/health", health_check_cb, NULL); + evhttp_set_cb(http, "/health", health_check_cb, ks); return http; } @@ -414,15 +434,15 @@ int consul_service_register(struct swarmkv_keyspace* ks) int ret=0; char health_check_url[SWARMKV_URL_MAX]=""; char uuid_str[37]; - unsigned int p2p_port=0; + unsigned int cluster_port=0; char ip[16]; - node_parse(&ks->self, &p2p_port, ip, sizeof(ip)); + node_parse(&ks->self, &cluster_port, ip, sizeof(ip)); cJSON* service=cJSON_CreateObject(); cJSON_AddStringToObject(service, "Name", ks->db_name); - snprintf(ks->consul_service_id, sizeof(ks->consul_service_id), "%.128s-%d", ks->db_name, p2p_port); + snprintf(ks->consul_service_id, sizeof(ks->consul_service_id), "%.128s-%d", ks->db_name, cluster_port); cJSON_AddStringToObject(service, "ID", ks->consul_service_id); cJSON_AddStringToObject(service, "Address", ip); - cJSON_AddNumberToObject(service, "Port", p2p_port); + cJSON_AddNumberToObject(service, "Port", cluster_port); cJSON* meta=cJSON_CreateObject(); uuid_unparse_lower(ks->uuid, uuid_str); @@ -433,7 +453,7 @@ int consul_service_register(struct swarmkv_keyspace* ks) cJSON* check=cJSON_CreateObject(); cJSON_AddStringToObject(check, "Name", ks->db_name); - snprintf(ks->consul_check_id, sizeof(ks->consul_check_id), "%.128s-%u", ks->db_name, p2p_port); + snprintf(ks->consul_check_id, sizeof(ks->consul_check_id), "%.128s-%u", ks->db_name, cluster_port); cJSON_AddStringToObject(check, "CheckID", ks->consul_check_id); snprintf(health_check_url, sizeof(health_check_url), "http://%s:%u/health", ip, ks->health_check_port); cJSON_AddStringToObject(check, "HTTP", health_check_url); @@ -461,24 +481,43 @@ int consul_service_register(struct swarmkv_keyspace* ks) cJSON_Delete(service); return ret; } +static void __ks_dummy_event_handler(evutil_socket_t fd, short what, void * arg) +{ + struct swarmkv_keyspace *ks=(struct swarmkv_keyspace *)arg; + keyspace_active_expire_cycle(ks); + return; +} -void __consul_session_create_cb(struct evhttp_request *req, void *arg) +void *swarmkv_keyspace_thread(void *arg) { + struct swarmkv_keyspace *ks=(struct swarmkv_keyspace *)arg; + char thread_name[16]; + snprintf(thread_name, sizeof(thread_name), "swarmkv-ks"); + prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL); + + struct event * ev = event_new(ks->evbase, -1, EV_PERSIST, __ks_dummy_event_handler, ks); + struct timeval timer_delay = {2, 0}; + evtimer_add(ev, &timer_delay); + + consul_watch_slots_changes_async(ks); + consul_watch_nodes_changes_async(ks); + + event_base_dispatch(ks->evbase); + event_del(ev); + event_free(ev); + + return NULL; +} +void consul_session_create_on_success(void *result, void *arg) +{ + struct evhttp_request *req=(struct evhttp_request *)result; + struct swarmkv_keyspace* ks = (struct swarmkv_keyspace *)arg; int resp_code=0; cJSON* session_id=NULL, *session_create_response=NULL; - struct swarmkv_keyspace* ks = (struct swarmkv_keyspace *)arg; + int is_running_for_leader=0; - if(!req || !evhttp_request_get_response_code(req)) - { - int errcode = EVUTIL_SOCKET_ERROR(); - log_fatal(ks->logger, MODULE_SWARMKV_KEYSPACE, "session create failed: socket error %s", - evutil_socket_error_to_string(errcode)); - } - else - { - resp_code=evhttp_request_get_response_code(req); - - } + + resp_code=evhttp_request_get_response_code(req); if(resp_code==200) { struct evbuffer *buf=NULL; @@ -507,34 +546,12 @@ void __consul_session_create_cb(struct evhttp_request *req, void *arg) } return; } -static void __ks_dummy_event_handler(evutil_socket_t fd, short what, void * arg) +void consul_session_create_on_on_fail(enum e_future_error err, const char * what, void * arg) { - struct swarmkv_keyspace *ks=(struct swarmkv_keyspace *)arg; - keyspace_active_expire_cycle(ks); + struct swarmkv_keyspace *ks=(struct swarmkv_keyspace*)arg; + log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "consul session create failed: %s.", what); return; } - -void *swarmkv_keyspace_thread(void *arg) -{ - struct swarmkv_keyspace *ks=(struct swarmkv_keyspace *)arg; - char thread_name[16]; - snprintf(thread_name, sizeof(thread_name), "swarmkv-ks"); - prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL); - - struct event * ev = event_new(ks->evbase, -1, EV_PERSIST, __ks_dummy_event_handler, ks); - struct timeval timer_delay = {2, 0}; - evtimer_add(ev, &timer_delay); - - consul_watch_slots_changes_async(ks); - consul_watch_nodes_changes_async(ks); - - event_base_dispatch(ks->evbase); - event_del(ev); - event_free(ev); - - return NULL; -} - void consul_create_session_async(struct swarmkv_keyspace* ks) { cJSON* session=NULL, *service_check_array=NULL, *service_check=NULL; @@ -557,50 +574,40 @@ void consul_create_session_async(struct swarmkv_keyspace* ks) char url[SWARMKV_URL_MAX]=""; snprintf(url, sizeof(url), "/v1/session/create"); - struct consul_task *task=consul_task_getnxset(ks, __func__); + struct future *f=future_create(__func__, consul_session_create_on_success, consul_session_create_on_on_fail, ks); + struct consul_request *req=consul_request_new(ks->consul_client, __func__, f); - struct evhttp_request* request=evhttp_request_new(__consul_session_create_cb, ks); - struct evhttp_connection* conn=task->evhttpconn; - evhttp_connection_set_timeout(conn, 2); + + evhttp_connection_set_timeout(req->evhttpconn, 2); - struct evkeyvalq *output_headers = evhttp_request_get_output_headers(request); + struct evkeyvalq *output_headers = evhttp_request_get_output_headers(req->evhttpreq); - char ip[16]={}; - unsigned int port=0; - node_parse(&ks->self, &port, ip, sizeof(ip)); - evhttp_add_header(output_headers, "Host", ip); + evhttp_add_header(output_headers, "Host", ks->consul_agent_host); evhttp_add_header(output_headers, "Connection", "close"); - struct evbuffer* output_buffer = evhttp_request_get_output_buffer(request); + struct evbuffer *output_buffer = evhttp_request_get_output_buffer(req->evhttpreq); evbuffer_add(output_buffer, payload, strlen(payload)); char number[64]; snprintf(number, sizeof(number), "%zu", strlen(payload)); evhttp_add_header(output_headers, "Content-Type", "application/json"); evhttp_add_header(output_headers, "Content-Length", number); - evhttp_make_request(conn, request, EVHTTP_REQ_PUT, url); + consul_request_make(req, EVHTTP_REQ_PUT, url); free(payload); return; } -void __consul_session_check_cb(struct evhttp_request *req, void *arg) +void consul_session_check_on_success(void *result, void *arg) { int resp_code=0; cJSON *session_check=NULL; int session_check_array_size=0; - struct swarmkv_keyspace* ks = (struct swarmkv_keyspace *)arg; + struct swarmkv_keyspace *ks = (struct swarmkv_keyspace *)arg; int is_running_for_leader=0; - if(!req || !evhttp_request_get_response_code(req)) - { - int errcode = EVUTIL_SOCKET_ERROR(); - log_fatal(ks->logger, MODULE_SWARMKV_KEYSPACE, "session check failed: socket error %s", - evutil_socket_error_to_string(errcode)); - } - else - { - resp_code=evhttp_request_get_response_code(req); - - } + struct evhttp_request *req=(struct evhttp_request *)result; + + resp_code=evhttp_request_get_response_code(req); + if(resp_code==200) { struct evbuffer *buf=NULL; @@ -636,102 +643,87 @@ void __consul_session_check_cb(struct evhttp_request *req, void *arg) } return; } - +void consul_session_check_on_fail(enum e_future_error err, const char * what, void * arg) +{ + struct swarmkv_keyspace *ks=(struct swarmkv_keyspace*)arg; + log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "consul session check failed: %s.", what); + return; +} void consul_session_check_async(struct swarmkv_keyspace* ks) { char url[SWARMKV_URL_MAX]=""; snprintf(url, sizeof(url), "/v1/session/info/%s", ks->consul_session_id); - struct consul_task *task=consul_task_getnxset(ks, __func__); - - struct evhttp_request* request=evhttp_request_new(__consul_session_check_cb, ks); - struct evhttp_connection* conn=task->evhttpconn; - evhttp_connection_set_timeout(conn, 2); - - struct evkeyvalq *output_headers = evhttp_request_get_output_headers(request); - - char ip[16]={}; - unsigned int port=0; - node_parse(&ks->self, &port, ip, sizeof(ip)); - evhttp_add_header(output_headers, "Host", ip); - evhttp_add_header(output_headers, "Connection", "close"); - evhttp_make_request(conn, request, EVHTTP_REQ_GET, url); + struct future *f=future_create(__func__, consul_session_check_on_success, consul_session_check_on_fail, ks); + struct consul_request *req=consul_request_new(ks->consul_client, __func__, f); + consul_request_make(req, EVHTTP_REQ_GET, url); return; } -void __acquire_session_lock_cb(struct evhttp_request *req, void *arg) +void acquire_session_lock_on_success(void* result, void *arg) { + struct evhttp_request *req=(struct evhttp_request *)result; struct swarmkv_keyspace* ks=(struct swarmkv_keyspace*)arg; int resp_code=0; - if(!req || !evhttp_request_get_response_code(req)) - { - int errcode = EVUTIL_SOCKET_ERROR(); - log_fatal(ks->logger, MODULE_SWARMKV_KEYSPACE, "run for leader failed: %s", - evutil_socket_error_to_string(errcode)); - } - else + + resp_code=evhttp_request_get_response_code(req); + if(resp_code==200) { - resp_code=evhttp_request_get_response_code(req); - if(resp_code==200) - { - struct evbuffer *buf=NULL; - buf=evhttp_request_get_input_buffer(req); - size_t len = evbuffer_get_length(buf); - sds resp_body_buff=sdsnewlen(SDS_NOINIT, len); - evbuffer_copyout(buf, resp_body_buff, len); + struct evbuffer *buf=NULL; + buf=evhttp_request_get_input_buffer(req); + size_t len = evbuffer_get_length(buf); + sds resp_body_buff=sdsnewlen(SDS_NOINIT, len); + evbuffer_copyout(buf, resp_body_buff, len); - if(0==strncmp(resp_body_buff, "true", strlen("true"))) - { - ks->is_leader=1; - log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "become cluster leader."); - } - else - { - log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "lose the election."); - ks->is_leader=0; - } - sdsfree(resp_body_buff); + if(0==strncmp(resp_body_buff, "true", strlen("true"))) + { + ks->is_leader=1; + log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "become cluster leader."); } else { - log_fatal(ks->logger, MODULE_SWARMKV_KEYSPACE, "run for leader failed: HTTP Code %d.", resp_code); + log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "lose the election."); + ks->is_leader=0; } + sdsfree(resp_body_buff); + } + else + { + log_fatal(ks->logger, MODULE_SWARMKV_KEYSPACE, "run for leader failed: HTTP Code %d.", resp_code); } consul_watch_leadership_changes_async(ks); } - +void acquire_session_lock_on_fail(enum e_future_error err, const char * what, void * arg) +{ + struct swarmkv_keyspace *ks=(struct swarmkv_keyspace*)arg; + log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "consul session lock aquire failed: %s.", what); + consul_watch_leadership_changes_async(ks); + return; +} void consul_acquire_session_lock_async(struct swarmkv_keyspace* ks) { sds req_body=node_print_json(&ks->self, ks->uuid); char url[SWARMKV_URL_MAX]; - struct consul_task *task=consul_task_getnxset(ks, __func__); - - //run for leader snprintf(url, sizeof(url), "/v1/kv/swarmkv/%s/lead?acquire=%s", ks->db_name, ks->consul_session_id); - struct evhttp_request* request=evhttp_request_new(__acquire_session_lock_cb, ks); - struct evhttp_connection* conn=task->evhttpconn; - evhttp_connection_set_timeout(conn, 2); + struct future *f=future_create(__func__, acquire_session_lock_on_success, acquire_session_lock_on_fail, ks); + struct consul_request *req=consul_request_new(ks->consul_client, __func__, f); + + evhttp_connection_set_timeout(req->evhttpconn, 2); - struct evbuffer* output_buffer = evhttp_request_get_output_buffer(request); + struct evbuffer* output_buffer = evhttp_request_get_output_buffer(req->evhttpreq); evbuffer_add(output_buffer, req_body, sdslen(req_body)); char number[64]; - struct evkeyvalq *output_headers = evhttp_request_get_output_headers(request); + struct evkeyvalq *output_headers = evhttp_request_get_output_headers(req->evhttpreq); snprintf(number, sizeof(number), "%zu", sdslen(req_body)); - char ip[16]={}; - unsigned int port=0; - node_parse(&ks->self, &port, ip, sizeof(ip)); - evhttp_add_header(output_headers, "Host", ip); - evhttp_add_header(output_headers, "Connection", "close"); - evhttp_add_header(output_headers, "Content-Type", "application/json"); evhttp_add_header(output_headers, "Content-Length", number); - evhttp_make_request(conn, request, EVHTTP_REQ_PUT, url); + consul_request_make(req, EVHTTP_REQ_PUT, url); sdsfree(req_body); } @@ -813,7 +805,8 @@ void consul_watch_leadership_changes_async(struct swarmkv_keyspace* ks) snprintf(url, sizeof(url), "/v1/kv/swarmkv/%s/lead?index=%d&wait=10s", ks->db_name, ks->consul_lead_key_modify_idx); struct future *f=future_create(__func__, watch_leadership_changes_on_success, watch_leadership_changes_on_fail, ks); - http_get_async(__func__, ks->self, url, ks->consul_agent_host, ks->consul_agent_port, ks->evbase, f, ks->logger); + struct consul_request *req=consul_request_new(ks->consul_client, __func__, f); + consul_request_make(req, EVHTTP_REQ_GET, url); return; } void watch_slots_changes_on_success(void *result, void *arg) @@ -861,7 +854,7 @@ void watch_slots_changes_on_success(void *result, void *arg) if(slot_rt->state!=STATE_STABLE) { // log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "slot %d is reset to STABLE state, owner %s:%u.", -// slot->slot_id, owner->addr.ip_addr, owner->addr.p2p_port); +// slot->slot_id, owner->addr.ip_addr, owner->addr.cluster_port); memset(&(slot_rt->rebalancing_peer), 0, sizeof(slot_rt->rebalancing_peer)); } slot_rt->state=STATE_STABLE; @@ -892,24 +885,17 @@ void consul_watch_slots_changes_async(struct swarmkv_keyspace *ks) char url[SWARMKV_URL_MAX]=""; snprintf(url, sizeof(url), "/v1/kv/swarmkv/%s/slots?index=%d", ks->db_name, ks->consul_slots_modify_idx); struct future *f=future_create(__func__, watch_slots_changes_on_success, watch_slots_changes_on_fail, ks); - http_get_async(__func__, ks->self, url, ks->consul_agent_host, ks->consul_agent_port, ks->evbase, f, ks->logger); + struct consul_request *req=consul_request_new(ks->consul_client, __func__, f); + consul_request_make(req, EVHTTP_REQ_GET, url); return; } -void __propagate_slot_table_cb(struct evhttp_request *req, void *arg) +void propagate_slot_table_on_success(void *result, void *arg) { - struct swarmkv_keyspace* ks=(struct swarmkv_keyspace*)arg; + struct evhttp_request *req=(struct evhttp_request *)result; + struct swarmkv_keyspace *ks=(struct swarmkv_keyspace*)arg; int resp_code=0; - if(!req || !evhttp_request_get_response_code(req)) - { - int errcode = EVUTIL_SOCKET_ERROR(); - log_fatal(ks->logger, MODULE_SWARMKV_KEYSPACE, "key slots table propagate failed: socket error %s", - evutil_socket_error_to_string(errcode)); - } - else - { - resp_code=evhttp_request_get_response_code(req); - - } + + resp_code=evhttp_request_get_response_code(req); if(resp_code==200) { log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "key slots table propagate success."); @@ -920,35 +906,35 @@ void __propagate_slot_table_cb(struct evhttp_request *req, void *arg) } return; } +void propagate_slot_table_on_fail(enum e_future_error err, const char * what, void * arg) +{ + struct swarmkv_keyspace *ks=(struct swarmkv_keyspace*)arg; + log_fatal(ks->logger, MODULE_SWARMKV_KEYSPACE, "key slots table propagate failed: %s", + what); +} void propagate_slot_table_async(struct swarmkv_keyspace *ks, struct key_slot new_slot[], size_t slot_num) { - struct consul_task *task=consul_task_getnxset(ks, __func__); - sds new_slots_json=NULL; new_slots_json=keyslots2json(new_slot, sizeof(struct key_slot), 0, KEYSPACE_SLOT_NUM); char url[SWARMKV_URL_MAX]=""; snprintf(url, sizeof(url), "/v1/kv/swarmkv/%s/slots", ks->db_name); - struct evhttp_request *request=evhttp_request_new(__propagate_slot_table_cb, ks); - struct evhttp_connection *conn=task->evhttpconn; - evhttp_connection_set_timeout(conn, 20); - - struct evkeyvalq *output_headers = evhttp_request_get_output_headers(request); + struct future *f=future_create(__func__, propagate_slot_table_on_success, propagate_slot_table_on_fail, ks); + struct consul_request *req=consul_request_new(ks->consul_client, __func__, f); - evhttp_add_header(output_headers, "Host", ks->consul_agent_host); - evhttp_add_header(output_headers, "Connection", "close"); + evhttp_connection_set_timeout(req->evhttpconn, 20); - struct evbuffer* output_buffer = evhttp_request_get_output_buffer(request); + struct evkeyvalq *output_headers = evhttp_request_get_output_headers(req->evhttpreq); + + struct evbuffer* output_buffer = evhttp_request_get_output_buffer(req->evhttpreq); evbuffer_add(output_buffer, new_slots_json, sdslen(new_slots_json)); char number[64]; snprintf(number, sizeof(number), "%zu", sdslen(new_slots_json)); evhttp_add_header(output_headers, "Content-Type", "application/json"); evhttp_add_header(output_headers, "Content-Length", number); - evhttp_make_request(conn, request, EVHTTP_REQ_PUT, url); - + consul_request_make(req, EVHTTP_REQ_PUT, url); sdsfree(new_slots_json); new_slots_json=NULL; - } void remove_failed_nodes_from_global_slot_table(struct swarmkv_keyspace *ks, node_t *health_nodes, size_t n_node) { @@ -1133,8 +1119,8 @@ void consul_watch_nodes_changes_async(struct swarmkv_keyspace* ks) snprintf(url, sizeof(url), "/v1/health/service/%s?passing=1&index=%d&wait=10s", ks->db_name, ks->consul_nodes_modify_idx); struct future *f=future_create(__func__, watch_nodes_changes_on_success, watch_nodes_changes_on_fail, ks); - http_get_async(__func__, ks->self, url, ks->consul_agent_host, ks->consul_agent_port, ks->evbase, f, ks->logger); - + struct consul_request *req=consul_request_new(ks->consul_client, __func__, f); + consul_request_make(req, EVHTTP_REQ_GET, url); } int keyslots_init(struct swarmkv_keyspace *ks) @@ -1214,17 +1200,18 @@ struct swarmkv_module* swarmkv_keyspace_new(const struct swarmkv_options *opts, ks->module.lock=keyspace_lock; ks->module.unlock=keyspace_unlock; strncpy(ks->db_name, db_name, sizeof(ks->db_name)); - node_init(&ks->self, opts->bind_address, opts->p2p_port); + node_init(&ks->self, opts->bind_address, opts->cluster_port); uuid_copy(ks->uuid, opts->bin_uuid); - + ks->opts=opts; ks->health_check_port=opts->health_check_port; ks->consul_agent_port=opts->consul_port; - strncpy(ks->consul_agent_host, opts->consul_address, sizeof(ks->consul_agent_host)); + strncpy(ks->consul_agent_host, opts->consul_agent_host, sizeof(ks->consul_agent_host)); ks->evbase = event_base_new(); ks->logger = logger; ks->dryrun= opts->dryrun; + ks->consul_client=consul_client_new(ks->opts->consul_agent_host, ks->opts->consul_port, ks->evbase, ks->logger); int ret=-1; ret=keyslots_init(ks); @@ -1242,7 +1229,7 @@ struct swarmkv_module* swarmkv_keyspace_new(const struct swarmkv_options *opts, log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "work on dry run mode."); return &(ks->module); } - ks->http_server=http_server_new(ks->evbase, &(ks->health_check_port), ks->logger); + ks->http_server=http_server_new(ks->evbase, &(ks->health_check_port), ks); if(!ks->http_server) { asprintf(err, "health check HTTP server start failed."); @@ -1295,12 +1282,8 @@ void swarmkv_keyspace_free(struct swarmkv_module* module) } pthread_mutex_destroy(&slot_rt->mutex); } - struct consul_task *task=NULL, *tmp_task=NULL; - HASH_ITER(hh, ks->consul_task_hash, task, tmp_task) - { - HASH_DELETE(hh, ks->consul_task_hash, task); - consul_task_free(task); - } + consul_client_free(ks->consul_client); + ks->consul_client=NULL; event_base_free(ks->evbase); free(ks); return; diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c index 1b33c71..3dedef3 100644 --- a/src/swarmkv_store.c +++ b/src/swarmkv_store.c @@ -580,7 +580,7 @@ struct swarmkv_module *swarmkv_store_new(const struct swarmkv_options *opts, exe pthread_rwlock_init(&(store->rwlock[i]), NULL); } uuid_copy(store->my_uuid, node_uuid); - node_init(&store->self, opts->bind_address, opts->p2p_port); + node_init(&store->self, opts->bind_address, opts->cluster_port); store->exec_cmd=send_cmd; store->exec_cmd_handle=handle_send_cmd; diff --git a/test/swarmkv_cli_test.cpp b/test/swarmkv_cli_test.cpp index ad1c65d..f932f56 100644 --- a/test/swarmkv_cli_test.cpp +++ b/test/swarmkv_cli_test.cpp @@ -15,7 +15,7 @@ struct swarmkv_cli_node_addr { char ip_addr[INET6_ADDRSTRLEN]; - unsigned int p2p_port; + unsigned int cluster_port; }; void wait_for_check() @@ -38,14 +38,14 @@ void swarmkv_cli_set_db(const char *cluster_name) asprintf(&(g_cli_system.cluster_name), "%s", cluster_name); } -void swarmkv_cli_attach(const char *ip_addr, unsigned int p2p_port) +void swarmkv_cli_attach(const char *ip_addr, unsigned int cluster_port) { g_cli_system.attached=1; if(g_cli_system.attach_target_line) { free(g_cli_system.attach_target_line); } - asprintf(&(g_cli_system.attach_target_line), "--attach %s:%u", ip_addr, p2p_port); + asprintf(&(g_cli_system.attach_target_line), "--attach %s:%u", ip_addr, cluster_port); } void swarmkv_cli_detach() @@ -149,7 +149,7 @@ int swarmkv_cli_get_addr_node(char *result, struct swarmkv_cli_node_addr *node_a p+=3; } int n_read=0; - n_read=sscanf(p, "%[^:]:%u", node_addr[i].ip_addr, &node_addr[i].p2p_port); + n_read=sscanf(p, "%[^:]:%u", node_addr[i].ip_addr, &node_addr[i].cluster_port); if(n_read!=2) { return -1; @@ -172,7 +172,7 @@ protected: swarmkv_cli_create_cluster(cluster_name, "127.0.0.1:5210"); logger=log_handle_create(log_path, 0); struct swarmkv_options* opts=swarmkv_options_new(); - swarmkv_options_set_p2p_port(opts, 5210); + swarmkv_options_set_cluster_port(opts, 5210); swarmkv_options_set_health_check_port(opts, 6210); swarmkv_options_set_logger(opts, logger); db=swarmkv_open(opts, cluster_name, &err); @@ -286,7 +286,7 @@ TEST_F(SwarmkvCliNodes, ReplicaDel) swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), NULL, "KEYSPACE RLIST %s", "xiaolv"); swarmkv_cli_get_addr_node(result, &node_addr); - swarmkv_cli_attach(node_addr.ip_addr, node_addr.p2p_port); + swarmkv_cli_attach(node_addr.ip_addr, node_addr.cluster_port); cmd_exec_arg_expect_integer(reply_arg, 1); swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_integer, "CRDT DEL xiaolv"); cmd_exec_arg_clear(reply_arg); @@ -339,7 +339,7 @@ TEST_F(SwarmkvCliNodes, KeyspaceRdel) swarmkv_cli_get_addr_node(result, &node_addr); cmd_exec_arg_expect_integer(reply_arg, 1); - swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_integer, "KEYSPACE RDEL %s %s:%u", "xiaoqin", node_addr.ip_addr, node_addr.p2p_port); + swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_integer, "KEYSPACE RDEL %s %s:%u", "xiaoqin", node_addr.ip_addr, node_addr.cluster_port); cmd_exec_arg_clear(reply_arg); cmd_exec_arg_expect_NIL(reply_arg); @@ -387,11 +387,11 @@ TEST_F(SwarmkvCliNodes, KeyspaceRadd) swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), NULL, "KEYSPACE RLIST %s", "xiaolan"); swarmkv_cli_get_addr_node(result, &node_addr); - unsigned int p2p_port = (node_addr.p2p_port == 5210) ? 5211: 5210; + unsigned int cluster_port = (node_addr.cluster_port == 5210) ? 5211: 5210; - sprintf(reply_string, "1) %s:%u,2) %s:%u", node_addr.ip_addr, node_addr.p2p_port, node_addr.ip_addr, p2p_port); + sprintf(reply_string, "1) %s:%u,2) %s:%u", node_addr.ip_addr, node_addr.cluster_port, node_addr.ip_addr, cluster_port); cmd_exec_arg_expect_cstring(reply_arg, reply_string); - swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_string, "KEYSPACE RADD %s %s:%d", "xiaolan", node_addr.ip_addr, p2p_port); + swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_string, "KEYSPACE RADD %s %s:%d", "xiaolan", node_addr.ip_addr, cluster_port); cmd_exec_arg_clear(reply_arg); wait_for_check(); @@ -403,7 +403,7 @@ TEST_F(SwarmkvCliNodes, KeyspaceRadd) swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_integer, "CLUSTER SANITY heal"); cmd_exec_arg_clear(reply_arg); - sprintf(reply_string, "1) %s:%d", node_addr.ip_addr, node_addr.p2p_port); + sprintf(reply_string, "1) %s:%d", node_addr.ip_addr, node_addr.cluster_port); cmd_exec_arg_expect_cstring(reply_arg, reply_string); swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_string, "KEYSPACE RLIST %s", "xiaolan"); @@ -543,7 +543,7 @@ protected: logger=log_handle_create(log_path, 0); struct swarmkv_options* opts1=swarmkv_options_new(); - swarmkv_options_set_p2p_port(opts1, 5210); + swarmkv_options_set_cluster_port(opts1, 5210); swarmkv_options_set_health_check_port(opts1, 6210); swarmkv_options_set_logger(opts1, logger); db1=swarmkv_open(opts1, cluster_name, &err); @@ -555,7 +555,7 @@ protected: } struct swarmkv_options* opts2=swarmkv_options_new(); - swarmkv_options_set_p2p_port(opts2, 5211); + swarmkv_options_set_cluster_port(opts2, 5211); swarmkv_options_set_health_check_port(opts2, 6211); swarmkv_options_set_logger(opts2, logger); db2=swarmkv_open(opts2, cluster_name, &err); diff --git a/test/swarmkv_gtest.cpp b/test/swarmkv_gtest.cpp index 34575c7..386ee1e 100644 --- a/test/swarmkv_gtest.cpp +++ b/test/swarmkv_gtest.cpp @@ -38,7 +38,7 @@ protected: swarmkv_cli_create_cluster(cluster_name, "127.0.0.1:5210"); logger=log_handle_create(log_path, 0); struct swarmkv_options* opts=swarmkv_options_new(); - swarmkv_options_set_p2p_port(opts, 5210); + swarmkv_options_set_cluster_port(opts, 5210); swarmkv_options_set_health_check_port(opts, 6210); swarmkv_options_set_logger(opts, logger); db=swarmkv_open(opts, cluster_name, &err); @@ -491,7 +491,7 @@ protected: logger=log_handle_create(log_path, 0); struct swarmkv_options* opts1=swarmkv_options_new(); - swarmkv_options_set_p2p_port(opts1, 5210); + swarmkv_options_set_cluster_port(opts1, 5210); swarmkv_options_set_health_check_port(opts1, 6210); swarmkv_options_set_logger(opts1, logger); db1=swarmkv_open(opts1, cluster_name, &err); @@ -503,7 +503,7 @@ protected: } struct swarmkv_options* opts2=swarmkv_options_new(); - swarmkv_options_set_p2p_port(opts2, 5211); + swarmkv_options_set_cluster_port(opts2, 5211); swarmkv_options_set_health_check_port(opts2, 6211); swarmkv_options_set_logger(opts2, logger); db2=swarmkv_open(opts2, cluster_name, &err); @@ -1259,7 +1259,7 @@ protected: for(i=0; i<TEST_NODE_NUMBER; i++) { opts[i]=swarmkv_options_new(); - swarmkv_options_set_p2p_port(opts[i], 5210+i); + swarmkv_options_set_cluster_port(opts[i], 5210+i); swarmkv_options_set_health_check_port(opts[i], 6210+i); swarmkv_options_set_worker_thread_number(opts[i], 2); swarmkv_options_set_logger(opts[i], logger); diff --git a/test/swarmkv_perf_test.cpp b/test/swarmkv_perf_test.cpp index 8879172..136074a 100644 --- a/test/swarmkv_perf_test.cpp +++ b/test/swarmkv_perf_test.cpp @@ -98,7 +98,7 @@ TEST(Performance, Nthreads) for(i=0; i<NODE_NUMBER; i++) { opts[i]=swarmkv_options_new(); - swarmkv_options_set_p2p_port(opts[i], 5310+i); + swarmkv_options_set_cluster_port(opts[i], 5310+i); swarmkv_options_set_health_check_port(opts[i], 6310+i); swarmkv_options_set_logger(opts[i], logger); swarmkv_options_set_worker_thread_number(opts[i], WORKER_THREAD_NUMBER); @@ -233,11 +233,11 @@ TEST(Resilience, AddSlotOwner) for(i=0; i<NODE_NUMBER+CANDINATE_NUMBER; i++) { opts[i]=swarmkv_options_new(); - swarmkv_options_set_p2p_port(opts[i], p2p_port_start+i); + swarmkv_options_set_cluster_port(opts[i], p2p_port_start+i); swarmkv_options_set_health_check_port(opts[i], health_port_start+i); swarmkv_options_set_logger(opts[i], logger); swarmkv_options_set_worker_thread_number(opts[i], WORKER_THREAD_NUMBER); - swarmkv_options_set_p2p_timeout_us(opts[i], 100*1000); + swarmkv_options_set_cluster_timeout_us(opts[i], 100*1000); swarmkv_options_set_disable_run_for_leader(opts[i]); db[i]=swarmkv_open(opts[i], cluster_name, &err); if(err) @@ -346,11 +346,11 @@ TEST(Performance, Sync) for(size_t i=0; i<NODE_NUMBER; i++) { opts[i]=swarmkv_options_new(); - swarmkv_options_set_p2p_port(opts[i], p2p_port_start+i); + swarmkv_options_set_cluster_port(opts[i], p2p_port_start+i); swarmkv_options_set_health_check_port(opts[i], health_port_start+i); swarmkv_options_set_logger(opts[i], logger); swarmkv_options_set_worker_thread_number(opts[i], WORKER_THREAD_NUMBER); - swarmkv_options_set_p2p_timeout_us(opts[i], 1000*1000); + swarmkv_options_set_cluster_timeout_us(opts[i], 1000*1000); swarmkv_options_set_sync_interval_us(opts[i], 100*1000); swarmkv_options_set_disable_run_for_leader(opts[i]); db[i]=swarmkv_open(opts[i], cluster_name, &err); @@ -427,7 +427,7 @@ TEST(Resilience, Failover) for(size_t i=0; i<NODE_NUMBER; i++) { opts[i]=swarmkv_options_new(); - swarmkv_options_set_p2p_port(opts[i], p2p_port_start+i); + swarmkv_options_set_cluster_port(opts[i], p2p_port_start+i); swarmkv_options_set_health_check_port(opts[i], health_port_start+i); db[i]=swarmkv_open(opts[i], cluster_name, &err); if(err) diff --git a/test/swarmkv_scalability_test.cpp b/test/swarmkv_scalability_test.cpp index 35f1505..75deb06 100644 --- a/test/swarmkv_scalability_test.cpp +++ b/test/swarmkv_scalability_test.cpp @@ -169,11 +169,11 @@ TEST(Scalability, MultiThreads) const char *cluster_name="demo"; struct swarmkv_options *opts=swarmkv_options_new(); swarmkv_options_set_dryrun(opts); - swarmkv_options_set_p2p_port(opts, 5212); + swarmkv_options_set_cluster_port(opts, 5212); swarmkv_options_set_health_check_port(opts, 6212); swarmkv_options_set_logger(opts, logger); swarmkv_options_set_worker_thread_number(opts, 2); - swarmkv_options_set_p2p_timeout_us(opts, 500*1000); + swarmkv_options_set_cluster_timeout_us(opts, 500*1000); struct swarmkv *db=swarmkv_open(opts, cluster_name, &err); if(err) { diff --git a/tools/swarmkv_cli.c b/tools/swarmkv_cli.c index 65a41e9..a655008 100644 --- a/tools/swarmkv_cli.c +++ b/tools/swarmkv_cli.c @@ -26,7 +26,7 @@ struct config { char db_name[256]; unsigned int consul_port; - unsigned int p2p_port; + unsigned int cluster_port; char consul_host[64]; struct swarmkv *db; struct cluster_manager_command cluster_mgr_cmd; @@ -389,8 +389,8 @@ struct swarmkv_reply *cluster_addslotowner_command(struct swarmkv *db, char *arg { continue; } -/* printf("Migrating slot %d from %s %u to %s %u ... ", tmp->slot_id, tmp->owner.addr.ip_addr, tmp->owner.addr.p2p_port, - node.addr.ip_addr, node.addr.p2p_port); +/* printf("Migrating slot %d from %s %u to %s %u ... ", tmp->slot_id, tmp->owner.addr.ip_addr, tmp->owner.addr.cluster_port, + node.addr.ip_addr, node.addr.cluster_port); */ /*STEP 2.1 Set NEW node's slot to IMPORTING state. Slot will return to STABLE state after udpate the global slot table*/ setslot_reply=swarmkv_command_on(db, new_node->addr, "keyspace setslot %d IMPORTING %s", @@ -1185,7 +1185,7 @@ int main(int argc, char * argv[]) } else if(!strcmp(argv[i], "-l") && !lastarg) { - sscanf(argv[++i], "%u", &g_config.p2p_port); + sscanf(argv[++i], "%u", &g_config.cluster_port); } else if(!strcmp(argv[i], "--cluster-create") && !lastarg) { @@ -1242,7 +1242,7 @@ int main(int argc, char * argv[]) void * logger=log_handle_create("swarmkv-cli.log", 0); struct swarmkv_options *opts=swarmkv_options_new(); swarmkv_options_set_dryrun(opts); - swarmkv_options_set_p2p_port(opts, g_config.p2p_port);//listen on random port + swarmkv_options_set_cluster_port(opts, g_config.cluster_port);//listen on random port swarmkv_options_set_consul_host(opts, g_config.consul_host); swarmkv_options_set_consul_port(opts, g_config.consul_port); swarmkv_options_set_worker_thread_number(opts, 1); diff --git a/tools/swarmkv_simple_node.cpp b/tools/swarmkv_simple_node.cpp index c67ea76..70f7deb 100644 --- a/tools/swarmkv_simple_node.cpp +++ b/tools/swarmkv_simple_node.cpp @@ -33,7 +33,7 @@ int main(int argc, char ** argv) long long key_number=0; int worker_thread_number=1; unsigned int consul_port=8500; - unsigned int p2p_port; + unsigned int cluster_port; char cluster_name[256]; if(argc < 3) { help();} strcpy(consul_host, "127.0.0.1"); @@ -47,7 +47,7 @@ int main(int argc, char ** argv) } else if (!strcmp(argv[i], "-h") && !lastarg) { - sscanf(argv[++i], "%[^:]:%u", host, &p2p_port); + sscanf(argv[++i], "%[^:]:%u", host, &cluster_port); } else if(!strcmp(argv[i], "-c") && !lastarg) { @@ -70,12 +70,12 @@ int main(int argc, char ** argv) char *err=NULL; struct swarmkv_options *opts=swarmkv_options_new(); - swarmkv_options_set_p2p_port(opts, p2p_port); + swarmkv_options_set_cluster_port(opts, cluster_port); swarmkv_options_set_bind_address(opts, host); swarmkv_options_set_consul_host(opts, consul_host); swarmkv_options_set_consul_port(opts, consul_port); swarmkv_options_set_worker_thread_number(opts, worker_thread_number); - swarmkv_options_set_p2p_timeout_us(opts, 500000); + swarmkv_options_set_cluster_timeout_us(opts, 500000); swarmkv_options_set_sync_interval_us(opts, 500); //swarmkv_options_set_disable_run_for_leader(opts); struct swarmkv *db=swarmkv_open(opts, cluster_name, &err); |
