diff options
| author | Zheng Chao <[email protected]> | 2023-08-24 20:07:56 +0800 |
|---|---|---|
| committer | Zheng Chao <[email protected]> | 2023-08-24 20:07:56 +0800 |
| commit | 71a9bdb506ad022493ee7db3e1dada8f6a7a544e (patch) | |
| tree | 6c3a7e8c2be4a09d3d28ba9bed18f93a99ce82cd | |
| parent | 898e9689759ccdf212381ca088dd6a0f23cbc8b1 (diff) | |
Optimize __get_tid() by caching the result of `syscall(SYS_gettid)` in thread_local variable.
| -rw-r--r-- | src/swarmkv.c | 9 | ||||
| -rw-r--r-- | src/swarmkv_store.c | 15 | ||||
| -rw-r--r-- | test/swarmkv_perf_test.cpp | 142 |
3 files changed, 106 insertions, 60 deletions
diff --git a/src/swarmkv.c b/src/swarmkv.c index 16e2494..05746d6 100644 --- a/src/swarmkv.c +++ b/src/swarmkv.c @@ -91,11 +91,13 @@ struct swarmkv *module2db(struct swarmkv_module *module) assert(db==module->mod_ctx); return db; } +__thread int __sys_tid=-1; void swarmkv_register_thread(struct swarmkv *db) { int thread_id = atomic_fetch_add(&db->thread_counter, 1); assert(thread_id < db->opts->nr_worker_threads + db->opts->nr_caller_threads); - db->threads[thread_id].sys_tid=syscall(SYS_gettid); + if(__sys_tid<0) __sys_tid=syscall(SYS_gettid); + db->threads[thread_id].sys_tid=__sys_tid; db->threads[thread_id].thread_id=thread_id; return; } @@ -103,12 +105,13 @@ struct swarmkv_options *swarmkv_get0_options(struct swarmkv *db) { return db->opts; } + int __gettid(struct swarmkv *db) { - int sys_tid=syscall(SYS_gettid); +// int __sys_tid=syscall(SYS_gettid); for(int i=0; i<db->opts->nr_worker_threads + db->opts->nr_caller_threads; i++) { - if(db->threads[i].sys_tid==sys_tid) + if(db->threads[i].sys_tid==__sys_tid) { return db->threads[i].thread_id; } diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c index 26c688d..b10cdd3 100644 --- a/src/swarmkv_store.c +++ b/src/swarmkv_store.c @@ -482,14 +482,21 @@ static void crdt_generic_on_reply(const struct swarmkv_reply *reply, void *user) struct scontainer *ctr=NULL; uuid_t uuid; store_get_uuid(&(ctx->store->module), uuid); - int tid=__gettid(ctx->store->exec_cmd_handle); + + long long error_before=ctx->store->sync_err; if(reply->type==SWARMKV_REPLY_ERROR) { - atomic_inc(&ctx->store->sync_err); - store_remove_failed_peer(ctx->store, tid, &ctx->peer); + if(strcasestr(reply->str, "timeout")) + { + int tid=__gettid(ctx->store->exec_cmd_handle); + atomic_inc(&ctx->store->sync_err); + store_remove_failed_peer(ctx->store, tid, &ctx->peer); + } goto error_out; } + + switch(ctx->op) { case CRDT_GET: @@ -529,7 +536,7 @@ static void crdt_generic_on_reply(const struct swarmkv_reply *reply, void *user) atomic_inc(&ctx->store->sync_err); break; } - assert(ctx->store->sync_err==error_before); + //assert(ctx->store->sync_err==error_before); error_out: crdt_generic_ctx_free(ctx); return; diff --git a/test/swarmkv_perf_test.cpp b/test/swarmkv_perf_test.cpp index 12deca2..a897f28 100644 --- a/test/swarmkv_perf_test.cpp +++ b/test/swarmkv_perf_test.cpp @@ -274,6 +274,7 @@ struct async_exec_ctx struct timespec start; int seq; struct async_exec_globals *globals; + struct swarmkv *db; }; void async_on_reply_cb(const struct swarmkv_reply *reply, void * arg) { @@ -288,7 +289,7 @@ void async_on_reply_cb(const struct swarmkv_reply *reply, void * arg) globals->reply_cnt++; if(globals->reply_cnt==globals->expected_reply_cnt) { - swarmkv_caller_loop_break(globals->db); + swarmkv_caller_loop_break(ctx->db); } //EXPECT_EQ(reply->type, SWARMKV_REPLY_STATUS); if(reply->type == SWARMKV_REPLY_ERROR) @@ -299,70 +300,46 @@ void async_on_reply_cb(const struct swarmkv_reply *reply, void * arg) } free(ctx); } -TEST(Performance, AsyncExec) +void *async_caller_thread(void *thread_arg) { - int NODE_NUMBER=3; - int CALLER_THREAD_NUMBER=1; - int WORKER_THREAD_NUMBER=2; - int i=0; - struct swarmkv *db[NODE_NUMBER]; - char *err=NULL; - const char *log_path="./swarmkv-async-exec.log"; + struct swarmkv *db=(struct swarmkv *)thread_arg; - char node_list_str[1024]={0}; - for(i=0; i<NODE_NUMBER; i++) - { - snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%d ", PERF_ASYNC_EXEC_CLUSTER_PORT_START+i); - } - const char *cluster_name="swarmkv-async-exec"; - swarmkv_cli_create_cluster(cluster_name, node_list_str); - struct log_handle * logger=log_handle_create(log_path, 0); - struct swarmkv_options* opts[NODE_NUMBER]; - for(i=0; i<NODE_NUMBER; i++) - { - opts[i]=swarmkv_options_new(); - swarmkv_options_set_cluster_port(opts[i], PERF_ASYNC_EXEC_CLUSTER_PORT_START+i); - swarmkv_options_set_health_check_port(opts[i], PERF_ASYNC_EXEC_HEALTH_PORT_START+i); - swarmkv_options_set_cluster_timeout_us(opts[i], 1000*1000); - swarmkv_options_set_logger(opts[i], logger); - swarmkv_options_set_worker_thread_number(opts[i], WORKER_THREAD_NUMBER); - swarmkv_options_set_caller_thread_number(opts[i], CALLER_THREAD_NUMBER); - swarmkv_options_set_max_dispatch_interval(opts[i], NULL, 1000); - db[i]=swarmkv_open(opts[i], cluster_name, &err); - if(err) - { - printf("swarmkv_open %d instance failed: %s\n", i, err); - free(err); - err=NULL; - } - swarmkv_register_thread(db[i]); - } - int key_number=256*1024, round=key_number*8; + + int key_number=128*1024, round=key_number*2; struct async_exec_globals globals; memset(&globals, 0, sizeof(globals)); globals.expected_reply_cnt=round; globals.reply_cnt=0; - globals.db=db[0]; + globals.db=db; globals.cmd_inprogress=1; int tmp_cnt=0, empty_loop=0, loops=0; long long last_loop_ts_ms=0, curent_ts_ms=0; struct timespec start, end; struct async_exec_ctx *ctx=NULL; char key[256]={0}; + swarmkv_register_thread(db); clock_gettime(CLOCK_REALTIME, &start); - for(i=0; i<round; i++) + for(int i=0; i<round; i++) { ctx=ALLOC(struct async_exec_ctx, 1); ctx->globals=&globals; ctx->seq=i; + ctx->db=db; clock_gettime(CLOCK_REALTIME, &ctx->start); snprintf(key, sizeof(key), "async-key-%d", i%key_number); - swarmkv_set(db[0], key, strlen(key), "abc", 3, async_on_reply_cb, ctx); + if(i<key_number) + { + swarmkv_set(db, key, strlen(key), "abc", 3, async_on_reply_cb, ctx); + } + else + { + swarmkv_get(db, key, strlen(key), async_on_reply_cb, ctx); + } curent_ts_ms=ctx->start.tv_sec*1000+ctx->start.tv_nsec/1000000; if(last_loop_ts_ms!=curent_ts_ms) { tmp_cnt=globals.reply_cnt; - swarmkv_caller_loop(db[0], SWARMKV_LOOP_NONBLOCK, NULL);//SWARMKV_LOOP_ONCE|SWARMKV_LOOP_NONBLOCK + swarmkv_caller_loop(db, SWARMKV_LOOP_NONBLOCK, NULL);//SWARMKV_LOOP_ONCE|SWARMKV_LOOP_NONBLOCK loops++; if(tmp_cnt==globals.reply_cnt) { @@ -370,23 +347,80 @@ TEST(Performance, AsyncExec) } last_loop_ts_ms=curent_ts_ms; } - if(swarmkv_caller_get_pending_commands(db[0])>500) + if(swarmkv_caller_get_pending_commands(db)>100) { usleep(10); } } globals.cmd_inprogress=0; - swarmkv_caller_loop(db[0], SWARMKV_LOOP_NO_EXIT_ON_EMPTY, NULL); - + swarmkv_caller_loop(db, SWARMKV_LOOP_NO_EXIT_ON_EMPTY, NULL); clock_gettime(CLOCK_REALTIME, &end); double elapsed_ms=(end.tv_sec-start.tv_sec)*1000.0+(end.tv_nsec-start.tv_nsec)/1000000.0; - printf("Async SET %d keys, elapsed %lf ms, %lf cmds/s\n", round, elapsed_ms, round*1000.0/elapsed_ms); + printf("Async SET %d, Get %d, elapsed %lf ms, %lf cmds/s\n", key_number, round-key_number, elapsed_ms, round*1000.0/elapsed_ms); printf("Avg Exec Latency %lf ms, timed out %d\n", globals.total_exec_time_ms/round, globals.timedout_reply_cnt); printf("On fly exec %d, Loops %d, Wasted loop %d\n", globals.on_fly_reply_cnt, loops, empty_loop); - - for(i=0; i<NODE_NUMBER; i++) + EXPECT_EQ(globals.timedout_reply_cnt, 0); + EXPECT_EQ(globals.reply_cnt, globals.expected_reply_cnt); + EXPECT_LE(globals.total_exec_time_ms/round, 10); + return NULL; +} +TEST(Performance, AsyncExec) +{ + int NODE_NUMBER=2; + int CALLER_THREAD_NUMBER=1; + int WORKER_THREAD_NUMBER=2; + + struct swarmkv *db[NODE_NUMBER]; + char *err=NULL; + const char *log_path="./swarmkv-async-exec.log"; + + char node_list_str[1024]={0}; + for(int i=0; i<NODE_NUMBER; i++) + { + snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%d ", PERF_ASYNC_EXEC_CLUSTER_PORT_START+i); + } + const char *cluster_name="swarmkv-async-exec"; + swarmkv_cli_create_cluster(cluster_name, node_list_str); + struct log_handle * logger=log_handle_create(log_path, 0); + struct swarmkv_options* opts[NODE_NUMBER]; + for(int i=0; i<NODE_NUMBER; i++) + { + opts[i]=swarmkv_options_new(); + swarmkv_options_set_cluster_port(opts[i], PERF_ASYNC_EXEC_CLUSTER_PORT_START+i); + swarmkv_options_set_health_check_port(opts[i], PERF_ASYNC_EXEC_HEALTH_PORT_START+i); + swarmkv_options_set_cluster_timeout_us(opts[i], 1000*1000); + swarmkv_options_set_logger(opts[i], logger); + swarmkv_options_set_worker_thread_number(opts[i], WORKER_THREAD_NUMBER); + swarmkv_options_set_caller_thread_number(opts[i], CALLER_THREAD_NUMBER); + swarmkv_options_set_max_dispatch_interval(opts[i], NULL, 1000); + swarmkv_options_set_batch_sync_enabled(opts[i], 1); + db[i]=swarmkv_open(opts[i], cluster_name, &err); + if(err) + { + printf("swarmkv_open %d instance failed: %s\n", i, err); + free(err); + err=NULL; + } + } + pthread_t threads[NODE_NUMBER][CALLER_THREAD_NUMBER]; + for(int i=0; i<NODE_NUMBER; i++) + { + for(int j=0; j<CALLER_THREAD_NUMBER; j++) + { + pthread_create(&(threads[i][j]), NULL, async_caller_thread, db[i]); + } + } + int thread_ret=0; + for(int i=0; i<NODE_NUMBER; i++) + { + for(int j=0; j<CALLER_THREAD_NUMBER; j++) + { + pthread_join(threads[i][j], (void**)&thread_ret); + } + } + for(int i=0; i<NODE_NUMBER; i++) { //close slowly to cover more code branches. sleep(2); @@ -399,13 +433,13 @@ TEST(Performance, SyncExec) int NODE_NUMBER=3; int CALLER_THREAD_NUMBER=1; int WORKER_THREAD_NUMBER=2; - int i=0; + struct swarmkv *db[NODE_NUMBER]; char *err=NULL; const char *log_path="./swarmkv-sync-exec.log"; char node_list_str[1024]={0}; - for(i=0; i<NODE_NUMBER; i++) + for(int i=0; i<NODE_NUMBER; i++) { snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%d ", PERF_SYNC_EXEC_CLUSTER_PORT_START+i); } @@ -413,7 +447,7 @@ TEST(Performance, SyncExec) swarmkv_cli_create_cluster(cluster_name, node_list_str); struct log_handle * logger=log_handle_create(log_path, 0); struct swarmkv_options* opts[NODE_NUMBER]; - for(i=0; i<NODE_NUMBER; i++) + for(int i=0; i<NODE_NUMBER; i++) { opts[i]=swarmkv_options_new(); swarmkv_options_set_cluster_port(opts[i], PERF_SYNC_EXEC_CLUSTER_PORT_START+i); @@ -431,13 +465,15 @@ TEST(Performance, SyncExec) } swarmkv_register_thread(db[i]); } + + int key_number=16*1024; int success_cnt=0; struct timespec start, end; clock_gettime(CLOCK_REALTIME, &start); struct swarmkv_reply *reply=NULL; - for(i=0; i<key_number; i++) + for(int i=0; i<key_number; i++) { reply=swarmkv_command(db[0], "SET bock-key-%d by-node-%d", i, 0); if(reply->type==SWARMKV_REPLY_STATUS) @@ -449,7 +485,7 @@ TEST(Performance, SyncExec) clock_gettime(CLOCK_REALTIME, &end); double elapsed_ms=(end.tv_sec-start.tv_sec)*1000.0+(end.tv_nsec-start.tv_nsec)/1000000.0; printf("Block SET %d keys, success %d, elapsed %lf ms, %lf cmds/s\n", key_number, success_cnt, elapsed_ms, key_number*1000.0/elapsed_ms); - for(i=0; i<NODE_NUMBER; i++) + for(int i=0; i<NODE_NUMBER; i++) { //close slowly to cover more code branches. sleep(2); |
