diff options
| author | fengweihao <[email protected]> | 2023-08-21 17:58:13 +0800 |
|---|---|---|
| committer | 郑超 <[email protected]> | 2023-08-22 07:59:26 +0000 |
| commit | e32938f7469cf6b0797807873a6faff081382d35 (patch) | |
| tree | 6cb207505639c7751edefcef5c58aeec996442a8 | |
| parent | 17df52cc3fd0c60460e1dea0fabc9c9897554686 (diff) | |
fix swarmkv sanity cmd test case
| -rw-r--r-- | test/swarmkv_cli_test.cpp | 13 | ||||
| -rw-r--r-- | test/swarmkv_scalability_test.cpp | 3 | ||||
| -rw-r--r-- | tools/swarmkv_cli.c | 42 | ||||
| -rw-r--r-- | tools/swarmkv_simple_node.cpp | 3 |
4 files changed, 46 insertions, 15 deletions
diff --git a/test/swarmkv_cli_test.cpp b/test/swarmkv_cli_test.cpp index e3f5f8e..b5c8843 100644 --- a/test/swarmkv_cli_test.cpp +++ b/test/swarmkv_cli_test.cpp @@ -175,12 +175,14 @@ protected: swarmkv_options_set_cluster_port(opts, 5210); swarmkv_options_set_health_check_port(opts, 6210); swarmkv_options_set_logger(opts, logger); + swarmkv_options_set_caller_thread_number(opts, 1); db=swarmkv_open(opts, cluster_name, &err); if(err) { printf("swarmkv_open failed: %s.\n", err); free(err); } + swarmkv_register_thread(db); } static void TearDownTestCase() { @@ -201,6 +203,7 @@ TEST_F(SwarmkvCliNodes, Basic) struct cmd_exec_arg* reply_arg=NULL; reply_arg=cmd_exec_arg_new(); + int thread_id=0; swarmkv_cli_set_db("swarmkv-cli-nodes"); cmd_exec_arg_expect_OK(reply_arg); @@ -240,7 +243,7 @@ TEST_F(SwarmkvCliNodes, Basic) swarmkv_cli_attach("127.0.0.1", 5210); cmd_exec_arg_expect_cstring(reply_arg, "1) \"zhangsan\""); - swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_string, "CRDT KEYS %s", "zhangsan"); + swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_string, "CRDT KEYS %d %s", thread_id, "zhangsan"); cmd_exec_arg_clear(reply_arg); cmd_exec_arg_expect_integer(reply_arg, 1); @@ -248,7 +251,7 @@ TEST_F(SwarmkvCliNodes, Basic) cmd_exec_arg_clear(reply_arg); cmd_exec_arg_expect_NIL(reply_arg); - swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_nil, "CRDT KEYS %s", "lisi"); + swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_nil, "CRDT KEYS %d %s", thread_id, "lisi"); cmd_exec_arg_clear(reply_arg); swarmkv_cli_detach(); @@ -278,6 +281,8 @@ TEST_F(SwarmkvCliNodes, ReplicaDel) struct cmd_exec_arg* reply_arg=NULL; reply_arg=cmd_exec_arg_new(); + swarmkv_cli_set_db("swarmkv-cli-nodes"); + cmd_exec_arg_expect_OK(reply_arg); swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_string, "set %s %s", "xiaolv", "CQ"); cmd_exec_arg_clear(reply_arg); @@ -546,6 +551,7 @@ protected: swarmkv_options_set_cluster_port(opts1, 5210); swarmkv_options_set_health_check_port(opts1, 6210); swarmkv_options_set_logger(opts1, logger); + swarmkv_options_set_caller_thread_number(opts1, 1); db1=swarmkv_open(opts1, cluster_name, &err); if(err) { @@ -553,11 +559,13 @@ protected: free(err); err=NULL; } + swarmkv_register_thread(db1); struct swarmkv_options* opts2=swarmkv_options_new(); swarmkv_options_set_cluster_port(opts2, 5211); swarmkv_options_set_health_check_port(opts2, 6211); swarmkv_options_set_logger(opts2, logger); + swarmkv_options_set_caller_thread_number(opts2, 1); db2=swarmkv_open(opts2, cluster_name, &err); if(err) { @@ -565,6 +573,7 @@ protected: free(err); err=NULL; } + swarmkv_register_thread(db2); } static void TearDownTestCase() diff --git a/test/swarmkv_scalability_test.cpp b/test/swarmkv_scalability_test.cpp index 75deb06..29f6251 100644 --- a/test/swarmkv_scalability_test.cpp +++ b/test/swarmkv_scalability_test.cpp @@ -111,6 +111,7 @@ void *scalable_worker_thread(void *thread_arg) g_signal=cmd_exec_arg_new(); void cmd_exec_arg_set_cb(struct cmd_exec_arg *arg, proc_result_callback_t *cb, void* cb_arg); + swarmkv_register_thread(db); gettimeofday(&global_start, NULL); // while(cmd_cnt<SCALABLE_MAX_KEY) while(1) @@ -121,6 +122,7 @@ void *scalable_worker_thread(void *thread_arg) swarmkv_scalable_expect_cstring(scalable_ctx, value); swarmkv_async_command(db, scalable_gather_result_callback, scalable_ctx, "GET scalable-key-%lld", cmd_cnt%SCALABLE_MAX_KEY); + swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); cmd_cnt++; cmd_cnt_of_this_second++; if(cmd_cnt_of_this_second==cmd_rate) @@ -173,6 +175,7 @@ TEST(Scalability, MultiThreads) swarmkv_options_set_health_check_port(opts, 6212); swarmkv_options_set_logger(opts, logger); swarmkv_options_set_worker_thread_number(opts, 2); + swarmkv_options_set_caller_thread_number(opts, 1); swarmkv_options_set_cluster_timeout_us(opts, 500*1000); struct swarmkv *db=swarmkv_open(opts, cluster_name, &err); if(err) diff --git a/tools/swarmkv_cli.c b/tools/swarmkv_cli.c index 9bfbe62..46d85c8 100644 --- a/tools/swarmkv_cli.c +++ b/tools/swarmkv_cli.c @@ -638,6 +638,7 @@ void exec_cmd_crdt_join(struct swarmkv *db, struct cluster_sanity_ctx *ctx, stru { ctx->reply_num++; swarmkv_async_command_on(db, exec_cmd_generic_callback, ctx, keyspace_addr->node.addr, "CRDT JOIN %s %s", key, crdt_addr->node.addr); + swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); } } @@ -723,7 +724,7 @@ struct swarmkv_reply *cluster_sanity_command(struct swarmkv *db, char *argv[], s int timed_wait_rv; struct swarmkv_reply *keyspace_keys_reply=NULL; - struct swarmkv_reply *nodes_reply=NULL, *reply=NULL; + struct swarmkv_reply *nodes_reply=NULL, *reply=NULL, *thread_reply=NULL; if(argc < 3) { @@ -754,13 +755,18 @@ struct swarmkv_reply *cluster_sanity_command(struct swarmkv *db, char *argv[], s //Step1 Build the keyspace replica list for(size_t i=0; i<n_active_node; i++) { - keyspace_keys_reply=swarmkv_command_on(db, active_nodes[i].addr, "keyspace keys *"); - swarmkv_reply_merge_array(&reply, keyspace_keys_reply); - if(keyspace_keys_reply->type==SWARMKV_REPLY_NIL) - { - swarmkv_reply_free(keyspace_keys_reply); - } - keyspace_keys_reply=NULL; + thread_reply=swarmkv_command_on(db, active_nodes[i].addr, "info threads"); + for(int j=0; j<atoll(thread_reply->str); j++) + { + keyspace_keys_reply=swarmkv_command_on(db, active_nodes[i].addr, "keyspace keys %d *", j); + swarmkv_reply_merge_array(&reply, keyspace_keys_reply); + if(keyspace_keys_reply->type==SWARMKV_REPLY_NIL) + { + swarmkv_reply_free(keyspace_keys_reply); + } + keyspace_keys_reply=NULL; + } + swarmkv_reply_free(thread_reply); } if(reply) @@ -779,20 +785,26 @@ struct swarmkv_reply *cluster_sanity_command(struct swarmkv *db, char *argv[], s keysapce_replica_list->len=reply->elements[i]->len; HASH_ADD_KEYPTR(hh, ctx->keyspace_replica_hash, keysapce_replica_list->key, keysapce_replica_list->len, keysapce_replica_list); swarmkv_async_command_on(db, build_keyspace_replica_list, keysapce_replica_list, NULL, "keyspace rlist %s", keysapce_replica_list->key); + swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); } } if(reply) { swarmkv_reply_free(reply); } - + //Step2 Build the crdt replica list for(size_t i=0; i<n_active_node; i++) { - reply=swarmkv_command_on(db, active_nodes[i].addr, "crdt keys *"); - build_crdt_replica_list(reply, ctx, active_nodes+i); - swarmkv_reply_free(reply); - reply=NULL; + thread_reply=swarmkv_command_on(db, active_nodes[i].addr, "info threads"); + for(int j=0; j<atoll(thread_reply->str); j++) + { + reply=swarmkv_command_on(db, active_nodes[i].addr, "crdt keys 0 *"); + build_crdt_replica_list(reply, ctx, active_nodes+i); + swarmkv_reply_free(reply); + reply=NULL; + } + swarmkv_reply_free(thread_reply); } swarmkv_reply_free(nodes_reply); @@ -824,6 +836,7 @@ struct swarmkv_reply *cluster_sanity_command(struct swarmkv *db, char *argv[], s atomic_inc(&ctx->reply_num); ctx->heal_replica++; swarmkv_async_command_on(db, exec_cmd_generic_callback, ctx, NULL, "keyspace rdel %s %s", keyspace_list->key, keyspace_node_addr->node.addr); + swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); } } @@ -843,6 +856,7 @@ struct swarmkv_reply *cluster_sanity_command(struct swarmkv *db, char *argv[], s atomic_inc(&ctx->reply_num); ctx->heal_replica++; swarmkv_async_command_on(db, exec_cmd_generic_callback, ctx, NULL, "keyspace rdel %s %s", keyspace_list->key, keyspace_node_addr->node.addr); + swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); } } //Step3.2 foreach crdt list, if crdt node not in keyspace list, keyspace radd node @@ -859,6 +873,7 @@ struct swarmkv_reply *cluster_sanity_command(struct swarmkv *db, char *argv[], s atomic_inc(&ctx->reply_num); ctx->heal_replica++; swarmkv_async_command_on(db, exec_cmd_generic_callback, ctx, NULL, "keyspace radd %s %s", crdt_list->key, crdt_node_addr->node.addr); + swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); exec_cmd_crdt_join(db,ctx, keyspace_list, crdt_node_addr, crdt_list->key); } } @@ -881,6 +896,7 @@ struct swarmkv_reply *cluster_sanity_command(struct swarmkv *db, char *argv[], s atomic_inc(&ctx->reply_num); ctx->heal_replica++; swarmkv_async_command_on(db, exec_cmd_generic_callback, ctx, NULL, "keyspace radd %s %s", crdt_list->key, crdt_node_addr->node.addr); + swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); } } } diff --git a/tools/swarmkv_simple_node.cpp b/tools/swarmkv_simple_node.cpp index 095f6a4..541899c 100644 --- a/tools/swarmkv_simple_node.cpp +++ b/tools/swarmkv_simple_node.cpp @@ -121,6 +121,7 @@ int main(int argc, char ** argv) swarmkv_options_set_health_check_announce_port(opts, healthcheck_announce_port); swarmkv_options_set_worker_thread_number(opts, worker_thread_number); + swarmkv_options_set_caller_thread_number(opts, 1); swarmkv_options_set_cluster_timeout_us(opts, 500000); swarmkv_options_set_sync_interval_us(opts, 500); //swarmkv_options_set_run_for_leader_enabled(opts); @@ -132,6 +133,8 @@ int main(int argc, char ** argv) err=NULL; return -1; } + swarmkv_register_thread(db); + struct timeval start, end; long long eplapsed_second=0; gettimeofday(&start, NULL); |
