summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZheng Chao <[email protected]>2023-08-24 20:07:56 +0800
committerZheng Chao <[email protected]>2023-08-24 20:07:56 +0800
commit71a9bdb506ad022493ee7db3e1dada8f6a7a544e (patch)
tree6c3a7e8c2be4a09d3d28ba9bed18f93a99ce82cd
parent898e9689759ccdf212381ca088dd6a0f23cbc8b1 (diff)
Optimize __get_tid() by caching the result of `syscall(SYS_gettid)` in thread_local variable.
-rw-r--r--src/swarmkv.c9
-rw-r--r--src/swarmkv_store.c15
-rw-r--r--test/swarmkv_perf_test.cpp142
3 files changed, 106 insertions, 60 deletions
diff --git a/src/swarmkv.c b/src/swarmkv.c
index 16e2494..05746d6 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -91,11 +91,13 @@ struct swarmkv *module2db(struct swarmkv_module *module)
assert(db==module->mod_ctx);
return db;
}
+__thread int __sys_tid=-1;
void swarmkv_register_thread(struct swarmkv *db)
{
int thread_id = atomic_fetch_add(&db->thread_counter, 1);
assert(thread_id < db->opts->nr_worker_threads + db->opts->nr_caller_threads);
- db->threads[thread_id].sys_tid=syscall(SYS_gettid);
+ if(__sys_tid<0) __sys_tid=syscall(SYS_gettid);
+ db->threads[thread_id].sys_tid=__sys_tid;
db->threads[thread_id].thread_id=thread_id;
return;
}
@@ -103,12 +105,13 @@ struct swarmkv_options *swarmkv_get0_options(struct swarmkv *db)
{
return db->opts;
}
+
int __gettid(struct swarmkv *db)
{
- int sys_tid=syscall(SYS_gettid);
+// int __sys_tid=syscall(SYS_gettid);
for(int i=0; i<db->opts->nr_worker_threads + db->opts->nr_caller_threads; i++)
{
- if(db->threads[i].sys_tid==sys_tid)
+ if(db->threads[i].sys_tid==__sys_tid)
{
return db->threads[i].thread_id;
}
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c
index 26c688d..b10cdd3 100644
--- a/src/swarmkv_store.c
+++ b/src/swarmkv_store.c
@@ -482,14 +482,21 @@ static void crdt_generic_on_reply(const struct swarmkv_reply *reply, void *user)
struct scontainer *ctr=NULL;
uuid_t uuid;
store_get_uuid(&(ctx->store->module), uuid);
- int tid=__gettid(ctx->store->exec_cmd_handle);
+
+
long long error_before=ctx->store->sync_err;
if(reply->type==SWARMKV_REPLY_ERROR)
{
- atomic_inc(&ctx->store->sync_err);
- store_remove_failed_peer(ctx->store, tid, &ctx->peer);
+ if(strcasestr(reply->str, "timeout"))
+ {
+ int tid=__gettid(ctx->store->exec_cmd_handle);
+ atomic_inc(&ctx->store->sync_err);
+ store_remove_failed_peer(ctx->store, tid, &ctx->peer);
+ }
goto error_out;
}
+
+
switch(ctx->op)
{
case CRDT_GET:
@@ -529,7 +536,7 @@ static void crdt_generic_on_reply(const struct swarmkv_reply *reply, void *user)
atomic_inc(&ctx->store->sync_err);
break;
}
- assert(ctx->store->sync_err==error_before);
+ //assert(ctx->store->sync_err==error_before);
error_out:
crdt_generic_ctx_free(ctx);
return;
diff --git a/test/swarmkv_perf_test.cpp b/test/swarmkv_perf_test.cpp
index 12deca2..a897f28 100644
--- a/test/swarmkv_perf_test.cpp
+++ b/test/swarmkv_perf_test.cpp
@@ -274,6 +274,7 @@ struct async_exec_ctx
struct timespec start;
int seq;
struct async_exec_globals *globals;
+ struct swarmkv *db;
};
void async_on_reply_cb(const struct swarmkv_reply *reply, void * arg)
{
@@ -288,7 +289,7 @@ void async_on_reply_cb(const struct swarmkv_reply *reply, void * arg)
globals->reply_cnt++;
if(globals->reply_cnt==globals->expected_reply_cnt)
{
- swarmkv_caller_loop_break(globals->db);
+ swarmkv_caller_loop_break(ctx->db);
}
//EXPECT_EQ(reply->type, SWARMKV_REPLY_STATUS);
if(reply->type == SWARMKV_REPLY_ERROR)
@@ -299,70 +300,46 @@ void async_on_reply_cb(const struct swarmkv_reply *reply, void * arg)
}
free(ctx);
}
-TEST(Performance, AsyncExec)
+void *async_caller_thread(void *thread_arg)
{
- int NODE_NUMBER=3;
- int CALLER_THREAD_NUMBER=1;
- int WORKER_THREAD_NUMBER=2;
- int i=0;
- struct swarmkv *db[NODE_NUMBER];
- char *err=NULL;
- const char *log_path="./swarmkv-async-exec.log";
+ struct swarmkv *db=(struct swarmkv *)thread_arg;
- char node_list_str[1024]={0};
- for(i=0; i<NODE_NUMBER; i++)
- {
- snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%d ", PERF_ASYNC_EXEC_CLUSTER_PORT_START+i);
- }
- const char *cluster_name="swarmkv-async-exec";
- swarmkv_cli_create_cluster(cluster_name, node_list_str);
- struct log_handle * logger=log_handle_create(log_path, 0);
- struct swarmkv_options* opts[NODE_NUMBER];
- for(i=0; i<NODE_NUMBER; i++)
- {
- opts[i]=swarmkv_options_new();
- swarmkv_options_set_cluster_port(opts[i], PERF_ASYNC_EXEC_CLUSTER_PORT_START+i);
- swarmkv_options_set_health_check_port(opts[i], PERF_ASYNC_EXEC_HEALTH_PORT_START+i);
- swarmkv_options_set_cluster_timeout_us(opts[i], 1000*1000);
- swarmkv_options_set_logger(opts[i], logger);
- swarmkv_options_set_worker_thread_number(opts[i], WORKER_THREAD_NUMBER);
- swarmkv_options_set_caller_thread_number(opts[i], CALLER_THREAD_NUMBER);
- swarmkv_options_set_max_dispatch_interval(opts[i], NULL, 1000);
- db[i]=swarmkv_open(opts[i], cluster_name, &err);
- if(err)
- {
- printf("swarmkv_open %d instance failed: %s\n", i, err);
- free(err);
- err=NULL;
- }
- swarmkv_register_thread(db[i]);
- }
- int key_number=256*1024, round=key_number*8;
+
+ int key_number=128*1024, round=key_number*2;
struct async_exec_globals globals;
memset(&globals, 0, sizeof(globals));
globals.expected_reply_cnt=round;
globals.reply_cnt=0;
- globals.db=db[0];
+ globals.db=db;
globals.cmd_inprogress=1;
int tmp_cnt=0, empty_loop=0, loops=0;
long long last_loop_ts_ms=0, curent_ts_ms=0;
struct timespec start, end;
struct async_exec_ctx *ctx=NULL;
char key[256]={0};
+ swarmkv_register_thread(db);
clock_gettime(CLOCK_REALTIME, &start);
- for(i=0; i<round; i++)
+ for(int i=0; i<round; i++)
{
ctx=ALLOC(struct async_exec_ctx, 1);
ctx->globals=&globals;
ctx->seq=i;
+ ctx->db=db;
clock_gettime(CLOCK_REALTIME, &ctx->start);
snprintf(key, sizeof(key), "async-key-%d", i%key_number);
- swarmkv_set(db[0], key, strlen(key), "abc", 3, async_on_reply_cb, ctx);
+ if(i<key_number)
+ {
+ swarmkv_set(db, key, strlen(key), "abc", 3, async_on_reply_cb, ctx);
+ }
+ else
+ {
+ swarmkv_get(db, key, strlen(key), async_on_reply_cb, ctx);
+ }
curent_ts_ms=ctx->start.tv_sec*1000+ctx->start.tv_nsec/1000000;
if(last_loop_ts_ms!=curent_ts_ms)
{
tmp_cnt=globals.reply_cnt;
- swarmkv_caller_loop(db[0], SWARMKV_LOOP_NONBLOCK, NULL);//SWARMKV_LOOP_ONCE|SWARMKV_LOOP_NONBLOCK
+ swarmkv_caller_loop(db, SWARMKV_LOOP_NONBLOCK, NULL);//SWARMKV_LOOP_ONCE|SWARMKV_LOOP_NONBLOCK
loops++;
if(tmp_cnt==globals.reply_cnt)
{
@@ -370,23 +347,80 @@ TEST(Performance, AsyncExec)
}
last_loop_ts_ms=curent_ts_ms;
}
- if(swarmkv_caller_get_pending_commands(db[0])>500)
+ if(swarmkv_caller_get_pending_commands(db)>100)
{
usleep(10);
}
}
globals.cmd_inprogress=0;
- swarmkv_caller_loop(db[0], SWARMKV_LOOP_NO_EXIT_ON_EMPTY, NULL);
-
+ swarmkv_caller_loop(db, SWARMKV_LOOP_NO_EXIT_ON_EMPTY, NULL);
clock_gettime(CLOCK_REALTIME, &end);
double elapsed_ms=(end.tv_sec-start.tv_sec)*1000.0+(end.tv_nsec-start.tv_nsec)/1000000.0;
- printf("Async SET %d keys, elapsed %lf ms, %lf cmds/s\n", round, elapsed_ms, round*1000.0/elapsed_ms);
+ printf("Async SET %d, Get %d, elapsed %lf ms, %lf cmds/s\n", key_number, round-key_number, elapsed_ms, round*1000.0/elapsed_ms);
printf("Avg Exec Latency %lf ms, timed out %d\n",
globals.total_exec_time_ms/round,
globals.timedout_reply_cnt);
printf("On fly exec %d, Loops %d, Wasted loop %d\n", globals.on_fly_reply_cnt, loops, empty_loop);
-
- for(i=0; i<NODE_NUMBER; i++)
+ EXPECT_EQ(globals.timedout_reply_cnt, 0);
+ EXPECT_EQ(globals.reply_cnt, globals.expected_reply_cnt);
+ EXPECT_LE(globals.total_exec_time_ms/round, 10);
+ return NULL;
+}
+TEST(Performance, AsyncExec)
+{
+ int NODE_NUMBER=2;
+ int CALLER_THREAD_NUMBER=1;
+ int WORKER_THREAD_NUMBER=2;
+
+ struct swarmkv *db[NODE_NUMBER];
+ char *err=NULL;
+ const char *log_path="./swarmkv-async-exec.log";
+
+ char node_list_str[1024]={0};
+ for(int i=0; i<NODE_NUMBER; i++)
+ {
+ snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%d ", PERF_ASYNC_EXEC_CLUSTER_PORT_START+i);
+ }
+ const char *cluster_name="swarmkv-async-exec";
+ swarmkv_cli_create_cluster(cluster_name, node_list_str);
+ struct log_handle * logger=log_handle_create(log_path, 0);
+ struct swarmkv_options* opts[NODE_NUMBER];
+ for(int i=0; i<NODE_NUMBER; i++)
+ {
+ opts[i]=swarmkv_options_new();
+ swarmkv_options_set_cluster_port(opts[i], PERF_ASYNC_EXEC_CLUSTER_PORT_START+i);
+ swarmkv_options_set_health_check_port(opts[i], PERF_ASYNC_EXEC_HEALTH_PORT_START+i);
+ swarmkv_options_set_cluster_timeout_us(opts[i], 1000*1000);
+ swarmkv_options_set_logger(opts[i], logger);
+ swarmkv_options_set_worker_thread_number(opts[i], WORKER_THREAD_NUMBER);
+ swarmkv_options_set_caller_thread_number(opts[i], CALLER_THREAD_NUMBER);
+ swarmkv_options_set_max_dispatch_interval(opts[i], NULL, 1000);
+ swarmkv_options_set_batch_sync_enabled(opts[i], 1);
+ db[i]=swarmkv_open(opts[i], cluster_name, &err);
+ if(err)
+ {
+ printf("swarmkv_open %d instance failed: %s\n", i, err);
+ free(err);
+ err=NULL;
+ }
+ }
+ pthread_t threads[NODE_NUMBER][CALLER_THREAD_NUMBER];
+ for(int i=0; i<NODE_NUMBER; i++)
+ {
+ for(int j=0; j<CALLER_THREAD_NUMBER; j++)
+ {
+ pthread_create(&(threads[i][j]), NULL, async_caller_thread, db[i]);
+ }
+ }
+ int thread_ret=0;
+ for(int i=0; i<NODE_NUMBER; i++)
+ {
+ for(int j=0; j<CALLER_THREAD_NUMBER; j++)
+ {
+ pthread_join(threads[i][j], (void**)&thread_ret);
+ }
+ }
+ for(int i=0; i<NODE_NUMBER; i++)
{
//close slowly to cover more code branches.
sleep(2);
@@ -399,13 +433,13 @@ TEST(Performance, SyncExec)
int NODE_NUMBER=3;
int CALLER_THREAD_NUMBER=1;
int WORKER_THREAD_NUMBER=2;
- int i=0;
+
struct swarmkv *db[NODE_NUMBER];
char *err=NULL;
const char *log_path="./swarmkv-sync-exec.log";
char node_list_str[1024]={0};
- for(i=0; i<NODE_NUMBER; i++)
+ for(int i=0; i<NODE_NUMBER; i++)
{
snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%d ", PERF_SYNC_EXEC_CLUSTER_PORT_START+i);
}
@@ -413,7 +447,7 @@ TEST(Performance, SyncExec)
swarmkv_cli_create_cluster(cluster_name, node_list_str);
struct log_handle * logger=log_handle_create(log_path, 0);
struct swarmkv_options* opts[NODE_NUMBER];
- for(i=0; i<NODE_NUMBER; i++)
+ for(int i=0; i<NODE_NUMBER; i++)
{
opts[i]=swarmkv_options_new();
swarmkv_options_set_cluster_port(opts[i], PERF_SYNC_EXEC_CLUSTER_PORT_START+i);
@@ -431,13 +465,15 @@ TEST(Performance, SyncExec)
}
swarmkv_register_thread(db[i]);
}
+
+
int key_number=16*1024;
int success_cnt=0;
struct timespec start, end;
clock_gettime(CLOCK_REALTIME, &start);
struct swarmkv_reply *reply=NULL;
- for(i=0; i<key_number; i++)
+ for(int i=0; i<key_number; i++)
{
reply=swarmkv_command(db[0], "SET bock-key-%d by-node-%d", i, 0);
if(reply->type==SWARMKV_REPLY_STATUS)
@@ -449,7 +485,7 @@ TEST(Performance, SyncExec)
clock_gettime(CLOCK_REALTIME, &end);
double elapsed_ms=(end.tv_sec-start.tv_sec)*1000.0+(end.tv_nsec-start.tv_nsec)/1000000.0;
printf("Block SET %d keys, success %d, elapsed %lf ms, %lf cmds/s\n", key_number, success_cnt, elapsed_ms, key_number*1000.0/elapsed_ms);
- for(i=0; i<NODE_NUMBER; i++)
+ for(int i=0; i<NODE_NUMBER; i++)
{
//close slowly to cover more code branches.
sleep(2);