summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorfengweihao <[email protected]>2023-08-21 17:58:13 +0800
committer郑超 <[email protected]>2023-08-22 07:59:26 +0000
commite32938f7469cf6b0797807873a6faff081382d35 (patch)
tree6cb207505639c7751edefcef5c58aeec996442a8
parent17df52cc3fd0c60460e1dea0fabc9c9897554686 (diff)
fix swarmkv sanity cmd test case
-rw-r--r--test/swarmkv_cli_test.cpp13
-rw-r--r--test/swarmkv_scalability_test.cpp3
-rw-r--r--tools/swarmkv_cli.c42
-rw-r--r--tools/swarmkv_simple_node.cpp3
4 files changed, 46 insertions, 15 deletions
diff --git a/test/swarmkv_cli_test.cpp b/test/swarmkv_cli_test.cpp
index e3f5f8e..b5c8843 100644
--- a/test/swarmkv_cli_test.cpp
+++ b/test/swarmkv_cli_test.cpp
@@ -175,12 +175,14 @@ protected:
swarmkv_options_set_cluster_port(opts, 5210);
swarmkv_options_set_health_check_port(opts, 6210);
swarmkv_options_set_logger(opts, logger);
+ swarmkv_options_set_caller_thread_number(opts, 1);
db=swarmkv_open(opts, cluster_name, &err);
if(err)
{
printf("swarmkv_open failed: %s.\n", err);
free(err);
}
+ swarmkv_register_thread(db);
}
static void TearDownTestCase()
{
@@ -201,6 +203,7 @@ TEST_F(SwarmkvCliNodes, Basic)
struct cmd_exec_arg* reply_arg=NULL;
reply_arg=cmd_exec_arg_new();
+ int thread_id=0;
swarmkv_cli_set_db("swarmkv-cli-nodes");
cmd_exec_arg_expect_OK(reply_arg);
@@ -240,7 +243,7 @@ TEST_F(SwarmkvCliNodes, Basic)
swarmkv_cli_attach("127.0.0.1", 5210);
cmd_exec_arg_expect_cstring(reply_arg, "1) \"zhangsan\"");
- swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_string, "CRDT KEYS %s", "zhangsan");
+ swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_string, "CRDT KEYS %d %s", thread_id, "zhangsan");
cmd_exec_arg_clear(reply_arg);
cmd_exec_arg_expect_integer(reply_arg, 1);
@@ -248,7 +251,7 @@ TEST_F(SwarmkvCliNodes, Basic)
cmd_exec_arg_clear(reply_arg);
cmd_exec_arg_expect_NIL(reply_arg);
- swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_nil, "CRDT KEYS %s", "lisi");
+ swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_nil, "CRDT KEYS %d %s", thread_id, "lisi");
cmd_exec_arg_clear(reply_arg);
swarmkv_cli_detach();
@@ -278,6 +281,8 @@ TEST_F(SwarmkvCliNodes, ReplicaDel)
struct cmd_exec_arg* reply_arg=NULL;
reply_arg=cmd_exec_arg_new();
+ swarmkv_cli_set_db("swarmkv-cli-nodes");
+
cmd_exec_arg_expect_OK(reply_arg);
swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_string, "set %s %s", "xiaolv", "CQ");
cmd_exec_arg_clear(reply_arg);
@@ -546,6 +551,7 @@ protected:
swarmkv_options_set_cluster_port(opts1, 5210);
swarmkv_options_set_health_check_port(opts1, 6210);
swarmkv_options_set_logger(opts1, logger);
+ swarmkv_options_set_caller_thread_number(opts1, 1);
db1=swarmkv_open(opts1, cluster_name, &err);
if(err)
{
@@ -553,11 +559,13 @@ protected:
free(err);
err=NULL;
}
+ swarmkv_register_thread(db1);
struct swarmkv_options* opts2=swarmkv_options_new();
swarmkv_options_set_cluster_port(opts2, 5211);
swarmkv_options_set_health_check_port(opts2, 6211);
swarmkv_options_set_logger(opts2, logger);
+ swarmkv_options_set_caller_thread_number(opts2, 1);
db2=swarmkv_open(opts2, cluster_name, &err);
if(err)
{
@@ -565,6 +573,7 @@ protected:
free(err);
err=NULL;
}
+ swarmkv_register_thread(db2);
}
static void TearDownTestCase()
diff --git a/test/swarmkv_scalability_test.cpp b/test/swarmkv_scalability_test.cpp
index 75deb06..29f6251 100644
--- a/test/swarmkv_scalability_test.cpp
+++ b/test/swarmkv_scalability_test.cpp
@@ -111,6 +111,7 @@ void *scalable_worker_thread(void *thread_arg)
g_signal=cmd_exec_arg_new();
void cmd_exec_arg_set_cb(struct cmd_exec_arg *arg, proc_result_callback_t *cb, void* cb_arg);
+ swarmkv_register_thread(db);
gettimeofday(&global_start, NULL);
// while(cmd_cnt<SCALABLE_MAX_KEY)
while(1)
@@ -121,6 +122,7 @@ void *scalable_worker_thread(void *thread_arg)
swarmkv_scalable_expect_cstring(scalable_ctx, value);
swarmkv_async_command(db, scalable_gather_result_callback, scalable_ctx,
"GET scalable-key-%lld", cmd_cnt%SCALABLE_MAX_KEY);
+ swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL);
cmd_cnt++;
cmd_cnt_of_this_second++;
if(cmd_cnt_of_this_second==cmd_rate)
@@ -173,6 +175,7 @@ TEST(Scalability, MultiThreads)
swarmkv_options_set_health_check_port(opts, 6212);
swarmkv_options_set_logger(opts, logger);
swarmkv_options_set_worker_thread_number(opts, 2);
+ swarmkv_options_set_caller_thread_number(opts, 1);
swarmkv_options_set_cluster_timeout_us(opts, 500*1000);
struct swarmkv *db=swarmkv_open(opts, cluster_name, &err);
if(err)
diff --git a/tools/swarmkv_cli.c b/tools/swarmkv_cli.c
index 9bfbe62..46d85c8 100644
--- a/tools/swarmkv_cli.c
+++ b/tools/swarmkv_cli.c
@@ -638,6 +638,7 @@ void exec_cmd_crdt_join(struct swarmkv *db, struct cluster_sanity_ctx *ctx, stru
{
ctx->reply_num++;
swarmkv_async_command_on(db, exec_cmd_generic_callback, ctx, keyspace_addr->node.addr, "CRDT JOIN %s %s", key, crdt_addr->node.addr);
+ swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL);
}
}
@@ -723,7 +724,7 @@ struct swarmkv_reply *cluster_sanity_command(struct swarmkv *db, char *argv[], s
int timed_wait_rv;
struct swarmkv_reply *keyspace_keys_reply=NULL;
- struct swarmkv_reply *nodes_reply=NULL, *reply=NULL;
+ struct swarmkv_reply *nodes_reply=NULL, *reply=NULL, *thread_reply=NULL;
if(argc < 3)
{
@@ -754,13 +755,18 @@ struct swarmkv_reply *cluster_sanity_command(struct swarmkv *db, char *argv[], s
//Step1 Build the keyspace replica list
for(size_t i=0; i<n_active_node; i++)
{
- keyspace_keys_reply=swarmkv_command_on(db, active_nodes[i].addr, "keyspace keys *");
- swarmkv_reply_merge_array(&reply, keyspace_keys_reply);
- if(keyspace_keys_reply->type==SWARMKV_REPLY_NIL)
- {
- swarmkv_reply_free(keyspace_keys_reply);
- }
- keyspace_keys_reply=NULL;
+ thread_reply=swarmkv_command_on(db, active_nodes[i].addr, "info threads");
+ for(int j=0; j<atoll(thread_reply->str); j++)
+ {
+ keyspace_keys_reply=swarmkv_command_on(db, active_nodes[i].addr, "keyspace keys %d *", j);
+ swarmkv_reply_merge_array(&reply, keyspace_keys_reply);
+ if(keyspace_keys_reply->type==SWARMKV_REPLY_NIL)
+ {
+ swarmkv_reply_free(keyspace_keys_reply);
+ }
+ keyspace_keys_reply=NULL;
+ }
+ swarmkv_reply_free(thread_reply);
}
if(reply)
@@ -779,20 +785,26 @@ struct swarmkv_reply *cluster_sanity_command(struct swarmkv *db, char *argv[], s
keysapce_replica_list->len=reply->elements[i]->len;
HASH_ADD_KEYPTR(hh, ctx->keyspace_replica_hash, keysapce_replica_list->key, keysapce_replica_list->len, keysapce_replica_list);
swarmkv_async_command_on(db, build_keyspace_replica_list, keysapce_replica_list, NULL, "keyspace rlist %s", keysapce_replica_list->key);
+ swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL);
}
}
if(reply)
{
swarmkv_reply_free(reply);
}
-
+
//Step2 Build the crdt replica list
for(size_t i=0; i<n_active_node; i++)
{
- reply=swarmkv_command_on(db, active_nodes[i].addr, "crdt keys *");
- build_crdt_replica_list(reply, ctx, active_nodes+i);
- swarmkv_reply_free(reply);
- reply=NULL;
+ thread_reply=swarmkv_command_on(db, active_nodes[i].addr, "info threads");
+ for(int j=0; j<atoll(thread_reply->str); j++)
+ {
+ reply=swarmkv_command_on(db, active_nodes[i].addr, "crdt keys 0 *");
+ build_crdt_replica_list(reply, ctx, active_nodes+i);
+ swarmkv_reply_free(reply);
+ reply=NULL;
+ }
+ swarmkv_reply_free(thread_reply);
}
swarmkv_reply_free(nodes_reply);
@@ -824,6 +836,7 @@ struct swarmkv_reply *cluster_sanity_command(struct swarmkv *db, char *argv[], s
atomic_inc(&ctx->reply_num);
ctx->heal_replica++;
swarmkv_async_command_on(db, exec_cmd_generic_callback, ctx, NULL, "keyspace rdel %s %s", keyspace_list->key, keyspace_node_addr->node.addr);
+ swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL);
}
}
@@ -843,6 +856,7 @@ struct swarmkv_reply *cluster_sanity_command(struct swarmkv *db, char *argv[], s
atomic_inc(&ctx->reply_num);
ctx->heal_replica++;
swarmkv_async_command_on(db, exec_cmd_generic_callback, ctx, NULL, "keyspace rdel %s %s", keyspace_list->key, keyspace_node_addr->node.addr);
+ swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL);
}
}
//Step3.2 foreach crdt list, if crdt node not in keyspace list, keyspace radd node
@@ -859,6 +873,7 @@ struct swarmkv_reply *cluster_sanity_command(struct swarmkv *db, char *argv[], s
atomic_inc(&ctx->reply_num);
ctx->heal_replica++;
swarmkv_async_command_on(db, exec_cmd_generic_callback, ctx, NULL, "keyspace radd %s %s", crdt_list->key, crdt_node_addr->node.addr);
+ swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL);
exec_cmd_crdt_join(db,ctx, keyspace_list, crdt_node_addr, crdt_list->key);
}
}
@@ -881,6 +896,7 @@ struct swarmkv_reply *cluster_sanity_command(struct swarmkv *db, char *argv[], s
atomic_inc(&ctx->reply_num);
ctx->heal_replica++;
swarmkv_async_command_on(db, exec_cmd_generic_callback, ctx, NULL, "keyspace radd %s %s", crdt_list->key, crdt_node_addr->node.addr);
+ swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL);
}
}
}
diff --git a/tools/swarmkv_simple_node.cpp b/tools/swarmkv_simple_node.cpp
index 095f6a4..541899c 100644
--- a/tools/swarmkv_simple_node.cpp
+++ b/tools/swarmkv_simple_node.cpp
@@ -121,6 +121,7 @@ int main(int argc, char ** argv)
swarmkv_options_set_health_check_announce_port(opts, healthcheck_announce_port);
swarmkv_options_set_worker_thread_number(opts, worker_thread_number);
+ swarmkv_options_set_caller_thread_number(opts, 1);
swarmkv_options_set_cluster_timeout_us(opts, 500000);
swarmkv_options_set_sync_interval_us(opts, 500);
//swarmkv_options_set_run_for_leader_enabled(opts);
@@ -132,6 +133,8 @@ int main(int argc, char ** argv)
err=NULL;
return -1;
}
+ swarmkv_register_thread(db);
+
struct timeval start, end;
long long eplapsed_second=0;
gettimeofday(&start, NULL);