diff options
| author | Zheng Chao <[email protected]> | 2023-07-29 03:25:53 +0800 |
|---|---|---|
| committer | Zheng Chao <[email protected]> | 2023-07-29 03:25:53 +0800 |
| commit | 523621088abc7f3a701646a7360ff015f9e5c025 (patch) | |
| tree | c051d5367f3d1c71a99e1ddefa3b1645f4e8a4d4 | |
| parent | 6b50898b3bebc730979e74982fcf4c6645fb7f21 (diff) | |
WIP: fix swarmkv_net_send() bugoptimize-multithread-lockfree-ringbuf
| -rw-r--r-- | docs/commands.md | 4 | ||||
| -rw-r--r-- | include/swarmkv/swarmkv.h | 8 | ||||
| -rw-r--r-- | src/swarmkv.c | 15 | ||||
| -rw-r--r-- | src/swarmkv_api.c | 10 | ||||
| -rw-r--r-- | src/swarmkv_net.c | 17 | ||||
| -rw-r--r-- | src/swarmkv_store.c | 2 | ||||
| -rw-r--r-- | src/swarmkv_sync.c | 2 | ||||
| -rw-r--r-- | test/swarmkv_gtest.cpp | 512 |
8 files changed, 235 insertions, 335 deletions
diff --git a/docs/commands.md b/docs/commands.md index f1d9df7..dd57658 100644 --- a/docs/commands.md +++ b/docs/commands.md @@ -766,8 +766,8 @@ The pattern is exactly same as Redis https://redis.io/commands/keys/ . | KEYSPACE KEYS | IP port pattern | | | KEYSPACE COUNTKEYSINSLOT | slot | | | CRDT DEL | key | Integer reply: The number of cached keys that were removed. | -| CRDT PUSH | key blob | Simple string reply: OK if PUSH was executed correctly. | -| CRDT PULL | key | Blog reply: a blob of state-based CRDT | +| CRDT MERGE | key blob | Simple string reply: OK if MERGE was executed correctly. | +| CRDT GET | key | Blog reply: a blob of state-based CRDT | | CLUSTER KEYS | pattern | Array reply: list of keys matching pattern. | | CLUSTER NODES | | | | CLUSTER SLOTS | | | diff --git a/include/swarmkv/swarmkv.h b/include/swarmkv/swarmkv.h index 285cef6..3623474 100644 --- a/include/swarmkv/swarmkv.h +++ b/include/swarmkv/swarmkv.h @@ -61,7 +61,7 @@ int swarmkv_options_set_logger(struct swarmkv_options *opts, void *logger); int swarmkv_options_set_log_level(struct swarmkv_options *opts, int loglevel); int swarmkv_options_set_log_path(struct swarmkv_options *opts, const char *logpath); int swarmkv_options_set_consul_port(struct swarmkv_options *opts, unsigned int consul_port); -int swarmkv_options_set_consul_host(struct swarmkv_options *opts, const char* ip_addr); +int swarmkv_options_set_consul_host(struct swarmkv_options *opts, const char *ip_addr); int swarmkv_options_set_dryrun(struct swarmkv_options *opts); int swarmkv_options_set_disable_run_for_leader(struct swarmkv_options *opts); int swarmkv_options_set_caller_thread_number(struct swarmkv_options *opts, int nr_caller_threads); @@ -83,8 +83,8 @@ void swarmkv_caller_loop(struct swarmkv *db, struct timeval *tv); void swarmkv_caller_loop_break(struct swarmkv *db); //Blocking function -struct swarmkv_reply *swarmkv_command(struct swarmkv *db,const char *format, ...); -struct swarmkv_reply *swarmkv_command_on(struct swarmkv *db, const char *target, const char *format, ...); +struct swarmkv_reply *swarmkv_command(struct swarmkv *db, const char *format, ...)__attribute__ ((format (printf, 2, 3))); +struct swarmkv_reply *swarmkv_command_on(struct swarmkv *db, const char *target, const char *format, ...)__attribute__ ((format (printf, 3, 4))); //Non-blocking function typedef void swarmkv_on_reply_callback_t(const struct swarmkv_reply *reply, void * arg); @@ -119,7 +119,7 @@ size_t swarmkv_get_possible_command_name(struct swarmkv *db, const char *prefix, char *swarmkv_get_command_hint(struct swarmkv *db, const char* cmd_name); - +const char *swarmkv_self_address(const struct swarmkv *db); #ifdef __cplusplus } /* end extern "C" */ #endif diff --git a/src/swarmkv.c b/src/swarmkv.c index 719d8df..947f542 100644 --- a/src/swarmkv.c +++ b/src/swarmkv.c @@ -172,9 +172,10 @@ static void remoter_caller_ctx_send_reply(struct remote_caller_ctx *ctx, const s struct swarmkv *db = ctx->db; struct swarmkv_msg *msg=swarmkv_msg_new_by_reply(reply, &ctx->remote, ctx->remote_tid, ctx->sequence); int cur_tid=__gettid(db); - assert(cur_tid != msg->caller_tid); + if(0==node_compare(&db->self, &msg->caller)) { + assert(cur_tid != msg->caller_tid); swarmkv_mesh_send(db->mesh, cur_tid, msg->caller_tid, msg); } else @@ -760,10 +761,9 @@ static void peer_exec_on_success(void *result, void *user) cmd_ctx_free(ctx); } } -struct swarmkv_cmd *make_crdt_add_cmd(enum cmd_key_flag flag, const sds key, node_t replica[], size_t n_replica) +struct swarmkv_cmd *make_crdt_add_cmd(const sds key, node_t replica[], size_t n_replica) { struct swarmkv_cmd *crdt_add_cmd=NULL; - assert(flag==CMD_KEY_OW); crdt_add_cmd=swarmkv_cmd_new(3+n_replica); crdt_add_cmd->argv[0]=sdsnew("crdt"); crdt_add_cmd->argv[1]=sdsnew("add"); @@ -818,7 +818,7 @@ static void key_route_on_success(void *result, void *user) if(self_is_a_replica) { struct cmd_ctx *crdt_add_ctx=NULL; - struct swarmkv_cmd *crdt_add_cmd=make_crdt_add_cmd(spec->flag, key, replica_nodes, n_replica_node); + struct swarmkv_cmd *crdt_add_cmd=make_crdt_add_cmd(key, replica_nodes, n_replica_node); crdt_add_ctx=cmd_ctx_new(ctx->db, ctx->cmd, n_replica_node>0?NULL:ctx->future_of_caller); crdt_add_ctx->future_of_mine=future_create("crdt_add", crdt_add_on_success, generic_on_fail, crdt_add_ctx); __exec_cmd(ctx->db, NULL, &ctx->db->self, crdt_add_cmd, crdt_add_ctx->future_of_mine); @@ -920,7 +920,6 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t * return; } - enum cmd_exec_result exec_ret=FINISHED; struct timespec start, end; clock_gettime(CLOCK_MONOTONIC_COARSE, &start); @@ -934,6 +933,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t * struct promise *p=future_to_promise(future_of_caller); promise_success(p, reply); swarmkv_reply_free(reply); + return; } switch(exec_ret) { @@ -1491,6 +1491,9 @@ void swarmkv_close(struct swarmkv * db) free(db); return; } - +const char *swarmkv_self_address(const struct swarmkv *db) +{ + return db->self.addr; +} diff --git a/src/swarmkv_api.c b/src/swarmkv_api.c index cc2a21d..af8d6b7 100644 --- a/src/swarmkv_api.c +++ b/src/swarmkv_api.c @@ -70,9 +70,9 @@ int swarmkv_options_set_health_check_announce_port(struct swarmkv_options *opts, opts->health_check_announce_port=health_check_announce_port; return 0; } -int swarmkv_options_set_cluster_timeout_us(struct swarmkv_options *opts, unsigned int timeout_ms) +int swarmkv_options_set_cluster_timeout_us(struct swarmkv_options *opts, unsigned int timeout_us) { - opts->cluster_timeout_us=timeout_ms; + opts->cluster_timeout_us=timeout_us; return 0; } int swarmkv_options_set_sync_interval_us(struct swarmkv_options *opts, unsigned int interval_us) @@ -335,6 +335,7 @@ struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *ta node_t target_node; memset(&target_node, 0, sizeof(node_t)); ctx.db=db; + ctx.reply=NULL; cmd=swarmkv_cmd_new(argc); for(int i=0; i<argc; i++) { @@ -350,7 +351,10 @@ struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *ta node_init_from_sds(&target_node, target); exec_for_local(db, cmd, &target_node, blocking_query_cb, &ctx); } - swarmkv_caller_loop(db, NULL); + if(ctx.reply==NULL) + { + swarmkv_caller_loop(db, NULL); + } assert(ctx.reply!=NULL); reply=ctx.reply; ctx.reply=NULL; diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c index ad70dc9..74cf7db 100644 --- a/src/swarmkv_net.c +++ b/src/swarmkv_net.c @@ -110,7 +110,8 @@ struct snet_conn *snet_conn_new_by_connecting(struct snet_thread *thr, const nod struct snet_conn* conn=ALLOC(struct snet_conn, 1); conn->buff_for_sending=evbuffer_new(); - + conn->fd=-1; + //http://www.wangafu.net/~nickm/libevent-book/Ref6_bufferevent.html conn->bev=bufferevent_socket_new(base, -1, BEV_OPT_DEFER_CALLBACKS|BEV_OPT_THREADSAFE);//BEV_OPT_UNLOCK_CALLBACKS| bufferevent_setcb(conn->bev, NULL, NULL, connect_peer_eventcb, conn); @@ -373,6 +374,7 @@ void connect_peer_eventcb(struct bufferevent *bev, short events, void *arg) int yes=1; evutil_socket_t fd=bufferevent_getfd(conn->bev); setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char*)&yes, sizeof(yes)); + conn->fd=fd; evutil_make_socket_closeonexec(fd); bufferevent_write_buffer(conn->bev, conn->buff_for_sending); bufferevent_setcb(conn->bev, peer_conn_read_cb, peer_conn_write_cb, peer_conn_event_cb, conn); @@ -629,8 +631,17 @@ void swarmkv_net_send(struct swarmkv_net *net, const node_t *dest, struct swarmk char *data=NULL; size_t size=0; swarmkv_msg_serialize(msg, &data, &size); - swarmkv_msg_free(msg); - evbuffer_add(conn->buff_for_sending, data, size); + swarmkv_msg_free(msg); + struct evbuffer* output_buffer=NULL; + output_buffer=bufferevent_get_output(conn->bev); + if(output_buffer) + { + evbuffer_add(output_buffer, data, size); + } + else + { + evbuffer_add(conn->buff_for_sending, data, size); + } free(data); thr->stat.output_bytes += size; thr->stat.output_msgs++; diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c index 0a2fec1..00ae03c 100644 --- a/src/swarmkv_store.c +++ b/src/swarmkv_store.c @@ -537,7 +537,7 @@ static void crdt_generic_on_fail(enum e_future_error err, const char * what, voi void crdt_generic_call(struct swarmkv_store *store, enum CRDT_OP op, const struct swarmkv_cmd *cmd, const node_t *peer) { - const char *symbol[]={"crdt_pull", "crdt_push", "crdt_join"}; + const char *symbol[]={"crdt_get", "crdt_merge", "crdt_join"}; struct crdt_generic_ctx *ctx=NULL; ctx=ALLOC(struct crdt_generic_ctx, 1); ctx->op=op; diff --git a/src/swarmkv_sync.c b/src/swarmkv_sync.c index 3170f1e..17dffe9 100644 --- a/src/swarmkv_sync.c +++ b/src/swarmkv_sync.c @@ -93,7 +93,7 @@ int sync_master_get_cmd(struct sync_master *master, node_t *peer, struct swarmkv n_data=utarray_len(task->sync_data_list); c=swarmkv_cmd_new(2+n_data*2); c->argv[0]=sdsnew("crdt"); - c->argv[1]=sdsnew("push"); + c->argv[1]=sdsnew("merge"); for(size_t i=0; i<n_data; i++) { p=utarray_eltptr(task->sync_data_list, i); diff --git a/test/swarmkv_gtest.cpp b/test/swarmkv_gtest.cpp index e738ad2..8dfb4f2 100644 --- a/test/swarmkv_gtest.cpp +++ b/test/swarmkv_gtest.cpp @@ -558,6 +558,9 @@ protected: swarmkv_options_set_cluster_port(opts1, 5210); swarmkv_options_set_health_check_port(opts1, 6210); swarmkv_options_set_logger(opts1, logger); + swarmkv_options_set_cluster_timeout_us(opts1, 600*1000*1000); + swarmkv_options_set_worker_thread_number(opts1, 1); + swarmkv_options_set_caller_thread_number(opts1, 1); db1=swarmkv_open(opts1, cluster_name, &err); if(err) { @@ -570,6 +573,9 @@ protected: swarmkv_options_set_cluster_port(opts2, 5211); swarmkv_options_set_health_check_port(opts2, 6211); swarmkv_options_set_logger(opts2, logger); + swarmkv_options_set_cluster_timeout_us(opts2, 600*1000*1000); + swarmkv_options_set_worker_thread_number(opts2, 1); + swarmkv_options_set_caller_thread_number(opts2, 1); db2=swarmkv_open(opts2, cluster_name, &err); if(err) { @@ -594,504 +600,380 @@ struct swarmkv* SwarmkvTwoNodes::db1; struct swarmkv* SwarmkvTwoNodes::db2; TEST_F(SwarmkvTwoNodes, SET_GET) { - struct cmd_exec_arg *arg=NULL; struct swarmkv *db1=SwarmkvTwoNodes::db1; struct swarmkv *db2=SwarmkvTwoNodes::db2; const char *key="id001"; const char *val1="zhangsan", *val2="lisi"; - int exec_successful=0; + struct swarmkv_reply *reply=NULL; - arg=cmd_exec_arg_new(); - - cmd_exec_arg_expect_OK(arg); - swarmkv_set(db1, key, strlen(key), val1, strlen(val1), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + reply=swarmkv_command(db1, "SET %s %s", key, val1); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); + EXPECT_STREQ(reply->str, "OK"); + swarmkv_reply_free(reply); - cmd_exec_arg_expect_cstring(arg, val1); - swarmkv_get(db2, key, strlen(key), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + reply=swarmkv_command(db2, "GET %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING); + EXPECT_STREQ(reply->str, val1); + swarmkv_reply_free(reply); - cmd_exec_arg_expect_OK(arg); - swarmkv_set(db2, key, strlen(key), val2, strlen(val2), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + reply=swarmkv_command(db2, "SET %s %s", key, val2); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); + EXPECT_STREQ(reply->str, "OK"); + swarmkv_reply_free(reply); + + reply=swarmkv_command(db2, "GET %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING); + EXPECT_STREQ(reply->str, val2); + swarmkv_reply_free(reply); wait_for_sync(); - cmd_exec_arg_expect_cstring(arg, val2); - swarmkv_get(db1, key, strlen(key), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + reply=swarmkv_command(db1, "GET %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING); + EXPECT_STREQ(reply->str, val2); + swarmkv_reply_free(reply); - cmd_exec_arg_free(arg); - arg=NULL; } TEST_F(SwarmkvTwoNodes, SET1kString) { - struct cmd_exec_arg *arg=NULL; struct swarmkv *db1=SwarmkvTwoNodes::db1; struct swarmkv *db2=SwarmkvTwoNodes::db2; struct swarmkv* tmp_db=NULL; const char *key_prefix="test-1k-string"; const char *val_prefix="value-xx"; char key[128]="", val[128]=""; - int exec_successful=0, i=0, round=1000, n_success=0; + int i=0, round=1000; + struct swarmkv_reply *reply=NULL; + - arg=cmd_exec_arg_new(); - cmd_exec_arg_expect_OK(arg); - n_success=0; for(i=0; i<round; i++) { snprintf(key, sizeof(key), "%s-%d", key_prefix, i); snprintf(val, sizeof(val), "%s-%d", val_prefix, i); tmp_db=i%2?db1:db2; - swarmkv_set(tmp_db, key, strlen(key), val, strlen(val), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - if(exec_successful) n_success++; - cmd_exec_arg_clear(arg); + reply=swarmkv_command(tmp_db, "SET %s %s", key, val); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); + EXPECT_STREQ(reply->str, "OK"); + swarmkv_reply_free(reply); } - EXPECT_EQ(round, n_success); - n_success=0; + for(i=0; i<round; i++) { snprintf(key, sizeof(key), "%s-%d", key_prefix, i); snprintf(val, sizeof(val), "%s-%d", val_prefix, i); - cmd_exec_arg_expect_cstring(arg, val); tmp_db=i%2?db2:db1; - swarmkv_get(tmp_db, key, strlen(key), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - if(exec_successful) n_success++; - cmd_exec_arg_clear(arg); - } - EXPECT_EQ(round, n_success); + reply=swarmkv_command(tmp_db, "GET %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING); + EXPECT_STREQ(reply->str, val); + swarmkv_reply_free(reply); - cmd_exec_arg_free(arg); + } } -TEST_F(SwarmkvTwoNodes, INCRYBY1kInteger) +TEST_F(SwarmkvTwoNodes, INCRBY1kInteger) { - struct cmd_exec_arg *arg=NULL; struct swarmkv *db1=SwarmkvTwoNodes::db1; struct swarmkv *db2=SwarmkvTwoNodes::db2; struct swarmkv* tmp_db=NULL; const char *key_prefix="incrby-1k-integer"; - char key[128]="", val[128]=""; - int exec_successful=0, i=0, round=1000, n_success=0; - - arg=cmd_exec_arg_new(); + char key[128]=""; + int i=0, round=1000; + struct swarmkv_reply *reply=NULL; - n_success=0; int increment=-3000000; for(i=0; i<round; i++) - { + { + tmp_db=i%2?db1:db2; snprintf(key, sizeof(key), "%s-%d", key_prefix, i); - cmd_exec_arg_expect_integer(arg, i+increment); - - tmp_db=i%2?db2:db1; - swarmkv_incrby(tmp_db, key, strlen(key), i+increment, generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - if(exec_successful) n_success++; - cmd_exec_arg_clear(arg); + reply=swarmkv_command(tmp_db, "INCRBY %s %d", key, i+increment); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, i+increment); + swarmkv_reply_free(reply); } - EXPECT_EQ(round, n_success); - - cmd_exec_arg_expect_OK(arg); - n_success=0; for(i=0; i<round; i++) - { + { + tmp_db=(i+1)%2?db1:db2; snprintf(key, sizeof(key), "%s-%d", key_prefix, i); - snprintf(val, sizeof(val), "0"); + reply=swarmkv_command(tmp_db, "INCRBY %s %d", key, 0); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, i+increment); + swarmkv_reply_free(reply); + } + for(i=0; i<round; i++) + { tmp_db=i%2?db1:db2; - swarmkv_set(tmp_db, key, strlen(key), val, strlen(val), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - if(exec_successful) n_success++; - cmd_exec_arg_clear(arg); + snprintf(key, sizeof(key), "%s-%d", key_prefix, i); + reply=swarmkv_command(tmp_db, "SET %s 0", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); + EXPECT_STREQ(reply->str, "OK"); + swarmkv_reply_free(reply); } - EXPECT_EQ(round, n_success); - - cmd_exec_arg_expect_cstring(arg, "0"); - n_success=0; for(i=0; i<round; i++) { - snprintf(key, sizeof(key), "%s-%d", key_prefix, i); - tmp_db=i%2?db2:db1; - swarmkv_get(tmp_db, key, strlen(key), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - if(exec_successful) n_success++; - cmd_exec_arg_clear(arg); + tmp_db=(i+1)%2?db1:db2; + snprintf(key, sizeof(key), "%s-%d", key_prefix, i); + reply=swarmkv_command(tmp_db, "GET %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING); + EXPECT_STREQ(reply->str, "0"); + swarmkv_reply_free(reply); } - EXPECT_EQ(round, n_success); - - cmd_exec_arg_free(arg); } -TEST_F(SwarmkvTwoNodes, HINCRYBY5K) +TEST_F(SwarmkvTwoNodes, HINCRBY5K) { struct swarmkv *db1=SwarmkvTwoNodes::db1; struct swarmkv *db2=SwarmkvTwoNodes::db2; const char *key="myhash"; const char *field="priority-0"; - int i=0, round=5000; struct swarmkv_reply *reply=NULL; + int i=0, round=5000; + for(i=0; i<round; i++) { reply=swarmkv_command(db1, "HINCRBY %s %s 1", key, field); - EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER); swarmkv_reply_free(reply); + reply=swarmkv_command(db2, "HINCRBY %s %s -1", key, field); - EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER); swarmkv_reply_free(reply); } wait_for_sync(); reply=swarmkv_command(db2, "HINCRBY %s %s 0", key, field); - EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); EXPECT_EQ(reply->integer, 0); - swarmkv_reply_free(reply); + swarmkv_reply_free(reply); } TEST_F(SwarmkvTwoNodes, DEL) { - struct cmd_exec_arg *arg=NULL; struct swarmkv *db1=SwarmkvTwoNodes::db1; struct swarmkv *db2=SwarmkvTwoNodes::db2; const char* key="id002"; - const char* val="to-be-deleted"; - int exec_successful=0; - arg=cmd_exec_arg_new(); - - //db1: SET key value - cmd_exec_arg_expect_OK(arg); - swarmkv_set(db1, key, strlen(key), val, strlen(val), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); - - //db2: DEL key - cmd_exec_arg_expect_integer(arg, 1); - swarmkv_del(db2, key, strlen(key), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + const char* val="to-be-deleted"; + struct swarmkv_reply *reply=NULL; - wait_for_sync(); + reply=swarmkv_command(db1, "SET %s %s", key, val); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); + EXPECT_STREQ(reply->str, "OK"); + swarmkv_reply_free(reply); - //db1: GET key - cmd_exec_arg_expect_NIL(arg); - swarmkv_get(db1, key, strlen(key), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + reply=swarmkv_command(db2, "DEL %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 1); + swarmkv_reply_free(reply); - //db2: GET key - cmd_exec_arg_expect_NIL(arg); - swarmkv_get(db2, key, strlen(key), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + wait_for_sync(); - cmd_exec_arg_free(arg); - arg=NULL; + reply=swarmkv_command(db1, "GET %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_NIL); + swarmkv_reply_free(reply); + reply=swarmkv_command(db2, "GET %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_NIL); + swarmkv_reply_free(reply); } TEST_F(SwarmkvTwoNodes, INCRBY) { - struct cmd_exec_arg *arg=NULL; struct swarmkv *db1=SwarmkvTwoNodes::db1; struct swarmkv *db2=SwarmkvTwoNodes::db2; const char* key="id003"; - long long val=10000; - int exec_successful=0; - arg=cmd_exec_arg_new(); - - //INCRYBY key 10000 - cmd_exec_arg_expect_integer(arg, val); - swarmkv_incrby(db1, key, strlen(key), val, generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); - - //INCRYBY key 100 - cmd_exec_arg_expect_integer(arg, val+100); - swarmkv_incrby(db2, key, strlen(key), 100, generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + long long val=10000; - //INCRYBY key -200 - cmd_exec_arg_expect_integer(arg, val+100-200); - swarmkv_incrby(db1, key, strlen(key), -200, generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + struct swarmkv_reply *reply=NULL; + + reply=swarmkv_command(db1, "INCRBY %s %lld", key, val); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, val); + swarmkv_reply_free(reply); - cmd_exec_arg_free(arg); - arg=NULL; + reply=swarmkv_command(db2, "INCRBY %s 100", key); + val+=100; + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, val); + swarmkv_reply_free(reply); + reply=swarmkv_command(db2, "INCRBY %s -200", key); + val+=(-200); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, val); + swarmkv_reply_free(reply); } TEST_F(SwarmkvTwoNodes, EXPIRE) { - struct cmd_exec_arg *arg=NULL; struct swarmkv *db1=SwarmkvTwoNodes::db1; struct swarmkv *db2=SwarmkvTwoNodes::db2; struct swarmkv* tmp_db=NULL; char key[128]="", val[128]=""; - int exec_successful=0, i=0, round=128, n_success=0; + int i=0, round=128; int max_timeout_seconds=10, min_timeout_seconds=2; int seconds=0; - arg=cmd_exec_arg_new(); - /* - * SET expiring-key-%d value-xx-%d - */ - cmd_exec_arg_expect_OK(arg); - n_success=0; + struct swarmkv_reply *reply=NULL; + for(i=0; i<round; i++) { snprintf(key, sizeof(key), "expiring-key-%d", i); snprintf(val, sizeof(val), "value-xx-%d", i); tmp_db=i%2?db1:db2; - swarmkv_set(tmp_db, key, strlen(key), val, strlen(val), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - if(exec_successful) n_success++; - cmd_exec_arg_clear(arg); + reply=swarmkv_command(tmp_db, "SET %s %s", key, val); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); + EXPECT_STREQ(reply->str, "OK"); + swarmkv_reply_free(reply); } - EXPECT_EQ(round, n_success); - - /* - * EXPIRE expiring-key-%d [min_timeout_seconds, max_timeout_seconds) - * Expect integer 1 reply - */ - - n_success=0; for(i=0; i<round; i++) { snprintf(key, sizeof(key), "expiring-key-%d", i); tmp_db=i%2?db2:db1; - cmd_exec_arg_expect_integer(arg, 1); seconds=min_timeout_seconds+i%(max_timeout_seconds-min_timeout_seconds); - swarmkv_expire(tmp_db, key, strlen(key), seconds, generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - if(exec_successful) n_success++; - cmd_exec_arg_clear(arg); + reply=swarmkv_command(tmp_db, "EXPIRE %s %d", key, seconds); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 1); + swarmkv_reply_free(reply); } - EXPECT_EQ(round, n_success); sleep(max_timeout_seconds+1); - /* - * GET expiring-key-%d - * Expect NIL reply - */ - n_success=0; for(i=0; i<round; i++) { snprintf(key, sizeof(key), "expiring-key-%d", i); tmp_db=i%2?db2:db1; - cmd_exec_arg_expect_NIL(arg); - seconds=min_timeout_seconds+i%(max_timeout_seconds-min_timeout_seconds); - swarmkv_get(tmp_db, key, strlen(key), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - if(exec_successful) n_success++; - cmd_exec_arg_clear(arg); + reply=swarmkv_command(tmp_db, "GET %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_NIL); + swarmkv_reply_free(reply); } - EXPECT_EQ(round, n_success); - /* * KEYSPACE RLIST expiring-key-%d * Expect NIL reply, make sure that expired keys are removed from keyspace. */ - n_success=0; - struct swarmkv_reply *reply=NULL; for(i=0; i<round; i++) { snprintf(key, sizeof(key), "expiring-key-%d", i); tmp_db=i%2?db2:db1; - seconds=min_timeout_seconds+i%(max_timeout_seconds-min_timeout_seconds); - reply=swarmkv_command_on(tmp_db, NULL, "KEYSPACE RLIST %s", key); - if(reply->type==SWARMKV_REPLY_NIL) - { - n_success++; - } + reply=swarmkv_command(tmp_db, "KEYSPACE RLIST %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_NIL); swarmkv_reply_free(reply); } - EXPECT_EQ(round, n_success); - - cmd_exec_arg_free(arg); - arg=NULL; } TEST_F(SwarmkvTwoNodes, TTL) { - struct cmd_exec_arg *arg=NULL; struct swarmkv *db1=SwarmkvTwoNodes::db1; struct swarmkv *db2=SwarmkvTwoNodes::db2; const char *key="ttl-key-001"; const char *value="hello-world"; - int exec_successful=0; int seconds=3; - arg=cmd_exec_arg_new(); - - // db1: SET key value - - cmd_exec_arg_expect_OK(arg); - swarmkv_set(db1, key, strlen(key), value, strlen(value), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); - - // db2: EXPIRE key seconds - cmd_exec_arg_expect_integer(arg, 1); - swarmkv_expire(db2, key, strlen(key), seconds, generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); - - //db1: TTL key - cmd_exec_arg_expect_integer(arg, seconds); - swarmkv_ttl(db1, key, strlen(key), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + struct swarmkv_reply *reply=NULL; + reply=swarmkv_command(db1, "SET %s %s", key, value); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); + EXPECT_STREQ(reply->str, "OK"); + swarmkv_reply_free(reply); - //db2: TTL key - cmd_exec_arg_expect_integer(arg, seconds); - swarmkv_ttl(db2, key, strlen(key), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + reply=swarmkv_command(db2, "EXPIRE %s %d", key, seconds); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 1); + swarmkv_reply_free(reply); - cmd_exec_arg_free(arg); + reply=swarmkv_command(db1, "TTL %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, seconds); + swarmkv_reply_free(reply); + reply=swarmkv_command(db2, "TTL %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, seconds); + swarmkv_reply_free(reply); } TEST_F(SwarmkvTwoNodes, PERSIST) { - struct cmd_exec_arg *arg=NULL; struct swarmkv *db1=SwarmkvTwoNodes::db1; struct swarmkv *db2=SwarmkvTwoNodes::db2; const char *key="persit-key-001"; const char *value="hello-world"; - int exec_successful=0; int seconds=3; - arg=cmd_exec_arg_new(); - - // db1: SET key value + struct swarmkv_reply *reply=NULL; - cmd_exec_arg_expect_OK(arg); - swarmkv_set(db1, key, strlen(key), value, strlen(value), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); - - // db1: EXPIRE key seconds - cmd_exec_arg_expect_integer(arg, 1); - swarmkv_expire(db1, key, strlen(key), seconds, generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); - - //db2: TTL key - cmd_exec_arg_expect_integer(arg, seconds); - swarmkv_ttl(db2, key, strlen(key), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + reply=swarmkv_command(db1, "SET %s %s", key, value); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); + EXPECT_STREQ(reply->str, "OK"); + swarmkv_reply_free(reply); + reply=swarmkv_command(db1, "EXPIRE %s %d", key, seconds); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 1); - //db1: PERSIT key - cmd_exec_arg_expect_integer(arg, 1); - swarmkv_persist(db1, key, strlen(key), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + reply=swarmkv_command(db2, "TTL %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, seconds); + swarmkv_reply_free(reply); + reply=swarmkv_command(db1, "PERSIST %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 1); + swarmkv_reply_free(reply); - //db2: TTL key - cmd_exec_arg_expect_integer(arg, -1); - swarmkv_ttl(db2, key, strlen(key), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + reply=swarmkv_command(db2, "TTL %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, -1); + swarmkv_reply_free(reply); sleep(seconds+1); - //db1: GET key - cmd_exec_arg_expect_cstring(arg, value); - swarmkv_get(db1, key, strlen(key), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); - - cmd_exec_arg_free(arg); + reply=swarmkv_command(db1, "GET %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING); + EXPECT_STREQ(reply->str, value); + swarmkv_reply_free(reply); } TEST_F(SwarmkvTwoNodes, FromLocalReplica) { - struct cmd_exec_arg *arg=NULL; struct swarmkv *db1=SwarmkvTwoNodes::db1; struct swarmkv *db2=SwarmkvTwoNodes::db2; - const char* key="id004"; - int exec_successful=0; - arg=cmd_exec_arg_new(); - - - - //db1: SET id003 lisi - const char* val1="lisi"; - - cmd_exec_arg_expect_OK(arg); - swarmkv_set(db1, key, strlen(key), val1, strlen(val1), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + const char *key="id004"; + const char *val1="lisi", *val2="wang2mazi"; + struct swarmkv_reply *reply=NULL; - //db2: GET id003 - cmd_exec_arg_expect_cstring(arg, val1); - swarmkv_get(db2, key, strlen(key), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - EXPECT_EQ(arg->is_sync_callback, 0); - cmd_exec_arg_clear(arg); + reply=swarmkv_command(db1, "SET %s %s", key, val1); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); + EXPECT_STREQ(reply->str, "OK"); + swarmkv_reply_free(reply); + + reply=swarmkv_command(db2, "GET %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING); + EXPECT_STREQ(reply->str, val1); + swarmkv_reply_free(reply); wait_for_sync(); - //db2: SET id003 wang2mazi - const char* val2="wang2mazi"; - - cmd_exec_arg_expect_OK(arg); - swarmkv_set(db2, key, strlen(key), val2, strlen(val2), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - EXPECT_EQ(arg->is_sync_callback, 1); - cmd_exec_arg_clear(arg); + reply=swarmkv_command(db2, "SET %s %s", key, val2); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); + EXPECT_STREQ(reply->str, "OK"); + swarmkv_reply_free(reply); wait_for_sync(); - //db1: GET id003 - cmd_exec_arg_expect_cstring(arg, val2); - swarmkv_get(db1, key, strlen(key), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - EXPECT_EQ(arg->is_sync_callback, 1); - cmd_exec_arg_clear(arg); - - cmd_exec_arg_free(arg); + reply=swarmkv_command(db1, "GET %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING); + EXPECT_STREQ(reply->str, val2); + swarmkv_reply_free(reply); + reply=swarmkv_command_on(db1, swarmkv_self_address(db1), "CRDT EXISTS %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 1); + swarmkv_reply_free(reply); + + reply=swarmkv_command_on(db2, swarmkv_self_address(db2), "CRDT EXISTS %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 1); + swarmkv_reply_free(reply); } TEST_F(SwarmkvTwoNodes, TypeSet) { |
