summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorfengweihao <[email protected]>2023-12-26 17:35:52 +0800
committerfengweihao <[email protected]>2023-12-26 17:35:52 +0800
commit89e0a6aa0d83754afb1844507ab60278068d56b1 (patch)
tree94043e503f21f96618c7143cf7991e84c99cb397
parent7332c66226edb71bfa210428571bb7ad49bcb2e6 (diff)
Fixing some bugs in the CLUSTER SANITY heal commandv4.0.4
-rw-r--r--tools/swarmkv_cli.c4
-rw-r--r--tools/swarmkv_simple_node.cpp112
2 files changed, 111 insertions, 5 deletions
diff --git a/tools/swarmkv_cli.c b/tools/swarmkv_cli.c
index 5bf8e38..6907749 100644
--- a/tools/swarmkv_cli.c
+++ b/tools/swarmkv_cli.c
@@ -637,7 +637,7 @@ void exec_cmd_crdt_join(struct swarmkv *db, struct cluster_sanity_ctx *ctx, stru
HASH_ITER(hh, keyspace_list->replica_node_hash, keyspace_addr, tmp)
{
ctx->reply_num++;
- swarmkv_async_command_on(db, exec_cmd_generic_callback, ctx, keyspace_addr->node.addr, "CRDT JOIN %s %s", key, crdt_addr->node.addr);
+ swarmkv_async_command_on(db, exec_cmd_generic_callback, ctx, keyspace_addr->node.addr, "CRDT ADD %s %s", key, crdt_addr->node.addr);
swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL);
}
}
@@ -799,7 +799,7 @@ struct swarmkv_reply *cluster_sanity_command(struct swarmkv *db, char *argv[], s
thread_reply=swarmkv_command_on(db, active_nodes[i].addr, "info threads");
for(int j=0; j<atoll(thread_reply->str); j++)
{
- reply=swarmkv_command_on(db, active_nodes[i].addr, "crdt keys 0 *");
+ reply=swarmkv_command_on(db, active_nodes[i].addr, "crdt keys %d *", j);
build_crdt_replica_list(reply, ctx, active_nodes+i);
swarmkv_reply_free(reply);
reply=NULL;
diff --git a/tools/swarmkv_simple_node.cpp b/tools/swarmkv_simple_node.cpp
index aec702f..6275c9f 100644
--- a/tools/swarmkv_simple_node.cpp
+++ b/tools/swarmkv_simple_node.cpp
@@ -28,6 +28,7 @@ static void help()
" -r <caller thread number>\n"
" -l <consul heal check port>\n"
" -k <key number>\n"
+ " -w <weight>\n"
" --node-number <node number>\n"
" --node-id <node id>\n"
" --pending-commands <number>\n"
@@ -39,7 +40,8 @@ static void help()
"e.g.:./simple_node -n demo -h 127.0.0.1:5210 -t 2 -r 2\n"
"e.g.:./simple_node -n demo -h 127.0.0.1:5210 -t 1 -r 1 -k 10000 --dryrun 1 --exec batch set command\n"
"e.g.:./simple_node -n demo -h 127.0.0.1:5210 -t 2 -r 1 --exec batch xxcfg command\n"
- "e.g.:./simple_node -n demo -h 127.0.0.1:5210 -t 2 --exec as node pod\n");
+ "e.g.:./simple_node -n demo -h 127.0.0.1:5210 -t 2 --exec as node pod\n"
+ "e.g.:./simple_node -n demo -h 127.0.0.1:5210 -t 1 -w 1 --exec mult ftcfg command\n");
exit(1);
}
@@ -51,10 +53,11 @@ struct swarmkv_node_config
char consul_host[64];
unsigned int consul_port;
unsigned int consul_heal_check_port;
- size_t pending_commands;
+ long long pending_commands;
int cluster_node_id;
int cluster_node_number;
long long key_number;
+ int weight;
int caller_thread_number;
int worker_thread_number;
int forever;
@@ -79,6 +82,7 @@ struct async_exec_globals
struct swarmkv *db;
int cmd_inprogress;
int on_fly_reply_cnt;
+ long long consumed;
double total_exec_time_ms;
};
struct async_exec_ctx
@@ -587,6 +591,97 @@ int batch_set_command(struct swarmkv *db, int cluster_node_id)
return BATCH_SET_CMD;
}
+struct consumer_member
+{
+ int member_num;
+ int not_enough_tokens;
+ long long timedout_reply_cnt;
+ long long acquire_consumed;
+ long long consumed;
+};
+
+void mult_ftcfg_on_reply_cb(const struct swarmkv_reply *reply, void * arg)
+{
+ struct consumer_member *consumer = (struct consumer_member *)arg;
+
+ if(reply->type == SWARMKV_REPLY_ERROR)
+ {
+ consumer->timedout_reply_cnt++;
+ }
+ if(reply->type == SWARMKV_REPLY_INTEGER)
+ {
+ if(reply->integer==0)
+ {
+ consumer->not_enough_tokens++;
+ }
+ consumer->acquire_consumed=reply->integer;
+ }
+}
+
+void batch_ftcfg_command(struct swarmkv *db)
+{
+ struct swarmkv_reply *reply=NULL;
+ reply=swarmkv_command_on(db, NULL, "FTCFG ftb-token 15000000 15000000 256");
+ swarmkv_reply_free(reply);
+ return;
+}
+
+void *start_mult_ftcfg_caller_thread(void *thread_arg)
+{
+ struct swarmkv *db=g_node_config.db;
+ int i=0;
+ int key_number=100*1000, round=key_number;
+
+ swarmkv_register_thread(db);
+
+ const char *member[]= {"192.168.58.221", "192.168.58.162", "192.168.58.103"};
+ const char ftb_token[256]="ftb-token";
+ long long weight=1;
+ long long token=12112;
+ wake_up_caller_thread(db);
+
+ struct consumer_member consumer[3]={0};
+
+ for(i=0; i<round; i++)
+ {
+ if(consumer[i%3].acquire_consumed > token)
+ {
+ consumer[i%3].acquire_consumed -= token;
+ consumer[i%3].consumed+=token;
+ continue;
+ }
+ consumer[i%3].member_num=i;
+ swarmkv_ftconsume(db, ftb_token, strlen(ftb_token), member[i%3], strlen(member[i%3]), weight, token*10, mult_ftcfg_on_reply_cb, &consumer[i%3]);
+ swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL);
+ usleep(10);
+ }
+
+ for(i=0; i<3; i++)
+ {
+ printf("%s Consumed Token:%llu, Wasted loop %d\n", member[i], consumer[i].consumed, consumer[i].not_enough_tokens);
+ }
+
+ return NULL;
+}
+
+int mult_ftcfg_command(struct swarmkv *db, int cluster_node_id)
+{
+ int caller_thread_number = g_node_config.caller_thread_number;
+ pthread_t threads[caller_thread_number];
+ for(int i=0; i<caller_thread_number; i++)
+ {
+ pthread_create(&(threads[i]), NULL, start_mult_ftcfg_caller_thread, &g_node_config);
+ }
+
+ int thread_ret=0;
+ for(int i=0; i<caller_thread_number; i++)
+ {
+ pthread_join(threads[i], (void**)&thread_ret);
+ }
+
+ return BATCH_XXXCFG_CMD;
+}
+
int as_node_pod(struct swarmkv *db, int cluster_node_id)
{
if(g_node_config.key_number > 0)
@@ -594,6 +689,12 @@ int as_node_pod(struct swarmkv *db, int cluster_node_id)
swarmkv_register_thread(db);
batch_set_command(db, cluster_node_id);
}
+ if(g_node_config.key_number == 0)
+ {
+ swarmkv_register_thread(db);
+ batch_ftcfg_command(db);
+ }
+
return AS_NODE_POD;
}
@@ -603,6 +704,7 @@ struct batch_cmd_spec batch_cmds[]={
{"as node pod", as_node_pod},
{"batch hash test", bath_hash_test},
{"batch del command", batch_del_command},
+ {"mult ftcfg command", mult_ftcfg_command}
};
int batch_command_argv(struct swarmkv *db, size_t argc, char **argv)
@@ -683,6 +785,10 @@ int main(int argc, char **argv)
{
sscanf(argv[++i], "%lld", &g_node_config.key_number);
}
+ else if(!strcmp(argv[i], "-w") && !lastarg)
+ {
+ sscanf(argv[++i], "%d", &g_node_config.weight);
+ }
else if(!strcmp(argv[i], "--cluster-announce-create") && !lastarg)
{
sscanf(argv[++i], "%[^:]:%u", cluster_announce_ip, &cluster_announce_port);
@@ -701,7 +807,7 @@ int main(int argc, char **argv)
}
else if(!strcmp(argv[i], "--pending-commands") && !lastarg)
{
- sscanf(argv[++i], "%zu", &g_node_config.pending_commands);
+ sscanf(argv[++i], "%lld", &g_node_config.pending_commands);
}
else if(!strcmp(argv[i], "--dryrun") && !lastarg)
{