summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZheng Chao <[email protected]>2023-07-29 03:25:53 +0800
committerZheng Chao <[email protected]>2023-07-29 03:25:53 +0800
commit523621088abc7f3a701646a7360ff015f9e5c025 (patch)
treec051d5367f3d1c71a99e1ddefa3b1645f4e8a4d4
parent6b50898b3bebc730979e74982fcf4c6645fb7f21 (diff)
WIP: fix swarmkv_net_send() bugoptimize-multithread-lockfree-ringbuf
-rw-r--r--docs/commands.md4
-rw-r--r--include/swarmkv/swarmkv.h8
-rw-r--r--src/swarmkv.c15
-rw-r--r--src/swarmkv_api.c10
-rw-r--r--src/swarmkv_net.c17
-rw-r--r--src/swarmkv_store.c2
-rw-r--r--src/swarmkv_sync.c2
-rw-r--r--test/swarmkv_gtest.cpp512
8 files changed, 235 insertions, 335 deletions
diff --git a/docs/commands.md b/docs/commands.md
index f1d9df7..dd57658 100644
--- a/docs/commands.md
+++ b/docs/commands.md
@@ -766,8 +766,8 @@ The pattern is exactly same as Redis https://redis.io/commands/keys/ .
| KEYSPACE KEYS | IP port pattern | |
| KEYSPACE COUNTKEYSINSLOT | slot | |
| CRDT DEL | key | Integer reply: The number of cached keys that were removed. |
-| CRDT PUSH | key blob | Simple string reply: OK if PUSH was executed correctly. |
-| CRDT PULL | key | Blog reply: a blob of state-based CRDT |
+| CRDT MERGE | key blob | Simple string reply: OK if MERGE was executed correctly. |
+| CRDT GET | key | Blog reply: a blob of state-based CRDT |
| CLUSTER KEYS | pattern | Array reply: list of keys matching pattern. |
| CLUSTER NODES | | |
| CLUSTER SLOTS | | |
diff --git a/include/swarmkv/swarmkv.h b/include/swarmkv/swarmkv.h
index 285cef6..3623474 100644
--- a/include/swarmkv/swarmkv.h
+++ b/include/swarmkv/swarmkv.h
@@ -61,7 +61,7 @@ int swarmkv_options_set_logger(struct swarmkv_options *opts, void *logger);
int swarmkv_options_set_log_level(struct swarmkv_options *opts, int loglevel);
int swarmkv_options_set_log_path(struct swarmkv_options *opts, const char *logpath);
int swarmkv_options_set_consul_port(struct swarmkv_options *opts, unsigned int consul_port);
-int swarmkv_options_set_consul_host(struct swarmkv_options *opts, const char* ip_addr);
+int swarmkv_options_set_consul_host(struct swarmkv_options *opts, const char *ip_addr);
int swarmkv_options_set_dryrun(struct swarmkv_options *opts);
int swarmkv_options_set_disable_run_for_leader(struct swarmkv_options *opts);
int swarmkv_options_set_caller_thread_number(struct swarmkv_options *opts, int nr_caller_threads);
@@ -83,8 +83,8 @@ void swarmkv_caller_loop(struct swarmkv *db, struct timeval *tv);
void swarmkv_caller_loop_break(struct swarmkv *db);
//Blocking function
-struct swarmkv_reply *swarmkv_command(struct swarmkv *db,const char *format, ...);
-struct swarmkv_reply *swarmkv_command_on(struct swarmkv *db, const char *target, const char *format, ...);
+struct swarmkv_reply *swarmkv_command(struct swarmkv *db, const char *format, ...)__attribute__ ((format (printf, 2, 3)));
+struct swarmkv_reply *swarmkv_command_on(struct swarmkv *db, const char *target, const char *format, ...)__attribute__ ((format (printf, 3, 4)));
//Non-blocking function
typedef void swarmkv_on_reply_callback_t(const struct swarmkv_reply *reply, void * arg);
@@ -119,7 +119,7 @@ size_t swarmkv_get_possible_command_name(struct swarmkv *db, const char *prefix,
char *swarmkv_get_command_hint(struct swarmkv *db, const char* cmd_name);
-
+const char *swarmkv_self_address(const struct swarmkv *db);
#ifdef __cplusplus
} /* end extern "C" */
#endif
diff --git a/src/swarmkv.c b/src/swarmkv.c
index 719d8df..947f542 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -172,9 +172,10 @@ static void remoter_caller_ctx_send_reply(struct remote_caller_ctx *ctx, const s
struct swarmkv *db = ctx->db;
struct swarmkv_msg *msg=swarmkv_msg_new_by_reply(reply, &ctx->remote, ctx->remote_tid, ctx->sequence);
int cur_tid=__gettid(db);
- assert(cur_tid != msg->caller_tid);
+
if(0==node_compare(&db->self, &msg->caller))
{
+ assert(cur_tid != msg->caller_tid);
swarmkv_mesh_send(db->mesh, cur_tid, msg->caller_tid, msg);
}
else
@@ -760,10 +761,9 @@ static void peer_exec_on_success(void *result, void *user)
cmd_ctx_free(ctx);
}
}
-struct swarmkv_cmd *make_crdt_add_cmd(enum cmd_key_flag flag, const sds key, node_t replica[], size_t n_replica)
+struct swarmkv_cmd *make_crdt_add_cmd(const sds key, node_t replica[], size_t n_replica)
{
struct swarmkv_cmd *crdt_add_cmd=NULL;
- assert(flag==CMD_KEY_OW);
crdt_add_cmd=swarmkv_cmd_new(3+n_replica);
crdt_add_cmd->argv[0]=sdsnew("crdt");
crdt_add_cmd->argv[1]=sdsnew("add");
@@ -818,7 +818,7 @@ static void key_route_on_success(void *result, void *user)
if(self_is_a_replica)
{
struct cmd_ctx *crdt_add_ctx=NULL;
- struct swarmkv_cmd *crdt_add_cmd=make_crdt_add_cmd(spec->flag, key, replica_nodes, n_replica_node);
+ struct swarmkv_cmd *crdt_add_cmd=make_crdt_add_cmd(key, replica_nodes, n_replica_node);
crdt_add_ctx=cmd_ctx_new(ctx->db, ctx->cmd, n_replica_node>0?NULL:ctx->future_of_caller);
crdt_add_ctx->future_of_mine=future_create("crdt_add", crdt_add_on_success, generic_on_fail, crdt_add_ctx);
__exec_cmd(ctx->db, NULL, &ctx->db->self, crdt_add_cmd, crdt_add_ctx->future_of_mine);
@@ -920,7 +920,6 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t *
return;
}
-
enum cmd_exec_result exec_ret=FINISHED;
struct timespec start, end;
clock_gettime(CLOCK_MONOTONIC_COARSE, &start);
@@ -934,6 +933,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t *
struct promise *p=future_to_promise(future_of_caller);
promise_success(p, reply);
swarmkv_reply_free(reply);
+ return;
}
switch(exec_ret)
{
@@ -1491,6 +1491,9 @@ void swarmkv_close(struct swarmkv * db)
free(db);
return;
}
-
+const char *swarmkv_self_address(const struct swarmkv *db)
+{
+ return db->self.addr;
+}
diff --git a/src/swarmkv_api.c b/src/swarmkv_api.c
index cc2a21d..af8d6b7 100644
--- a/src/swarmkv_api.c
+++ b/src/swarmkv_api.c
@@ -70,9 +70,9 @@ int swarmkv_options_set_health_check_announce_port(struct swarmkv_options *opts,
opts->health_check_announce_port=health_check_announce_port;
return 0;
}
-int swarmkv_options_set_cluster_timeout_us(struct swarmkv_options *opts, unsigned int timeout_ms)
+int swarmkv_options_set_cluster_timeout_us(struct swarmkv_options *opts, unsigned int timeout_us)
{
- opts->cluster_timeout_us=timeout_ms;
+ opts->cluster_timeout_us=timeout_us;
return 0;
}
int swarmkv_options_set_sync_interval_us(struct swarmkv_options *opts, unsigned int interval_us)
@@ -335,6 +335,7 @@ struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *ta
node_t target_node;
memset(&target_node, 0, sizeof(node_t));
ctx.db=db;
+ ctx.reply=NULL;
cmd=swarmkv_cmd_new(argc);
for(int i=0; i<argc; i++)
{
@@ -350,7 +351,10 @@ struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *ta
node_init_from_sds(&target_node, target);
exec_for_local(db, cmd, &target_node, blocking_query_cb, &ctx);
}
- swarmkv_caller_loop(db, NULL);
+ if(ctx.reply==NULL)
+ {
+ swarmkv_caller_loop(db, NULL);
+ }
assert(ctx.reply!=NULL);
reply=ctx.reply;
ctx.reply=NULL;
diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c
index ad70dc9..74cf7db 100644
--- a/src/swarmkv_net.c
+++ b/src/swarmkv_net.c
@@ -110,7 +110,8 @@ struct snet_conn *snet_conn_new_by_connecting(struct snet_thread *thr, const nod
struct snet_conn* conn=ALLOC(struct snet_conn, 1);
conn->buff_for_sending=evbuffer_new();
-
+ conn->fd=-1;
+
//http://www.wangafu.net/~nickm/libevent-book/Ref6_bufferevent.html
conn->bev=bufferevent_socket_new(base, -1, BEV_OPT_DEFER_CALLBACKS|BEV_OPT_THREADSAFE);//BEV_OPT_UNLOCK_CALLBACKS|
bufferevent_setcb(conn->bev, NULL, NULL, connect_peer_eventcb, conn);
@@ -373,6 +374,7 @@ void connect_peer_eventcb(struct bufferevent *bev, short events, void *arg)
int yes=1;
evutil_socket_t fd=bufferevent_getfd(conn->bev);
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char*)&yes, sizeof(yes));
+ conn->fd=fd;
evutil_make_socket_closeonexec(fd);
bufferevent_write_buffer(conn->bev, conn->buff_for_sending);
bufferevent_setcb(conn->bev, peer_conn_read_cb, peer_conn_write_cb, peer_conn_event_cb, conn);
@@ -629,8 +631,17 @@ void swarmkv_net_send(struct swarmkv_net *net, const node_t *dest, struct swarmk
char *data=NULL;
size_t size=0;
swarmkv_msg_serialize(msg, &data, &size);
- swarmkv_msg_free(msg);
- evbuffer_add(conn->buff_for_sending, data, size);
+ swarmkv_msg_free(msg);
+ struct evbuffer* output_buffer=NULL;
+ output_buffer=bufferevent_get_output(conn->bev);
+ if(output_buffer)
+ {
+ evbuffer_add(output_buffer, data, size);
+ }
+ else
+ {
+ evbuffer_add(conn->buff_for_sending, data, size);
+ }
free(data);
thr->stat.output_bytes += size;
thr->stat.output_msgs++;
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c
index 0a2fec1..00ae03c 100644
--- a/src/swarmkv_store.c
+++ b/src/swarmkv_store.c
@@ -537,7 +537,7 @@ static void crdt_generic_on_fail(enum e_future_error err, const char * what, voi
void crdt_generic_call(struct swarmkv_store *store, enum CRDT_OP op, const struct swarmkv_cmd *cmd, const node_t *peer)
{
- const char *symbol[]={"crdt_pull", "crdt_push", "crdt_join"};
+ const char *symbol[]={"crdt_get", "crdt_merge", "crdt_join"};
struct crdt_generic_ctx *ctx=NULL;
ctx=ALLOC(struct crdt_generic_ctx, 1);
ctx->op=op;
diff --git a/src/swarmkv_sync.c b/src/swarmkv_sync.c
index 3170f1e..17dffe9 100644
--- a/src/swarmkv_sync.c
+++ b/src/swarmkv_sync.c
@@ -93,7 +93,7 @@ int sync_master_get_cmd(struct sync_master *master, node_t *peer, struct swarmkv
n_data=utarray_len(task->sync_data_list);
c=swarmkv_cmd_new(2+n_data*2);
c->argv[0]=sdsnew("crdt");
- c->argv[1]=sdsnew("push");
+ c->argv[1]=sdsnew("merge");
for(size_t i=0; i<n_data; i++)
{
p=utarray_eltptr(task->sync_data_list, i);
diff --git a/test/swarmkv_gtest.cpp b/test/swarmkv_gtest.cpp
index e738ad2..8dfb4f2 100644
--- a/test/swarmkv_gtest.cpp
+++ b/test/swarmkv_gtest.cpp
@@ -558,6 +558,9 @@ protected:
swarmkv_options_set_cluster_port(opts1, 5210);
swarmkv_options_set_health_check_port(opts1, 6210);
swarmkv_options_set_logger(opts1, logger);
+ swarmkv_options_set_cluster_timeout_us(opts1, 600*1000*1000);
+ swarmkv_options_set_worker_thread_number(opts1, 1);
+ swarmkv_options_set_caller_thread_number(opts1, 1);
db1=swarmkv_open(opts1, cluster_name, &err);
if(err)
{
@@ -570,6 +573,9 @@ protected:
swarmkv_options_set_cluster_port(opts2, 5211);
swarmkv_options_set_health_check_port(opts2, 6211);
swarmkv_options_set_logger(opts2, logger);
+ swarmkv_options_set_cluster_timeout_us(opts2, 600*1000*1000);
+ swarmkv_options_set_worker_thread_number(opts2, 1);
+ swarmkv_options_set_caller_thread_number(opts2, 1);
db2=swarmkv_open(opts2, cluster_name, &err);
if(err)
{
@@ -594,504 +600,380 @@ struct swarmkv* SwarmkvTwoNodes::db1;
struct swarmkv* SwarmkvTwoNodes::db2;
TEST_F(SwarmkvTwoNodes, SET_GET)
{
- struct cmd_exec_arg *arg=NULL;
struct swarmkv *db1=SwarmkvTwoNodes::db1;
struct swarmkv *db2=SwarmkvTwoNodes::db2;
const char *key="id001";
const char *val1="zhangsan", *val2="lisi";
- int exec_successful=0;
+ struct swarmkv_reply *reply=NULL;
- arg=cmd_exec_arg_new();
-
- cmd_exec_arg_expect_OK(arg);
- swarmkv_set(db1, key, strlen(key), val1, strlen(val1), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ reply=swarmkv_command(db1, "SET %s %s", key, val1);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ EXPECT_STREQ(reply->str, "OK");
+ swarmkv_reply_free(reply);
- cmd_exec_arg_expect_cstring(arg, val1);
- swarmkv_get(db2, key, strlen(key), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ reply=swarmkv_command(db2, "GET %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING);
+ EXPECT_STREQ(reply->str, val1);
+ swarmkv_reply_free(reply);
- cmd_exec_arg_expect_OK(arg);
- swarmkv_set(db2, key, strlen(key), val2, strlen(val2), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ reply=swarmkv_command(db2, "SET %s %s", key, val2);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ EXPECT_STREQ(reply->str, "OK");
+ swarmkv_reply_free(reply);
+
+ reply=swarmkv_command(db2, "GET %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING);
+ EXPECT_STREQ(reply->str, val2);
+ swarmkv_reply_free(reply);
wait_for_sync();
- cmd_exec_arg_expect_cstring(arg, val2);
- swarmkv_get(db1, key, strlen(key), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ reply=swarmkv_command(db1, "GET %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING);
+ EXPECT_STREQ(reply->str, val2);
+ swarmkv_reply_free(reply);
- cmd_exec_arg_free(arg);
- arg=NULL;
}
TEST_F(SwarmkvTwoNodes, SET1kString)
{
- struct cmd_exec_arg *arg=NULL;
struct swarmkv *db1=SwarmkvTwoNodes::db1;
struct swarmkv *db2=SwarmkvTwoNodes::db2;
struct swarmkv* tmp_db=NULL;
const char *key_prefix="test-1k-string";
const char *val_prefix="value-xx";
char key[128]="", val[128]="";
- int exec_successful=0, i=0, round=1000, n_success=0;
+ int i=0, round=1000;
+ struct swarmkv_reply *reply=NULL;
+
- arg=cmd_exec_arg_new();
- cmd_exec_arg_expect_OK(arg);
- n_success=0;
for(i=0; i<round; i++)
{
snprintf(key, sizeof(key), "%s-%d", key_prefix, i);
snprintf(val, sizeof(val), "%s-%d", val_prefix, i);
tmp_db=i%2?db1:db2;
- swarmkv_set(tmp_db, key, strlen(key), val, strlen(val), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- if(exec_successful) n_success++;
- cmd_exec_arg_clear(arg);
+ reply=swarmkv_command(tmp_db, "SET %s %s", key, val);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ EXPECT_STREQ(reply->str, "OK");
+ swarmkv_reply_free(reply);
}
- EXPECT_EQ(round, n_success);
- n_success=0;
+
for(i=0; i<round; i++)
{
snprintf(key, sizeof(key), "%s-%d", key_prefix, i);
snprintf(val, sizeof(val), "%s-%d", val_prefix, i);
- cmd_exec_arg_expect_cstring(arg, val);
tmp_db=i%2?db2:db1;
- swarmkv_get(tmp_db, key, strlen(key), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- if(exec_successful) n_success++;
- cmd_exec_arg_clear(arg);
- }
- EXPECT_EQ(round, n_success);
+ reply=swarmkv_command(tmp_db, "GET %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING);
+ EXPECT_STREQ(reply->str, val);
+ swarmkv_reply_free(reply);
- cmd_exec_arg_free(arg);
+ }
}
-TEST_F(SwarmkvTwoNodes, INCRYBY1kInteger)
+TEST_F(SwarmkvTwoNodes, INCRBY1kInteger)
{
- struct cmd_exec_arg *arg=NULL;
struct swarmkv *db1=SwarmkvTwoNodes::db1;
struct swarmkv *db2=SwarmkvTwoNodes::db2;
struct swarmkv* tmp_db=NULL;
const char *key_prefix="incrby-1k-integer";
- char key[128]="", val[128]="";
- int exec_successful=0, i=0, round=1000, n_success=0;
-
- arg=cmd_exec_arg_new();
+ char key[128]="";
+ int i=0, round=1000;
+ struct swarmkv_reply *reply=NULL;
- n_success=0;
int increment=-3000000;
for(i=0; i<round; i++)
- {
+ {
+ tmp_db=i%2?db1:db2;
snprintf(key, sizeof(key), "%s-%d", key_prefix, i);
- cmd_exec_arg_expect_integer(arg, i+increment);
-
- tmp_db=i%2?db2:db1;
- swarmkv_incrby(tmp_db, key, strlen(key), i+increment, generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- if(exec_successful) n_success++;
- cmd_exec_arg_clear(arg);
+ reply=swarmkv_command(tmp_db, "INCRBY %s %d", key, i+increment);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, i+increment);
+ swarmkv_reply_free(reply);
}
- EXPECT_EQ(round, n_success);
-
- cmd_exec_arg_expect_OK(arg);
- n_success=0;
for(i=0; i<round; i++)
- {
+ {
+ tmp_db=(i+1)%2?db1:db2;
snprintf(key, sizeof(key), "%s-%d", key_prefix, i);
- snprintf(val, sizeof(val), "0");
+ reply=swarmkv_command(tmp_db, "INCRBY %s %d", key, 0);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, i+increment);
+ swarmkv_reply_free(reply);
+ }
+ for(i=0; i<round; i++)
+ {
tmp_db=i%2?db1:db2;
- swarmkv_set(tmp_db, key, strlen(key), val, strlen(val), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- if(exec_successful) n_success++;
- cmd_exec_arg_clear(arg);
+ snprintf(key, sizeof(key), "%s-%d", key_prefix, i);
+ reply=swarmkv_command(tmp_db, "SET %s 0", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ EXPECT_STREQ(reply->str, "OK");
+ swarmkv_reply_free(reply);
}
- EXPECT_EQ(round, n_success);
-
- cmd_exec_arg_expect_cstring(arg, "0");
- n_success=0;
for(i=0; i<round; i++)
{
- snprintf(key, sizeof(key), "%s-%d", key_prefix, i);
- tmp_db=i%2?db2:db1;
- swarmkv_get(tmp_db, key, strlen(key), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- if(exec_successful) n_success++;
- cmd_exec_arg_clear(arg);
+ tmp_db=(i+1)%2?db1:db2;
+ snprintf(key, sizeof(key), "%s-%d", key_prefix, i);
+ reply=swarmkv_command(tmp_db, "GET %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING);
+ EXPECT_STREQ(reply->str, "0");
+ swarmkv_reply_free(reply);
}
- EXPECT_EQ(round, n_success);
-
- cmd_exec_arg_free(arg);
}
-TEST_F(SwarmkvTwoNodes, HINCRYBY5K)
+TEST_F(SwarmkvTwoNodes, HINCRBY5K)
{
struct swarmkv *db1=SwarmkvTwoNodes::db1;
struct swarmkv *db2=SwarmkvTwoNodes::db2;
const char *key="myhash";
const char *field="priority-0";
- int i=0, round=5000;
struct swarmkv_reply *reply=NULL;
+ int i=0, round=5000;
+
for(i=0; i<round; i++)
{
reply=swarmkv_command(db1, "HINCRBY %s %s 1", key, field);
- EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
swarmkv_reply_free(reply);
+
reply=swarmkv_command(db2, "HINCRBY %s %s -1", key, field);
- EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
swarmkv_reply_free(reply);
}
wait_for_sync();
reply=swarmkv_command(db2, "HINCRBY %s %s 0", key, field);
- EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
EXPECT_EQ(reply->integer, 0);
- swarmkv_reply_free(reply);
+ swarmkv_reply_free(reply);
}
TEST_F(SwarmkvTwoNodes, DEL)
{
- struct cmd_exec_arg *arg=NULL;
struct swarmkv *db1=SwarmkvTwoNodes::db1;
struct swarmkv *db2=SwarmkvTwoNodes::db2;
const char* key="id002";
- const char* val="to-be-deleted";
- int exec_successful=0;
- arg=cmd_exec_arg_new();
-
- //db1: SET key value
- cmd_exec_arg_expect_OK(arg);
- swarmkv_set(db1, key, strlen(key), val, strlen(val), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
-
- //db2: DEL key
- cmd_exec_arg_expect_integer(arg, 1);
- swarmkv_del(db2, key, strlen(key), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ const char* val="to-be-deleted";
+ struct swarmkv_reply *reply=NULL;
- wait_for_sync();
+ reply=swarmkv_command(db1, "SET %s %s", key, val);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ EXPECT_STREQ(reply->str, "OK");
+ swarmkv_reply_free(reply);
- //db1: GET key
- cmd_exec_arg_expect_NIL(arg);
- swarmkv_get(db1, key, strlen(key), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ reply=swarmkv_command(db2, "DEL %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 1);
+ swarmkv_reply_free(reply);
- //db2: GET key
- cmd_exec_arg_expect_NIL(arg);
- swarmkv_get(db2, key, strlen(key), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ wait_for_sync();
- cmd_exec_arg_free(arg);
- arg=NULL;
+ reply=swarmkv_command(db1, "GET %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_NIL);
+ swarmkv_reply_free(reply);
+ reply=swarmkv_command(db2, "GET %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_NIL);
+ swarmkv_reply_free(reply);
}
TEST_F(SwarmkvTwoNodes, INCRBY)
{
- struct cmd_exec_arg *arg=NULL;
struct swarmkv *db1=SwarmkvTwoNodes::db1;
struct swarmkv *db2=SwarmkvTwoNodes::db2;
const char* key="id003";
- long long val=10000;
- int exec_successful=0;
- arg=cmd_exec_arg_new();
-
- //INCRYBY key 10000
- cmd_exec_arg_expect_integer(arg, val);
- swarmkv_incrby(db1, key, strlen(key), val, generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
-
- //INCRYBY key 100
- cmd_exec_arg_expect_integer(arg, val+100);
- swarmkv_incrby(db2, key, strlen(key), 100, generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ long long val=10000;
- //INCRYBY key -200
- cmd_exec_arg_expect_integer(arg, val+100-200);
- swarmkv_incrby(db1, key, strlen(key), -200, generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ struct swarmkv_reply *reply=NULL;
+
+ reply=swarmkv_command(db1, "INCRBY %s %lld", key, val);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, val);
+ swarmkv_reply_free(reply);
- cmd_exec_arg_free(arg);
- arg=NULL;
+ reply=swarmkv_command(db2, "INCRBY %s 100", key);
+ val+=100;
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, val);
+ swarmkv_reply_free(reply);
+ reply=swarmkv_command(db2, "INCRBY %s -200", key);
+ val+=(-200);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, val);
+ swarmkv_reply_free(reply);
}
TEST_F(SwarmkvTwoNodes, EXPIRE)
{
- struct cmd_exec_arg *arg=NULL;
struct swarmkv *db1=SwarmkvTwoNodes::db1;
struct swarmkv *db2=SwarmkvTwoNodes::db2;
struct swarmkv* tmp_db=NULL;
char key[128]="", val[128]="";
- int exec_successful=0, i=0, round=128, n_success=0;
+ int i=0, round=128;
int max_timeout_seconds=10, min_timeout_seconds=2;
int seconds=0;
- arg=cmd_exec_arg_new();
- /*
- * SET expiring-key-%d value-xx-%d
- */
- cmd_exec_arg_expect_OK(arg);
- n_success=0;
+ struct swarmkv_reply *reply=NULL;
+
for(i=0; i<round; i++)
{
snprintf(key, sizeof(key), "expiring-key-%d", i);
snprintf(val, sizeof(val), "value-xx-%d", i);
tmp_db=i%2?db1:db2;
- swarmkv_set(tmp_db, key, strlen(key), val, strlen(val), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- if(exec_successful) n_success++;
- cmd_exec_arg_clear(arg);
+ reply=swarmkv_command(tmp_db, "SET %s %s", key, val);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ EXPECT_STREQ(reply->str, "OK");
+ swarmkv_reply_free(reply);
}
- EXPECT_EQ(round, n_success);
-
- /*
- * EXPIRE expiring-key-%d [min_timeout_seconds, max_timeout_seconds)
- * Expect integer 1 reply
- */
-
- n_success=0;
for(i=0; i<round; i++)
{
snprintf(key, sizeof(key), "expiring-key-%d", i);
tmp_db=i%2?db2:db1;
- cmd_exec_arg_expect_integer(arg, 1);
seconds=min_timeout_seconds+i%(max_timeout_seconds-min_timeout_seconds);
- swarmkv_expire(tmp_db, key, strlen(key), seconds, generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- if(exec_successful) n_success++;
- cmd_exec_arg_clear(arg);
+ reply=swarmkv_command(tmp_db, "EXPIRE %s %d", key, seconds);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 1);
+ swarmkv_reply_free(reply);
}
- EXPECT_EQ(round, n_success);
sleep(max_timeout_seconds+1);
- /*
- * GET expiring-key-%d
- * Expect NIL reply
- */
- n_success=0;
for(i=0; i<round; i++)
{
snprintf(key, sizeof(key), "expiring-key-%d", i);
tmp_db=i%2?db2:db1;
- cmd_exec_arg_expect_NIL(arg);
- seconds=min_timeout_seconds+i%(max_timeout_seconds-min_timeout_seconds);
- swarmkv_get(tmp_db, key, strlen(key), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- if(exec_successful) n_success++;
- cmd_exec_arg_clear(arg);
+ reply=swarmkv_command(tmp_db, "GET %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_NIL);
+ swarmkv_reply_free(reply);
}
- EXPECT_EQ(round, n_success);
-
/*
* KEYSPACE RLIST expiring-key-%d
* Expect NIL reply, make sure that expired keys are removed from keyspace.
*/
- n_success=0;
- struct swarmkv_reply *reply=NULL;
for(i=0; i<round; i++)
{
snprintf(key, sizeof(key), "expiring-key-%d", i);
tmp_db=i%2?db2:db1;
- seconds=min_timeout_seconds+i%(max_timeout_seconds-min_timeout_seconds);
- reply=swarmkv_command_on(tmp_db, NULL, "KEYSPACE RLIST %s", key);
- if(reply->type==SWARMKV_REPLY_NIL)
- {
- n_success++;
- }
+ reply=swarmkv_command(tmp_db, "KEYSPACE RLIST %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_NIL);
swarmkv_reply_free(reply);
}
- EXPECT_EQ(round, n_success);
-
- cmd_exec_arg_free(arg);
- arg=NULL;
}
TEST_F(SwarmkvTwoNodes, TTL)
{
- struct cmd_exec_arg *arg=NULL;
struct swarmkv *db1=SwarmkvTwoNodes::db1;
struct swarmkv *db2=SwarmkvTwoNodes::db2;
const char *key="ttl-key-001";
const char *value="hello-world";
- int exec_successful=0;
int seconds=3;
- arg=cmd_exec_arg_new();
-
- // db1: SET key value
-
- cmd_exec_arg_expect_OK(arg);
- swarmkv_set(db1, key, strlen(key), value, strlen(value), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
-
- // db2: EXPIRE key seconds
- cmd_exec_arg_expect_integer(arg, 1);
- swarmkv_expire(db2, key, strlen(key), seconds, generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
-
- //db1: TTL key
- cmd_exec_arg_expect_integer(arg, seconds);
- swarmkv_ttl(db1, key, strlen(key), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ struct swarmkv_reply *reply=NULL;
+ reply=swarmkv_command(db1, "SET %s %s", key, value);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ EXPECT_STREQ(reply->str, "OK");
+ swarmkv_reply_free(reply);
- //db2: TTL key
- cmd_exec_arg_expect_integer(arg, seconds);
- swarmkv_ttl(db2, key, strlen(key), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ reply=swarmkv_command(db2, "EXPIRE %s %d", key, seconds);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 1);
+ swarmkv_reply_free(reply);
- cmd_exec_arg_free(arg);
+ reply=swarmkv_command(db1, "TTL %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, seconds);
+ swarmkv_reply_free(reply);
+ reply=swarmkv_command(db2, "TTL %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, seconds);
+ swarmkv_reply_free(reply);
}
TEST_F(SwarmkvTwoNodes, PERSIST)
{
- struct cmd_exec_arg *arg=NULL;
struct swarmkv *db1=SwarmkvTwoNodes::db1;
struct swarmkv *db2=SwarmkvTwoNodes::db2;
const char *key="persit-key-001";
const char *value="hello-world";
- int exec_successful=0;
int seconds=3;
- arg=cmd_exec_arg_new();
-
- // db1: SET key value
+ struct swarmkv_reply *reply=NULL;
- cmd_exec_arg_expect_OK(arg);
- swarmkv_set(db1, key, strlen(key), value, strlen(value), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
-
- // db1: EXPIRE key seconds
- cmd_exec_arg_expect_integer(arg, 1);
- swarmkv_expire(db1, key, strlen(key), seconds, generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
-
- //db2: TTL key
- cmd_exec_arg_expect_integer(arg, seconds);
- swarmkv_ttl(db2, key, strlen(key), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ reply=swarmkv_command(db1, "SET %s %s", key, value);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ EXPECT_STREQ(reply->str, "OK");
+ swarmkv_reply_free(reply);
+ reply=swarmkv_command(db1, "EXPIRE %s %d", key, seconds);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 1);
- //db1: PERSIT key
- cmd_exec_arg_expect_integer(arg, 1);
- swarmkv_persist(db1, key, strlen(key), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ reply=swarmkv_command(db2, "TTL %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, seconds);
+ swarmkv_reply_free(reply);
+ reply=swarmkv_command(db1, "PERSIST %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 1);
+ swarmkv_reply_free(reply);
- //db2: TTL key
- cmd_exec_arg_expect_integer(arg, -1);
- swarmkv_ttl(db2, key, strlen(key), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ reply=swarmkv_command(db2, "TTL %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, -1);
+ swarmkv_reply_free(reply);
sleep(seconds+1);
- //db1: GET key
- cmd_exec_arg_expect_cstring(arg, value);
- swarmkv_get(db1, key, strlen(key), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
-
- cmd_exec_arg_free(arg);
+ reply=swarmkv_command(db1, "GET %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING);
+ EXPECT_STREQ(reply->str, value);
+ swarmkv_reply_free(reply);
}
TEST_F(SwarmkvTwoNodes, FromLocalReplica)
{
- struct cmd_exec_arg *arg=NULL;
struct swarmkv *db1=SwarmkvTwoNodes::db1;
struct swarmkv *db2=SwarmkvTwoNodes::db2;
- const char* key="id004";
- int exec_successful=0;
- arg=cmd_exec_arg_new();
-
-
-
- //db1: SET id003 lisi
- const char* val1="lisi";
-
- cmd_exec_arg_expect_OK(arg);
- swarmkv_set(db1, key, strlen(key), val1, strlen(val1), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ const char *key="id004";
+ const char *val1="lisi", *val2="wang2mazi";
+ struct swarmkv_reply *reply=NULL;
- //db2: GET id003
- cmd_exec_arg_expect_cstring(arg, val1);
- swarmkv_get(db2, key, strlen(key), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- EXPECT_EQ(arg->is_sync_callback, 0);
- cmd_exec_arg_clear(arg);
+ reply=swarmkv_command(db1, "SET %s %s", key, val1);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ EXPECT_STREQ(reply->str, "OK");
+ swarmkv_reply_free(reply);
+
+ reply=swarmkv_command(db2, "GET %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING);
+ EXPECT_STREQ(reply->str, val1);
+ swarmkv_reply_free(reply);
wait_for_sync();
- //db2: SET id003 wang2mazi
- const char* val2="wang2mazi";
-
- cmd_exec_arg_expect_OK(arg);
- swarmkv_set(db2, key, strlen(key), val2, strlen(val2), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- EXPECT_EQ(arg->is_sync_callback, 1);
- cmd_exec_arg_clear(arg);
+ reply=swarmkv_command(db2, "SET %s %s", key, val2);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ EXPECT_STREQ(reply->str, "OK");
+ swarmkv_reply_free(reply);
wait_for_sync();
- //db1: GET id003
- cmd_exec_arg_expect_cstring(arg, val2);
- swarmkv_get(db1, key, strlen(key), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- EXPECT_EQ(arg->is_sync_callback, 1);
- cmd_exec_arg_clear(arg);
-
- cmd_exec_arg_free(arg);
+ reply=swarmkv_command(db1, "GET %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING);
+ EXPECT_STREQ(reply->str, val2);
+ swarmkv_reply_free(reply);
+ reply=swarmkv_command_on(db1, swarmkv_self_address(db1), "CRDT EXISTS %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 1);
+ swarmkv_reply_free(reply);
+
+ reply=swarmkv_command_on(db2, swarmkv_self_address(db2), "CRDT EXISTS %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 1);
+ swarmkv_reply_free(reply);
}
TEST_F(SwarmkvTwoNodes, TypeSet)
{