diff options
| author | Zheng Chao <[email protected]> | 2023-07-24 03:24:31 +0800 |
|---|---|---|
| committer | Zheng Chao <[email protected]> | 2023-07-24 03:24:31 +0800 |
| commit | 49c33b4687c065ba2003a512aad67c24388b3d8e (patch) | |
| tree | 305d54705a861f5b6f49783d77ae5e81b4af3b3c | |
| parent | 27da04165b6e36d78dd2a4dfd2bb4962af919d6b (diff) | |
WIP: It works.
| -rw-r--r-- | include/swarmkv/swarmkv.h | 10 | ||||
| -rw-r--r-- | src/inc_internal/swarmkv_common.h | 1 | ||||
| -rw-r--r-- | src/inc_internal/swarmkv_core.h | 6 | ||||
| -rw-r--r-- | src/inc_internal/swarmkv_mesh.h | 2 | ||||
| -rw-r--r-- | src/inc_internal/swarmkv_message.h | 8 | ||||
| -rw-r--r-- | src/inc_internal/swarmkv_store.h | 2 | ||||
| -rw-r--r-- | src/swarmkv.c | 196 | ||||
| -rw-r--r-- | src/swarmkv_api.c | 35 | ||||
| -rw-r--r-- | src/swarmkv_keyspace.c | 2 | ||||
| -rw-r--r-- | src/swarmkv_mesh.c | 8 | ||||
| -rw-r--r-- | src/swarmkv_message.c | 16 | ||||
| -rw-r--r-- | src/swarmkv_net.c | 9 | ||||
| -rw-r--r-- | src/swarmkv_rpc.c | 7 | ||||
| -rw-r--r-- | src/swarmkv_store.c | 3 | ||||
| -rw-r--r-- | test/swarmkv_gtest.cpp | 299 | ||||
| -rw-r--r-- | test/test_utils.c | 2 |
16 files changed, 315 insertions, 291 deletions
diff --git a/include/swarmkv/swarmkv.h b/include/swarmkv/swarmkv.h index ce41260..285cef6 100644 --- a/include/swarmkv/swarmkv.h +++ b/include/swarmkv/swarmkv.h @@ -11,6 +11,8 @@ #pragma once #include <stddef.h> #include <stdio.h> +#include <sys/time.h> + #ifdef __cplusplus extern "C" { @@ -62,7 +64,8 @@ int swarmkv_options_set_consul_port(struct swarmkv_options *opts, unsigned int c int swarmkv_options_set_consul_host(struct swarmkv_options *opts, const char* ip_addr); int swarmkv_options_set_dryrun(struct swarmkv_options *opts); int swarmkv_options_set_disable_run_for_leader(struct swarmkv_options *opts); -int swarmkv_options_set_worker_thread_number(struct swarmkv_options *opts, size_t nr_worker_threads); +int swarmkv_options_set_caller_thread_number(struct swarmkv_options *opts, int nr_caller_threads); +int swarmkv_options_set_worker_thread_number(struct swarmkv_options *opts, int nr_worker_threads); int swarmkv_options_set_cluster_announce_ip(struct swarmkv_options *opts, const char *ip_addr); int swarmkv_options_set_cluster_announce_port(struct swarmkv_options *opts, unsigned int cluster_announce_port); @@ -74,6 +77,11 @@ struct swarmkv; struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *cluster_name, char **err); void swarmkv_close(struct swarmkv * db); + +void swarmkv_register_thread(struct swarmkv *db); +void swarmkv_caller_loop(struct swarmkv *db, struct timeval *tv); +void swarmkv_caller_loop_break(struct swarmkv *db); + //Blocking function struct swarmkv_reply *swarmkv_command(struct swarmkv *db,const char *format, ...); struct swarmkv_reply *swarmkv_command_on(struct swarmkv *db, const char *target, const char *format, ...); diff --git a/src/inc_internal/swarmkv_common.h b/src/inc_internal/swarmkv_common.h index 3e9e11a..063f0c1 100644 --- a/src/inc_internal/swarmkv_common.h +++ b/src/inc_internal/swarmkv_common.h @@ -75,6 +75,7 @@ struct swarmkv_options uuid_t bin_uuid; int nr_worker_threads; int nr_caller_threads; + int total_threads; int is_assigned_to_db; }; diff --git a/src/inc_internal/swarmkv_core.h b/src/inc_internal/swarmkv_core.h new file mode 100644 index 0000000..365851d --- /dev/null +++ b/src/inc_internal/swarmkv_core.h @@ -0,0 +1,6 @@ +#pragma once + +#include "swarmkv.h" +#include "swarmkv_common.h" +void exec_for_local(struct swarmkv *db, const struct swarmkv_cmd *cmd, const node_t *target_node, swarmkv_on_reply_callback_t * cb, void * cb_arg); +struct event_base *swarmkv_get_event_base(struct swarmkv *db);
\ No newline at end of file diff --git a/src/inc_internal/swarmkv_mesh.h b/src/inc_internal/swarmkv_mesh.h index 4a6f04c..c9efcae 100644 --- a/src/inc_internal/swarmkv_mesh.h +++ b/src/inc_internal/swarmkv_mesh.h @@ -3,6 +3,6 @@ struct swarmkv_mesh; int swarmkv_mesh_send(struct swarmkv_mesh *mesh, int current_thread_id, int dest_thread_id, struct swarmkv_msg *msg); -void swarmkv_mesh_set_on_msg_recv_cb(struct swarmkv_mesh *mesh, on_msg_callback_t *cb_func, void *cb_arg); +void swarmkv_mesh_set_on_msg_cb(struct swarmkv_mesh *mesh, on_msg_callback_t *cb_func, void *cb_arg); struct swarmkv_mesh *swarmkv_mesh_new(struct event_base *evbase[], int nthreads, struct log_handle *logger); void swarmkv_mesh_free(struct swarmkv_mesh *mesh);
\ No newline at end of file diff --git a/src/inc_internal/swarmkv_message.h b/src/inc_internal/swarmkv_message.h index c1a69ae..3fbb7c4 100644 --- a/src/inc_internal/swarmkv_message.h +++ b/src/inc_internal/swarmkv_message.h @@ -13,9 +13,9 @@ struct swarmkv_msg unsigned int magic; enum MSG_TYPE type; long long sequence; - int thread_id; + int caller_tid; unsigned int payload_len; - node_t source; + node_t caller; union { struct swarmkv_cmd *cmd; @@ -25,8 +25,8 @@ struct swarmkv_msg }; #define SWARMKV_MSG_HDR_SIZE offsetof(struct swarmkv_msg, payload) -struct swarmkv_msg *swarmkv_msg_new_by_cmd(const struct swarmkv_cmd *cmd, const node_t *source, int thread_id, long long sequence); -struct swarmkv_msg *swarmkv_msg_new_by_reply(const struct swarmkv_reply *reply, const node_t *source, int thread_id, long long sequence); +struct swarmkv_msg *swarmkv_msg_new_by_cmd(const struct swarmkv_cmd *cmd, const node_t *caller, int caller_tid, long long sequence); +struct swarmkv_msg *swarmkv_msg_new_by_reply(const struct swarmkv_reply *reply, const node_t *caller, int caller_tid, long long sequence); void swarmkv_msg_free(struct swarmkv_msg *msg); //on_msg_callback_t transfers ownership of msg to the callback function diff --git a/src/inc_internal/swarmkv_store.h b/src/inc_internal/swarmkv_store.h index 241168d..f25ff5f 100644 --- a/src/inc_internal/swarmkv_store.h +++ b/src/inc_internal/swarmkv_store.h @@ -66,7 +66,7 @@ enum cmd_exec_result handle_undefined_object(struct sobj *obj, struct swarmkv_re void store_get_uuid(struct swarmkv_module* mod_store, uuid_t uuid); struct sobj *store_lookup(struct swarmkv_module* mod_store, sds key); - +enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply); enum cmd_exec_result crdt_get_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply); enum cmd_exec_result crdt_merge_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply); enum cmd_exec_result crdt_del_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply); diff --git a/src/swarmkv.c b/src/swarmkv.c index f552a29..70d6e4f 100644 --- a/src/swarmkv.c +++ b/src/swarmkv.c @@ -50,6 +50,7 @@ struct swarmkv_thread_ctx int sys_tid; int thread_id; struct swarmkv *db; + struct event_base *evbase; int is_dispatching; time_t lastime; struct event *store_periodic_ev; @@ -64,9 +65,9 @@ struct swarmkv struct swarmkv_options *opts; int thread_counter; - struct swarmkv_thread_ctx *worker_threads; + struct swarmkv_thread_ctx *threads; - struct event_base **evbases; + struct event_base **ref_evbases; node_t self; struct swarmkv_rpc_mgr *rpc_mgr; @@ -94,8 +95,9 @@ struct swarmkv *module2db(struct swarmkv_module *module) void swarmkv_register_thread(struct swarmkv *db) { int thread_id = atomic_fetch_add(&db->thread_counter, 1); - db->worker_threads[thread_id].sys_tid=syscall(SYS_gettid); - db->worker_threads[thread_id].thread_id=thread_id; + assert(thread_id < db->opts->nr_worker_threads + db->opts->nr_caller_threads); + db->threads[thread_id].sys_tid=syscall(SYS_gettid); + db->threads[thread_id].thread_id=thread_id; return; } int __gettid(struct swarmkv *db) @@ -103,9 +105,9 @@ int __gettid(struct swarmkv *db) int sys_tid=syscall(SYS_gettid); for(int i=0; i<db->opts->nr_worker_threads + db->opts->nr_caller_threads; i++) { - if(db->worker_threads[i].sys_tid==sys_tid) + if(db->threads[i].sys_tid==sys_tid) { - return db->worker_threads[i].thread_id; + return db->threads[i].thread_id; } } assert(0); @@ -154,7 +156,7 @@ struct remote_caller_ctx struct future *my_future; struct swarmkv *db; long long sequence; - int thread_id; + int remote_tid; node_t remote; }; void remote_calller_ctx_free(struct remote_caller_ctx *ctx) @@ -165,12 +167,27 @@ void remote_calller_ctx_free(struct remote_caller_ctx *ctx) free(ctx); return; } +static void remoter_caller_ctx_send_reply(struct remote_caller_ctx *ctx, const struct swarmkv_reply *reply) +{ + struct swarmkv *db = ctx->db; + struct swarmkv_msg *msg=swarmkv_msg_new_by_reply(reply, &ctx->remote, ctx->remote_tid, ctx->sequence); + int cur_tid=__gettid(db); + assert(cur_tid != msg->caller_tid); + if(0==node_compare(&db->self, &msg->caller)) + { + swarmkv_mesh_send(db->mesh, cur_tid, msg->caller_tid, msg); + } + else + { + swarmkv_net_send(ctx->db->net, &ctx->remote, msg); + } + return; +} static void remote_caller_on_success(void *result, void *user) { struct remote_caller_ctx *ctx=(struct remote_caller_ctx*)user; const struct swarmkv_reply *reply=(const struct swarmkv_reply*) result; - struct swarmkv_msg *msg=swarmkv_msg_new_by_reply(reply, &ctx->db->self, ctx->thread_id, ctx->sequence); - swarmkv_net_send(ctx->db->net, &ctx->remote, msg); + remoter_caller_ctx_send_reply(ctx, reply); remote_calller_ctx_free(ctx); return; } @@ -178,8 +195,7 @@ static void remote_caller_on_fail(enum e_future_error err, const char * what, vo { struct remote_caller_ctx *ctx=(struct remote_caller_ctx*)user; struct swarmkv_reply *reply=swarmkv_reply_new_error(what); - struct swarmkv_msg *msg=swarmkv_msg_new_by_reply(reply, &ctx->db->self, ctx->thread_id, ctx->sequence); - swarmkv_net_send(ctx->db->net, &ctx->remote, msg); + remoter_caller_ctx_send_reply(ctx, reply); remote_calller_ctx_free(ctx); return; } @@ -188,13 +204,13 @@ void exec_for_remote(struct swarmkv *db, const struct swarmkv_msg *msg) struct remote_caller_ctx *ctx=ALLOC(struct remote_caller_ctx, 1); ctx->db=db; ctx->sequence=msg->sequence; - ctx->thread_id=msg->thread_id; - node_copy(&ctx->remote, &msg->source); + ctx->remote_tid=msg->caller_tid; + node_copy(&ctx->remote, &msg->caller); ctx->my_future=future_create("for_remote", remote_caller_on_success, remote_caller_on_fail, ctx); - __exec_cmd(db, &msg->source, NULL, msg->cmd, ctx->my_future); + __exec_cmd(db, &msg->caller, NULL, msg->cmd, ctx->my_future); return; } enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply) @@ -518,6 +534,12 @@ static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg) { } +void __swarmkv_periodic(evutil_socket_t fd, short what, void * arg) +{ + struct swarmkv_thread_ctx *thread=(struct swarmkv_thread_ctx *)arg; + swarmkv_store_periodic(thread->db->mod_store, thread->thread_id); + swarmkv_keyspace_periodic(thread->db->mod_keyspace, thread->thread_id); +} void *swarmkv_worker_thread(void *arg) { struct swarmkv_thread_ctx* ctx = (struct swarmkv_thread_ctx *) arg; @@ -527,17 +549,20 @@ void *swarmkv_worker_thread(void *arg) snprintf(thread_name, sizeof(thread_name), "swarmkv-%u", ctx->thread_id); prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL); - struct event * ev = event_new(db->evbases[ctx->thread_id], -1, EV_PERSIST, __dummy_event_handler, db); - struct timeval timer_delay = {2, 0}; + struct timeval sync_interval = {db->opts->sync_interval_us/(1000*1000), db->opts->sync_interval_us%(1000*1000)}; + struct event * periodic_ev=event_new(ctx->evbase, -1, EV_PERSIST, __swarmkv_periodic, ctx); + evtimer_add(periodic_ev, &sync_interval); + ctx->is_dispatching=1; - evtimer_add(ev, &timer_delay); + - int ret=event_base_dispatch(db->evbases[ctx->thread_id]); - event_del(ev); - event_free(ev); + int ret=event_base_dispatch(ctx->evbase); + event_del(periodic_ev); + event_free(periodic_ev); + if(ctx->is_dispatching) { - log_fatal(db->logger, MODULE_SWAMRKV_CORE, "event_base_dispatch() exit abnormally, ret=%d", ret); + log_fatal(db->logger, MODULE_SWAMRKV_CORE, "worker thread event_base_dispatch() exit abnormally, ret=%d", ret); } else { @@ -551,12 +576,22 @@ void swarmkv_threads_run(struct swarmkv *db) int i = 0, ret=0; for (i = 0; i < db->opts->nr_worker_threads; i++) { - ret=pthread_create(&db->worker_threads[i].thr, NULL, swarmkv_worker_thread, (void *) (db->worker_threads+i)); + ret=pthread_create(&db->threads[i].thr, NULL, swarmkv_worker_thread, (void *) (db->threads+i)); if(ret !=0 )//error { log_fatal(db->logger, MODULE_SWAMRKV_CORE, "pthread_create() error %d", ret); } } + int running_threads=0; + while(running_threads < db->opts->nr_worker_threads) + { + running_threads=0; + for (i = 0; i < db->opts->nr_worker_threads; i++) + { + running_threads += db->threads[i].is_dispatching; + } + usleep(10); + } return; } @@ -741,7 +776,9 @@ struct swarmkv_cmd *make_crdt_add_cmd(enum cmd_key_flag flag, const sds key, nod } static void crdt_add_on_success(void *result, void *user) { + struct swarmkv_reply *reply=(struct swarmkv_reply*)result; struct cmd_ctx *ctx = (struct cmd_ctx*)user; + assert(reply->type==SWARMKV_REPLY_STATUS); if(ctx->future_of_caller) { __exec_cmd(ctx->db, NULL, NULL, ctx->cmd, ctx->future_of_caller); @@ -773,7 +810,6 @@ static void key_route_on_success(void *result, void *user) if(self_is_a_replica) { n_replica_node-=node_list_remove(replica_nodes, n_replica_node, &ctx->db->self); -// swarmkv_store_add_key(ctx->db->mod_store, key, replica_nodes, n_replica_node); } if(n_replica_node>0) { @@ -785,8 +821,7 @@ static void key_route_on_success(void *result, void *user) struct swarmkv_cmd *crdt_add_cmd=make_crdt_add_cmd(spec->flag, key, replica_nodes, n_replica_node); crdt_add_ctx=cmd_ctx_new(ctx->db, ctx->cmd, n_replica_node>0?NULL:ctx->future_of_caller); crdt_add_ctx->future_of_mine=future_create("crdt_add", crdt_add_on_success, generic_on_fail, crdt_add_ctx); - __exec_cmd(ctx->db, NULL, NULL, crdt_add_cmd, crdt_add_ctx->future_of_mine); - + __exec_cmd(ctx->db, NULL, &ctx->db->self, crdt_add_cmd, crdt_add_ctx->future_of_mine); } free(replica_nodes); } @@ -813,20 +848,21 @@ static int spec_gettid(struct swarmkv_cmd_spec *spec, const struct swarmkv_cmd * void on_msg_callback(struct swarmkv_msg *msg, void *arg) { struct swarmkv *db = (struct swarmkv *)arg; - int thread_id=__gettid(db); + int cur_tid=__gettid(db); if(msg->type==MSG_TYPE_CMD) { + assert(cur_tid < db->opts->nr_worker_threads); exec_for_remote(db, msg); } else { - if(msg->thread_id==thread_id) + if(msg->caller_tid!=cur_tid) { - swarmkv_mesh_send(db->mesh, thread_id, msg->thread_id, msg); + swarmkv_mesh_send(db->mesh, cur_tid, msg->caller_tid, msg); } else { - swarmkv_rpc_complete(db->rpc_mgr, thread_id, msg->sequence, msg->reply); + swarmkv_rpc_complete(db->rpc_mgr, cur_tid, msg->sequence, msg->reply); } } } @@ -835,7 +871,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t * struct swarmkv_cmd_spec *spec=NULL; struct swarmkv_reply *reply=NULL; struct promise *p=NULL; - int current_thread_id=__gettid(db); + int cur_tid=__gettid(db); node_t peer; spec=get_spec_by_argv(db, cmd->argc, cmd->argv); if(!spec) @@ -863,23 +899,23 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t * swarmkv_reply_free(reply); return; } - int target_thread_id=spec_gettid(spec, cmd, db->opts->nr_worker_threads); + int target_tid=spec_gettid(spec, cmd, db->opts->nr_worker_threads); struct swarmkv_msg *msg=NULL; if(target_node && node_compare(&db->self, target_node)) { //cmd is executed in target node's on_msg_callback - long long sequence=swarmkv_rpc_launch(db->rpc_mgr, current_thread_id, future_of_caller); - msg=swarmkv_msg_new_by_cmd(cmd, target_node, current_thread_id, sequence); + long long sequence=swarmkv_rpc_launch(db->rpc_mgr, cur_tid, future_of_caller); + msg=swarmkv_msg_new_by_cmd(cmd, &db->self, cur_tid, sequence); swarmkv_net_send(db->net, target_node, msg); return; } - if(current_thread_id != target_thread_id) + if(cur_tid != target_tid) { //cmd is executed in target thread's on_msg_callback - long long sequence=swarmkv_rpc_launch(db->rpc_mgr, current_thread_id, future_of_caller); - msg=swarmkv_msg_new_by_cmd(cmd, &db->self, current_thread_id, sequence); - swarmkv_mesh_send(db->mesh, current_thread_id, target_thread_id, msg); + long long sequence=swarmkv_rpc_launch(db->rpc_mgr, cur_tid, future_of_caller); + msg=swarmkv_msg_new_by_cmd(cmd, &db->self, cur_tid, sequence); + swarmkv_mesh_send(db->mesh, cur_tid, target_tid, msg); return; } @@ -892,7 +928,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t * swarmkv_monitor_record_command(db->mod_monitor, spec->name, timespec_diff_usec(&start, &end)); - if(accessing_node)//Remote call, non-recursive exec + if(accessing_node && node_compare(&db->self, accessing_node))//Remote call, non-recursive exec { struct promise *p=future_to_promise(future_of_caller); promise_success(p, reply); @@ -1088,6 +1124,9 @@ void command_spec_init(struct swarmkv *db) latency_command, db->mod_monitor); /* low-level state-based CRDT synchronization commands*/ + command_register(&(db->command_table), "CRDT ADD", "key [IP:port ...]", + 1, 2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE, + crdt_add_command, db->mod_store); command_register(&(db->command_table), "CRDT GET", "key", 1, 2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE, crdt_get_command, db->mod_store); @@ -1245,19 +1284,45 @@ char *swarmkv_get_command_hint(struct swarmkv *db, const char* cmd_name) sdsfreesplitres(argv,argc); return NULL; } -void swarmkv_dispatch(struct swarmkv *db) +void timeout_cb(evutil_socket_t fd, short event, void *arg) +{ + struct swarmkv_thread_ctx *ctx = (struct swarmkv_thread_ctx *)arg; + event_base_loopbreak(ctx->evbase); +} +void swarmkv_caller_loop(struct swarmkv *db, struct timeval *tv) { int tid=__gettid(db); //must initiate from caller threads, and caller thread ID is larger than worker thread ID assert(tid >= db->opts->nr_worker_threads); - event_base_loop(db->evbases[tid], EVLOOP_ONCE|EVLOOP_NONBLOCK); + struct swarmkv_thread_ctx *ctx=db->threads+tid; + struct event *timeout_event = NULL; + if(tv) + { + timeout_event = event_new(ctx->evbase, -1, 0, timeout_cb, ctx); + evtimer_add(timeout_event, tv); + event_base_loop(ctx->evbase, EVLOOP_ONCE); + event_free(timeout_event); + } + else + { + event_base_loop(ctx->evbase, EVLOOP_ONCE); + } + return; } -void __swarmkv_periodic(evutil_socket_t fd, short what, void * arg) +void swarmkv_caller_loop_break(struct swarmkv *db) { - struct swarmkv_thread_ctx *thread=(struct swarmkv_thread_ctx *)arg; - swarmkv_store_periodic(thread->db->mod_store, thread->thread_id); - swarmkv_keyspace_periodic(thread->db->mod_keyspace, thread->thread_id); + int tid=__gettid(db); + //must initiate from caller threads, and caller thread ID is larger than worker thread ID + assert(tid >= db->opts->nr_worker_threads); + event_base_loopbreak(db->ref_evbases[tid]); +} +struct event_base *swarmkv_get_event_base(struct swarmkv *db) +{ + int tid=__gettid(db); + //must initiate from caller threads, and caller thread ID is larger than worker thread ID + assert(tid >= db->opts->nr_worker_threads); + return db->threads[tid].evbase; } struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name, char **err) { @@ -1265,6 +1330,8 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name, // event_set_log_callback(libevent_log_cb); db=ALLOC(struct swarmkv, 1); strncpy(db->db_name, db_name, sizeof(db->db_name)); + opts->total_threads=opts->nr_caller_threads+opts->nr_worker_threads; + db->threads=ALLOC(struct swarmkv_thread_ctx,opts->total_threads); /* adds locking, only required if accessed from separate threads */ evthread_use_pthreads(); @@ -1295,17 +1362,20 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name, if(opts->dryrun) { } - db->evbases=ALLOC(struct event_base*, db->opts->nr_worker_threads); - for(int i=0; i<db->opts->nr_worker_threads + db->opts->nr_caller_threads; i++) + db->ref_evbases=ALLOC(struct event_base*, opts->total_threads); + for(int i=0; i<opts->total_threads; i++) { - db->evbases[i]=event_base_new(); + db->threads[i].evbase=event_base_new(); + db->threads[i].db=db; + db->ref_evbases[i]=db->threads[i].evbase; } - db->rpc_mgr=swarmkv_rpc_mgr_new(db->opts, db->evbases, db->opts->nr_worker_threads); - db->mesh=swarmkv_mesh_new(db->evbases, db->opts->nr_worker_threads + db->opts->nr_caller_threads, db->logger); + db->rpc_mgr=swarmkv_rpc_mgr_new(db->opts, db->ref_evbases, opts->total_threads); + db->mesh=swarmkv_mesh_new(db->ref_evbases, opts->total_threads, db->logger); + swarmkv_mesh_set_on_msg_cb(db->mesh, on_msg_callback, db); db->mod_monitor=swarmkv_monitor_new(db->opts); //Note: if the cluster_port is 0, swarmkv_net_new updates db->self.cluster_port. - db->net=swarmkv_net_new(db->evbases, db->opts->nr_worker_threads, opts, db->logger, err); + db->net=swarmkv_net_new(db->ref_evbases, db->opts->nr_worker_threads, opts, db->logger, err); if(*err) { goto error_out; @@ -1328,12 +1398,7 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name, db->mod_store=swarmkv_store_new(db->opts, __exec_cmd, db); swarmkv_store_set_monitor_handle(db->mod_store, db->mod_monitor); - struct timeval sync_interval = {db->opts->sync_interval_us/(1000*1000), db->opts->sync_interval_us%(1000*1000)}; - for(int i=0; i<db->opts->nr_worker_threads; i++) - { - db->worker_threads[i].store_periodic_ev=event_new(db->evbases[i], -1, EV_PERSIST, __swarmkv_periodic, db->worker_threads+i); - evtimer_add(db->worker_threads[i].store_periodic_ev, &sync_interval); - } + command_spec_init(db); @@ -1370,16 +1435,16 @@ error_out: } void swarmkv_close(struct swarmkv * db) { - for(size_t i=0; i<db->opts->nr_worker_threads; i++) + for(size_t i=0; i<db->opts->total_threads; i++) { - event_base_loopexit(db->evbases[i], NULL); - db->worker_threads[i].is_dispatching=0; + event_base_loopexit(db->threads[i].evbase, NULL); + db->threads[i].is_dispatching=0; } for(size_t i=0; i<db->opts->nr_worker_threads; i++) { - pthread_join(db->worker_threads[i].thr, NULL); + pthread_join(db->threads[i].thr, NULL); } - free(db->worker_threads); + swarmkv_store_free(db->mod_store); db->mod_store=NULL; swarmkv_keyspace_free(db->mod_keyspace); @@ -1407,9 +1472,18 @@ void swarmkv_close(struct swarmkv * db) log_handle_destroy(db->logger); db->logger=NULL; } + + for(size_t i=0; i<db->opts->total_threads; i++) + { + event_base_free(db->threads[i].evbase); + } + free(db->threads); + db->threads=NULL; + db->opts->is_assigned_to_db=0; swarmkv_options_free(db->opts); db->opts=NULL; + free(db); return; } diff --git a/src/swarmkv_api.c b/src/swarmkv_api.c index 06ea6db..0d310b1 100644 --- a/src/swarmkv_api.c +++ b/src/swarmkv_api.c @@ -126,7 +126,12 @@ int swarmkv_options_set_dryrun(struct swarmkv_options *opts) opts->cluster_port=15210;//overrides default p2p port return 0; } -int swarmkv_options_set_worker_thread_number(struct swarmkv_options *opts, size_t nr_worker_threads) +int swarmkv_options_set_caller_thread_number(struct swarmkv_options *opts, int nr_caller_threads) +{ + opts->nr_caller_threads=nr_caller_threads; + return 0; +} +int swarmkv_options_set_worker_thread_number(struct swarmkv_options *opts, int nr_worker_threads) { opts->nr_worker_threads=nr_worker_threads; return 0; @@ -311,15 +316,14 @@ void swarmkv_btconsume(struct swarmkv * db, const char * key, size_t keylen, con } struct blocking_query_ctx { - pthread_cond_t cond; - pthread_mutex_t mutex; struct swarmkv_reply *reply; + struct swarmkv *db; }; void blocking_query_cb(const struct swarmkv_reply *reply, void * arg) { struct blocking_query_ctx *ctx=(struct blocking_query_ctx*) arg; ctx->reply=swarmkv_reply_dup(reply); - pthread_cond_signal(&ctx->cond); + swarmkv_caller_loop_break(ctx->db); return; } struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *target, int argc, sds *argv) @@ -330,8 +334,7 @@ struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *ta memset(&ctx, 0, sizeof(ctx)); node_t target_node; memset(&target_node, 0, sizeof(node_t)); - pthread_cond_init(&ctx.cond, NULL); - pthread_mutex_init(&ctx.mutex, NULL); + ctx.db=db; cmd=swarmkv_cmd_new(argc); for(int i=0; i<argc; i++) { @@ -347,24 +350,10 @@ struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *ta node_init_from_sds(&target_node, target); exec_for_local(db, cmd, &target_node, blocking_query_cb, &ctx); } - - struct timespec max_wait = {0, 0}; - clock_gettime(CLOCK_REALTIME, &max_wait); - max_wait.tv_sec+=100; - int timed_wait_rv=0; - if(NULL==ctx.reply) - { - pthread_mutex_lock(&ctx.mutex); - timed_wait_rv=pthread_cond_timedwait(&ctx.cond, &ctx.mutex, &max_wait); - pthread_mutex_unlock(&ctx.mutex); - if(timed_wait_rv) - { - printf("%s\n", swarmkv_util_pthread_cond_timedwait_error_to_string(timed_wait_rv)); - ctx.reply=swarmkv_reply_new_error("timeout"); - } - } + swarmkv_caller_loop(db, NULL); reply=ctx.reply; ctx.reply=NULL; + swarmkv_cmd_free(cmd); return reply; } @@ -395,7 +384,7 @@ struct swarmkv_reply *swarmkv_command(struct swarmkv *db,const char *format, ... { char *cmd_str=NULL; va_list ap; - va_start(ap,format); + va_start(ap, format); vasprintf(&cmd_str, format, ap); va_end(ap); diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c index e00a973..7c00c18 100644 --- a/src/swarmkv_keyspace.c +++ b/src/swarmkv_keyspace.c @@ -1991,7 +1991,7 @@ void swarmkv_keyspace_periodic(struct swarmkv_module *mod_keyspace, int thread_i if(i%ks->opts->nr_worker_threads!=thread_id) continue; slot_rt=ks->slot_rts+i; if(!slot_rt->I_am_owner) continue; - if(pthread_mutex_trylock(&slot_rt->sanity_lock)) + if(0==pthread_mutex_trylock(&slot_rt->sanity_lock)) { pthread_mutex_unlock(&slot_rt->sanity_lock); } diff --git a/src/swarmkv_mesh.c b/src/swarmkv_mesh.c index 9ee02b5..0a24c3a 100644 --- a/src/swarmkv_mesh.c +++ b/src/swarmkv_mesh.c @@ -19,6 +19,7 @@ struct swarmkv_mesh_thread { int thread_id; int efd; + struct event * ev; ringbuf_t *ring; char *buff; ringbuf_worker_t **workers; @@ -65,7 +66,7 @@ error_out: swarmkv_msg_free(msg); return -1; } -void swarmkv_mesh_set_on_msg_recv_cb(struct swarmkv_mesh *mesh, on_msg_callback_t cb_func, void *cb_arg) +void swarmkv_mesh_set_on_msg_cb(struct swarmkv_mesh *mesh, on_msg_callback_t cb_func, void *cb_arg) { mesh->on_msg_recv=cb_func; mesh->msg_recv_arg=cb_arg; @@ -80,6 +81,8 @@ void swarmkv_mesh_free(struct swarmkv_mesh *mesh) free(mesh->threads[i].ring); free(mesh->threads[i].workers); free(mesh->threads[i].buff); + event_del(mesh->threads[i].ev); + event_free(mesh->threads[i].ev); } free(mesh->threads); free(mesh); @@ -134,7 +137,8 @@ struct swarmkv_mesh *swarmkv_mesh_new(struct event_base *evbase[], int nthreads, log_fatal(mesh->ref_logger, MODULE_SWAMRKV_MESH, "eventfd() failed: %s", strerror(errno)); assert(0); } - event_new(evbase[i],mesh->threads[i].efd, EV_READ|EV_PERSIST, swarmkv_mesh_on_event, mesh->threads+i); + mesh->threads[i].ev=event_new(evbase[i], mesh->threads[i].efd, EV_READ|EV_PERSIST, swarmkv_mesh_on_event, mesh->threads+i); + event_add(mesh->threads[i].ev, NULL); mesh->threads[i].ref_mesh=mesh; } return mesh; diff --git a/src/swarmkv_message.c b/src/swarmkv_message.c index 73e1469..a1bad9d 100644 --- a/src/swarmkv_message.c +++ b/src/swarmkv_message.c @@ -183,25 +183,25 @@ static void swarmkv_cmd_serialize(const struct swarmkv_cmd *cmd, char **blob, si *blob_sz=root_mpack_sz; return; } -struct swarmkv_msg *swarmkv_msg_new_by_cmd(const struct swarmkv_cmd *cmd, const node_t *source, int thread_id, long long sequence) +struct swarmkv_msg *swarmkv_msg_new_by_cmd(const struct swarmkv_cmd *cmd, const node_t *caller, int caller_tid, long long sequence) { struct swarmkv_msg *msg=ALLOC(struct swarmkv_msg, 1); msg->magic=SWARMKV_MSG_MAGIC; msg->type=MSG_TYPE_CMD; - msg->thread_id=thread_id; + msg->caller_tid=caller_tid; msg->sequence=sequence; - memcpy(&msg->source, &source, sizeof(node_t)); + node_copy(&msg->caller, caller); msg->cmd=swarmkv_cmd_dup(cmd); return msg; } -struct swarmkv_msg *swarmkv_msg_new_by_reply(const struct swarmkv_reply *reply, const node_t *source, int thread_id, long long sequence) +struct swarmkv_msg *swarmkv_msg_new_by_reply(const struct swarmkv_reply *reply, const node_t *caller, int caller_tid, long long sequence) { struct swarmkv_msg *msg=ALLOC(struct swarmkv_msg, 1); msg->magic=SWARMKV_MSG_MAGIC; msg->type=MSG_TYPE_REPLY; - msg->thread_id=thread_id; + msg->caller_tid=caller_tid; msg->sequence=sequence; - memcpy(&msg->source, &source, sizeof(node_t)); + node_copy(&msg->caller, caller); msg->reply=swarmkv_reply_dup(reply); return msg; } @@ -220,7 +220,7 @@ void swarmkv_msg_free(struct swarmkv_msg *msg) } void swarmkv_msg_serialize(const struct swarmkv_msg *msg, char **blob, size_t *blob_sz) { - + struct swarmkv_msg *pseudo_hdr=NULL; char *payload=NULL; size_t payload_len=0; if(msg->type==MSG_TYPE_CMD) @@ -233,6 +233,8 @@ void swarmkv_msg_serialize(const struct swarmkv_msg *msg, char **blob, size_t *b } *blob=ALLOC(char, payload_len+SWARMKV_MSG_HDR_SIZE); memcpy(*blob, msg, SWARMKV_MSG_HDR_SIZE); + pseudo_hdr=(struct swarmkv_msg *)*blob; + pseudo_hdr->payload_len=payload_len; memcpy(*blob+SWARMKV_MSG_HDR_SIZE, payload, payload_len); *blob_sz=payload_len+SWARMKV_MSG_HDR_SIZE; free(payload); diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c index 58822ca..fee1fb2 100644 --- a/src/swarmkv_net.c +++ b/src/swarmkv_net.c @@ -280,6 +280,7 @@ static void peer_conn_read_cb(struct bufferevent *bev, void *arg) assert(bev==conn->bev); snet_thread_buffer_usage_stat(thr); assert(conn->ref_thr == conn->ref_thr->ref_net->threads+conn->thread_id); + struct swarmkv_msg *msg=NULL; if(conn->need_to_be_kill) { log_fatal(thr->ref_logger, MODULE_SWAMRKV_NET, "connection %s is killed", @@ -316,7 +317,7 @@ static void peer_conn_read_cb(struct bufferevent *bev, void *arg) } if(!conn->is_in_conn_table && !conn->is_duplicated) { - snet_conn_set_peer(conn, &hdr->source); + snet_conn_set_peer(conn, &hdr->caller); snet_conn_table_add(thr, conn); } conn->recv_state=RECEIVING_PAYLOAD; @@ -332,7 +333,10 @@ static void peer_conn_read_cb(struct bufferevent *bev, void *arg) conn->recv_state=RECEIVING_PAYLOAD; break; } - swarmkv_msg_deserialize(recv_buff, msg_sz); + msg=swarmkv_msg_deserialize(recv_buff, msg_sz); + thr->ref_net->on_msg_cb(msg, thr->ref_net->on_msg_cb_arg); + swarmkv_msg_free(msg); + msg=NULL; evbuffer_drain(input, msg_sz); thr->stat.input_bytes += msg_sz; conn->recv_state=RECEIVING_HDR; @@ -600,7 +604,6 @@ void swarmkv_net_set_on_msg_callback(struct swarmkv_net *net, on_msg_callback_t void swarmkv_net_send(struct swarmkv_net *net, const node_t *dest, struct swarmkv_msg *msg) { int tid=__gettid((struct swarmkv *)(net->on_msg_cb_arg)); - assert(tid==msg->thread_id); struct snet_thread *thr=net->threads+tid; struct snet_conn *conn=NULL; HASH_FIND(hh, thr->conn_table, dest, sizeof(node_t), conn); diff --git a/src/swarmkv_rpc.c b/src/swarmkv_rpc.c index f94cd10..9390aff 100644 --- a/src/swarmkv_rpc.c +++ b/src/swarmkv_rpc.c @@ -30,13 +30,13 @@ struct swarmkv_rpc_mgr long long timed_out_rpcs; long long unknown_sequence; }; -struct swarmkv_rpc_mgr *swarmkv_rpc_mgr_new(const struct swarmkv_options *opts, struct event_base *evbases[], int nr_worker_threads) +struct swarmkv_rpc_mgr *swarmkv_rpc_mgr_new(const struct swarmkv_options *opts, struct event_base *evbases[], int nr_threads) { struct swarmkv_rpc_mgr *mgr=ALLOC(struct swarmkv_rpc_mgr, 1); mgr->seq_generator=0; - mgr->nr_worker_threads=nr_worker_threads; + mgr->nr_worker_threads=nr_threads; mgr->evbases=evbases; - mgr->rpc_table=ALLOC(struct swarmkv_rpc *, nr_worker_threads); + mgr->rpc_table=ALLOC(struct swarmkv_rpc *, nr_threads); mgr->timeout_us=opts->cluster_timeout_us; return mgr; } @@ -58,6 +58,7 @@ static void rpc_timeout_callback(evutil_socket_t fd, short events, void *arg) rpc->ref_mgr->timed_out_rpcs++; struct promise *p=future_to_promise(rpc->f); promise_failed(p, FUTURE_ERROR_TIMEOUT, "rpc timed out"); + swarmkv_rpc_free(rpc); } long long swarmkv_rpc_launch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct future *f) diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c index 18b0257..bef45ee 100644 --- a/src/swarmkv_store.c +++ b/src/swarmkv_store.c @@ -675,7 +675,7 @@ enum cmd_exec_result type_command(struct swarmkv_module *mod_store, const struct } enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply) { -/*CRDT ADD key IP:port [IP:port ...]*/ +/*CRDT ADD key [IP:port ...]*/ struct scontainer *ctr=NULL; struct swarmkv_store *store=module2store(mod_store); size_t max_pull_node_num=4; @@ -735,6 +735,7 @@ enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const st swarmkv_cmd_free(crdt_get_cmd); crdt_get_cmd=NULL; free(replica_nodes); + *reply=swarmkv_reply_new_status("OK"); return FINISHED; } enum cmd_exec_result crdt_get_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply) diff --git a/test/swarmkv_gtest.cpp b/test/swarmkv_gtest.cpp index 721d309..50b034d 100644 --- a/test/swarmkv_gtest.cpp +++ b/test/swarmkv_gtest.cpp @@ -11,7 +11,7 @@ #include <sys/time.h> #define CMD_EXEC_TIMEOUT_MS 1000*2 - +struct timeval g_exec_timeout={CMD_EXEC_TIMEOUT_MS/1000, (CMD_EXEC_TIMEOUT_MS%1000)*1000}; void generic_callback(const struct swarmkv_reply *reply, void * cb_arg) { struct cmd_exec_arg *arg=(struct cmd_exec_arg*)cb_arg; @@ -41,12 +41,15 @@ protected: swarmkv_options_set_cluster_port(opts, 5210); swarmkv_options_set_health_check_port(opts, 6210); swarmkv_options_set_logger(opts, logger); + swarmkv_options_set_worker_thread_number(opts, 1); + swarmkv_options_set_caller_thread_number(opts, 1); db=swarmkv_open(opts, cluster_name, &err); if(err) { printf("swarmkv_open failed: %s.\n", err); free(err); } + swarmkv_register_thread(db); } static void TearDownTestCase() { @@ -62,28 +65,21 @@ struct swarmkv *SwarmkvBasicTest::db; struct log_handle *SwarmkvBasicTest::logger; TEST_F(SwarmkvBasicTest, TypeString) { - struct cmd_exec_arg *arg=NULL; struct swarmkv *db=SwarmkvBasicTest::db; const char *key="name"; const char *val="zhangsan"; - int exec_successful=0; - arg=cmd_exec_arg_new(); - - cmd_exec_arg_expect_OK(arg); - swarmkv_set(db, key, strlen(key), val, strlen(val), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + struct swarmkv_reply *reply=NULL; + reply=swarmkv_command(db, "SET %s %s", key, val); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); + EXPECT_STREQ(reply->str, "OK"); + swarmkv_reply_free(reply); + + reply=swarmkv_command(db, "GET %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING); + EXPECT_STREQ(reply->str, val); + swarmkv_reply_free(reply); - cmd_exec_arg_expect_cstring(arg, val); - swarmkv_get(db, key, strlen(key), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); - - cmd_exec_arg_free(arg); - arg=NULL; } TEST_F(SwarmkvBasicTest, TypeInteger) { @@ -122,26 +118,21 @@ TEST_F(SwarmkvBasicTest, TypeInteger) } TEST_F(SwarmkvBasicTest, GenericDEL) { - struct cmd_exec_arg *arg=NULL; struct swarmkv *db=SwarmkvBasicTest::db; const char* key="name2"; const char* val="zhangsan"; - int exec_successful=0; - arg=cmd_exec_arg_new(); - cmd_exec_arg_expect_OK(arg); - swarmkv_set(db, key, strlen(key), val, strlen(val), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + struct swarmkv_reply *reply=NULL; - - cmd_exec_arg_expect_integer(arg, 1); - swarmkv_del(db, key, strlen(key), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_free(arg); - arg=NULL; + reply=swarmkv_command(db, "SET %s %s", key, val); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); + EXPECT_STREQ(reply->str, "OK"); + swarmkv_reply_free(reply); + + reply=swarmkv_command(db, "DEL %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 1); + swarmkv_reply_free(reply); } TEST_F(SwarmkvBasicTest, GenericTYPE) { @@ -217,109 +208,79 @@ TEST_F(SwarmkvBasicTest, GenericTYPE) } TEST_F(SwarmkvBasicTest, TypeSet) { - struct cmd_exec_arg *arg=NULL; struct swarmkv *db=SwarmkvBasicTest::db; - const char* key="Jack's friends"; - const char* member[]={"zhangsan", "lisi", "王二麻子", "Tom", "مرحبا"}; - size_t member_len[32]; - size_t n_member=0, i=0; - n_member=sizeof(member)/sizeof(const char*); - for(i=0; i<n_member; i++) - { - member_len[i]=strlen(member[i])+1; - } - - int exec_successful=0; - - arg=cmd_exec_arg_new(); - - //SADD key m0 m2 ... m4 - cmd_exec_arg_expect_integer(arg, n_member); - swarmkv_sadd(db, key, strlen(key)+1, member, member_len, n_member, generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); - - //SMEMEBERS key - cmd_exec_arg_expect_array(arg, n_member); - swarmkv_smembers(db, key, strlen(key)+1, generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + const char *key="friends_of_Jack"; + const char *member[]={"zhangsan", "lisi", "王二麻子", "Tom", "مرحبا"}; + size_t i=0; + struct swarmkv_reply *reply=NULL; + reply=swarmkv_command(db, "SADD %s %s %s %s %s %s", key, member[0], member[1], member[2], member[3], member[4]); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 5); + swarmkv_reply_free(reply); - //SREM key m0 m1 - cmd_exec_arg_expect_integer(arg, 2); - swarmkv_srem(db, key, strlen(key)+1, member, member_len, 2, generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + reply=swarmkv_command(db, "SMEMBERS %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_ARRAY); + EXPECT_EQ(reply->n_element, 5); + for(i=0; i<reply->n_element; i++) + { + EXPECT_EQ(reply->elements[i]->type, SWARMKV_REPLY_STRING); + EXPECT_STREQ(reply->elements[i]->str, member[i]); + } + swarmkv_reply_free(reply); - //SISMEMBER key m0 - cmd_exec_arg_expect_integer(arg, 0); - swarmkv_sismember(db, key, strlen(key)+1, member[0], member_len[0], generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + reply=swarmkv_command(db, "SREM %s %s %s", key, member[0], member[1]); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 2); + swarmkv_reply_free(reply); - //SISMEMBER key m2 - cmd_exec_arg_expect_integer(arg, 1); - swarmkv_sismember(db, key, strlen(key)+1, member[2], member_len[2], generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + reply=swarmkv_command(db, "SISMEMBER %s %s", key, member[0]); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 0); + swarmkv_reply_free(reply); + reply=swarmkv_command(db, "SISMEMBER %s %s", key, member[2]); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 1); + swarmkv_reply_free(reply); - //SCARD key - cmd_exec_arg_expect_integer(arg, 3); - swarmkv_scard(db, key, strlen(key)+1, generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + reply=swarmkv_command(db, "SCARD %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 3); + swarmkv_reply_free(reply); - //DEL key - cmd_exec_arg_expect_integer(arg, 1); - swarmkv_del(db, key, strlen(key)+1, generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + reply=swarmkv_command(db, "DEL %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 1); + swarmkv_reply_free(reply); - //SISMEMBER key m2 - cmd_exec_arg_expect_integer(arg, 0); - swarmkv_sismember(db, key, strlen(key)+1, member[2], member_len[2], generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + reply=swarmkv_command(db, "SISMEMBER %s %s", key, member[2]); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 0); + swarmkv_reply_free(reply); - //SCARD key - cmd_exec_arg_expect_integer(arg, 0); - swarmkv_scard(db, key, strlen(key)+1, generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + reply=swarmkv_command(db, "SCARD %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 0); + swarmkv_reply_free(reply); - - cmd_exec_arg_free(arg); - arg=NULL; } TEST_F(SwarmkvBasicTest, TypeTokenBucket) { - struct cmd_exec_arg *arg=NULL; struct swarmkv *db=SwarmkvBasicTest::db; const char *key="tb-192.168.0.1"; - int exec_successful=0; - arg=cmd_exec_arg_new(); + struct swarmkv_reply *reply=NULL; + long long capacity=1024*4, rate=1024*2, request_tokens=0, allocated_tokens=0; - cmd_exec_arg_expect_OK(arg); - swarmkv_async_command(db, generic_callback, arg, "TCFG %s %lld %lld", key, rate, capacity); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - cmd_exec_arg_clear(arg); - EXPECT_TRUE(exec_successful); + reply=swarmkv_command(db, "TCFG %s %lld %lld", key, rate, capacity); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); + swarmkv_reply_free(reply); + struct timeval start, now; gettimeofday(&start, NULL); gettimeofday(&now, NULL); srand(171); - struct swarmkv_reply *reply=NULL; + int i=0; while(now.tv_sec - start.tv_sec<3) { @@ -334,7 +295,6 @@ TEST_F(SwarmkvBasicTest, TypeTokenBucket) i++; } EXPECT_LE(allocated_tokens, (now.tv_sec -start.tv_sec)*rate+capacity); - cmd_exec_arg_free(arg); //Infinite tokens reply=swarmkv_command(db, "TCFG %s 0 0", key); @@ -355,22 +315,19 @@ TEST_F(SwarmkvBasicTest, TypeTokenBucket) } TEST_F(SwarmkvBasicTest, TypeFairTokenBucket) { - struct cmd_exec_arg *arg=NULL; struct swarmkv *db=SwarmkvBasicTest::db; const char *key="3-floor-bandwidth-100Mbps"; - int exec_successful=0; - arg=cmd_exec_arg_new(); long long capacity=200*1024*1024, rate=100*1024*1024, request_tokens=0, allocated_tokens=0; - cmd_exec_arg_expect_OK(arg); - swarmkv_async_command(db, generic_callback, arg, "FTCFG %s %lld %lld 128", key, rate, capacity); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - cmd_exec_arg_clear(arg); - EXPECT_TRUE(exec_successful); + + struct swarmkv_reply *reply=NULL; + reply=swarmkv_command(db, "FTCFG %s %lld %lld 128", key, rate, capacity); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); + swarmkv_reply_free(reply); + struct timeval start, now; gettimeofday(&start, NULL); gettimeofday(&now, NULL); srand(171); - struct swarmkv_reply *reply=NULL; int i=0; while(now.tv_sec - start.tv_sec<3) { @@ -385,7 +342,7 @@ TEST_F(SwarmkvBasicTest, TypeFairTokenBucket) i++; } EXPECT_LE(allocated_tokens, (now.tv_sec -start.tv_sec)*rate+capacity); - cmd_exec_arg_free(arg); + //Infinite tokens reply=swarmkv_command(db, "FTCFG %s 0 0 256", key); @@ -406,22 +363,20 @@ TEST_F(SwarmkvBasicTest, TypeFairTokenBucket) } TEST_F(SwarmkvBasicTest, TypeBulkTokenBucket) { - struct cmd_exec_arg *arg=NULL; struct swarmkv *db=SwarmkvBasicTest::db; const char *key="everyone-has-1Mbps"; - int exec_successful=0; - arg=cmd_exec_arg_new(); + long long capacity=2*1024*1024, rate=1*1024*1024, request_tokens=0, allocated_tokens=0; - cmd_exec_arg_expect_OK(arg); - swarmkv_async_command(db, generic_callback, arg, "BTCFG %s %lld %lld 128", key, rate, capacity); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - cmd_exec_arg_clear(arg); - EXPECT_TRUE(exec_successful); + + struct swarmkv_reply *reply=NULL; + reply=swarmkv_command(db, "BTCFG %s %lld %lld 128", key, rate, capacity); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); + swarmkv_reply_free(reply); + struct timeval start, now; gettimeofday(&start, NULL); gettimeofday(&now, NULL); srand(171); - struct swarmkv_reply *reply=NULL; int i=0, n_member=120; while(now.tv_sec - start.tv_sec<3) { @@ -436,7 +391,6 @@ TEST_F(SwarmkvBasicTest, TypeBulkTokenBucket) i++; } EXPECT_LE(allocated_tokens/n_member, (now.tv_sec -start.tv_sec)*rate+capacity); - cmd_exec_arg_free(arg); //Infinite tokens reply=swarmkv_command(db, "BTCFG %s 0 0 256", key); @@ -531,61 +485,42 @@ TEST_F(SwarmkvBasicTest, TypeHash) } TEST_F(SwarmkvBasicTest, EXPIRE_TTL) { - struct cmd_exec_arg *arg=NULL; struct swarmkv *db=SwarmkvBasicTest::db; - const char* key="quarantine"; - const char* val="wuhan-江夏-如家"; - int exec_successful=0; + const char *key="quarantine"; + const char *val="wuhan-江夏-如家"; int seconds=3; - - //SET key value - arg=cmd_exec_arg_new(); - cmd_exec_arg_expect_OK(arg); - swarmkv_set(db, key, strlen(key), val, strlen(val), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + struct swarmkv_reply *reply=NULL; - //EXPIRE key seconds - cmd_exec_arg_expect_integer(arg, 1); - swarmkv_expire(db, key, strlen(key), seconds, generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + reply=swarmkv_command(db, "SET %s %s", key, val); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); + EXPECT_STREQ(reply->str, "OK"); + swarmkv_reply_free(reply); - //TTL key - cmd_exec_arg_expect_integer(arg, seconds); - swarmkv_ttl(db, key, strlen(key), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + reply=swarmkv_command(db, "EXPIRE %s %d", key, seconds); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 1); + swarmkv_reply_free(reply); - sleep(seconds+2); + reply=swarmkv_command(db, "TTL %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, seconds); + swarmkv_reply_free(reply); - //GET key - cmd_exec_arg_expect_NIL(arg); - swarmkv_get(db, key, strlen(key), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + sleep(seconds+2); + reply=swarmkv_command(db, "GET %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_NIL); + swarmkv_reply_free(reply); - //EXPIRE key seconds - cmd_exec_arg_expect_integer(arg, 0); - swarmkv_expire(db, key, strlen(key), seconds, generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); + reply=swarmkv_command(db, "EXPIRE %s %d", key, seconds); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 0); + swarmkv_reply_free(reply); - //TTL key - cmd_exec_arg_expect_integer(arg, -2); - swarmkv_ttl(db, key, strlen(key), generic_callback, arg); - exec_successful=cmd_exec_arg_wait(arg, CMD_EXEC_TIMEOUT_MS); - EXPECT_TRUE(exec_successful); - cmd_exec_arg_clear(arg); - - cmd_exec_arg_free(arg); - arg=NULL; + reply=swarmkv_command(db, "TTL %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, -2); + swarmkv_reply_free(reply); } TEST_F(SwarmkvBasicTest, HashTags) { diff --git a/test/test_utils.c b/test/test_utils.c index c4c6cac..a03f4b5 100644 --- a/test/test_utils.c +++ b/test/test_utils.c @@ -165,7 +165,7 @@ void error_pthread_cond_timedwait(const int timed_wait_rv) } } -int cmd_exec_arg_wait(struct cmd_exec_arg* arg, long timeout_ms) +int cmd_exec_arg_wait(struct cmd_exec_arg *arg, long timeout_ms) { struct timespec max_wait = {0, 0}; clock_gettime(CLOCK_REALTIME, &max_wait); |
