#include "swarmkv/swarmkv.h" #include "test_utils.h" #include "log.h" #include #include #include #include #include #include #include #define PERF_TEST_EXEC_TO_MS 10000 void set_counter(struct cmd_exec_arg *exec_arg, void *cb_arg) { unsigned long long *counter=(unsigned long long *)cb_arg; if(exec_arg->success) (*counter)++; return; } void *blocking_call_thread(void *thread_arg) { struct swarmkv *db=(struct swarmkv *)thread_arg; size_t n_key=1024*60, i=0; char key[256]={0}, value[256]={0}; uuid_t uuid; char uuid_str[36]; swarmkv_register_thread(db); int *success=ALLOC(int, 1); uuid_generate(uuid); uuid_unparse_lower(uuid, uuid_str); struct swarmkv_reply *reply=NULL; for(i=0; itype, SWARMKV_REPLY_STATUS); EXPECT_STREQ(reply->str, "OK"); swarmkv_reply_free(reply); } wait_for_sync(); for(i=0; itype, SWARMKV_REPLY_STRING); EXPECT_STREQ(reply->str, value); swarmkv_reply_free(reply); } wait_for_sync(); for(i=0; itype, SWARMKV_REPLY_INTEGER); EXPECT_EQ(reply->integer, 1); swarmkv_reply_free(reply); } sleep(10); *success=1; return success; } #define PERF_NTHREAD_CLUSTER_PORT_START 5310 #define PERF_NTHREAD_HEALTH_PORT_START 6310 #define PERF_TB_CLUSTER_PORT_START 7310 #define PERF_TB_HEALTH_PORT_START 8310 #define PERF_ASYNC_EXEC_CLUSTER_PORT_START 9310 #define PERF_ASYNC_EXEC_HEALTH_PORT_START 10310 #define PERF_RESILIENCE_ADD_SLOT_OWNER_CLUSTER_PORT_START 11310 #define PERF_RESILIENCE_ADD_SLOT_OWNER_HEALTH_PORT_START 12310 #define PERF_RESILIENCE_FAILOVER_CLUSTER_PORT_START 13310 #define PERF_RESILIENCE_FAILOVER_HEALTH_PORT_START 14310 #define PERF_SYNC_EXEC_CLUSTER_PORT_START 15310 #define PERF_SYNC_EXEC_HEALTH_PORT_START 16310 TEST(Performance, Nthreads) { int NODE_NUMBER=2; int CALLER_THREAD_NUMBER=2; int WORKER_THREAD_NUMBER=2; int i=0, j=0; struct swarmkv *db[NODE_NUMBER]; char *err=NULL; const char *log_path="./swarmkv-n-threads.log"; char node_list_str[1024]={0}; for(i=0; itype, SWARMKV_REPLY_INTEGER); got_token_cnt+=reply->integer; swarmkv_reply_free(reply); round++; } int *success=ALLOC(int, 1); EXPECT_EQ(got_token_cnt, round); if(got_token_cnt==round) *success=1; else *success=0; return success; } TEST(Performance, TokenBucket) { size_t NODE_NUMBER=2; size_t CALLER_THREAD_NUMBER=2; size_t WORKER_THREAD_NUMBER=1; struct swarmkv *db[NODE_NUMBER]; char *err=NULL; const char *log_path="./swarmkv-tb.log"; char node_list_str[1024]={0}; for(size_t i=0; itype, SWARMKV_REPLY_STATUS); if(reply->type != SWARMKV_REPLY_STATUS) { swarmkv_reply_print(reply, stdout); } swarmkv_reply_free(reply); } srand(171); g_tconsume_running_flag=1; pthread_t threads[NODE_NUMBER][CALLER_THREAD_NUMBER]; for(size_t i=0; iglobals; struct timespec end; clock_gettime(CLOCK_REALTIME, &end); double elapsed_ms=(end.tv_sec-ctx->start.tv_sec)*1000.0+(end.tv_nsec-ctx->start.tv_nsec)/1000000.0; globals->total_exec_time_ms+=elapsed_ms; if(globals->cmd_inprogress) globals->on_fly_reply_cnt++; globals->reply_cnt++; if(globals->reply_cnt==globals->expected_reply_cnt) { swarmkv_caller_loop_break(ctx->db); } //EXPECT_EQ(reply->type, SWARMKV_REPLY_STATUS); if(reply->type == SWARMKV_REPLY_ERROR) { //swarmkv_reply_print(reply, stdout); printf("Timeout %d\n", ctx->seq); globals->timedout_reply_cnt++; } free(ctx); } void *async_caller_thread(void *thread_arg) { struct swarmkv *db=(struct swarmkv *)thread_arg; int key_number=1024*1024, round=key_number*5; struct async_exec_globals globals; memset(&globals, 0, sizeof(globals)); globals.expected_reply_cnt=round; globals.reply_cnt=0; globals.db=db; globals.cmd_inprogress=1; int tmp_cnt=0, empty_loop=0, loops=0; long long last_loop_ts_ms=0, curent_ts_ms=0; struct timespec start, end; struct async_exec_ctx *ctx=NULL; char key[256]={0}; swarmkv_register_thread(db); clock_gettime(CLOCK_REALTIME, &start); long long set_cnt=0, get_cnt=0, del_cnt=0; for(int i=0; iglobals=&globals; ctx->seq=i; ctx->db=db; clock_gettime(CLOCK_REALTIME, &ctx->start); snprintf(key, sizeof(key), "async-key-%d", i%key_number); switch(i/key_number) { case 0: swarmkv_set(db, key, strlen(key), "abc", 3, async_on_reply_cb, ctx); set_cnt++; break; case 1: swarmkv_get(db, key, strlen(key), async_on_reply_cb, ctx); get_cnt++; break; case 2: swarmkv_del(db, key, strlen(key), async_on_reply_cb, ctx); del_cnt++; break; case 3: swarmkv_set(db, key, strlen(key), "abc", 3, async_on_reply_cb, ctx); set_cnt++; break; default: swarmkv_get(db, key, strlen(key), async_on_reply_cb, ctx); get_cnt++; break; } curent_ts_ms=ctx->start.tv_sec*1000+ctx->start.tv_nsec/1000000; if(last_loop_ts_ms!=curent_ts_ms) { tmp_cnt=globals.reply_cnt; swarmkv_caller_loop(db, SWARMKV_LOOP_NONBLOCK, NULL);//SWARMKV_LOOP_ONCE|SWARMKV_LOOP_NONBLOCK loops++; if(tmp_cnt==globals.reply_cnt) { empty_loop++; } last_loop_ts_ms=curent_ts_ms; } long long pending_cmd=swarmkv_caller_get_pending_commands(db); struct timeval to={0, 10}; while(pending_cmd>100) { swarmkv_caller_loop(db, SWARMKV_LOOP_NO_EXIT_ON_EMPTY, &to); break; pending_cmd=swarmkv_caller_get_pending_commands(db); } } globals.cmd_inprogress=0; if(globals.reply_cnt!=globals.expected_reply_cnt) { swarmkv_caller_loop(db, SWARMKV_LOOP_NO_EXIT_ON_EMPTY, NULL); } clock_gettime(CLOCK_REALTIME, &end); double elapsed_ms=(end.tv_sec-start.tv_sec)*1000.0+(end.tv_nsec-start.tv_nsec)/1000000.0; printf("Async Set %lld, Get %lld, Del %lld, elapsed %lf ms, %lf cmds/s\n", set_cnt, get_cnt, del_cnt, elapsed_ms, round*1000.0/elapsed_ms); printf("Avg Exec Latency %lf ms, timed out %d\n", globals.total_exec_time_ms/round, globals.timedout_reply_cnt); //printf("On fly exec %d, Loops %d, Wasted loop %d\n", globals.on_fly_reply_cnt, loops, empty_loop); EXPECT_EQ(globals.timedout_reply_cnt, 0); EXPECT_EQ(globals.reply_cnt, globals.expected_reply_cnt); EXPECT_LE(globals.total_exec_time_ms/round, 10); return NULL; } TEST(Performance, AsyncExec) { int NODE_NUMBER=2; int CALLER_THREAD_NUMBER=2; int WORKER_THREAD_NUMBER=2; struct swarmkv *db[NODE_NUMBER]; char *err=NULL; const char *log_path="./swarmkv-async-exec.log"; char node_list_str[1024]={0}; for(int i=1; itype==SWARMKV_REPLY_STATUS) { success_cnt++; } swarmkv_reply_free(reply); } clock_gettime(CLOCK_REALTIME, &end); double elapsed_ms=(end.tv_sec-start.tv_sec)*1000.0+(end.tv_nsec-start.tv_nsec)/1000000.0; printf("Block SET %d keys, success %d, elapsed %lf ms, %lf cmds/s\n", key_number, success_cnt, elapsed_ms, key_number*1000.0/elapsed_ms); for(int i=0; itype, SWARMKV_REPLY_STATUS); swarmkv_reply_free(reply); } int seq=0; while(g_running_flag) { snprintf(key, sizeof(key), "ephemeral-key-of-%s-%d", uuid_str, seq); seq++; reply=swarmkv_command(db, "SET %s %s", key, value); EXPECT_EQ(reply->type, SWARMKV_REPLY_STATUS); swarmkv_reply_free(reply); reply=swarmkv_command(db, "GET %s", key); EXPECT_EQ(reply->type, SWARMKV_REPLY_STRING); EXPECT_STREQ(reply->str, value); swarmkv_reply_free(reply); reply=swarmkv_command(db, "DEL %s", key); EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER); EXPECT_EQ(reply->integer, 1); swarmkv_reply_free(reply); } int *success=ALLOC(int, 1); *success=1; return success; } TEST(Resilience, AddSlotOwner) { int NODE_NUMBER=2; int CANDINATE_NUMBER=2; int CALLER_THREAD_NUMBER=1; int WORKER_THREAD_NUMBER=1; int i=0, j=0; struct swarmkv *db[NODE_NUMBER+CANDINATE_NUMBER]; char *err=NULL; const char *log_path="./swarmkv-migration-11.log"; char node_list_str[1024]={0}; for(i=0; itype, SWARMKV_REPLY_ARRAY); EXPECT_EQ(reply->n_element, NODE_NUMBER); swarmkv_reply_free(reply); for(size_t j=1; jtype, SWARMKV_REPLY_ARRAY); EXPECT_EQ(reply->n_element, NODE_NUMBER-2); swarmkv_reply_free(reply); } } for(size_t i=1; i