diff options
| author | Zheng Chao <[email protected]> | 2023-01-25 19:09:52 +0800 |
|---|---|---|
| committer | Zheng Chao <[email protected]> | 2023-01-25 19:09:52 +0800 |
| commit | 99d7df4c68d3968e9afcfd00b68ca92ef10c4d9c (patch) | |
| tree | c7999b50b89f09287bbd3b9a20bffaadf79aa29d | |
| parent | 136e57eff9e87fd84de8b7b25f6691972716bacd (diff) | |
:bug: The test case of Resilience.AddSlotOwner fails when run all tests. To fix this, we MUST start watching slots and nodes changes in different thread than evbase_dispatch(), if not, evhttp_make_request() maybe fail due to socket fd error.
| -rw-r--r-- | src/swarmkv.c | 6 | ||||
| -rw-r--r-- | src/swarmkv_keyspace.c | 42 | ||||
| -rw-r--r-- | src/swarmkv_net.c | 3 | ||||
| -rw-r--r-- | test/swarmkv_perf_test.cpp | 245 | ||||
| -rw-r--r-- | tools/swarmkv_cli.c | 4 |
5 files changed, 152 insertions, 148 deletions
diff --git a/src/swarmkv.c b/src/swarmkv.c index 12cbb46..2e99e7f 100644 --- a/src/swarmkv.c +++ b/src/swarmkv.c @@ -1147,13 +1147,15 @@ static void libevent_log_cb(int severity, const char *msg) fclose(logfile); } */ -struct swarmkv *swarmkv_open(struct swarmkv_options* opts, const char * db_name, char **err) +struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name, char **err) { struct swarmkv *db = NULL; // event_set_log_callback(libevent_log_cb); db=ALLOC(struct swarmkv, 1); strncpy(db->db_name, db_name, sizeof(db->db_name)); - + /* adds locking, only required if accessed from separate threads */ + evthread_use_pthreads(); + strncpy(db->module.name, "db", sizeof(db->module.name)); db->module.mod_ctx=db; db->module.lock=NULL; diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c index 1a85033..83d579c 100644 --- a/src/swarmkv_keyspace.c +++ b/src/swarmkv_keyspace.c @@ -236,7 +236,8 @@ struct swarmkv_keyspace exec_cmd_func *exec_cmd_func; struct swarmkv *exec_cmd_handle; struct swarmkv_module *mod_monitor; - void *logger; + struct log_handle *logger; + }; void http_request_on_close(struct evhttp_connection *conn, void *arg) { @@ -248,7 +249,6 @@ struct consul_request { char symbol[SWARMKV_SYMBOL_MAX]; long long request_id; - struct evhttp_connection *evhttpconn; //tcp connection for the request struct evhttp_request *evhttpreq; struct future *f; struct consul_client *ref_client; @@ -261,6 +261,7 @@ struct consul_client long long req_id_generator; struct event_base *evbase; //reference of keyspace evbase struct log_handle *logger; //reference of logger + struct evhttp_connection *evhttpconn; //tcp connection for the request struct consul_request *request_table; }; void consul_request_callback(struct evhttp_request *evhttpreq, void *arg); @@ -269,8 +270,6 @@ struct consul_request *consul_request_new(struct consul_client *client, const ch struct consul_request *req=ALLOC(struct consul_request, 1); strncpy(req->symbol, symbol, sizeof(req->symbol)); - req->evhttpconn=evhttp_connection_base_new(client->evbase, NULL, client->consul_agent_host, client->consul_agent_port); - evhttp_connection_set_timeout(req->evhttpconn, 1800);//set to 30min for blocking query. req->evhttpreq=evhttp_request_new(consul_request_callback, req); req->f=f; req->ref_client=client; @@ -286,7 +285,7 @@ struct consul_request *consul_request_new(struct consul_client *client, const ch } void consul_request_free(struct consul_request *req) { - evhttp_connection_free(req->evhttpconn); + //evhttp_connection_free(req->evhttpconn); //if(req->evhttpreq) evhttp_request_free(req->evhttpreq); future_destroy(req->f); @@ -318,12 +317,10 @@ void consul_request_callback(struct evhttp_request *evhttpreq, void *arg) void consul_request_make(struct consul_request *req, enum evhttp_cmd_type cmd, const char *url) { struct evkeyvalq *output_headers = evhttp_request_get_output_headers(req->evhttpreq); - - evhttp_connection_set_closecb(req->evhttpconn, http_request_on_close, req); evhttp_add_header(output_headers, "Host", req->ref_client->consul_agent_host); - evhttp_add_header(output_headers, "Connection", "close"); +// evhttp_add_header(output_headers, "Connection", "close"); - evhttp_make_request(req->evhttpconn, req->evhttpreq, cmd, url); + evhttp_make_request(req->ref_client->evhttpconn, req->evhttpreq, cmd, url); } struct consul_client *consul_client_new(const char *host, unsigned short port, struct event_base *evbase, struct log_handle *logger) @@ -333,6 +330,9 @@ struct consul_client *consul_client_new(const char *host, unsigned short port, s strncpy(client->consul_agent_host, host, sizeof(client->consul_agent_host)); client->logger=logger; client->evbase=evbase; + client->evhttpconn=evhttp_connection_base_new(client->evbase, NULL, client->consul_agent_host, client->consul_agent_port); + evhttp_connection_set_timeout(client->evhttpconn, 1800);//set to 30min for blocking query. + evhttp_connection_set_closecb(client->evhttpconn, http_request_on_close, client); return client; } void consul_client_free(struct consul_client *client) @@ -343,6 +343,7 @@ void consul_client_free(struct consul_client *client) HASH_DEL(client->request_table, req); consul_request_free(req); } + evhttp_connection_free(client->evhttpconn); free(client); } void consul_acquire_session_lock_async(struct swarmkv_keyspace* ks); @@ -489,14 +490,9 @@ void *swarmkv_keyspace_thread(void *arg) char thread_name[16]; snprintf(thread_name, sizeof(thread_name), "swarmkv-ks"); prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL); - struct event * ev = event_new(ks->evbase, -1, EV_PERSIST, __ks_dummy_event_handler, ks); struct timeval timer_delay = {2, 0}; evtimer_add(ev, &timer_delay); - - consul_watch_slots_changes_async(ks); - consul_watch_nodes_changes_async(ks); - event_base_dispatch(ks->evbase); event_del(ev); event_free(ev); @@ -576,7 +572,7 @@ void consul_create_session_async(struct swarmkv_keyspace *ks) struct consul_request *req=consul_request_new(ks->consul_client, __func__, f); - evhttp_connection_set_timeout(req->evhttpconn, 2); +// evhttp_connection_set_timeout(req->evhttpconn, 2); struct evkeyvalq *output_headers = evhttp_request_get_output_headers(req->evhttpreq); @@ -705,7 +701,7 @@ void consul_acquire_session_lock_async(struct swarmkv_keyspace* ks) struct future *f=future_create(__func__, acquire_session_lock_on_success, acquire_session_lock_on_fail, ks); struct consul_request *req=consul_request_new(ks->consul_client, __func__, f); - evhttp_connection_set_timeout(req->evhttpconn, 2); +// evhttp_connection_set_timeout(req->evhttpconn, 2); struct evbuffer* output_buffer = evhttp_request_get_output_buffer(req->evhttpreq); evbuffer_add(output_buffer, req_body, sdslen(req_body)); @@ -916,11 +912,11 @@ void propagate_slot_table_async(struct swarmkv_keyspace *ks, struct key_slot new struct future *f=future_create(__func__, propagate_slot_table_on_success, propagate_slot_table_on_fail, ks); struct consul_request *req=consul_request_new(ks->consul_client, __func__, f); - evhttp_connection_set_timeout(req->evhttpconn, 20); +// evhttp_connection_set_timeout(req->evhttpconn, 20); struct evkeyvalq *output_headers = evhttp_request_get_output_headers(req->evhttpreq); - struct evbuffer* output_buffer = evhttp_request_get_output_buffer(req->evhttpreq); + struct evbuffer *output_buffer = evhttp_request_get_output_buffer(req->evhttpreq); evbuffer_add(output_buffer, new_slots_json, sdslen(new_slots_json)); char number[64]; snprintf(number, sizeof(number), "%zu", sdslen(new_slots_json)); @@ -1187,7 +1183,7 @@ void keyspace_unlock(struct swarmkv_module *mod_keyspace, enum cmd_key_flag flag return; } -struct swarmkv_module* swarmkv_keyspace_new(const struct swarmkv_options *opts, const char* db_name, void *logger, char **err) +struct swarmkv_module* swarmkv_keyspace_new(const struct swarmkv_options *opts, const char *db_name, void *logger, char **err) { struct swarmkv_keyspace *ks=ALLOC(struct swarmkv_keyspace, 1); strncpy(ks->module.name, "keyspace", sizeof(ks->module.name)); @@ -1216,7 +1212,13 @@ struct swarmkv_module* swarmkv_keyspace_new(const struct swarmkv_options *opts, log_fatal(ks->logger, MODULE_SWARMKV_KEYSPACE, "key slots init failed."); goto error_out; } -// consul_kv_init_if_nonexist(ks); + //We MUST start watching in different thread than evbase_dispatch(), if not, evhttp_make_request() maybe fail due to socket fd error. + //This happens when calling swarmkv-cli for adding more slot owners. + //I spent two days on this issue, but still don't known what magic design of libevent2 causing this problem. + + consul_watch_slots_changes_async(ks); + consul_watch_nodes_changes_async(ks); + pthread_create(&(ks->thr), NULL, swarmkv_keyspace_thread, (void *) ks); if(ks->dryrun) diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c index c30d713..9ad1563 100644 --- a/src/swarmkv_net.c +++ b/src/swarmkv_net.c @@ -760,14 +760,13 @@ static void __accept_error_cb(struct evconnlistener *listener, void *arg) struct swarmkv_net* swarmkv_net_new(const struct swarmkv_options *opts, node_t* self, void* logger, char **err) { struct swarmkv_net* net=NULL; - /* adds locking, only required if accessed from separate threads */ - evthread_use_pthreads(); net=ALLOC(struct swarmkv_net, 1); net->logger=logger; net->n_thread=opts->nr_worker_threads; net->sequence_generator=1; net->threads=ALLOC(struct snet_thread, net->n_thread); struct snet_thread *thr=NULL; + for(size_t i=0; i<net->n_thread; i++) { thr=net->threads+i; diff --git a/test/swarmkv_perf_test.cpp b/test/swarmkv_perf_test.cpp index 136074a..275c4a9 100644 --- a/test/swarmkv_perf_test.cpp +++ b/test/swarmkv_perf_test.cpp @@ -141,6 +141,128 @@ TEST(Performance, Nthreads) } log_handle_destroy(logger); } + +int g_tconsume_running_flag=0; +size_t g_token_bucket_number=200*1000; + +void *background_tconsume_thread(void *thread_arg) +{ + struct swarmkv *db=(struct swarmkv *)thread_arg; + + struct cmd_exec_arg *arg=NULL; + arg=cmd_exec_arg_new(); + cmd_exec_arg_diable_sync_check(arg); + char key[256]; + int ret=0; + long long tokens=1; + size_t round=0, got_token_cnt=0; + int start=random()%16; + while(g_tconsume_running_flag) + { + snprintf(key, sizeof(key), "tb-%zu", (round+start)%g_token_bucket_number); + cmd_exec_arg_expect_integer(arg, tokens); + arg->print_reply_on_fail=1; + swarmkv_tconsume(db, key, strlen(key), tokens, cmd_exec_generic_callback, arg); + ret=cmd_exec_arg_wait(arg, PERF_TEST_EXEC_TO_MS); + cmd_exec_arg_clear(arg); + if(ret==1) got_token_cnt++; + else printf("tconsume %s failed\n", key); + round++; + } + cmd_exec_arg_free(arg); + int *success=ALLOC(int, 1); + EXPECT_EQ(got_token_cnt, round); + if(got_token_cnt==round) + *success=1; + else + *success=0; + return success; +} +TEST(Performance, Sync) +{ + size_t NODE_NUMBER=2; + size_t CALL_THREAD_PER_NODE=2; + size_t WORKER_THREAD_NUMBER=1; + + struct swarmkv *db[NODE_NUMBER]; + char *err=NULL; + const char *log_path="./swarmkv-sync.log"; + unsigned int p2p_port_start=9310, health_port_start=10310; + char node_list_str[1024]={0}; + for(size_t i=0; i<NODE_NUMBER; i++) + { + snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%zu ", p2p_port_start+i); + } + const char *cluster_name="swarmkv-sync"; + swarmkv_cli_create_cluster(cluster_name, node_list_str); + struct log_handle * logger=log_handle_create(log_path, 0); + struct swarmkv_options *opts[NODE_NUMBER]; + for(size_t i=0; i<NODE_NUMBER; i++) + { + opts[i]=swarmkv_options_new(); + swarmkv_options_set_cluster_port(opts[i], p2p_port_start+i); + swarmkv_options_set_health_check_port(opts[i], health_port_start+i); + swarmkv_options_set_logger(opts[i], logger); + swarmkv_options_set_worker_thread_number(opts[i], WORKER_THREAD_NUMBER); + swarmkv_options_set_cluster_timeout_us(opts[i], 1000*1000); + swarmkv_options_set_sync_interval_us(opts[i], 100*1000); + swarmkv_options_set_disable_run_for_leader(opts[i]); + db[i]=swarmkv_open(opts[i], cluster_name, &err); + if(err) + { + printf("swarmkv_open %zu instance failed: %s\n", i, err); + free(err); + err=NULL; + } + } + char key[256]={0}; + struct swarmkv_reply *reply=NULL; + for(size_t i=0; i<g_token_bucket_number; i++) + { + snprintf(key, sizeof(key), "tb-%zu", i); + reply=swarmkv_command_on(db[i%NODE_NUMBER], NULL, "TCFG %s 2000000 2000000", key); + EXPECT_EQ(reply->type, SWARMKV_REPLY_STATUS); + if(reply->type != SWARMKV_REPLY_STATUS) + { + swarmkv_reply_print(reply, stdout); + } + swarmkv_reply_free(reply); + } + srand(171); + g_tconsume_running_flag=1; + pthread_t threads[NODE_NUMBER][CALL_THREAD_PER_NODE]; + for(size_t i=0; i<NODE_NUMBER; i++) + { + for(size_t j=0; j<CALL_THREAD_PER_NODE; j++) + { + pthread_create(&(threads[i][j]), NULL, background_tconsume_thread, db[i]); + } + } + + sleep(20); + g_tconsume_running_flag=0; + int *successful_backgroud_running_thread=NULL; + for(size_t i=0; i<NODE_NUMBER; i++) + { + for(size_t j=0; j<CALL_THREAD_PER_NODE; j++) + { + pthread_join(threads[i][j], (void**)&successful_backgroud_running_thread); + + EXPECT_EQ(*successful_backgroud_running_thread, 1); + *successful_backgroud_running_thread=0; + free(successful_backgroud_running_thread); + successful_backgroud_running_thread=NULL; + } + } + sleep(20); + for(size_t i=0; i<NODE_NUMBER; i++) + { + swarmkv_close(db[i]); + } + + log_handle_destroy(logger); + +} int g_running_flag=0; void *migration_background_thread(void *thread_arg) { @@ -219,7 +341,7 @@ TEST(Resilience, AddSlotOwner) int i=0, j=0; struct swarmkv *db[NODE_NUMBER+CANDINATE_NUMBER]; char *err=NULL; - const char *log_path="./swarmkv-migration.log"; + const char *log_path="./swarmkv-migration-11.log"; unsigned int p2p_port_start=7310, health_port_start=8310; char node_list_str[1024]={0}; for(i=0; i<NODE_NUMBER; i++) @@ -288,127 +410,6 @@ TEST(Resilience, AddSlotOwner) log_handle_destroy(logger); } -int g_tconsume_running_flag=0; -size_t g_token_bucket_number=200*1000; - -void *background_tconsume_thread(void *thread_arg) -{ - struct swarmkv *db=(struct swarmkv *)thread_arg; - - struct cmd_exec_arg *arg=NULL; - arg=cmd_exec_arg_new(); - cmd_exec_arg_diable_sync_check(arg); - char key[256]; - int ret=0; - long long tokens=1; - size_t round=0, got_token_cnt=0; - int start=random()%16; - while(g_tconsume_running_flag) - { - snprintf(key, sizeof(key), "tb-%zu", (round+start)%g_token_bucket_number); - cmd_exec_arg_expect_integer(arg, tokens); - arg->print_reply_on_fail=1; - swarmkv_tconsume(db, key, strlen(key), tokens, cmd_exec_generic_callback, arg); - ret=cmd_exec_arg_wait(arg, PERF_TEST_EXEC_TO_MS); - cmd_exec_arg_clear(arg); - if(ret==1) got_token_cnt++; - else printf("tconsume %s failed\n", key); - round++; - } - cmd_exec_arg_free(arg); - int *success=ALLOC(int, 1); - EXPECT_EQ(got_token_cnt, round); - if(got_token_cnt==round) - *success=1; - else - *success=0; - return success; -} -TEST(Performance, Sync) -{ - size_t NODE_NUMBER=2; - size_t CALL_THREAD_PER_NODE=2; - size_t WORKER_THREAD_NUMBER=1; - - struct swarmkv *db[NODE_NUMBER]; - char *err=NULL; - const char *log_path="./swarmkv-sync.log"; - unsigned int p2p_port_start=9310, health_port_start=10310; - char node_list_str[1024]={0}; - for(size_t i=0; i<NODE_NUMBER; i++) - { - snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%zu ", p2p_port_start+i); - } - const char *cluster_name="swarmkv-sync"; - swarmkv_cli_create_cluster(cluster_name, node_list_str); - struct log_handle * logger=log_handle_create(log_path, 0); - struct swarmkv_options *opts[NODE_NUMBER]; - for(size_t i=0; i<NODE_NUMBER; i++) - { - opts[i]=swarmkv_options_new(); - swarmkv_options_set_cluster_port(opts[i], p2p_port_start+i); - swarmkv_options_set_health_check_port(opts[i], health_port_start+i); - swarmkv_options_set_logger(opts[i], logger); - swarmkv_options_set_worker_thread_number(opts[i], WORKER_THREAD_NUMBER); - swarmkv_options_set_cluster_timeout_us(opts[i], 1000*1000); - swarmkv_options_set_sync_interval_us(opts[i], 100*1000); - swarmkv_options_set_disable_run_for_leader(opts[i]); - db[i]=swarmkv_open(opts[i], cluster_name, &err); - if(err) - { - printf("swarmkv_open %zu instance failed: %s\n", i, err); - free(err); - err=NULL; - } - } - char key[256]={0}; - struct swarmkv_reply *reply=NULL; - for(size_t i=0; i<g_token_bucket_number; i++) - { - snprintf(key, sizeof(key), "tb-%zu", i); - reply=swarmkv_command_on(db[i%NODE_NUMBER], NULL, "TCFG %s 2000000 2000000", key); - EXPECT_EQ(reply->type, SWARMKV_REPLY_STATUS); - if(reply->type != SWARMKV_REPLY_STATUS) - { - swarmkv_reply_print(reply, stdout); - } - swarmkv_reply_free(reply); - } - srand(171); - g_tconsume_running_flag=1; - pthread_t threads[NODE_NUMBER][CALL_THREAD_PER_NODE]; - for(size_t i=0; i<NODE_NUMBER; i++) - { - for(size_t j=0; j<CALL_THREAD_PER_NODE; j++) - { - pthread_create(&(threads[i][j]), NULL, background_tconsume_thread, db[i]); - } - } - - sleep(20); - g_tconsume_running_flag=0; - int *successful_backgroud_running_thread=NULL; - for(size_t i=0; i<NODE_NUMBER; i++) - { - for(size_t j=0; j<CALL_THREAD_PER_NODE; j++) - { - pthread_join(threads[i][j], (void**)&successful_backgroud_running_thread); - - EXPECT_EQ(*successful_backgroud_running_thread, 1); - *successful_backgroud_running_thread=0; - free(successful_backgroud_running_thread); - successful_backgroud_running_thread=NULL; - } - } - sleep(20); - for(size_t i=0; i<NODE_NUMBER; i++) - { - swarmkv_close(db[i]); - } - - log_handle_destroy(logger); - -} TEST(Resilience, Failover) { size_t NODE_NUMBER=4; diff --git a/tools/swarmkv_cli.c b/tools/swarmkv_cli.c index a655008..2b81f4c 100644 --- a/tools/swarmkv_cli.c +++ b/tools/swarmkv_cli.c @@ -375,7 +375,7 @@ struct swarmkv_reply *cluster_addslotowner_command(struct swarmkv *db, char *arg struct swarmkv_reply *setslot_reply=NULL, *getkeysinslot_reply=NULL, *addkeystoslot_reply=NULL; sds *migrate_argv=NULL; - printf("%zu slot to be migrated\n", actual_rebalanced_slot_num); + printf("%zu slots to be migrated\n", actual_rebalanced_slot_num); long long migrated_keys=0; node_t *new_node=NULL, *old_node=NULL; int slot_id=0; @@ -1239,7 +1239,7 @@ int main(int argc, char * argv[]) return 0; } snprintf(g_runtime.prompt, sizeof(g_runtime.prompt), "%s> ", g_config.db_name); - void * logger=log_handle_create("swarmkv-cli.log", 0); + struct log_handle * logger=log_handle_create("swarmkv-cli.log", 0); struct swarmkv_options *opts=swarmkv_options_new(); swarmkv_options_set_dryrun(opts); swarmkv_options_set_cluster_port(opts, g_config.cluster_port);//listen on random port |
