diff options
| author | fengweihao <[email protected]> | 2023-12-26 17:35:52 +0800 |
|---|---|---|
| committer | fengweihao <[email protected]> | 2023-12-26 17:35:52 +0800 |
| commit | 89e0a6aa0d83754afb1844507ab60278068d56b1 (patch) | |
| tree | 94043e503f21f96618c7143cf7991e84c99cb397 | |
| parent | 7332c66226edb71bfa210428571bb7ad49bcb2e6 (diff) | |
Fixing some bugs in the CLUSTER SANITY heal commandv4.0.4
| -rw-r--r-- | tools/swarmkv_cli.c | 4 | ||||
| -rw-r--r-- | tools/swarmkv_simple_node.cpp | 112 |
2 files changed, 111 insertions, 5 deletions
diff --git a/tools/swarmkv_cli.c b/tools/swarmkv_cli.c index 5bf8e38..6907749 100644 --- a/tools/swarmkv_cli.c +++ b/tools/swarmkv_cli.c @@ -637,7 +637,7 @@ void exec_cmd_crdt_join(struct swarmkv *db, struct cluster_sanity_ctx *ctx, stru HASH_ITER(hh, keyspace_list->replica_node_hash, keyspace_addr, tmp) { ctx->reply_num++; - swarmkv_async_command_on(db, exec_cmd_generic_callback, ctx, keyspace_addr->node.addr, "CRDT JOIN %s %s", key, crdt_addr->node.addr); + swarmkv_async_command_on(db, exec_cmd_generic_callback, ctx, keyspace_addr->node.addr, "CRDT ADD %s %s", key, crdt_addr->node.addr); swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); } } @@ -799,7 +799,7 @@ struct swarmkv_reply *cluster_sanity_command(struct swarmkv *db, char *argv[], s thread_reply=swarmkv_command_on(db, active_nodes[i].addr, "info threads"); for(int j=0; j<atoll(thread_reply->str); j++) { - reply=swarmkv_command_on(db, active_nodes[i].addr, "crdt keys 0 *"); + reply=swarmkv_command_on(db, active_nodes[i].addr, "crdt keys %d *", j); build_crdt_replica_list(reply, ctx, active_nodes+i); swarmkv_reply_free(reply); reply=NULL; diff --git a/tools/swarmkv_simple_node.cpp b/tools/swarmkv_simple_node.cpp index aec702f..6275c9f 100644 --- a/tools/swarmkv_simple_node.cpp +++ b/tools/swarmkv_simple_node.cpp @@ -28,6 +28,7 @@ static void help() " -r <caller thread number>\n" " -l <consul heal check port>\n" " -k <key number>\n" + " -w <weight>\n" " --node-number <node number>\n" " --node-id <node id>\n" " --pending-commands <number>\n" @@ -39,7 +40,8 @@ static void help() "e.g.:./simple_node -n demo -h 127.0.0.1:5210 -t 2 -r 2\n" "e.g.:./simple_node -n demo -h 127.0.0.1:5210 -t 1 -r 1 -k 10000 --dryrun 1 --exec batch set command\n" "e.g.:./simple_node -n demo -h 127.0.0.1:5210 -t 2 -r 1 --exec batch xxcfg command\n" - "e.g.:./simple_node -n demo -h 127.0.0.1:5210 -t 2 --exec as node pod\n"); + "e.g.:./simple_node -n demo -h 127.0.0.1:5210 -t 2 --exec as node pod\n" + "e.g.:./simple_node -n demo -h 127.0.0.1:5210 -t 1 -w 1 --exec mult ftcfg command\n"); exit(1); } @@ -51,10 +53,11 @@ struct swarmkv_node_config char consul_host[64]; unsigned int consul_port; unsigned int consul_heal_check_port; - size_t pending_commands; + long long pending_commands; int cluster_node_id; int cluster_node_number; long long key_number; + int weight; int caller_thread_number; int worker_thread_number; int forever; @@ -79,6 +82,7 @@ struct async_exec_globals struct swarmkv *db; int cmd_inprogress; int on_fly_reply_cnt; + long long consumed; double total_exec_time_ms; }; struct async_exec_ctx @@ -587,6 +591,97 @@ int batch_set_command(struct swarmkv *db, int cluster_node_id) return BATCH_SET_CMD; } +struct consumer_member +{ + int member_num; + int not_enough_tokens; + long long timedout_reply_cnt; + long long acquire_consumed; + long long consumed; +}; + +void mult_ftcfg_on_reply_cb(const struct swarmkv_reply *reply, void * arg) +{ + struct consumer_member *consumer = (struct consumer_member *)arg; + + if(reply->type == SWARMKV_REPLY_ERROR) + { + consumer->timedout_reply_cnt++; + } + if(reply->type == SWARMKV_REPLY_INTEGER) + { + if(reply->integer==0) + { + consumer->not_enough_tokens++; + } + consumer->acquire_consumed=reply->integer; + } +} + +void batch_ftcfg_command(struct swarmkv *db) +{ + struct swarmkv_reply *reply=NULL; + reply=swarmkv_command_on(db, NULL, "FTCFG ftb-token 15000000 15000000 256"); + swarmkv_reply_free(reply); + return; +} + +void *start_mult_ftcfg_caller_thread(void *thread_arg) +{ + struct swarmkv *db=g_node_config.db; + int i=0; + int key_number=100*1000, round=key_number; + + swarmkv_register_thread(db); + + const char *member[]= {"192.168.58.221", "192.168.58.162", "192.168.58.103"}; + const char ftb_token[256]="ftb-token"; + long long weight=1; + long long token=12112; + wake_up_caller_thread(db); + + struct consumer_member consumer[3]={0}; + + for(i=0; i<round; i++) + { + if(consumer[i%3].acquire_consumed > token) + { + consumer[i%3].acquire_consumed -= token; + consumer[i%3].consumed+=token; + continue; + } + consumer[i%3].member_num=i; + swarmkv_ftconsume(db, ftb_token, strlen(ftb_token), member[i%3], strlen(member[i%3]), weight, token*10, mult_ftcfg_on_reply_cb, &consumer[i%3]); + swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); + usleep(10); + } + + for(i=0; i<3; i++) + { + printf("%s Consumed Token:%llu, Wasted loop %d\n", member[i], consumer[i].consumed, consumer[i].not_enough_tokens); + } + + return NULL; +} + +int mult_ftcfg_command(struct swarmkv *db, int cluster_node_id) +{ + int caller_thread_number = g_node_config.caller_thread_number; + pthread_t threads[caller_thread_number]; + for(int i=0; i<caller_thread_number; i++) + { + pthread_create(&(threads[i]), NULL, start_mult_ftcfg_caller_thread, &g_node_config); + } + + int thread_ret=0; + for(int i=0; i<caller_thread_number; i++) + { + pthread_join(threads[i], (void**)&thread_ret); + } + + return BATCH_XXXCFG_CMD; +} + int as_node_pod(struct swarmkv *db, int cluster_node_id) { if(g_node_config.key_number > 0) @@ -594,6 +689,12 @@ int as_node_pod(struct swarmkv *db, int cluster_node_id) swarmkv_register_thread(db); batch_set_command(db, cluster_node_id); } + if(g_node_config.key_number == 0) + { + swarmkv_register_thread(db); + batch_ftcfg_command(db); + } + return AS_NODE_POD; } @@ -603,6 +704,7 @@ struct batch_cmd_spec batch_cmds[]={ {"as node pod", as_node_pod}, {"batch hash test", bath_hash_test}, {"batch del command", batch_del_command}, + {"mult ftcfg command", mult_ftcfg_command} }; int batch_command_argv(struct swarmkv *db, size_t argc, char **argv) @@ -683,6 +785,10 @@ int main(int argc, char **argv) { sscanf(argv[++i], "%lld", &g_node_config.key_number); } + else if(!strcmp(argv[i], "-w") && !lastarg) + { + sscanf(argv[++i], "%d", &g_node_config.weight); + } else if(!strcmp(argv[i], "--cluster-announce-create") && !lastarg) { sscanf(argv[++i], "%[^:]:%u", cluster_announce_ip, &cluster_announce_port); @@ -701,7 +807,7 @@ int main(int argc, char **argv) } else if(!strcmp(argv[i], "--pending-commands") && !lastarg) { - sscanf(argv[++i], "%zu", &g_node_config.pending_commands); + sscanf(argv[++i], "%lld", &g_node_config.pending_commands); } else if(!strcmp(argv[i], "--dryrun") && !lastarg) { |
