diff options
| author | Zheng Chao <[email protected]> | 2023-08-17 22:29:08 +0800 |
|---|---|---|
| committer | Zheng Chao <[email protected]> | 2023-08-17 22:29:08 +0800 |
| commit | e4dc5e3f31d97f7ece5348860abfd4468934ceab (patch) | |
| tree | 80618e69dc2ddc80551d93211446b41e0d9cfaec | |
| parent | b721c09d86229f9049456c4f8754f94b22a9b652 (diff) | |
performance test passed.
| -rw-r--r-- | src/swarmkv.c | 32 | ||||
| -rw-r--r-- | src/swarmkv_store.c | 74 | ||||
| -rw-r--r-- | test/swarmkv_perf_test.cpp | 149 |
3 files changed, 130 insertions, 125 deletions
diff --git a/src/swarmkv.c b/src/swarmkv.c index fdcefac..033c554 100644 --- a/src/swarmkv.c +++ b/src/swarmkv.c @@ -134,7 +134,15 @@ static void local_caller_on_success(void *result, void *user) static void local_caller_on_fail(enum e_future_error err, const char * what, void * user) { struct local_caller_ctx *ctx=(struct local_caller_ctx*)user; - struct swarmkv_reply *reply=swarmkv_reply_new_error(what); + struct swarmkv_reply *reply=NULL; + if(err==FUTURE_ERROR_CANCEL) + { + reply=swarmkv_reply_new_error("cancelled"); + } + else + { + reply=swarmkv_reply_new_error(what); + } ctx->cb(reply, ctx->cb_arg); future_destroy(ctx->my_future); free(ctx); @@ -1213,7 +1221,7 @@ char *swarmkv_get_command_hint(struct swarmkv *db, const char* cmd_name) sdsfreesplitres(argv,argc); return NULL; } -static void loop_timeout_cb(evutil_socket_t fd, short event, void *arg) +static void evloop_timeout_cb(evutil_socket_t fd, short event, void *arg) { struct swarmkv_thread_ctx *ctx = (struct swarmkv_thread_ctx *)arg; event_base_loopbreak(ctx->evbase); @@ -1227,7 +1235,7 @@ void swarmkv_caller_loop(struct swarmkv *db, int flags, struct timeval *tv) struct event *timeout_event = NULL; if(tv) { - timeout_event = event_new(ctx->evbase, -1, 0, loop_timeout_cb, ctx); + timeout_event = event_new(ctx->evbase, -1, 0, evloop_timeout_cb, ctx); evtimer_add(timeout_event, tv); event_base_loop(ctx->evbase, flags); event_del(timeout_event); @@ -1247,13 +1255,6 @@ void swarmkv_caller_loop_break(struct swarmkv *db) assert(tid >= db->opts->nr_worker_threads); event_base_loopbreak(db->ref_evbases[tid]); } -struct event_base *swarmkv_get_event_base(struct swarmkv *db) -{ - int tid=__gettid(db); - //must initiate from caller threads, and caller thread ID is larger than worker thread ID - assert(tid >= db->opts->nr_worker_threads); - return db->threads[tid].evbase; -} struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name, char **err) { struct swarmkv *db = NULL; @@ -1311,8 +1312,6 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name, goto error_out; } swarmkv_net_set_on_msg_callback(db->net, __on_msg_callback, db); - - swarmkv_net_set_monitor_handle(db->net, db->mod_monitor); node_init(&db->self, opts->cluster_announce_ip, opts->cluster_announce_port); @@ -1376,17 +1375,20 @@ void swarmkv_close(struct swarmkv * db) pthread_join(db->threads[i].thr, NULL); } + swarmkv_rpc_mgr_free(db->rpc_mgr); + db->rpc_mgr=NULL; + swarmkv_monitor_free(db->mod_monitor); + db->mod_monitor=NULL; swarmkv_store_free(db->mod_store); db->mod_store=NULL; swarmkv_keyspace_free(db->mod_keyspace); db->mod_keyspace=NULL; - swarmkv_monitor_free(db->mod_monitor); - db->mod_monitor=NULL; swarmkv_mesh_free(db->mesh); - swarmkv_rpc_mgr_free(db->rpc_mgr); + db->mesh=NULL; swarmkv_net_free(db->net); db->net=NULL; + struct swarmkv_cmd_spec *spec=NULL, *tmp_spec=NULL; HASH_ITER(hh, db->command_table, spec, tmp_spec) diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c index 57f9815..26c688d 100644 --- a/src/swarmkv_store.c +++ b/src/swarmkv_store.c @@ -469,6 +469,7 @@ void crdt_generic_ctx_free(struct crdt_generic_ctx *ctx) static void store_remove_failed_peer(struct swarmkv_store *store, int tid, const node_t *peer) { struct scontainer *ctr=NULL, *tmp=NULL; + if(tid>=store->opts->nr_worker_threads) return;//swarmkv_close() is called. HASH_ITER(hh, store->threads[tid].obj_table, ctr, tmp) { scontainer_remove_replica_node(ctr, peer); @@ -547,19 +548,12 @@ void crdt_generic_call(struct swarmkv_store *store, enum CRDT_OP op, const struc return; } #define MAX_SYNC_PER_PERIOD 100000 -void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id) +void store_batch_sync(struct swarmkv_store *store, int tid) { - struct swarmkv_store *store=module2store(mod_store); - struct scontainer *ctr=NULL, *tmp=NULL; - struct timespec start, end; int n_synced=0; - int real_tid=__gettid(store->exec_cmd_handle); - assert(real_tid==thread_id); - clock_gettime(CLOCK_MONOTONIC, &start); - - struct swarmkv_store_thread *thr=&store->threads[real_tid]; - thr->calls++; + struct swarmkv_store_thread *thr=&store->threads[tid]; struct sync_master *sync_master=sync_master_new(); + struct scontainer *ctr=NULL, *tmp=NULL; DL_FOREACH_SAFE(thr->sync_queue, ctr, tmp) { char *blob=NULL; @@ -592,20 +586,62 @@ void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id) n_synced++; if(n_synced>=MAX_SYNC_PER_PERIOD) break; } + node_t peer; + struct swarmkv_cmd *cmd=NULL; + int ret=1; + while(1) + { + ret=sync_master_get_cmd(sync_master, &peer, &cmd); + if(!ret) break; + crdt_generic_call(store, CRDT_MERGE, cmd, &peer); + swarmkv_cmd_free(cmd); + cmd=NULL; + } + sync_master_free(sync_master); +} +void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id) +{ + struct swarmkv_store *store=module2store(mod_store); + struct scontainer *ctr=NULL, *tmp=NULL; + struct timespec start, end; + int n_synced=0; + int real_tid=__gettid(store->exec_cmd_handle); + assert(real_tid==thread_id); + clock_gettime(CLOCK_MONOTONIC, &start); + + struct swarmkv_store_thread *thr=&store->threads[real_tid]; + thr->calls++; if(store->opts->batch_sync_enabled) { - node_t peer; - struct swarmkv_cmd *cmd=NULL; - int ret=1; - while(1) + store_batch_sync(store, real_tid); + } + else + { + DL_FOREACH_SAFE(thr->sync_queue, ctr, tmp) { - ret=sync_master_get_cmd(sync_master, &peer, &cmd); - if(!ret) break; - crdt_generic_call(store, CRDT_MERGE, cmd, &peer); + char *blob=NULL; + size_t blob_sz=0; + scontainer_serialize(ctr, &blob, &blob_sz); + + struct swarmkv_cmd *cmd=swarmkv_cmd_new(4); + cmd->argv[0]=sdsnew("crdt"); + cmd->argv[1]=sdsnew("merge"); + cmd->argv[2]=sdsnew(ctr->obj.key); + cmd->argv[3]=sdsnewlen(blob, blob_sz); + for(int i=0; i<utarray_len(ctr->replica_node_list); i++) + { + node_t *peer=utarray_eltptr(ctr->replica_node_list, i); + crdt_generic_call(store, CRDT_MERGE, cmd, peer); + } swarmkv_cmd_free(cmd); - cmd=NULL; + free(blob); + + DL_DELETE(thr->sync_queue, ctr); + ctr->is_in_sync_q=0; + store->synced++; + n_synced++; + if(n_synced>=MAX_SYNC_PER_PERIOD) break; } - sync_master_free(sync_master); } thr->n_keys=HASH_COUNT(thr->obj_table); DL_COUNT(thr->sync_queue, ctr, thr->keys_to_sync); diff --git a/test/swarmkv_perf_test.cpp b/test/swarmkv_perf_test.cpp index 1d8edc3..73d8a9a 100644 --- a/test/swarmkv_perf_test.cpp +++ b/test/swarmkv_perf_test.cpp @@ -25,61 +25,52 @@ void *blocking_call_thread(void *thread_arg) char key[256]={0}, value[256]={0}; uuid_t uuid; char uuid_str[36]; + swarmkv_register_thread(db); int *success=ALLOC(int, 1); uuid_generate(uuid); uuid_unparse_lower(uuid, uuid_str); - struct cmd_exec_arg *arg=NULL; - unsigned long long set_success_counter=0, get_success_counter=0, del_success_counter=0; - arg=cmd_exec_arg_new(); - + struct swarmkv_reply *reply=NULL; for(i=0; i<n_key; i++) { snprintf(key, sizeof(key), "%s-key-%zu", uuid_str, i); - snprintf(value, sizeof(value), "%s-val-%zu", uuid_str, i); - cmd_exec_arg_clear(arg); - cmd_exec_arg_expect_OK(arg); - swarmkv_set(db, key, strlen(key), value, strlen(value), cmd_exec_generic_callback, arg); - set_success_counter+=cmd_exec_arg_wait(arg, PERF_TEST_EXEC_TO_MS); + snprintf(value, sizeof(value), "%s-val-%zu", uuid_str, i); + reply=swarmkv_command(db, "SET %s %s", key, value); + EXPECT_EQ(reply->type, SWARMKV_REPLY_STATUS); + EXPECT_STREQ(reply->str, "OK"); + swarmkv_reply_free(reply); } wait_for_sync(); for(i=0; i<n_key; i++) { snprintf(key, sizeof(key), "%s-key-%zu", uuid_str, i); - snprintf(value, sizeof(value), "%s-val-%zu", uuid_str, i); - cmd_exec_arg_clear(arg); - cmd_exec_arg_expect_cstring(arg, value); - swarmkv_get(db, key, strlen(key), cmd_exec_generic_callback, arg); - get_success_counter+=cmd_exec_arg_wait(arg, PERF_TEST_EXEC_TO_MS); + snprintf(value, sizeof(value), "%s-val-%zu", uuid_str, i); + reply=swarmkv_command(db, "GET %s", key); + EXPECT_EQ(reply->type, SWARMKV_REPLY_STRING); + EXPECT_STREQ(reply->str, value); + swarmkv_reply_free(reply); } wait_for_sync(); for(i=0; i<n_key; i++) { snprintf(key, sizeof(key), "%s-key-%zu", uuid_str, i); - cmd_exec_arg_clear(arg); - cmd_exec_arg_expect_integer(arg, 1); - swarmkv_del(db, key, strlen(key), cmd_exec_generic_callback, arg); - del_success_counter+=cmd_exec_arg_wait(arg, PERF_TEST_EXEC_TO_MS); + reply=swarmkv_command(db, "DEL %s", key); + EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 1); + swarmkv_reply_free(reply); } - cmd_exec_arg_free(arg); - + sleep(10); - *success=0; - if( n_key==get_success_counter && - get_success_counter==set_success_counter && - get_success_counter==del_success_counter) - { - *success=1; - } + *success=1; return success; } TEST(Performance, Nthreads) { - int NODE_NUMBER=3; - int CALLBACK_THREAD_NUMBER=4; + int NODE_NUMBER=2; + int CALLER_THREAD_NUMBER=2; int WORKER_THREAD_NUMBER=2; int i=0, j=0; struct swarmkv *db[NODE_NUMBER]; @@ -102,6 +93,7 @@ TEST(Performance, Nthreads) swarmkv_options_set_health_check_port(opts[i], 6310+i); swarmkv_options_set_logger(opts[i], logger); swarmkv_options_set_worker_thread_number(opts[i], WORKER_THREAD_NUMBER); + swarmkv_options_set_caller_thread_number(opts[i], CALLER_THREAD_NUMBER); db[i]=swarmkv_open(opts[i], cluster_name, &err); if(err) { @@ -114,7 +106,7 @@ TEST(Performance, Nthreads) pthread_t threads[NODE_NUMBER][WORKER_THREAD_NUMBER]; for(i=0; i<NODE_NUMBER; i++) { - for(j=0; j<CALLBACK_THREAD_NUMBER; j++) + for(j=0; j<CALLER_THREAD_NUMBER; j++) { pthread_create(&(threads[i][j]), NULL, blocking_call_thread, db[i]); } @@ -148,28 +140,21 @@ size_t g_token_bucket_number=200*1000; void *background_tconsume_thread(void *thread_arg) { struct swarmkv *db=(struct swarmkv *)thread_arg; + swarmkv_register_thread(db); - struct cmd_exec_arg *arg=NULL; - arg=cmd_exec_arg_new(); - cmd_exec_arg_diable_sync_check(arg); char key[256]; - int ret=0; - long long tokens=1; size_t round=0, got_token_cnt=0; int start=random()%16; + struct swarmkv_reply *reply=NULL; while(g_tconsume_running_flag) { snprintf(key, sizeof(key), "tb-%zu", (round+start)%g_token_bucket_number); - cmd_exec_arg_expect_integer(arg, tokens); - arg->print_reply_on_fail=1; - swarmkv_tconsume(db, key, strlen(key), tokens, cmd_exec_generic_callback, arg); - ret=cmd_exec_arg_wait(arg, PERF_TEST_EXEC_TO_MS); - cmd_exec_arg_clear(arg); - if(ret==1) got_token_cnt++; - else printf("tconsume %s failed\n", key); + reply=swarmkv_command(db, "TCONSUME %s 1", key); + EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + got_token_cnt+=reply->integer; + swarmkv_reply_free(reply); round++; } - cmd_exec_arg_free(arg); int *success=ALLOC(int, 1); EXPECT_EQ(got_token_cnt, round); if(got_token_cnt==round) @@ -181,7 +166,7 @@ void *background_tconsume_thread(void *thread_arg) TEST(Performance, Sync) { size_t NODE_NUMBER=2; - size_t CALL_THREAD_PER_NODE=2; + size_t CALLER_THREAD_NUMBER=2; size_t WORKER_THREAD_NUMBER=1; struct swarmkv *db[NODE_NUMBER]; @@ -204,6 +189,7 @@ TEST(Performance, Sync) swarmkv_options_set_health_check_port(opts[i], health_port_start+i); swarmkv_options_set_logger(opts[i], logger); swarmkv_options_set_worker_thread_number(opts[i], WORKER_THREAD_NUMBER); + swarmkv_options_set_caller_thread_number(opts[i], CALLER_THREAD_NUMBER+1); swarmkv_options_set_cluster_timeout_us(opts[i], 1000*1000); swarmkv_options_set_sync_interval_us(opts[i], 100*1000); swarmkv_options_set_run_for_leader_enabled(opts[i], 0); @@ -214,6 +200,7 @@ TEST(Performance, Sync) free(err); err=NULL; } + swarmkv_register_thread(db[i]); } char key[256]={0}; struct swarmkv_reply *reply=NULL; @@ -230,10 +217,10 @@ TEST(Performance, Sync) } srand(171); g_tconsume_running_flag=1; - pthread_t threads[NODE_NUMBER][CALL_THREAD_PER_NODE]; + pthread_t threads[NODE_NUMBER][CALLER_THREAD_NUMBER]; for(size_t i=0; i<NODE_NUMBER; i++) { - for(size_t j=0; j<CALL_THREAD_PER_NODE; j++) + for(size_t j=0; j<CALLER_THREAD_NUMBER; j++) { pthread_create(&(threads[i][j]), NULL, background_tconsume_thread, db[i]); } @@ -244,7 +231,7 @@ TEST(Performance, Sync) int *successful_backgroud_running_thread=NULL; for(size_t i=0; i<NODE_NUMBER; i++) { - for(size_t j=0; j<CALL_THREAD_PER_NODE; j++) + for(size_t j=0; j<CALLER_THREAD_NUMBER; j++) { pthread_join(threads[i][j], (void**)&successful_backgroud_running_thread); @@ -267,69 +254,45 @@ int g_running_flag=0; void *migration_background_thread(void *thread_arg) { struct swarmkv *db=(struct swarmkv *)thread_arg; + swarmkv_register_thread(db); uuid_t uuid; char uuid_str[36]; uuid_generate(uuid); uuid_unparse_lower(uuid, uuid_str); - struct cmd_exec_arg *arg=NULL; - arg=cmd_exec_arg_new(); char key[256], value[256]; - int ret=0; int static_key_cnt=1024; + struct swarmkv_reply *reply=NULL; + snprintf(value, sizeof(value), "background-running-value"); for(int i=0; i<static_key_cnt; i++) { snprintf(key, sizeof(key), "static-key-%d-of-%s", i, uuid_str); - snprintf(value, sizeof(value), "background-running-value"); - - cmd_exec_arg_clear(arg); - cmd_exec_arg_expect_OK(arg); - swarmkv_set(db, key, strlen(key), value, strlen(value), cmd_exec_generic_callback, arg); - ret+=cmd_exec_arg_wait(arg, PERF_TEST_EXEC_TO_MS); + + reply=swarmkv_command(db, "SET %s %s", key, value); + EXPECT_EQ(reply->type, SWARMKV_REPLY_STATUS); + swarmkv_reply_free(reply); } - EXPECT_EQ(ret, static_key_cnt); - - snprintf(value, sizeof(value), "background-running-value"); - ret=3; int seq=0; while(g_running_flag) { - ret=0; snprintf(key, sizeof(key), "ephemeral-key-of-%s-%d", uuid_str, seq); seq++; - cmd_exec_arg_clear(arg); - cmd_exec_arg_expect_OK(arg); - swarmkv_set(db, key, strlen(key), value, strlen(value), cmd_exec_generic_callback, arg); - ret+=cmd_exec_arg_wait(arg, PERF_TEST_EXEC_TO_MS); - if(ret!=1) - { - printf("set %s failed\n", key); - } - cmd_exec_arg_clear(arg); - cmd_exec_arg_expect_cstring(arg, value); - swarmkv_get(db, key, strlen(key), cmd_exec_generic_callback, arg); - ret+=cmd_exec_arg_wait(arg, PERF_TEST_EXEC_TO_MS); - if(ret!=2) - { - printf("get %s failed\n", key); - } + reply=swarmkv_command(db, "SET %s %s", key, value); + EXPECT_EQ(reply->type, SWARMKV_REPLY_STATUS); + swarmkv_reply_free(reply); + + reply=swarmkv_command(db, "GET %s", key); + EXPECT_EQ(reply->type, SWARMKV_REPLY_STRING); + EXPECT_STREQ(reply->str, value); + swarmkv_reply_free(reply); - cmd_exec_arg_clear(arg); - cmd_exec_arg_expect_integer(arg, 1); - swarmkv_del(db, key, strlen(key), cmd_exec_generic_callback, arg); - ret+=cmd_exec_arg_wait(arg, PERF_TEST_EXEC_TO_MS); - if(ret!=3) - { - printf("del %s failed\n", key); - } + reply=swarmkv_command(db, "DEL %s", key); + EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 1); + swarmkv_reply_free(reply); } - cmd_exec_arg_free(arg); int *success=ALLOC(int, 1); - EXPECT_EQ(ret, 3); - if(ret==3) - *success=1; - else - *success=0; + *success=1; return success; } TEST(Resilience, AddSlotOwner) @@ -359,6 +322,7 @@ TEST(Resilience, AddSlotOwner) swarmkv_options_set_health_check_port(opts[i], health_port_start+i); swarmkv_options_set_logger(opts[i], logger); swarmkv_options_set_worker_thread_number(opts[i], WORKER_THREAD_NUMBER); + swarmkv_options_set_caller_thread_number(opts[i], CALLER_THREAD_NUMBER); swarmkv_options_set_cluster_timeout_us(opts[i], 100*1000); swarmkv_options_set_run_for_leader_enabled(opts[i], 0); db[i]=swarmkv_open(opts[i], cluster_name, &err); @@ -430,6 +394,8 @@ TEST(Resilience, Failover) opts[i]=swarmkv_options_new(); swarmkv_options_set_cluster_port(opts[i], p2p_port_start+i); swarmkv_options_set_health_check_port(opts[i], health_port_start+i); + swarmkv_options_set_worker_thread_number(opts[i], 1); + swarmkv_options_set_caller_thread_number(opts[i], 1); db[i]=swarmkv_open(opts[i], cluster_name, &err); if(err) { @@ -438,6 +404,7 @@ TEST(Resilience, Failover) err=NULL; } ASSERT_TRUE(err==NULL); + swarmkv_register_thread(db[i]); } size_t KEY_NUMBER=1024; struct swarmkv_reply *reply=NULL; |
