diff options
| author | Zheng Chao <[email protected]> | 2023-08-19 22:24:07 +0800 |
|---|---|---|
| committer | Zheng Chao <[email protected]> | 2023-08-19 22:24:07 +0800 |
| commit | 32f4c412c9b458332ff3f87257cb05ab4a0e81f1 (patch) | |
| tree | 7d1fe2cae3448fce538f1f93c1fe192143a6323b | |
| parent | c590601c8df9a9ab03a3b6b7681cce3a18d2256f (diff) | |
Increase the ringbuf size of swarmkv_mesh to avoid message drops.
| -rw-r--r-- | include/swarmkv/swarmkv.h | 23 | ||||
| -rw-r--r-- | src/inc_internal/swarmkv_net.h | 8 | ||||
| -rw-r--r-- | src/swarmkv.c | 42 | ||||
| -rw-r--r-- | src/swarmkv_mesh.c | 38 | ||||
| -rw-r--r-- | src/swarmkv_net.c | 28 | ||||
| -rw-r--r-- | src/swarmkv_rpc.c | 2 | ||||
| -rw-r--r-- | test/swarmkv_perf_test.cpp | 126 |
7 files changed, 188 insertions, 79 deletions
diff --git a/include/swarmkv/swarmkv.h b/include/swarmkv/swarmkv.h index 52f412e..61c2dfb 100644 --- a/include/swarmkv/swarmkv.h +++ b/include/swarmkv/swarmkv.h @@ -87,14 +87,21 @@ struct swarmkv_options *swarmkv_get0_options(struct swarmkv *db); void swarmkv_register_thread(struct swarmkv *db); -//Do not exit the loop because we have no pending events. -#define SWARMKV_LOOP_NO_EXIT_ON_EMPTY 0x04 -//Do not block: see which events are ready now, run the callbacks of the highest-priority ones, then exit. -#define SWARMKV_LOOP_NONBLOCK 0x02 -// Block until we have an active event, then exit once all active events have had their callbacks run. -#define SWARMKV_LOOP_ONCE 0x01 -//flags any combination of SWARMKV_LOOP_ONCE | SWARMKV_LOOP_NONBLOCK | SWARMKV_LOOP_NO_EXIT_ON_EMPTY -//The flags are identicial to event_base_loop() of libevent. See https://libevent.org/doc/event_8h.html#a10b9c102337d0ece8607c9c01bc4f7c8 +/* Do not exit the loop because we have no pending events. */ +#define SWARMKV_LOOP_NO_EXIT_ON_EMPTY 0x04 +/* Do not block, see which events are ready now, run the callbacks of the highest-priority ones, then exit.*/ +#define SWARMKV_LOOP_NONBLOCK 0x02 +/* Block until we have an active event, then exit once all active events have had their callbacks run. */ +#define SWARMKV_LOOP_ONCE 0x01 +/** + * Run the event loop for the specified amount of time. swarmkv_on_reply_callback_t will be called when the reply is ready. + * By default, this loop will run the event base until either there are no more pending or active events, or until something calls swarmkv_caller_loop_break(). + * You can override this behavior with the 'flags' argument. + * @param db swarmkv instance to loop, the caller thread must be registered by swarmkv_register_thread() before calling this function. + * @param flags any combination of SWARMKV_LOOP_ONCE | SWARMKV_LOOP_NONBLOCK | SWARMKV_LOOP_NO_EXIT_ON_EMPTY. + * The flags are identicial to event_base_loop() of libevent. See https://libevent.org/doc/event_8h.html#a10b9c102337d0ece8607c9c01bc4f7c8 + * @param tv Timeout for select/poll in milliseconds, or NULL for no timeout. + */ void swarmkv_caller_loop(struct swarmkv *db, int flags, struct timeval *tv); void swarmkv_caller_loop_break(struct swarmkv *db); diff --git a/src/inc_internal/swarmkv_net.h b/src/inc_internal/swarmkv_net.h index af2d28a..4ceb025 100644 --- a/src/inc_internal/swarmkv_net.h +++ b/src/inc_internal/swarmkv_net.h @@ -19,10 +19,10 @@ struct snet_info long long connections; long long input_bytes; - long long input_msgs; //input_cmds + input_replies + long long input_msgs; long long output_bytes; - long long output_msgs; //output_cmds + output_replies + long long output_msgs; long long input_buffer_sz; long long output_buffer_sz; @@ -30,7 +30,7 @@ struct snet_info double instantaneous_input_kbps; double instantaneous_output_kbps; - double instantaneous_input_cps; - double instantaneous_output_cps; + double instantaneous_input_msgs; + double instantaneous_output_msgs; }; void swarmkv_net_info(struct swarmkv_net *net, struct snet_info *info);
\ No newline at end of file diff --git a/src/swarmkv.c b/src/swarmkv.c index 9dccb42..cd15797 100644 --- a/src/swarmkv.c +++ b/src/swarmkv.c @@ -44,7 +44,7 @@ const char* SWARMKV_VERSION="3.0.0"; -struct swarmkv_thread_ctx +struct swarmkv_thread { pthread_t thr; int sys_tid; @@ -66,7 +66,7 @@ struct swarmkv struct swarmkv_options *opts; int thread_counter; - struct swarmkv_thread_ctx *threads; + struct swarmkv_thread *threads; pthread_barrier_t barrier; struct event_base **ref_evbases; @@ -312,8 +312,8 @@ enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct sw net_info.output_buffer_sz, net_info.instantaneous_input_kbps, net_info.instantaneous_output_kbps, - net_info.instantaneous_input_cps, - net_info.instantaneous_output_cps + net_info.instantaneous_input_msgs, + net_info.instantaneous_output_msgs ); if(cmd->argc>1) @@ -476,39 +476,39 @@ struct swarmkv_cmd_spec *get_spec_by_argv(struct swarmkv *db, size_t argc, char* } void __swarmkv_periodic(evutil_socket_t fd, short what, void * arg) { - struct swarmkv_thread_ctx *thread=(struct swarmkv_thread_ctx *)arg; - swarmkv_store_periodic(thread->db->mod_store, thread->thread_id); - swarmkv_keyspace_periodic(thread->db->mod_keyspace, thread->thread_id); + struct swarmkv_thread *thr=(struct swarmkv_thread *)arg; + swarmkv_store_periodic(thr->db->mod_store, thr->thread_id); + swarmkv_keyspace_periodic(thr->db->mod_keyspace, thr->thread_id); } void *swarmkv_worker_thread(void *arg) { struct swarmkv *db = (struct swarmkv *)arg; swarmkv_register_thread(db); int tid=__gettid(db); - struct swarmkv_thread_ctx *ctx = db->threads+tid; + struct swarmkv_thread *thr = db->threads+tid; char thread_name[16]; - snprintf(thread_name, sizeof(thread_name), "swarmkv-%u", ctx->thread_id); + snprintf(thread_name, sizeof(thread_name), "swarmkv-%u", thr->thread_id); prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL); struct timeval sync_interval = {db->opts->sync_interval_us/(1000*1000), db->opts->sync_interval_us%(1000*1000)}; - struct event * periodic_ev=event_new(ctx->evbase, -1, EV_PERSIST, __swarmkv_periodic, ctx); + struct event * periodic_ev=event_new(thr->evbase, -1, EV_PERSIST, __swarmkv_periodic, thr); evtimer_add(periodic_ev, &sync_interval); - ctx->is_dispatching=1; + thr->is_dispatching=1; pthread_barrier_wait(&db->barrier); - int ret=event_base_dispatch(ctx->evbase); + int ret=event_base_dispatch(thr->evbase); event_del(periodic_ev); event_free(periodic_ev); - if(ctx->is_dispatching) + if(thr->is_dispatching) { log_fatal(db->logger, MODULE_SWAMRKV_CORE, "worker thread event_base_dispatch() exit abnormally, ret=%d", ret); } else { - log_info(ctx->db->logger, MODULE_SWAMRKV_CORE, "%s worker thread %d exited", ctx->db->db_name, ctx->sys_tid); + log_info(thr->db->logger, MODULE_SWAMRKV_CORE, "%s worker thread %d exited", thr->db->db_name, thr->sys_tid); } return NULL; } @@ -1223,27 +1223,27 @@ char *swarmkv_get_command_hint(struct swarmkv *db, const char* cmd_name) } static void evloop_timeout_cb(evutil_socket_t fd, short event, void *arg) { - struct swarmkv_thread_ctx *ctx = (struct swarmkv_thread_ctx *)arg; - event_base_loopbreak(ctx->evbase); + struct swarmkv_thread *thr = (struct swarmkv_thread *)arg; + event_base_loopbreak(thr->evbase); } void swarmkv_caller_loop(struct swarmkv *db, int flags, struct timeval *tv) { int tid=__gettid(db); //must initiate from caller threads, and caller thread ID is larger than worker thread ID assert(tid >= db->opts->nr_worker_threads); - struct swarmkv_thread_ctx *ctx=db->threads+tid; + struct swarmkv_thread *thr=db->threads+tid; struct event *timeout_event = NULL; if(tv) { - timeout_event = event_new(ctx->evbase, -1, 0, evloop_timeout_cb, ctx); + timeout_event = event_new(thr->evbase, -1, 0, evloop_timeout_cb, thr); evtimer_add(timeout_event, tv); - event_base_loop(ctx->evbase, flags); + event_base_loop(thr->evbase, flags); event_del(timeout_event); event_free(timeout_event); } else { - event_base_loop(ctx->evbase, flags); + event_base_loop(thr->evbase, flags); } return; @@ -1262,7 +1262,7 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name, db=ALLOC(struct swarmkv, 1); strncpy(db->db_name, db_name, sizeof(db->db_name)); opts->total_threads=opts->nr_caller_threads+opts->nr_worker_threads; - db->threads=ALLOC(struct swarmkv_thread_ctx,opts->total_threads); + db->threads=ALLOC(struct swarmkv_thread, opts->total_threads); /* adds locking, only required if accessed from separate threads */ evthread_use_pthreads(); diff --git a/src/swarmkv_mesh.c b/src/swarmkv_mesh.c index 49203d4..548b6b6 100644 --- a/src/swarmkv_mesh.c +++ b/src/swarmkv_mesh.c @@ -14,7 +14,7 @@ #include <pthread.h> //sanity check #define MODULE_SWAMRKV_MESH module_name_str("swarmkv.mesh") -#define RINGBUF_SIZE 1024 +#define RINGBUF_SIZE 1024*1024 struct swarmkv_mesh_thread { @@ -25,15 +25,18 @@ struct swarmkv_mesh_thread char *buff; ringbuf_worker_t **workers; struct swarmkv_mesh *ref_mesh; - int n_write, n_read; + long long n_send, n_recv; + struct event_base *evbase; }; struct swarmkv_mesh { size_t nr_thread; struct swarmkv_mesh_thread *threads; struct log_handle *ref_logger; - on_msg_callback_t *on_msg_recv; - void *msg_recv_arg; + on_msg_callback_t *on_msg_cb_func; + void *on_msg_cb_arg; + long long n_send, n_recv; + long long n_drop; }; //The swarmkv_mesh_send takes the ownership of msg. int swarmkv_mesh_send(struct swarmkv_mesh *mesh, int current_thread_id, int dest_thread_id, struct swarmkv_msg *msg) @@ -42,12 +45,15 @@ int swarmkv_mesh_send(struct swarmkv_mesh *mesh, int current_thread_id, int dest assert(current_thread_id != dest_thread_id); struct swarmkv_mesh_thread *curr_thr=mesh->threads+current_thread_id; struct swarmkv_mesh_thread *dest_thr=mesh->threads+dest_thread_id; + int tid=__gettid((struct swarmkv *)(mesh->on_msg_cb_arg)); + assert(tid==current_thread_id); ringbuf_t *dest_ring=dest_thr->ring; assert(msg->magic == SWARMKV_MSG_MAGIC); ssize_t offset=0; offset=ringbuf_acquire(dest_ring, curr_thr->workers[dest_thread_id], sizeof(struct swarmkv_msg*)); if(offset == -1) { + mesh->n_drop++; log_warn(mesh->ref_logger, MODULE_SWAMRKV_MESH, "ringbuf is full, drop the message"); goto error_out; } @@ -60,7 +66,8 @@ int swarmkv_mesh_send(struct swarmkv_mesh *mesh, int current_thread_id, int dest { assert(0); } - dest_thr->n_write++; + curr_thr->n_send++; + mesh->n_send++; return 0; error_out: swarmkv_msg_free(msg); @@ -68,18 +75,20 @@ error_out: } void swarmkv_mesh_set_on_msg_cb(struct swarmkv_mesh *mesh, on_msg_callback_t cb_func, void *cb_arg) { - mesh->on_msg_recv=cb_func; - mesh->msg_recv_arg=cb_arg; + mesh->on_msg_cb_func=cb_func; + mesh->on_msg_cb_arg=cb_arg; return; } -static void swarmkv_mesh_on_event(evutil_socket_t fd, short what, void * arg) +static void swarmkv_mesh_on_read(evutil_socket_t fd, short what, void * arg) { struct swarmkv_mesh_thread *thr=(struct swarmkv_mesh_thread*)arg; struct swarmkv_mesh *mesh=thr->ref_mesh; ringbuf_t *ring=thr->ring; uint64_t n_msg=0; ssize_t s = read(thr->efd, &n_msg, sizeof(uint64_t)); + int tid=__gettid((struct swarmkv *)(mesh->on_msg_cb_arg)); + assert(tid==thr->thread_id); if(s!=sizeof(uint64_t)) { assert(0); @@ -93,10 +102,13 @@ static void swarmkv_mesh_on_event(evutil_socket_t fd, short what, void * arg) msg=*(struct swarmkv_msg**)(thr->buff+offset); assert(msg->magic==SWARMKV_MSG_MAGIC); ringbuf_release(ring, sizeof(struct swarmkv_msg*)); + thr->n_recv++; + mesh->n_recv++; //ownership of msg is transferred to the callback function. - mesh->on_msg_recv(msg, mesh->msg_recv_arg); + mesh->on_msg_cb_func(msg, mesh->on_msg_cb_arg); + offset=0; - thr->n_read++; + } return; } @@ -111,7 +123,7 @@ struct swarmkv_mesh *swarmkv_mesh_new(struct event_base *evbase[], int nthreads, for(int i=0; i<mesh->nr_thread; i++) { mesh->threads[i].thread_id=i; - mesh->threads[i].efd=eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC); + mesh->threads[i].efd=eventfd(0, EFD_SEMAPHORE);//EFD_NONBLOCK|EFD_CLOEXEC mesh->threads[i].workers=ALLOC(ringbuf_worker_t*, nthreads); mesh->threads[i].buff=ALLOC(char, RINGBUF_SIZE); mesh->threads[i].ring=malloc(ringbuf_obj_size); @@ -121,14 +133,16 @@ struct swarmkv_mesh *swarmkv_mesh_new(struct event_base *evbase[], int nthreads, log_fatal(mesh->ref_logger, MODULE_SWAMRKV_MESH, "eventfd() failed: %s", strerror(errno)); assert(0); } - mesh->threads[i].ev=event_new(evbase[i], mesh->threads[i].efd, EV_READ|EV_PERSIST, swarmkv_mesh_on_event, mesh->threads+i); + mesh->threads[i].ev=event_new(evbase[i], mesh->threads[i].efd, EV_READ|EV_PERSIST, swarmkv_mesh_on_read, mesh->threads+i); event_add(mesh->threads[i].ev, NULL); mesh->threads[i].ref_mesh=mesh; + mesh->threads[i].evbase=evbase[i]; } for(int i=0; i<mesh->nr_thread; i++) { for(int j=0; j<mesh->nr_thread; j++) { + if(i==j) continue; mesh->threads[i].workers[j]=ringbuf_register(mesh->threads[j].ring, i); } } diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c index 56505e7..6d076b3 100644 --- a/src/swarmkv_net.c +++ b/src/swarmkv_net.c @@ -54,11 +54,7 @@ struct snet_conn struct snet_stat { long long input_msgs, output_msgs; - long long input_replies, output_replies; - long long input_cmds, output_cmds; long long input_bytes, output_bytes; - long long timed_out_rpcs; - long long unknown_sequence; long long input_buffer_sz, output_buffer_sz; }; struct snet_thread @@ -91,10 +87,10 @@ struct swarmkv_net void *on_msg_cb_arg; struct event * stat_ev; long long last_input_bytes, last_output_bytes; - long long last_input_cmds, last_output_cmds; + long long last_input_msgs, last_output_msgs; struct timespec last_stats; double instantaneous_input_kbps, instantaneous_output_kbps; - double instantaneous_input_cps, instantaneous_output_cps; + double instantaneous_input_msgs, instantaneous_output_msgs; }; static void peer_conn_read_cb(struct bufferevent *bev, void *arg); @@ -339,6 +335,7 @@ static void peer_conn_read_cb(struct bufferevent *bev, void *arg) //swarmkv_msg_free(msg); msg=NULL; evbuffer_drain(input, msg_sz); + thr->stat.input_msgs++; thr->stat.input_bytes += msg_sz; conn->recv_state=RECEIVING_HDR; } @@ -410,7 +407,8 @@ static void snet_stat_periodic(evutil_socket_t fd, short what, void * arg) struct swarmkv_net *net= (struct swarmkv_net *)arg; struct timespec now; long long input_bytes=0, output_bytes=0; - long long input_cmds=0, output_cmds=0; + long long input_msgs=0, output_msgs=0; + clock_gettime(CLOCK_MONOTONIC, &now); if(now.tv_sec == net->last_stats.tv_sec) { @@ -422,21 +420,21 @@ static void snet_stat_periodic(evutil_socket_t fd, short what, void * arg) { input_bytes += net->threads[i].stat.input_bytes; output_bytes += net->threads[i].stat.output_bytes; - input_cmds += net->threads[i].stat.input_cmds; - output_cmds += net->threads[i].stat.output_cmds; + input_msgs += net->threads[i].stat.input_msgs; + output_msgs += net->threads[i].stat.output_msgs; } long long diff_sec=MAX(now.tv_sec - net->last_stats.tv_sec, 1); net->instantaneous_input_kbps=(input_bytes - net->last_input_bytes)*8/1000/diff_sec; net->instantaneous_output_kbps=(output_bytes - net->last_output_bytes)*8/1000/diff_sec; - net->instantaneous_input_cps=(input_cmds - net->last_input_cmds)/diff_sec; - net->instantaneous_output_cps=(output_cmds - net->last_output_cmds)/diff_sec; + net->instantaneous_input_msgs=(input_msgs - net->last_input_msgs)/diff_sec; + net->instantaneous_output_msgs=(output_msgs - net->last_output_msgs)/diff_sec; clock_gettime(CLOCK_MONOTONIC, &net->last_stats); net->last_input_bytes=input_bytes; net->last_output_bytes=output_bytes; - net->last_input_cmds=input_cmds; - net->last_output_cmds=output_cmds; + net->last_input_msgs=input_msgs; + net->last_output_msgs=output_msgs; return; } @@ -538,8 +536,8 @@ void swarmkv_net_info(struct swarmkv_net *net, struct snet_info *info) } info->instantaneous_input_kbps=net->instantaneous_input_kbps; info->instantaneous_output_kbps=net->instantaneous_output_kbps; - info->instantaneous_input_cps=net->instantaneous_input_cps; - info->instantaneous_output_cps=net->instantaneous_output_cps; + info->instantaneous_input_msgs=net->instantaneous_input_msgs; + info->instantaneous_output_msgs=net->instantaneous_output_msgs; return; } diff --git a/src/swarmkv_rpc.c b/src/swarmkv_rpc.c index 1065b99..8dd2261 100644 --- a/src/swarmkv_rpc.c +++ b/src/swarmkv_rpc.c @@ -76,7 +76,7 @@ static void rpc_timeout_callback(evutil_socket_t fd, short events, void *arg) long long swarmkv_rpc_launch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct future *f) { struct swarmkv_rpc *rpc=ALLOC(struct swarmkv_rpc, 1); - rpc->sequence=++mgr->seq_generator; + rpc->sequence=__atomic_add_fetch(&mgr->seq_generator, 1, __ATOMIC_SEQ_CST); struct timeval timeout={mgr->timeout_us/(1000*1000), mgr->timeout_us%(1000*1000)}; rpc->timeout_ev=event_new(mgr->evbases[thread_id], -1, 0, rpc_timeout_callback, rpc); event_add(rpc->timeout_ev, &timeout); diff --git a/test/swarmkv_perf_test.cpp b/test/swarmkv_perf_test.cpp index 4aba2a6..777dcc9 100644 --- a/test/swarmkv_perf_test.cpp +++ b/test/swarmkv_perf_test.cpp @@ -66,6 +66,16 @@ void *blocking_call_thread(void *thread_arg) return success; } +#define PERF_NTHREAD_CLUSTER_PORT_START 5310 +#define PERF_NTHREAD_HEALTH_PORT_START 6310 +#define PERF_DATA_SYNC_CLUSTER_PORT_START 7310 +#define PERF_DATA_SYNC_HEALTH_PORT_START 8310 +#define PERF_ASYNC_EXEC_CLUSTER_PORT_START 9310 +#define PERF_ASYNC_EXEC_HEALTH_PORT_START 10310 +#define PERF_RESILIENCE_ADD_SLOT_OWNER_CLUSTER_PORT_START 11310 +#define PERF_RESILIENCE_ADD_SLOT_OWNER_HEALTH_PORT_START 12310 +#define PERF_RESILIENCE_FAILOVER_CLUSTER_PORT_START 13310 +#define PERF_RESILIENCE_FAILOVER_HEALTH_PORT_START 14310 TEST(Performance, Nthreads) { @@ -80,7 +90,7 @@ TEST(Performance, Nthreads) char node_list_str[1024]={0}; for(i=0; i<NODE_NUMBER; i++) { - snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%d ", 5310+i); + snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%d ", PERF_NTHREAD_CLUSTER_PORT_START+i); } const char *cluster_name="swarmkv-n-threads"; swarmkv_cli_create_cluster(cluster_name, node_list_str); @@ -89,8 +99,8 @@ TEST(Performance, Nthreads) for(i=0; i<NODE_NUMBER; i++) { opts[i]=swarmkv_options_new(); - swarmkv_options_set_cluster_port(opts[i], 5310+i); - swarmkv_options_set_health_check_port(opts[i], 6310+i); + swarmkv_options_set_cluster_port(opts[i], PERF_NTHREAD_CLUSTER_PORT_START+i); + swarmkv_options_set_health_check_port(opts[i], PERF_NTHREAD_HEALTH_PORT_START+i); swarmkv_options_set_logger(opts[i], logger); swarmkv_options_set_worker_thread_number(opts[i], WORKER_THREAD_NUMBER); swarmkv_options_set_caller_thread_number(opts[i], CALLER_THREAD_NUMBER); @@ -163,7 +173,7 @@ void *background_tconsume_thread(void *thread_arg) *success=0; return success; } -TEST(Performance, Sync) +TEST(Performance, DataSync) { size_t NODE_NUMBER=2; size_t CALLER_THREAD_NUMBER=2; @@ -172,11 +182,10 @@ TEST(Performance, Sync) struct swarmkv *db[NODE_NUMBER]; char *err=NULL; const char *log_path="./swarmkv-sync.log"; - unsigned int p2p_port_start=9310, health_port_start=10310; char node_list_str[1024]={0}; for(size_t i=0; i<NODE_NUMBER; i++) { - snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%zu ", p2p_port_start+i); + snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%zu ", PERF_DATA_SYNC_CLUSTER_PORT_START+i); } const char *cluster_name="swarmkv-sync"; swarmkv_cli_create_cluster(cluster_name, node_list_str); @@ -185,8 +194,8 @@ TEST(Performance, Sync) for(size_t i=0; i<NODE_NUMBER; i++) { opts[i]=swarmkv_options_new(); - swarmkv_options_set_cluster_port(opts[i], p2p_port_start+i); - swarmkv_options_set_health_check_port(opts[i], health_port_start+i); + swarmkv_options_set_cluster_port(opts[i], PERF_DATA_SYNC_CLUSTER_PORT_START+i); + swarmkv_options_set_health_check_port(opts[i], PERF_DATA_SYNC_HEALTH_PORT_START+i); swarmkv_options_set_logger(opts[i], logger); swarmkv_options_set_worker_thread_number(opts[i], WORKER_THREAD_NUMBER); swarmkv_options_set_caller_thread_number(opts[i], CALLER_THREAD_NUMBER+1); @@ -248,7 +257,89 @@ TEST(Performance, Sync) } log_handle_destroy(logger); +} +struct async_exec_ctx +{ + int expected_reply_cnt; + int reply_cnt; + struct swarmkv *db; +}; +void async_on_reply_cb(const struct swarmkv_reply *reply, void * arg) +{ + struct async_exec_ctx *ctx=(struct async_exec_ctx *)arg; + EXPECT_EQ(reply->type, SWARMKV_REPLY_STATUS); + if(reply->type != SWARMKV_REPLY_STATUS) + { + swarmkv_reply_print(reply, stdout); + } + ctx->reply_cnt++; + if(ctx->reply_cnt==ctx->expected_reply_cnt) + { + swarmkv_caller_loop_break(ctx->db); + } +} +TEST(Performance, AsyncExec) +{ + int NODE_NUMBER=1; + int CALLER_THREAD_NUMBER=1; + int WORKER_THREAD_NUMBER=1; + int i=0; + struct swarmkv *db[NODE_NUMBER]; + char *err=NULL; + const char *log_path="./swarmkv-async-exec.log"; + + char node_list_str[1024]={0}; + for(i=0; i<NODE_NUMBER; i++) + { + snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%d ", PERF_ASYNC_EXEC_CLUSTER_PORT_START+i); + } + const char *cluster_name="swarmkv-async-exec"; + swarmkv_cli_create_cluster(cluster_name, node_list_str); + struct log_handle * logger=log_handle_create(log_path, 0); + struct swarmkv_options* opts[NODE_NUMBER]; + for(i=0; i<NODE_NUMBER; i++) + { + opts[i]=swarmkv_options_new(); + swarmkv_options_set_cluster_port(opts[i], PERF_ASYNC_EXEC_CLUSTER_PORT_START+i); + swarmkv_options_set_health_check_port(opts[i], PERF_ASYNC_EXEC_HEALTH_PORT_START+i); + swarmkv_options_set_cluster_timeout_us(opts[i], 2000*1000); + swarmkv_options_set_logger(opts[i], logger); + swarmkv_options_set_worker_thread_number(opts[i], WORKER_THREAD_NUMBER); + swarmkv_options_set_caller_thread_number(opts[i], CALLER_THREAD_NUMBER); + db[i]=swarmkv_open(opts[i], cluster_name, &err); + if(err) + { + printf("swarmkv_open %d instance failed: %s\n", i, err); + free(err); + err=NULL; + } + swarmkv_register_thread(db[i]); + } + int key_number=65536; + struct async_exec_ctx ctx; + ctx.expected_reply_cnt=key_number; + ctx.reply_cnt=0; + ctx.db=db[0]; + struct timespec start, end; + clock_gettime(CLOCK_REALTIME, &start); + for(i=0; i<key_number; i++) + { + swarmkv_async_command(db[0], async_on_reply_cb, &ctx, "SET async-key-%d by-node-%d", i, 0); + //swarmkv_caller_loop(db[0], SWARMKV_LOOP_NONBLOCK|SWARMKV_LOOP_ONCE, NULL); + } + clock_gettime(CLOCK_REALTIME, &end); + double elapsed_ms=(end.tv_sec-start.tv_sec)*1000.0+(end.tv_nsec-start.tv_nsec)/1000000.0; + printf("Async SET %d keys, elapsed %lf ms, %lf keys/s\n", key_number, elapsed_ms, key_number*1000.0/elapsed_ms); + swarmkv_caller_loop(db[0], SWARMKV_LOOP_NO_EXIT_ON_EMPTY, NULL); + + for(i=0; i<NODE_NUMBER; i++) + { + //close slowly to cover more code branches. + sleep(2); + swarmkv_close(db[i]); + } + log_handle_destroy(logger); } int g_running_flag=0; void *migration_background_thread(void *thread_arg) @@ -305,11 +396,11 @@ TEST(Resilience, AddSlotOwner) struct swarmkv *db[NODE_NUMBER+CANDINATE_NUMBER]; char *err=NULL; const char *log_path="./swarmkv-migration-11.log"; - unsigned int p2p_port_start=7310, health_port_start=8310; char node_list_str[1024]={0}; for(i=0; i<NODE_NUMBER; i++) { - snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%d ", p2p_port_start+i); + snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%d ", + PERF_RESILIENCE_ADD_SLOT_OWNER_CLUSTER_PORT_START+i); } const char *cluster_name="swarmkv-migration"; swarmkv_cli_create_cluster(cluster_name, node_list_str); @@ -318,8 +409,8 @@ TEST(Resilience, AddSlotOwner) for(i=0; i<NODE_NUMBER+CANDINATE_NUMBER; i++) { opts[i]=swarmkv_options_new(); - swarmkv_options_set_cluster_port(opts[i], p2p_port_start+i); - swarmkv_options_set_health_check_port(opts[i], health_port_start+i); + swarmkv_options_set_cluster_port(opts[i], PERF_RESILIENCE_ADD_SLOT_OWNER_CLUSTER_PORT_START+i); + swarmkv_options_set_health_check_port(opts[i], PERF_RESILIENCE_ADD_SLOT_OWNER_HEALTH_PORT_START+i); swarmkv_options_set_logger(opts[i], logger); swarmkv_options_set_worker_thread_number(opts[i], WORKER_THREAD_NUMBER); swarmkv_options_set_caller_thread_number(opts[i], CALLER_THREAD_NUMBER); @@ -347,7 +438,7 @@ TEST(Resilience, AddSlotOwner) int offset=0; for(i=0; i<CANDINATE_NUMBER; i++) { - offset+=snprintf(candinate_string+offset, sizeof(candinate_string)-offset, "127.0.0.1:%u ", p2p_port_start+NODE_NUMBER+i); + offset+=snprintf(candinate_string+offset, sizeof(candinate_string)-offset, "127.0.0.1:%u ", PERF_RESILIENCE_ADD_SLOT_OWNER_CLUSTER_PORT_START+NODE_NUMBER+i); } swarmkv_cli_add_slot_owner(cluster_name, candinate_string); sleep(10); @@ -380,11 +471,10 @@ TEST(Resilience, Failover) struct swarmkv *db[NODE_NUMBER]; char *err=NULL; - unsigned int p2p_port_start=11310, health_port_start=12310; char node_list_str[1024]={0}; for(size_t i=1; i<NODE_NUMBER; i++) { - snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%zu ", p2p_port_start+i); + snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%zu ", PERF_RESILIENCE_FAILOVER_CLUSTER_PORT_START+i); } const char *cluster_name="swarmkv-failover"; swarmkv_cli_create_cluster(cluster_name, node_list_str); @@ -392,8 +482,8 @@ TEST(Resilience, Failover) for(size_t i=0; i<NODE_NUMBER; i++) { opts[i]=swarmkv_options_new(); - swarmkv_options_set_cluster_port(opts[i], p2p_port_start+i); - swarmkv_options_set_health_check_port(opts[i], health_port_start+i); + swarmkv_options_set_cluster_port(opts[i], PERF_RESILIENCE_FAILOVER_CLUSTER_PORT_START+i); + swarmkv_options_set_health_check_port(opts[i], PERF_RESILIENCE_FAILOVER_HEALTH_PORT_START+i); swarmkv_options_set_worker_thread_number(opts[i], 1); swarmkv_options_set_caller_thread_number(opts[i], 1); db[i]=swarmkv_open(opts[i], cluster_name, &err); @@ -436,7 +526,7 @@ TEST(Resilience, Failover) swarmkv_reply_free(reply); for(size_t j=1; j<NODE_NUMBER; j++) { - snprintf(target, sizeof(target), "127.0.0.1:%zu", p2p_port_start+j); + snprintf(target, sizeof(target), "127.0.0.1:%zu", PERF_RESILIENCE_FAILOVER_CLUSTER_PORT_START+j); reply=swarmkv_command_on(db[1], target, "CRDT RLIST fo-%zu", i); EXPECT_EQ(reply->type, SWARMKV_REPLY_ARRAY); EXPECT_EQ(reply->n_element, NODE_NUMBER-2); |
