summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZheng Chao <[email protected]>2023-07-24 03:24:31 +0800
committerZheng Chao <[email protected]>2023-07-24 03:24:31 +0800
commit49c33b4687c065ba2003a512aad67c24388b3d8e (patch)
tree305d54705a861f5b6f49783d77ae5e81b4af3b3c
parent27da04165b6e36d78dd2a4dfd2bb4962af919d6b (diff)
WIP: It works.
-rw-r--r--include/swarmkv/swarmkv.h10
-rw-r--r--src/inc_internal/swarmkv_common.h1
-rw-r--r--src/inc_internal/swarmkv_core.h6
-rw-r--r--src/inc_internal/swarmkv_mesh.h2
-rw-r--r--src/inc_internal/swarmkv_message.h8
-rw-r--r--src/inc_internal/swarmkv_store.h2
-rw-r--r--src/swarmkv.c196
-rw-r--r--src/swarmkv_api.c35
-rw-r--r--src/swarmkv_keyspace.c2
-rw-r--r--src/swarmkv_mesh.c8
-rw-r--r--src/swarmkv_message.c16
-rw-r--r--src/swarmkv_net.c9
-rw-r--r--src/swarmkv_rpc.c7
-rw-r--r--src/swarmkv_store.c3
-rw-r--r--test/swarmkv_gtest.cpp299
-rw-r--r--test/test_utils.c2
16 files changed, 315 insertions, 291 deletions
diff --git a/include/swarmkv/swarmkv.h b/include/swarmkv/swarmkv.h
index ce41260..285cef6 100644
--- a/include/swarmkv/swarmkv.h
+++ b/include/swarmkv/swarmkv.h
@@ -11,6 +11,8 @@
#pragma once
#include <stddef.h>
#include <stdio.h>
+#include <sys/time.h>
+
#ifdef __cplusplus
extern "C"
{
@@ -62,7 +64,8 @@ int swarmkv_options_set_consul_port(struct swarmkv_options *opts, unsigned int c
int swarmkv_options_set_consul_host(struct swarmkv_options *opts, const char* ip_addr);
int swarmkv_options_set_dryrun(struct swarmkv_options *opts);
int swarmkv_options_set_disable_run_for_leader(struct swarmkv_options *opts);
-int swarmkv_options_set_worker_thread_number(struct swarmkv_options *opts, size_t nr_worker_threads);
+int swarmkv_options_set_caller_thread_number(struct swarmkv_options *opts, int nr_caller_threads);
+int swarmkv_options_set_worker_thread_number(struct swarmkv_options *opts, int nr_worker_threads);
int swarmkv_options_set_cluster_announce_ip(struct swarmkv_options *opts, const char *ip_addr);
int swarmkv_options_set_cluster_announce_port(struct swarmkv_options *opts, unsigned int cluster_announce_port);
@@ -74,6 +77,11 @@ struct swarmkv;
struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *cluster_name, char **err);
void swarmkv_close(struct swarmkv * db);
+
+void swarmkv_register_thread(struct swarmkv *db);
+void swarmkv_caller_loop(struct swarmkv *db, struct timeval *tv);
+void swarmkv_caller_loop_break(struct swarmkv *db);
+
//Blocking function
struct swarmkv_reply *swarmkv_command(struct swarmkv *db,const char *format, ...);
struct swarmkv_reply *swarmkv_command_on(struct swarmkv *db, const char *target, const char *format, ...);
diff --git a/src/inc_internal/swarmkv_common.h b/src/inc_internal/swarmkv_common.h
index 3e9e11a..063f0c1 100644
--- a/src/inc_internal/swarmkv_common.h
+++ b/src/inc_internal/swarmkv_common.h
@@ -75,6 +75,7 @@ struct swarmkv_options
uuid_t bin_uuid;
int nr_worker_threads;
int nr_caller_threads;
+ int total_threads;
int is_assigned_to_db;
};
diff --git a/src/inc_internal/swarmkv_core.h b/src/inc_internal/swarmkv_core.h
new file mode 100644
index 0000000..365851d
--- /dev/null
+++ b/src/inc_internal/swarmkv_core.h
@@ -0,0 +1,6 @@
+#pragma once
+
+#include "swarmkv.h"
+#include "swarmkv_common.h"
+void exec_for_local(struct swarmkv *db, const struct swarmkv_cmd *cmd, const node_t *target_node, swarmkv_on_reply_callback_t * cb, void * cb_arg);
+struct event_base *swarmkv_get_event_base(struct swarmkv *db); \ No newline at end of file
diff --git a/src/inc_internal/swarmkv_mesh.h b/src/inc_internal/swarmkv_mesh.h
index 4a6f04c..c9efcae 100644
--- a/src/inc_internal/swarmkv_mesh.h
+++ b/src/inc_internal/swarmkv_mesh.h
@@ -3,6 +3,6 @@
struct swarmkv_mesh;
int swarmkv_mesh_send(struct swarmkv_mesh *mesh, int current_thread_id, int dest_thread_id, struct swarmkv_msg *msg);
-void swarmkv_mesh_set_on_msg_recv_cb(struct swarmkv_mesh *mesh, on_msg_callback_t *cb_func, void *cb_arg);
+void swarmkv_mesh_set_on_msg_cb(struct swarmkv_mesh *mesh, on_msg_callback_t *cb_func, void *cb_arg);
struct swarmkv_mesh *swarmkv_mesh_new(struct event_base *evbase[], int nthreads, struct log_handle *logger);
void swarmkv_mesh_free(struct swarmkv_mesh *mesh); \ No newline at end of file
diff --git a/src/inc_internal/swarmkv_message.h b/src/inc_internal/swarmkv_message.h
index c1a69ae..3fbb7c4 100644
--- a/src/inc_internal/swarmkv_message.h
+++ b/src/inc_internal/swarmkv_message.h
@@ -13,9 +13,9 @@ struct swarmkv_msg
unsigned int magic;
enum MSG_TYPE type;
long long sequence;
- int thread_id;
+ int caller_tid;
unsigned int payload_len;
- node_t source;
+ node_t caller;
union
{
struct swarmkv_cmd *cmd;
@@ -25,8 +25,8 @@ struct swarmkv_msg
};
#define SWARMKV_MSG_HDR_SIZE offsetof(struct swarmkv_msg, payload)
-struct swarmkv_msg *swarmkv_msg_new_by_cmd(const struct swarmkv_cmd *cmd, const node_t *source, int thread_id, long long sequence);
-struct swarmkv_msg *swarmkv_msg_new_by_reply(const struct swarmkv_reply *reply, const node_t *source, int thread_id, long long sequence);
+struct swarmkv_msg *swarmkv_msg_new_by_cmd(const struct swarmkv_cmd *cmd, const node_t *caller, int caller_tid, long long sequence);
+struct swarmkv_msg *swarmkv_msg_new_by_reply(const struct swarmkv_reply *reply, const node_t *caller, int caller_tid, long long sequence);
void swarmkv_msg_free(struct swarmkv_msg *msg);
//on_msg_callback_t transfers ownership of msg to the callback function
diff --git a/src/inc_internal/swarmkv_store.h b/src/inc_internal/swarmkv_store.h
index 241168d..f25ff5f 100644
--- a/src/inc_internal/swarmkv_store.h
+++ b/src/inc_internal/swarmkv_store.h
@@ -66,7 +66,7 @@ enum cmd_exec_result handle_undefined_object(struct sobj *obj, struct swarmkv_re
void store_get_uuid(struct swarmkv_module* mod_store, uuid_t uuid);
struct sobj *store_lookup(struct swarmkv_module* mod_store, sds key);
-
+enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply);
enum cmd_exec_result crdt_get_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply);
enum cmd_exec_result crdt_merge_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply);
enum cmd_exec_result crdt_del_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply);
diff --git a/src/swarmkv.c b/src/swarmkv.c
index f552a29..70d6e4f 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -50,6 +50,7 @@ struct swarmkv_thread_ctx
int sys_tid;
int thread_id;
struct swarmkv *db;
+ struct event_base *evbase;
int is_dispatching;
time_t lastime;
struct event *store_periodic_ev;
@@ -64,9 +65,9 @@ struct swarmkv
struct swarmkv_options *opts;
int thread_counter;
- struct swarmkv_thread_ctx *worker_threads;
+ struct swarmkv_thread_ctx *threads;
- struct event_base **evbases;
+ struct event_base **ref_evbases;
node_t self;
struct swarmkv_rpc_mgr *rpc_mgr;
@@ -94,8 +95,9 @@ struct swarmkv *module2db(struct swarmkv_module *module)
void swarmkv_register_thread(struct swarmkv *db)
{
int thread_id = atomic_fetch_add(&db->thread_counter, 1);
- db->worker_threads[thread_id].sys_tid=syscall(SYS_gettid);
- db->worker_threads[thread_id].thread_id=thread_id;
+ assert(thread_id < db->opts->nr_worker_threads + db->opts->nr_caller_threads);
+ db->threads[thread_id].sys_tid=syscall(SYS_gettid);
+ db->threads[thread_id].thread_id=thread_id;
return;
}
int __gettid(struct swarmkv *db)
@@ -103,9 +105,9 @@ int __gettid(struct swarmkv *db)
int sys_tid=syscall(SYS_gettid);
for(int i=0; i<db->opts->nr_worker_threads + db->opts->nr_caller_threads; i++)
{
- if(db->worker_threads[i].sys_tid==sys_tid)
+ if(db->threads[i].sys_tid==sys_tid)
{
- return db->worker_threads[i].thread_id;
+ return db->threads[i].thread_id;
}
}
assert(0);
@@ -154,7 +156,7 @@ struct remote_caller_ctx
struct future *my_future;
struct swarmkv *db;
long long sequence;
- int thread_id;
+ int remote_tid;
node_t remote;
};
void remote_calller_ctx_free(struct remote_caller_ctx *ctx)
@@ -165,12 +167,27 @@ void remote_calller_ctx_free(struct remote_caller_ctx *ctx)
free(ctx);
return;
}
+static void remoter_caller_ctx_send_reply(struct remote_caller_ctx *ctx, const struct swarmkv_reply *reply)
+{
+ struct swarmkv *db = ctx->db;
+ struct swarmkv_msg *msg=swarmkv_msg_new_by_reply(reply, &ctx->remote, ctx->remote_tid, ctx->sequence);
+ int cur_tid=__gettid(db);
+ assert(cur_tid != msg->caller_tid);
+ if(0==node_compare(&db->self, &msg->caller))
+ {
+ swarmkv_mesh_send(db->mesh, cur_tid, msg->caller_tid, msg);
+ }
+ else
+ {
+ swarmkv_net_send(ctx->db->net, &ctx->remote, msg);
+ }
+ return;
+}
static void remote_caller_on_success(void *result, void *user)
{
struct remote_caller_ctx *ctx=(struct remote_caller_ctx*)user;
const struct swarmkv_reply *reply=(const struct swarmkv_reply*) result;
- struct swarmkv_msg *msg=swarmkv_msg_new_by_reply(reply, &ctx->db->self, ctx->thread_id, ctx->sequence);
- swarmkv_net_send(ctx->db->net, &ctx->remote, msg);
+ remoter_caller_ctx_send_reply(ctx, reply);
remote_calller_ctx_free(ctx);
return;
}
@@ -178,8 +195,7 @@ static void remote_caller_on_fail(enum e_future_error err, const char * what, vo
{
struct remote_caller_ctx *ctx=(struct remote_caller_ctx*)user;
struct swarmkv_reply *reply=swarmkv_reply_new_error(what);
- struct swarmkv_msg *msg=swarmkv_msg_new_by_reply(reply, &ctx->db->self, ctx->thread_id, ctx->sequence);
- swarmkv_net_send(ctx->db->net, &ctx->remote, msg);
+ remoter_caller_ctx_send_reply(ctx, reply);
remote_calller_ctx_free(ctx);
return;
}
@@ -188,13 +204,13 @@ void exec_for_remote(struct swarmkv *db, const struct swarmkv_msg *msg)
struct remote_caller_ctx *ctx=ALLOC(struct remote_caller_ctx, 1);
ctx->db=db;
ctx->sequence=msg->sequence;
- ctx->thread_id=msg->thread_id;
- node_copy(&ctx->remote, &msg->source);
+ ctx->remote_tid=msg->caller_tid;
+ node_copy(&ctx->remote, &msg->caller);
ctx->my_future=future_create("for_remote",
remote_caller_on_success,
remote_caller_on_fail,
ctx);
- __exec_cmd(db, &msg->source, NULL, msg->cmd, ctx->my_future);
+ __exec_cmd(db, &msg->caller, NULL, msg->cmd, ctx->my_future);
return;
}
enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply)
@@ -518,6 +534,12 @@ static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg)
{
}
+void __swarmkv_periodic(evutil_socket_t fd, short what, void * arg)
+{
+ struct swarmkv_thread_ctx *thread=(struct swarmkv_thread_ctx *)arg;
+ swarmkv_store_periodic(thread->db->mod_store, thread->thread_id);
+ swarmkv_keyspace_periodic(thread->db->mod_keyspace, thread->thread_id);
+}
void *swarmkv_worker_thread(void *arg)
{
struct swarmkv_thread_ctx* ctx = (struct swarmkv_thread_ctx *) arg;
@@ -527,17 +549,20 @@ void *swarmkv_worker_thread(void *arg)
snprintf(thread_name, sizeof(thread_name), "swarmkv-%u", ctx->thread_id);
prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL);
- struct event * ev = event_new(db->evbases[ctx->thread_id], -1, EV_PERSIST, __dummy_event_handler, db);
- struct timeval timer_delay = {2, 0};
+ struct timeval sync_interval = {db->opts->sync_interval_us/(1000*1000), db->opts->sync_interval_us%(1000*1000)};
+ struct event * periodic_ev=event_new(ctx->evbase, -1, EV_PERSIST, __swarmkv_periodic, ctx);
+ evtimer_add(periodic_ev, &sync_interval);
+
ctx->is_dispatching=1;
- evtimer_add(ev, &timer_delay);
+
- int ret=event_base_dispatch(db->evbases[ctx->thread_id]);
- event_del(ev);
- event_free(ev);
+ int ret=event_base_dispatch(ctx->evbase);
+ event_del(periodic_ev);
+ event_free(periodic_ev);
+
if(ctx->is_dispatching)
{
- log_fatal(db->logger, MODULE_SWAMRKV_CORE, "event_base_dispatch() exit abnormally, ret=%d", ret);
+ log_fatal(db->logger, MODULE_SWAMRKV_CORE, "worker thread event_base_dispatch() exit abnormally, ret=%d", ret);
}
else
{
@@ -551,12 +576,22 @@ void swarmkv_threads_run(struct swarmkv *db)
int i = 0, ret=0;
for (i = 0; i < db->opts->nr_worker_threads; i++)
{
- ret=pthread_create(&db->worker_threads[i].thr, NULL, swarmkv_worker_thread, (void *) (db->worker_threads+i));
+ ret=pthread_create(&db->threads[i].thr, NULL, swarmkv_worker_thread, (void *) (db->threads+i));
if(ret !=0 )//error
{
log_fatal(db->logger, MODULE_SWAMRKV_CORE, "pthread_create() error %d", ret);
}
}
+ int running_threads=0;
+ while(running_threads < db->opts->nr_worker_threads)
+ {
+ running_threads=0;
+ for (i = 0; i < db->opts->nr_worker_threads; i++)
+ {
+ running_threads += db->threads[i].is_dispatching;
+ }
+ usleep(10);
+ }
return;
}
@@ -741,7 +776,9 @@ struct swarmkv_cmd *make_crdt_add_cmd(enum cmd_key_flag flag, const sds key, nod
}
static void crdt_add_on_success(void *result, void *user)
{
+ struct swarmkv_reply *reply=(struct swarmkv_reply*)result;
struct cmd_ctx *ctx = (struct cmd_ctx*)user;
+ assert(reply->type==SWARMKV_REPLY_STATUS);
if(ctx->future_of_caller)
{
__exec_cmd(ctx->db, NULL, NULL, ctx->cmd, ctx->future_of_caller);
@@ -773,7 +810,6 @@ static void key_route_on_success(void *result, void *user)
if(self_is_a_replica)
{
n_replica_node-=node_list_remove(replica_nodes, n_replica_node, &ctx->db->self);
-// swarmkv_store_add_key(ctx->db->mod_store, key, replica_nodes, n_replica_node);
}
if(n_replica_node>0)
{
@@ -785,8 +821,7 @@ static void key_route_on_success(void *result, void *user)
struct swarmkv_cmd *crdt_add_cmd=make_crdt_add_cmd(spec->flag, key, replica_nodes, n_replica_node);
crdt_add_ctx=cmd_ctx_new(ctx->db, ctx->cmd, n_replica_node>0?NULL:ctx->future_of_caller);
crdt_add_ctx->future_of_mine=future_create("crdt_add", crdt_add_on_success, generic_on_fail, crdt_add_ctx);
- __exec_cmd(ctx->db, NULL, NULL, crdt_add_cmd, crdt_add_ctx->future_of_mine);
-
+ __exec_cmd(ctx->db, NULL, &ctx->db->self, crdt_add_cmd, crdt_add_ctx->future_of_mine);
}
free(replica_nodes);
}
@@ -813,20 +848,21 @@ static int spec_gettid(struct swarmkv_cmd_spec *spec, const struct swarmkv_cmd *
void on_msg_callback(struct swarmkv_msg *msg, void *arg)
{
struct swarmkv *db = (struct swarmkv *)arg;
- int thread_id=__gettid(db);
+ int cur_tid=__gettid(db);
if(msg->type==MSG_TYPE_CMD)
{
+ assert(cur_tid < db->opts->nr_worker_threads);
exec_for_remote(db, msg);
}
else
{
- if(msg->thread_id==thread_id)
+ if(msg->caller_tid!=cur_tid)
{
- swarmkv_mesh_send(db->mesh, thread_id, msg->thread_id, msg);
+ swarmkv_mesh_send(db->mesh, cur_tid, msg->caller_tid, msg);
}
else
{
- swarmkv_rpc_complete(db->rpc_mgr, thread_id, msg->sequence, msg->reply);
+ swarmkv_rpc_complete(db->rpc_mgr, cur_tid, msg->sequence, msg->reply);
}
}
}
@@ -835,7 +871,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t *
struct swarmkv_cmd_spec *spec=NULL;
struct swarmkv_reply *reply=NULL;
struct promise *p=NULL;
- int current_thread_id=__gettid(db);
+ int cur_tid=__gettid(db);
node_t peer;
spec=get_spec_by_argv(db, cmd->argc, cmd->argv);
if(!spec)
@@ -863,23 +899,23 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t *
swarmkv_reply_free(reply);
return;
}
- int target_thread_id=spec_gettid(spec, cmd, db->opts->nr_worker_threads);
+ int target_tid=spec_gettid(spec, cmd, db->opts->nr_worker_threads);
struct swarmkv_msg *msg=NULL;
if(target_node && node_compare(&db->self, target_node))
{
//cmd is executed in target node's on_msg_callback
- long long sequence=swarmkv_rpc_launch(db->rpc_mgr, current_thread_id, future_of_caller);
- msg=swarmkv_msg_new_by_cmd(cmd, target_node, current_thread_id, sequence);
+ long long sequence=swarmkv_rpc_launch(db->rpc_mgr, cur_tid, future_of_caller);
+ msg=swarmkv_msg_new_by_cmd(cmd, &db->self, cur_tid, sequence);
swarmkv_net_send(db->net, target_node, msg);
return;
}
- if(current_thread_id != target_thread_id)
+ if(cur_tid != target_tid)
{
//cmd is executed in target thread's on_msg_callback
- long long sequence=swarmkv_rpc_launch(db->rpc_mgr, current_thread_id, future_of_caller);
- msg=swarmkv_msg_new_by_cmd(cmd, &db->self, current_thread_id, sequence);
- swarmkv_mesh_send(db->mesh, current_thread_id, target_thread_id, msg);
+ long long sequence=swarmkv_rpc_launch(db->rpc_mgr, cur_tid, future_of_caller);
+ msg=swarmkv_msg_new_by_cmd(cmd, &db->self, cur_tid, sequence);
+ swarmkv_mesh_send(db->mesh, cur_tid, target_tid, msg);
return;
}
@@ -892,7 +928,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t *
swarmkv_monitor_record_command(db->mod_monitor, spec->name, timespec_diff_usec(&start, &end));
- if(accessing_node)//Remote call, non-recursive exec
+ if(accessing_node && node_compare(&db->self, accessing_node))//Remote call, non-recursive exec
{
struct promise *p=future_to_promise(future_of_caller);
promise_success(p, reply);
@@ -1088,6 +1124,9 @@ void command_spec_init(struct swarmkv *db)
latency_command, db->mod_monitor);
/* low-level state-based CRDT synchronization commands*/
+ command_register(&(db->command_table), "CRDT ADD", "key [IP:port ...]",
+ 1, 2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE,
+ crdt_add_command, db->mod_store);
command_register(&(db->command_table), "CRDT GET", "key",
1, 2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE,
crdt_get_command, db->mod_store);
@@ -1245,19 +1284,45 @@ char *swarmkv_get_command_hint(struct swarmkv *db, const char* cmd_name)
sdsfreesplitres(argv,argc);
return NULL;
}
-void swarmkv_dispatch(struct swarmkv *db)
+void timeout_cb(evutil_socket_t fd, short event, void *arg)
+{
+ struct swarmkv_thread_ctx *ctx = (struct swarmkv_thread_ctx *)arg;
+ event_base_loopbreak(ctx->evbase);
+}
+void swarmkv_caller_loop(struct swarmkv *db, struct timeval *tv)
{
int tid=__gettid(db);
//must initiate from caller threads, and caller thread ID is larger than worker thread ID
assert(tid >= db->opts->nr_worker_threads);
- event_base_loop(db->evbases[tid], EVLOOP_ONCE|EVLOOP_NONBLOCK);
+ struct swarmkv_thread_ctx *ctx=db->threads+tid;
+ struct event *timeout_event = NULL;
+ if(tv)
+ {
+ timeout_event = event_new(ctx->evbase, -1, 0, timeout_cb, ctx);
+ evtimer_add(timeout_event, tv);
+ event_base_loop(ctx->evbase, EVLOOP_ONCE);
+ event_free(timeout_event);
+ }
+ else
+ {
+ event_base_loop(ctx->evbase, EVLOOP_ONCE);
+ }
+
return;
}
-void __swarmkv_periodic(evutil_socket_t fd, short what, void * arg)
+void swarmkv_caller_loop_break(struct swarmkv *db)
{
- struct swarmkv_thread_ctx *thread=(struct swarmkv_thread_ctx *)arg;
- swarmkv_store_periodic(thread->db->mod_store, thread->thread_id);
- swarmkv_keyspace_periodic(thread->db->mod_keyspace, thread->thread_id);
+ int tid=__gettid(db);
+ //must initiate from caller threads, and caller thread ID is larger than worker thread ID
+ assert(tid >= db->opts->nr_worker_threads);
+ event_base_loopbreak(db->ref_evbases[tid]);
+}
+struct event_base *swarmkv_get_event_base(struct swarmkv *db)
+{
+ int tid=__gettid(db);
+ //must initiate from caller threads, and caller thread ID is larger than worker thread ID
+ assert(tid >= db->opts->nr_worker_threads);
+ return db->threads[tid].evbase;
}
struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name, char **err)
{
@@ -1265,6 +1330,8 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name,
// event_set_log_callback(libevent_log_cb);
db=ALLOC(struct swarmkv, 1);
strncpy(db->db_name, db_name, sizeof(db->db_name));
+ opts->total_threads=opts->nr_caller_threads+opts->nr_worker_threads;
+ db->threads=ALLOC(struct swarmkv_thread_ctx,opts->total_threads);
/* adds locking, only required if accessed from separate threads */
evthread_use_pthreads();
@@ -1295,17 +1362,20 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name,
if(opts->dryrun)
{
}
- db->evbases=ALLOC(struct event_base*, db->opts->nr_worker_threads);
- for(int i=0; i<db->opts->nr_worker_threads + db->opts->nr_caller_threads; i++)
+ db->ref_evbases=ALLOC(struct event_base*, opts->total_threads);
+ for(int i=0; i<opts->total_threads; i++)
{
- db->evbases[i]=event_base_new();
+ db->threads[i].evbase=event_base_new();
+ db->threads[i].db=db;
+ db->ref_evbases[i]=db->threads[i].evbase;
}
- db->rpc_mgr=swarmkv_rpc_mgr_new(db->opts, db->evbases, db->opts->nr_worker_threads);
- db->mesh=swarmkv_mesh_new(db->evbases, db->opts->nr_worker_threads + db->opts->nr_caller_threads, db->logger);
+ db->rpc_mgr=swarmkv_rpc_mgr_new(db->opts, db->ref_evbases, opts->total_threads);
+ db->mesh=swarmkv_mesh_new(db->ref_evbases, opts->total_threads, db->logger);
+ swarmkv_mesh_set_on_msg_cb(db->mesh, on_msg_callback, db);
db->mod_monitor=swarmkv_monitor_new(db->opts);
//Note: if the cluster_port is 0, swarmkv_net_new updates db->self.cluster_port.
- db->net=swarmkv_net_new(db->evbases, db->opts->nr_worker_threads, opts, db->logger, err);
+ db->net=swarmkv_net_new(db->ref_evbases, db->opts->nr_worker_threads, opts, db->logger, err);
if(*err)
{
goto error_out;
@@ -1328,12 +1398,7 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name,
db->mod_store=swarmkv_store_new(db->opts, __exec_cmd, db);
swarmkv_store_set_monitor_handle(db->mod_store, db->mod_monitor);
- struct timeval sync_interval = {db->opts->sync_interval_us/(1000*1000), db->opts->sync_interval_us%(1000*1000)};
- for(int i=0; i<db->opts->nr_worker_threads; i++)
- {
- db->worker_threads[i].store_periodic_ev=event_new(db->evbases[i], -1, EV_PERSIST, __swarmkv_periodic, db->worker_threads+i);
- evtimer_add(db->worker_threads[i].store_periodic_ev, &sync_interval);
- }
+
command_spec_init(db);
@@ -1370,16 +1435,16 @@ error_out:
}
void swarmkv_close(struct swarmkv * db)
{
- for(size_t i=0; i<db->opts->nr_worker_threads; i++)
+ for(size_t i=0; i<db->opts->total_threads; i++)
{
- event_base_loopexit(db->evbases[i], NULL);
- db->worker_threads[i].is_dispatching=0;
+ event_base_loopexit(db->threads[i].evbase, NULL);
+ db->threads[i].is_dispatching=0;
}
for(size_t i=0; i<db->opts->nr_worker_threads; i++)
{
- pthread_join(db->worker_threads[i].thr, NULL);
+ pthread_join(db->threads[i].thr, NULL);
}
- free(db->worker_threads);
+
swarmkv_store_free(db->mod_store);
db->mod_store=NULL;
swarmkv_keyspace_free(db->mod_keyspace);
@@ -1407,9 +1472,18 @@ void swarmkv_close(struct swarmkv * db)
log_handle_destroy(db->logger);
db->logger=NULL;
}
+
+ for(size_t i=0; i<db->opts->total_threads; i++)
+ {
+ event_base_free(db->threads[i].evbase);
+ }
+ free(db->threads);
+ db->threads=NULL;
+
db->opts->is_assigned_to_db=0;
swarmkv_options_free(db->opts);
db->opts=NULL;
+
free(db);
return;
}
diff --git a/src/swarmkv_api.c b/src/swarmkv_api.c
index 06ea6db..0d310b1 100644
--- a/src/swarmkv_api.c
+++ b/src/swarmkv_api.c
@@ -126,7 +126,12 @@ int swarmkv_options_set_dryrun(struct swarmkv_options *opts)
opts->cluster_port=15210;//overrides default p2p port
return 0;
}
-int swarmkv_options_set_worker_thread_number(struct swarmkv_options *opts, size_t nr_worker_threads)
+int swarmkv_options_set_caller_thread_number(struct swarmkv_options *opts, int nr_caller_threads)
+{
+ opts->nr_caller_threads=nr_caller_threads;
+ return 0;
+}
+int swarmkv_options_set_worker_thread_number(struct swarmkv_options *opts, int nr_worker_threads)
{
opts->nr_worker_threads=nr_worker_threads;
return 0;
@@ -311,15 +316,14 @@ void swarmkv_btconsume(struct swarmkv * db, const char * key, size_t keylen, con
}
struct blocking_query_ctx
{
- pthread_cond_t cond;
- pthread_mutex_t mutex;
struct swarmkv_reply *reply;
+ struct swarmkv *db;
};
void blocking_query_cb(const struct swarmkv_reply *reply, void * arg)
{
struct blocking_query_ctx *ctx=(struct blocking_query_ctx*) arg;
ctx->reply=swarmkv_reply_dup(reply);
- pthread_cond_signal(&ctx->cond);
+ swarmkv_caller_loop_break(ctx->db);
return;
}
struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *target, int argc, sds *argv)
@@ -330,8 +334,7 @@ struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *ta
memset(&ctx, 0, sizeof(ctx));
node_t target_node;
memset(&target_node, 0, sizeof(node_t));
- pthread_cond_init(&ctx.cond, NULL);
- pthread_mutex_init(&ctx.mutex, NULL);
+ ctx.db=db;
cmd=swarmkv_cmd_new(argc);
for(int i=0; i<argc; i++)
{
@@ -347,24 +350,10 @@ struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *ta
node_init_from_sds(&target_node, target);
exec_for_local(db, cmd, &target_node, blocking_query_cb, &ctx);
}
-
- struct timespec max_wait = {0, 0};
- clock_gettime(CLOCK_REALTIME, &max_wait);
- max_wait.tv_sec+=100;
- int timed_wait_rv=0;
- if(NULL==ctx.reply)
- {
- pthread_mutex_lock(&ctx.mutex);
- timed_wait_rv=pthread_cond_timedwait(&ctx.cond, &ctx.mutex, &max_wait);
- pthread_mutex_unlock(&ctx.mutex);
- if(timed_wait_rv)
- {
- printf("%s\n", swarmkv_util_pthread_cond_timedwait_error_to_string(timed_wait_rv));
- ctx.reply=swarmkv_reply_new_error("timeout");
- }
- }
+ swarmkv_caller_loop(db, NULL);
reply=ctx.reply;
ctx.reply=NULL;
+
swarmkv_cmd_free(cmd);
return reply;
}
@@ -395,7 +384,7 @@ struct swarmkv_reply *swarmkv_command(struct swarmkv *db,const char *format, ...
{
char *cmd_str=NULL;
va_list ap;
- va_start(ap,format);
+ va_start(ap, format);
vasprintf(&cmd_str, format, ap);
va_end(ap);
diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c
index e00a973..7c00c18 100644
--- a/src/swarmkv_keyspace.c
+++ b/src/swarmkv_keyspace.c
@@ -1991,7 +1991,7 @@ void swarmkv_keyspace_periodic(struct swarmkv_module *mod_keyspace, int thread_i
if(i%ks->opts->nr_worker_threads!=thread_id) continue;
slot_rt=ks->slot_rts+i;
if(!slot_rt->I_am_owner) continue;
- if(pthread_mutex_trylock(&slot_rt->sanity_lock))
+ if(0==pthread_mutex_trylock(&slot_rt->sanity_lock))
{
pthread_mutex_unlock(&slot_rt->sanity_lock);
}
diff --git a/src/swarmkv_mesh.c b/src/swarmkv_mesh.c
index 9ee02b5..0a24c3a 100644
--- a/src/swarmkv_mesh.c
+++ b/src/swarmkv_mesh.c
@@ -19,6 +19,7 @@ struct swarmkv_mesh_thread
{
int thread_id;
int efd;
+ struct event * ev;
ringbuf_t *ring;
char *buff;
ringbuf_worker_t **workers;
@@ -65,7 +66,7 @@ error_out:
swarmkv_msg_free(msg);
return -1;
}
-void swarmkv_mesh_set_on_msg_recv_cb(struct swarmkv_mesh *mesh, on_msg_callback_t cb_func, void *cb_arg)
+void swarmkv_mesh_set_on_msg_cb(struct swarmkv_mesh *mesh, on_msg_callback_t cb_func, void *cb_arg)
{
mesh->on_msg_recv=cb_func;
mesh->msg_recv_arg=cb_arg;
@@ -80,6 +81,8 @@ void swarmkv_mesh_free(struct swarmkv_mesh *mesh)
free(mesh->threads[i].ring);
free(mesh->threads[i].workers);
free(mesh->threads[i].buff);
+ event_del(mesh->threads[i].ev);
+ event_free(mesh->threads[i].ev);
}
free(mesh->threads);
free(mesh);
@@ -134,7 +137,8 @@ struct swarmkv_mesh *swarmkv_mesh_new(struct event_base *evbase[], int nthreads,
log_fatal(mesh->ref_logger, MODULE_SWAMRKV_MESH, "eventfd() failed: %s", strerror(errno));
assert(0);
}
- event_new(evbase[i],mesh->threads[i].efd, EV_READ|EV_PERSIST, swarmkv_mesh_on_event, mesh->threads+i);
+ mesh->threads[i].ev=event_new(evbase[i], mesh->threads[i].efd, EV_READ|EV_PERSIST, swarmkv_mesh_on_event, mesh->threads+i);
+ event_add(mesh->threads[i].ev, NULL);
mesh->threads[i].ref_mesh=mesh;
}
return mesh;
diff --git a/src/swarmkv_message.c b/src/swarmkv_message.c
index 73e1469..a1bad9d 100644
--- a/src/swarmkv_message.c
+++ b/src/swarmkv_message.c
@@ -183,25 +183,25 @@ static void swarmkv_cmd_serialize(const struct swarmkv_cmd *cmd, char **blob, si
*blob_sz=root_mpack_sz;
return;
}
-struct swarmkv_msg *swarmkv_msg_new_by_cmd(const struct swarmkv_cmd *cmd, const node_t *source, int thread_id, long long sequence)
+struct swarmkv_msg *swarmkv_msg_new_by_cmd(const struct swarmkv_cmd *cmd, const node_t *caller, int caller_tid, long long sequence)
{
struct swarmkv_msg *msg=ALLOC(struct swarmkv_msg, 1);
msg->magic=SWARMKV_MSG_MAGIC;
msg->type=MSG_TYPE_CMD;
- msg->thread_id=thread_id;
+ msg->caller_tid=caller_tid;
msg->sequence=sequence;
- memcpy(&msg->source, &source, sizeof(node_t));
+ node_copy(&msg->caller, caller);
msg->cmd=swarmkv_cmd_dup(cmd);
return msg;
}
-struct swarmkv_msg *swarmkv_msg_new_by_reply(const struct swarmkv_reply *reply, const node_t *source, int thread_id, long long sequence)
+struct swarmkv_msg *swarmkv_msg_new_by_reply(const struct swarmkv_reply *reply, const node_t *caller, int caller_tid, long long sequence)
{
struct swarmkv_msg *msg=ALLOC(struct swarmkv_msg, 1);
msg->magic=SWARMKV_MSG_MAGIC;
msg->type=MSG_TYPE_REPLY;
- msg->thread_id=thread_id;
+ msg->caller_tid=caller_tid;
msg->sequence=sequence;
- memcpy(&msg->source, &source, sizeof(node_t));
+ node_copy(&msg->caller, caller);
msg->reply=swarmkv_reply_dup(reply);
return msg;
}
@@ -220,7 +220,7 @@ void swarmkv_msg_free(struct swarmkv_msg *msg)
}
void swarmkv_msg_serialize(const struct swarmkv_msg *msg, char **blob, size_t *blob_sz)
{
-
+ struct swarmkv_msg *pseudo_hdr=NULL;
char *payload=NULL;
size_t payload_len=0;
if(msg->type==MSG_TYPE_CMD)
@@ -233,6 +233,8 @@ void swarmkv_msg_serialize(const struct swarmkv_msg *msg, char **blob, size_t *b
}
*blob=ALLOC(char, payload_len+SWARMKV_MSG_HDR_SIZE);
memcpy(*blob, msg, SWARMKV_MSG_HDR_SIZE);
+ pseudo_hdr=(struct swarmkv_msg *)*blob;
+ pseudo_hdr->payload_len=payload_len;
memcpy(*blob+SWARMKV_MSG_HDR_SIZE, payload, payload_len);
*blob_sz=payload_len+SWARMKV_MSG_HDR_SIZE;
free(payload);
diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c
index 58822ca..fee1fb2 100644
--- a/src/swarmkv_net.c
+++ b/src/swarmkv_net.c
@@ -280,6 +280,7 @@ static void peer_conn_read_cb(struct bufferevent *bev, void *arg)
assert(bev==conn->bev);
snet_thread_buffer_usage_stat(thr);
assert(conn->ref_thr == conn->ref_thr->ref_net->threads+conn->thread_id);
+ struct swarmkv_msg *msg=NULL;
if(conn->need_to_be_kill)
{
log_fatal(thr->ref_logger, MODULE_SWAMRKV_NET, "connection %s is killed",
@@ -316,7 +317,7 @@ static void peer_conn_read_cb(struct bufferevent *bev, void *arg)
}
if(!conn->is_in_conn_table && !conn->is_duplicated)
{
- snet_conn_set_peer(conn, &hdr->source);
+ snet_conn_set_peer(conn, &hdr->caller);
snet_conn_table_add(thr, conn);
}
conn->recv_state=RECEIVING_PAYLOAD;
@@ -332,7 +333,10 @@ static void peer_conn_read_cb(struct bufferevent *bev, void *arg)
conn->recv_state=RECEIVING_PAYLOAD;
break;
}
- swarmkv_msg_deserialize(recv_buff, msg_sz);
+ msg=swarmkv_msg_deserialize(recv_buff, msg_sz);
+ thr->ref_net->on_msg_cb(msg, thr->ref_net->on_msg_cb_arg);
+ swarmkv_msg_free(msg);
+ msg=NULL;
evbuffer_drain(input, msg_sz);
thr->stat.input_bytes += msg_sz;
conn->recv_state=RECEIVING_HDR;
@@ -600,7 +604,6 @@ void swarmkv_net_set_on_msg_callback(struct swarmkv_net *net, on_msg_callback_t
void swarmkv_net_send(struct swarmkv_net *net, const node_t *dest, struct swarmkv_msg *msg)
{
int tid=__gettid((struct swarmkv *)(net->on_msg_cb_arg));
- assert(tid==msg->thread_id);
struct snet_thread *thr=net->threads+tid;
struct snet_conn *conn=NULL;
HASH_FIND(hh, thr->conn_table, dest, sizeof(node_t), conn);
diff --git a/src/swarmkv_rpc.c b/src/swarmkv_rpc.c
index f94cd10..9390aff 100644
--- a/src/swarmkv_rpc.c
+++ b/src/swarmkv_rpc.c
@@ -30,13 +30,13 @@ struct swarmkv_rpc_mgr
long long timed_out_rpcs;
long long unknown_sequence;
};
-struct swarmkv_rpc_mgr *swarmkv_rpc_mgr_new(const struct swarmkv_options *opts, struct event_base *evbases[], int nr_worker_threads)
+struct swarmkv_rpc_mgr *swarmkv_rpc_mgr_new(const struct swarmkv_options *opts, struct event_base *evbases[], int nr_threads)
{
struct swarmkv_rpc_mgr *mgr=ALLOC(struct swarmkv_rpc_mgr, 1);
mgr->seq_generator=0;
- mgr->nr_worker_threads=nr_worker_threads;
+ mgr->nr_worker_threads=nr_threads;
mgr->evbases=evbases;
- mgr->rpc_table=ALLOC(struct swarmkv_rpc *, nr_worker_threads);
+ mgr->rpc_table=ALLOC(struct swarmkv_rpc *, nr_threads);
mgr->timeout_us=opts->cluster_timeout_us;
return mgr;
}
@@ -58,6 +58,7 @@ static void rpc_timeout_callback(evutil_socket_t fd, short events, void *arg)
rpc->ref_mgr->timed_out_rpcs++;
struct promise *p=future_to_promise(rpc->f);
promise_failed(p, FUTURE_ERROR_TIMEOUT, "rpc timed out");
+ swarmkv_rpc_free(rpc);
}
long long swarmkv_rpc_launch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct future *f)
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c
index 18b0257..bef45ee 100644
--- a/src/swarmkv_store.c
+++ b/src/swarmkv_store.c
@@ -675,7 +675,7 @@ enum cmd_exec_result type_command(struct swarmkv_module *mod_store, const struct
}
enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply)
{
-/*CRDT ADD key IP:port [IP:port ...]*/
+/*CRDT ADD key [IP:port ...]*/
struct scontainer *ctr=NULL;
struct swarmkv_store *store=module2store(mod_store);
size_t max_pull_node_num=4;
@@ -735,6 +735,7 @@ enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const st
swarmkv_cmd_free(crdt_get_cmd);
crdt_get_cmd=NULL;
free(replica_nodes);
+ *reply=swarmkv_reply_new_status("OK");
return FINISHED;
}
enum cmd_exec_result crdt_get_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply)
diff --git a/test/swarmkv_gtest.cpp b/test/swarmkv_gtest.cpp
index 721d309..50b034d 100644
--- a/test/swarmkv_gtest.cpp
+++ b/test/swarmkv_gtest.cpp
@@ -11,7 +11,7 @@
#include <sys/time.h>
#define CMD_EXEC_TIMEOUT_MS 1000*2
-
+struct timeval g_exec_timeout={CMD_EXEC_TIMEOUT_MS/1000, (CMD_EXEC_TIMEOUT_MS%1000)*1000};
void generic_callback(const struct swarmkv_reply *reply, void * cb_arg)
{
struct cmd_exec_arg *arg=(struct cmd_exec_arg*)cb_arg;
@@ -41,12 +41,15 @@ protected:
swarmkv_options_set_cluster_port(opts, 5210);
swarmkv_options_set_health_check_port(opts, 6210);
swarmkv_options_set_logger(opts, logger);
+ swarmkv_options_set_worker_thread_number(opts, 1);
+ swarmkv_options_set_caller_thread_number(opts, 1);
db=swarmkv_open(opts, cluster_name, &err);
if(err)
{
printf("swarmkv_open failed: %s.\n", err);
free(err);
}
+ swarmkv_register_thread(db);
}
static void TearDownTestCase()
{
@@ -62,28 +65,21 @@ struct swarmkv *SwarmkvBasicTest::db;
struct log_handle *SwarmkvBasicTest::logger;
TEST_F(SwarmkvBasicTest, TypeString)
{
- struct cmd_exec_arg *arg=NULL;
struct swarmkv *db=SwarmkvBasicTest::db;
const char *key="name";
const char *val="zhangsan";
- int exec_successful=0;
- arg=cmd_exec_arg_new();
-
- cmd_exec_arg_expect_OK(arg);
- swarmkv_set(db, key, strlen(key), val, strlen(val), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ struct swarmkv_reply *reply=NULL;
+ reply=swarmkv_command(db, "SET %s %s", key, val);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ EXPECT_STREQ(reply->str, "OK");
+ swarmkv_reply_free(reply);
+
+ reply=swarmkv_command(db, "GET %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING);
+ EXPECT_STREQ(reply->str, val);
+ swarmkv_reply_free(reply);
- cmd_exec_arg_expect_cstring(arg, val);
- swarmkv_get(db, key, strlen(key), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
-
- cmd_exec_arg_free(arg);
- arg=NULL;
}
TEST_F(SwarmkvBasicTest, TypeInteger)
{
@@ -122,26 +118,21 @@ TEST_F(SwarmkvBasicTest, TypeInteger)
}
TEST_F(SwarmkvBasicTest, GenericDEL)
{
- struct cmd_exec_arg *arg=NULL;
struct swarmkv *db=SwarmkvBasicTest::db;
const char* key="name2";
const char* val="zhangsan";
- int exec_successful=0;
- arg=cmd_exec_arg_new();
- cmd_exec_arg_expect_OK(arg);
- swarmkv_set(db, key, strlen(key), val, strlen(val), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ struct swarmkv_reply *reply=NULL;
-
- cmd_exec_arg_expect_integer(arg, 1);
- swarmkv_del(db, key, strlen(key), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_free(arg);
- arg=NULL;
+ reply=swarmkv_command(db, "SET %s %s", key, val);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ EXPECT_STREQ(reply->str, "OK");
+ swarmkv_reply_free(reply);
+
+ reply=swarmkv_command(db, "DEL %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 1);
+ swarmkv_reply_free(reply);
}
TEST_F(SwarmkvBasicTest, GenericTYPE)
{
@@ -217,109 +208,79 @@ TEST_F(SwarmkvBasicTest, GenericTYPE)
}
TEST_F(SwarmkvBasicTest, TypeSet)
{
- struct cmd_exec_arg *arg=NULL;
struct swarmkv *db=SwarmkvBasicTest::db;
- const char* key="Jack's friends";
- const char* member[]={"zhangsan", "lisi", "王二麻子", "Tom", "مرحبا"};
- size_t member_len[32];
- size_t n_member=0, i=0;
- n_member=sizeof(member)/sizeof(const char*);
- for(i=0; i<n_member; i++)
- {
- member_len[i]=strlen(member[i])+1;
- }
-
- int exec_successful=0;
-
- arg=cmd_exec_arg_new();
-
- //SADD key m0 m2 ... m4
- cmd_exec_arg_expect_integer(arg, n_member);
- swarmkv_sadd(db, key, strlen(key)+1, member, member_len, n_member, generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
-
- //SMEMEBERS key
- cmd_exec_arg_expect_array(arg, n_member);
- swarmkv_smembers(db, key, strlen(key)+1, generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ const char *key="friends_of_Jack";
+ const char *member[]={"zhangsan", "lisi", "王二麻子", "Tom", "مرحبا"};
+ size_t i=0;
+ struct swarmkv_reply *reply=NULL;
+ reply=swarmkv_command(db, "SADD %s %s %s %s %s %s", key, member[0], member[1], member[2], member[3], member[4]);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 5);
+ swarmkv_reply_free(reply);
- //SREM key m0 m1
- cmd_exec_arg_expect_integer(arg, 2);
- swarmkv_srem(db, key, strlen(key)+1, member, member_len, 2, generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ reply=swarmkv_command(db, "SMEMBERS %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_ARRAY);
+ EXPECT_EQ(reply->n_element, 5);
+ for(i=0; i<reply->n_element; i++)
+ {
+ EXPECT_EQ(reply->elements[i]->type, SWARMKV_REPLY_STRING);
+ EXPECT_STREQ(reply->elements[i]->str, member[i]);
+ }
+ swarmkv_reply_free(reply);
- //SISMEMBER key m0
- cmd_exec_arg_expect_integer(arg, 0);
- swarmkv_sismember(db, key, strlen(key)+1, member[0], member_len[0], generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ reply=swarmkv_command(db, "SREM %s %s %s", key, member[0], member[1]);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 2);
+ swarmkv_reply_free(reply);
- //SISMEMBER key m2
- cmd_exec_arg_expect_integer(arg, 1);
- swarmkv_sismember(db, key, strlen(key)+1, member[2], member_len[2], generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ reply=swarmkv_command(db, "SISMEMBER %s %s", key, member[0]);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 0);
+ swarmkv_reply_free(reply);
+ reply=swarmkv_command(db, "SISMEMBER %s %s", key, member[2]);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 1);
+ swarmkv_reply_free(reply);
- //SCARD key
- cmd_exec_arg_expect_integer(arg, 3);
- swarmkv_scard(db, key, strlen(key)+1, generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ reply=swarmkv_command(db, "SCARD %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 3);
+ swarmkv_reply_free(reply);
- //DEL key
- cmd_exec_arg_expect_integer(arg, 1);
- swarmkv_del(db, key, strlen(key)+1, generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ reply=swarmkv_command(db, "DEL %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 1);
+ swarmkv_reply_free(reply);
- //SISMEMBER key m2
- cmd_exec_arg_expect_integer(arg, 0);
- swarmkv_sismember(db, key, strlen(key)+1, member[2], member_len[2], generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ reply=swarmkv_command(db, "SISMEMBER %s %s", key, member[2]);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 0);
+ swarmkv_reply_free(reply);
- //SCARD key
- cmd_exec_arg_expect_integer(arg, 0);
- swarmkv_scard(db, key, strlen(key)+1, generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ reply=swarmkv_command(db, "SCARD %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 0);
+ swarmkv_reply_free(reply);
-
- cmd_exec_arg_free(arg);
- arg=NULL;
}
TEST_F(SwarmkvBasicTest, TypeTokenBucket)
{
- struct cmd_exec_arg *arg=NULL;
struct swarmkv *db=SwarmkvBasicTest::db;
const char *key="tb-192.168.0.1";
- int exec_successful=0;
- arg=cmd_exec_arg_new();
+ struct swarmkv_reply *reply=NULL;
+
long long capacity=1024*4, rate=1024*2, request_tokens=0, allocated_tokens=0;
- cmd_exec_arg_expect_OK(arg);
- swarmkv_async_command(db, generic_callback, arg, "TCFG %s %lld %lld", key, rate, capacity);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- cmd_exec_arg_clear(arg);
- EXPECT_TRUE(exec_successful);
+ reply=swarmkv_command(db, "TCFG %s %lld %lld", key, rate, capacity);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ swarmkv_reply_free(reply);
+
struct timeval start, now;
gettimeofday(&start, NULL);
gettimeofday(&now, NULL);
srand(171);
- struct swarmkv_reply *reply=NULL;
+
int i=0;
while(now.tv_sec - start.tv_sec<3)
{
@@ -334,7 +295,6 @@ TEST_F(SwarmkvBasicTest, TypeTokenBucket)
i++;
}
EXPECT_LE(allocated_tokens, (now.tv_sec -start.tv_sec)*rate+capacity);
- cmd_exec_arg_free(arg);
//Infinite tokens
reply=swarmkv_command(db, "TCFG %s 0 0", key);
@@ -355,22 +315,19 @@ TEST_F(SwarmkvBasicTest, TypeTokenBucket)
}
TEST_F(SwarmkvBasicTest, TypeFairTokenBucket)
{
- struct cmd_exec_arg *arg=NULL;
struct swarmkv *db=SwarmkvBasicTest::db;
const char *key="3-floor-bandwidth-100Mbps";
- int exec_successful=0;
- arg=cmd_exec_arg_new();
long long capacity=200*1024*1024, rate=100*1024*1024, request_tokens=0, allocated_tokens=0;
- cmd_exec_arg_expect_OK(arg);
- swarmkv_async_command(db, generic_callback, arg, "FTCFG %s %lld %lld 128", key, rate, capacity);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- cmd_exec_arg_clear(arg);
- EXPECT_TRUE(exec_successful);
+
+ struct swarmkv_reply *reply=NULL;
+ reply=swarmkv_command(db, "FTCFG %s %lld %lld 128", key, rate, capacity);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ swarmkv_reply_free(reply);
+
struct timeval start, now;
gettimeofday(&start, NULL);
gettimeofday(&now, NULL);
srand(171);
- struct swarmkv_reply *reply=NULL;
int i=0;
while(now.tv_sec - start.tv_sec<3)
{
@@ -385,7 +342,7 @@ TEST_F(SwarmkvBasicTest, TypeFairTokenBucket)
i++;
}
EXPECT_LE(allocated_tokens, (now.tv_sec -start.tv_sec)*rate+capacity);
- cmd_exec_arg_free(arg);
+
//Infinite tokens
reply=swarmkv_command(db, "FTCFG %s 0 0 256", key);
@@ -406,22 +363,20 @@ TEST_F(SwarmkvBasicTest, TypeFairTokenBucket)
}
TEST_F(SwarmkvBasicTest, TypeBulkTokenBucket)
{
- struct cmd_exec_arg *arg=NULL;
struct swarmkv *db=SwarmkvBasicTest::db;
const char *key="everyone-has-1Mbps";
- int exec_successful=0;
- arg=cmd_exec_arg_new();
+
long long capacity=2*1024*1024, rate=1*1024*1024, request_tokens=0, allocated_tokens=0;
- cmd_exec_arg_expect_OK(arg);
- swarmkv_async_command(db, generic_callback, arg, "BTCFG %s %lld %lld 128", key, rate, capacity);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- cmd_exec_arg_clear(arg);
- EXPECT_TRUE(exec_successful);
+
+ struct swarmkv_reply *reply=NULL;
+ reply=swarmkv_command(db, "BTCFG %s %lld %lld 128", key, rate, capacity);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ swarmkv_reply_free(reply);
+
struct timeval start, now;
gettimeofday(&start, NULL);
gettimeofday(&now, NULL);
srand(171);
- struct swarmkv_reply *reply=NULL;
int i=0, n_member=120;
while(now.tv_sec - start.tv_sec<3)
{
@@ -436,7 +391,6 @@ TEST_F(SwarmkvBasicTest, TypeBulkTokenBucket)
i++;
}
EXPECT_LE(allocated_tokens/n_member, (now.tv_sec -start.tv_sec)*rate+capacity);
- cmd_exec_arg_free(arg);
//Infinite tokens
reply=swarmkv_command(db, "BTCFG %s 0 0 256", key);
@@ -531,61 +485,42 @@ TEST_F(SwarmkvBasicTest, TypeHash)
}
TEST_F(SwarmkvBasicTest, EXPIRE_TTL)
{
- struct cmd_exec_arg *arg=NULL;
struct swarmkv *db=SwarmkvBasicTest::db;
- const char* key="quarantine";
- const char* val="wuhan-江夏-如家";
- int exec_successful=0;
+ const char *key="quarantine";
+ const char *val="wuhan-江夏-如家";
int seconds=3;
-
- //SET key value
- arg=cmd_exec_arg_new();
- cmd_exec_arg_expect_OK(arg);
- swarmkv_set(db, key, strlen(key), val, strlen(val), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ struct swarmkv_reply *reply=NULL;
- //EXPIRE key seconds
- cmd_exec_arg_expect_integer(arg, 1);
- swarmkv_expire(db, key, strlen(key), seconds, generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ reply=swarmkv_command(db, "SET %s %s", key, val);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ EXPECT_STREQ(reply->str, "OK");
+ swarmkv_reply_free(reply);
- //TTL key
- cmd_exec_arg_expect_integer(arg, seconds);
- swarmkv_ttl(db, key, strlen(key), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ reply=swarmkv_command(db, "EXPIRE %s %d", key, seconds);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 1);
+ swarmkv_reply_free(reply);
- sleep(seconds+2);
+ reply=swarmkv_command(db, "TTL %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, seconds);
+ swarmkv_reply_free(reply);
- //GET key
- cmd_exec_arg_expect_NIL(arg);
- swarmkv_get(db, key, strlen(key), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ sleep(seconds+2);
+ reply=swarmkv_command(db, "GET %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_NIL);
+ swarmkv_reply_free(reply);
- //EXPIRE key seconds
- cmd_exec_arg_expect_integer(arg, 0);
- swarmkv_expire(db, key, strlen(key), seconds, generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
+ reply=swarmkv_command(db, "EXPIRE %s %d", key, seconds);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 0);
+ swarmkv_reply_free(reply);
- //TTL key
- cmd_exec_arg_expect_integer(arg, -2);
- swarmkv_ttl(db, key, strlen(key), generic_callback, arg);
- exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS);
- EXPECT_TRUE(exec_successful);
- cmd_exec_arg_clear(arg);
-
- cmd_exec_arg_free(arg);
- arg=NULL;
+ reply=swarmkv_command(db, "TTL %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, -2);
+ swarmkv_reply_free(reply);
}
TEST_F(SwarmkvBasicTest, HashTags)
{
diff --git a/test/test_utils.c b/test/test_utils.c
index c4c6cac..a03f4b5 100644
--- a/test/test_utils.c
+++ b/test/test_utils.c
@@ -165,7 +165,7 @@ void error_pthread_cond_timedwait(const int timed_wait_rv)
}
}
-int cmd_exec_arg_wait(struct cmd_exec_arg* arg, long timeout_ms)
+int cmd_exec_arg_wait(struct cmd_exec_arg *arg, long timeout_ms)
{
struct timespec max_wait = {0, 0};
clock_gettime(CLOCK_REALTIME, &max_wait);