summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZheng Chao <[email protected]>2023-08-19 22:24:07 +0800
committerZheng Chao <[email protected]>2023-08-19 22:24:07 +0800
commit32f4c412c9b458332ff3f87257cb05ab4a0e81f1 (patch)
tree7d1fe2cae3448fce538f1f93c1fe192143a6323b
parentc590601c8df9a9ab03a3b6b7681cce3a18d2256f (diff)
Increase the ringbuf size of swarmkv_mesh to avoid message drops.
-rw-r--r--include/swarmkv/swarmkv.h23
-rw-r--r--src/inc_internal/swarmkv_net.h8
-rw-r--r--src/swarmkv.c42
-rw-r--r--src/swarmkv_mesh.c38
-rw-r--r--src/swarmkv_net.c28
-rw-r--r--src/swarmkv_rpc.c2
-rw-r--r--test/swarmkv_perf_test.cpp126
7 files changed, 188 insertions, 79 deletions
diff --git a/include/swarmkv/swarmkv.h b/include/swarmkv/swarmkv.h
index 52f412e..61c2dfb 100644
--- a/include/swarmkv/swarmkv.h
+++ b/include/swarmkv/swarmkv.h
@@ -87,14 +87,21 @@ struct swarmkv_options *swarmkv_get0_options(struct swarmkv *db);
void swarmkv_register_thread(struct swarmkv *db);
-//Do not exit the loop because we have no pending events.
-#define SWARMKV_LOOP_NO_EXIT_ON_EMPTY 0x04
-//Do not block: see which events are ready now, run the callbacks of the highest-priority ones, then exit.
-#define SWARMKV_LOOP_NONBLOCK 0x02
-// Block until we have an active event, then exit once all active events have had their callbacks run.
-#define SWARMKV_LOOP_ONCE 0x01
-//flags any combination of SWARMKV_LOOP_ONCE | SWARMKV_LOOP_NONBLOCK | SWARMKV_LOOP_NO_EXIT_ON_EMPTY
-//The flags are identicial to event_base_loop() of libevent. See https://libevent.org/doc/event_8h.html#a10b9c102337d0ece8607c9c01bc4f7c8
+/* Do not exit the loop because we have no pending events. */
+#define SWARMKV_LOOP_NO_EXIT_ON_EMPTY 0x04
+/* Do not block, see which events are ready now, run the callbacks of the highest-priority ones, then exit.*/
+#define SWARMKV_LOOP_NONBLOCK 0x02
+/* Block until we have an active event, then exit once all active events have had their callbacks run. */
+#define SWARMKV_LOOP_ONCE 0x01
+/**
+ * Run the event loop for the specified amount of time. swarmkv_on_reply_callback_t will be called when the reply is ready.
+ * By default, this loop will run the event base until either there are no more pending or active events, or until something calls swarmkv_caller_loop_break().
+ * You can override this behavior with the 'flags' argument.
+ * @param db swarmkv instance to loop, the caller thread must be registered by swarmkv_register_thread() before calling this function.
+ * @param flags any combination of SWARMKV_LOOP_ONCE | SWARMKV_LOOP_NONBLOCK | SWARMKV_LOOP_NO_EXIT_ON_EMPTY.
+ * The flags are identicial to event_base_loop() of libevent. See https://libevent.org/doc/event_8h.html#a10b9c102337d0ece8607c9c01bc4f7c8
+ * @param tv Timeout for select/poll in milliseconds, or NULL for no timeout.
+ */
void swarmkv_caller_loop(struct swarmkv *db, int flags, struct timeval *tv);
void swarmkv_caller_loop_break(struct swarmkv *db);
diff --git a/src/inc_internal/swarmkv_net.h b/src/inc_internal/swarmkv_net.h
index af2d28a..4ceb025 100644
--- a/src/inc_internal/swarmkv_net.h
+++ b/src/inc_internal/swarmkv_net.h
@@ -19,10 +19,10 @@ struct snet_info
long long connections;
long long input_bytes;
- long long input_msgs; //input_cmds + input_replies
+ long long input_msgs;
long long output_bytes;
- long long output_msgs; //output_cmds + output_replies
+ long long output_msgs;
long long input_buffer_sz;
long long output_buffer_sz;
@@ -30,7 +30,7 @@ struct snet_info
double instantaneous_input_kbps;
double instantaneous_output_kbps;
- double instantaneous_input_cps;
- double instantaneous_output_cps;
+ double instantaneous_input_msgs;
+ double instantaneous_output_msgs;
};
void swarmkv_net_info(struct swarmkv_net *net, struct snet_info *info); \ No newline at end of file
diff --git a/src/swarmkv.c b/src/swarmkv.c
index 9dccb42..cd15797 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -44,7 +44,7 @@ const char* SWARMKV_VERSION="3.0.0";
-struct swarmkv_thread_ctx
+struct swarmkv_thread
{
pthread_t thr;
int sys_tid;
@@ -66,7 +66,7 @@ struct swarmkv
struct swarmkv_options *opts;
int thread_counter;
- struct swarmkv_thread_ctx *threads;
+ struct swarmkv_thread *threads;
pthread_barrier_t barrier;
struct event_base **ref_evbases;
@@ -312,8 +312,8 @@ enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct sw
net_info.output_buffer_sz,
net_info.instantaneous_input_kbps,
net_info.instantaneous_output_kbps,
- net_info.instantaneous_input_cps,
- net_info.instantaneous_output_cps
+ net_info.instantaneous_input_msgs,
+ net_info.instantaneous_output_msgs
);
if(cmd->argc>1)
@@ -476,39 +476,39 @@ struct swarmkv_cmd_spec *get_spec_by_argv(struct swarmkv *db, size_t argc, char*
}
void __swarmkv_periodic(evutil_socket_t fd, short what, void * arg)
{
- struct swarmkv_thread_ctx *thread=(struct swarmkv_thread_ctx *)arg;
- swarmkv_store_periodic(thread->db->mod_store, thread->thread_id);
- swarmkv_keyspace_periodic(thread->db->mod_keyspace, thread->thread_id);
+ struct swarmkv_thread *thr=(struct swarmkv_thread *)arg;
+ swarmkv_store_periodic(thr->db->mod_store, thr->thread_id);
+ swarmkv_keyspace_periodic(thr->db->mod_keyspace, thr->thread_id);
}
void *swarmkv_worker_thread(void *arg)
{
struct swarmkv *db = (struct swarmkv *)arg;
swarmkv_register_thread(db);
int tid=__gettid(db);
- struct swarmkv_thread_ctx *ctx = db->threads+tid;
+ struct swarmkv_thread *thr = db->threads+tid;
char thread_name[16];
- snprintf(thread_name, sizeof(thread_name), "swarmkv-%u", ctx->thread_id);
+ snprintf(thread_name, sizeof(thread_name), "swarmkv-%u", thr->thread_id);
prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL);
struct timeval sync_interval = {db->opts->sync_interval_us/(1000*1000), db->opts->sync_interval_us%(1000*1000)};
- struct event * periodic_ev=event_new(ctx->evbase, -1, EV_PERSIST, __swarmkv_periodic, ctx);
+ struct event * periodic_ev=event_new(thr->evbase, -1, EV_PERSIST, __swarmkv_periodic, thr);
evtimer_add(periodic_ev, &sync_interval);
- ctx->is_dispatching=1;
+ thr->is_dispatching=1;
pthread_barrier_wait(&db->barrier);
- int ret=event_base_dispatch(ctx->evbase);
+ int ret=event_base_dispatch(thr->evbase);
event_del(periodic_ev);
event_free(periodic_ev);
- if(ctx->is_dispatching)
+ if(thr->is_dispatching)
{
log_fatal(db->logger, MODULE_SWAMRKV_CORE, "worker thread event_base_dispatch() exit abnormally, ret=%d", ret);
}
else
{
- log_info(ctx->db->logger, MODULE_SWAMRKV_CORE, "%s worker thread %d exited", ctx->db->db_name, ctx->sys_tid);
+ log_info(thr->db->logger, MODULE_SWAMRKV_CORE, "%s worker thread %d exited", thr->db->db_name, thr->sys_tid);
}
return NULL;
}
@@ -1223,27 +1223,27 @@ char *swarmkv_get_command_hint(struct swarmkv *db, const char* cmd_name)
}
static void evloop_timeout_cb(evutil_socket_t fd, short event, void *arg)
{
- struct swarmkv_thread_ctx *ctx = (struct swarmkv_thread_ctx *)arg;
- event_base_loopbreak(ctx->evbase);
+ struct swarmkv_thread *thr = (struct swarmkv_thread *)arg;
+ event_base_loopbreak(thr->evbase);
}
void swarmkv_caller_loop(struct swarmkv *db, int flags, struct timeval *tv)
{
int tid=__gettid(db);
//must initiate from caller threads, and caller thread ID is larger than worker thread ID
assert(tid >= db->opts->nr_worker_threads);
- struct swarmkv_thread_ctx *ctx=db->threads+tid;
+ struct swarmkv_thread *thr=db->threads+tid;
struct event *timeout_event = NULL;
if(tv)
{
- timeout_event = event_new(ctx->evbase, -1, 0, evloop_timeout_cb, ctx);
+ timeout_event = event_new(thr->evbase, -1, 0, evloop_timeout_cb, thr);
evtimer_add(timeout_event, tv);
- event_base_loop(ctx->evbase, flags);
+ event_base_loop(thr->evbase, flags);
event_del(timeout_event);
event_free(timeout_event);
}
else
{
- event_base_loop(ctx->evbase, flags);
+ event_base_loop(thr->evbase, flags);
}
return;
@@ -1262,7 +1262,7 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name,
db=ALLOC(struct swarmkv, 1);
strncpy(db->db_name, db_name, sizeof(db->db_name));
opts->total_threads=opts->nr_caller_threads+opts->nr_worker_threads;
- db->threads=ALLOC(struct swarmkv_thread_ctx,opts->total_threads);
+ db->threads=ALLOC(struct swarmkv_thread, opts->total_threads);
/* adds locking, only required if accessed from separate threads */
evthread_use_pthreads();
diff --git a/src/swarmkv_mesh.c b/src/swarmkv_mesh.c
index 49203d4..548b6b6 100644
--- a/src/swarmkv_mesh.c
+++ b/src/swarmkv_mesh.c
@@ -14,7 +14,7 @@
#include <pthread.h> //sanity check
#define MODULE_SWAMRKV_MESH module_name_str("swarmkv.mesh")
-#define RINGBUF_SIZE 1024
+#define RINGBUF_SIZE 1024*1024
struct swarmkv_mesh_thread
{
@@ -25,15 +25,18 @@ struct swarmkv_mesh_thread
char *buff;
ringbuf_worker_t **workers;
struct swarmkv_mesh *ref_mesh;
- int n_write, n_read;
+ long long n_send, n_recv;
+ struct event_base *evbase;
};
struct swarmkv_mesh
{
size_t nr_thread;
struct swarmkv_mesh_thread *threads;
struct log_handle *ref_logger;
- on_msg_callback_t *on_msg_recv;
- void *msg_recv_arg;
+ on_msg_callback_t *on_msg_cb_func;
+ void *on_msg_cb_arg;
+ long long n_send, n_recv;
+ long long n_drop;
};
//The swarmkv_mesh_send takes the ownership of msg.
int swarmkv_mesh_send(struct swarmkv_mesh *mesh, int current_thread_id, int dest_thread_id, struct swarmkv_msg *msg)
@@ -42,12 +45,15 @@ int swarmkv_mesh_send(struct swarmkv_mesh *mesh, int current_thread_id, int dest
assert(current_thread_id != dest_thread_id);
struct swarmkv_mesh_thread *curr_thr=mesh->threads+current_thread_id;
struct swarmkv_mesh_thread *dest_thr=mesh->threads+dest_thread_id;
+ int tid=__gettid((struct swarmkv *)(mesh->on_msg_cb_arg));
+ assert(tid==current_thread_id);
ringbuf_t *dest_ring=dest_thr->ring;
assert(msg->magic == SWARMKV_MSG_MAGIC);
ssize_t offset=0;
offset=ringbuf_acquire(dest_ring, curr_thr->workers[dest_thread_id], sizeof(struct swarmkv_msg*));
if(offset == -1)
{
+ mesh->n_drop++;
log_warn(mesh->ref_logger, MODULE_SWAMRKV_MESH, "ringbuf is full, drop the message");
goto error_out;
}
@@ -60,7 +66,8 @@ int swarmkv_mesh_send(struct swarmkv_mesh *mesh, int current_thread_id, int dest
{
assert(0);
}
- dest_thr->n_write++;
+ curr_thr->n_send++;
+ mesh->n_send++;
return 0;
error_out:
swarmkv_msg_free(msg);
@@ -68,18 +75,20 @@ error_out:
}
void swarmkv_mesh_set_on_msg_cb(struct swarmkv_mesh *mesh, on_msg_callback_t cb_func, void *cb_arg)
{
- mesh->on_msg_recv=cb_func;
- mesh->msg_recv_arg=cb_arg;
+ mesh->on_msg_cb_func=cb_func;
+ mesh->on_msg_cb_arg=cb_arg;
return;
}
-static void swarmkv_mesh_on_event(evutil_socket_t fd, short what, void * arg)
+static void swarmkv_mesh_on_read(evutil_socket_t fd, short what, void * arg)
{
struct swarmkv_mesh_thread *thr=(struct swarmkv_mesh_thread*)arg;
struct swarmkv_mesh *mesh=thr->ref_mesh;
ringbuf_t *ring=thr->ring;
uint64_t n_msg=0;
ssize_t s = read(thr->efd, &n_msg, sizeof(uint64_t));
+ int tid=__gettid((struct swarmkv *)(mesh->on_msg_cb_arg));
+ assert(tid==thr->thread_id);
if(s!=sizeof(uint64_t))
{
assert(0);
@@ -93,10 +102,13 @@ static void swarmkv_mesh_on_event(evutil_socket_t fd, short what, void * arg)
msg=*(struct swarmkv_msg**)(thr->buff+offset);
assert(msg->magic==SWARMKV_MSG_MAGIC);
ringbuf_release(ring, sizeof(struct swarmkv_msg*));
+ thr->n_recv++;
+ mesh->n_recv++;
//ownership of msg is transferred to the callback function.
- mesh->on_msg_recv(msg, mesh->msg_recv_arg);
+ mesh->on_msg_cb_func(msg, mesh->on_msg_cb_arg);
+
offset=0;
- thr->n_read++;
+
}
return;
}
@@ -111,7 +123,7 @@ struct swarmkv_mesh *swarmkv_mesh_new(struct event_base *evbase[], int nthreads,
for(int i=0; i<mesh->nr_thread; i++)
{
mesh->threads[i].thread_id=i;
- mesh->threads[i].efd=eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC);
+ mesh->threads[i].efd=eventfd(0, EFD_SEMAPHORE);//EFD_NONBLOCK|EFD_CLOEXEC
mesh->threads[i].workers=ALLOC(ringbuf_worker_t*, nthreads);
mesh->threads[i].buff=ALLOC(char, RINGBUF_SIZE);
mesh->threads[i].ring=malloc(ringbuf_obj_size);
@@ -121,14 +133,16 @@ struct swarmkv_mesh *swarmkv_mesh_new(struct event_base *evbase[], int nthreads,
log_fatal(mesh->ref_logger, MODULE_SWAMRKV_MESH, "eventfd() failed: %s", strerror(errno));
assert(0);
}
- mesh->threads[i].ev=event_new(evbase[i], mesh->threads[i].efd, EV_READ|EV_PERSIST, swarmkv_mesh_on_event, mesh->threads+i);
+ mesh->threads[i].ev=event_new(evbase[i], mesh->threads[i].efd, EV_READ|EV_PERSIST, swarmkv_mesh_on_read, mesh->threads+i);
event_add(mesh->threads[i].ev, NULL);
mesh->threads[i].ref_mesh=mesh;
+ mesh->threads[i].evbase=evbase[i];
}
for(int i=0; i<mesh->nr_thread; i++)
{
for(int j=0; j<mesh->nr_thread; j++)
{
+ if(i==j) continue;
mesh->threads[i].workers[j]=ringbuf_register(mesh->threads[j].ring, i);
}
}
diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c
index 56505e7..6d076b3 100644
--- a/src/swarmkv_net.c
+++ b/src/swarmkv_net.c
@@ -54,11 +54,7 @@ struct snet_conn
struct snet_stat
{
long long input_msgs, output_msgs;
- long long input_replies, output_replies;
- long long input_cmds, output_cmds;
long long input_bytes, output_bytes;
- long long timed_out_rpcs;
- long long unknown_sequence;
long long input_buffer_sz, output_buffer_sz;
};
struct snet_thread
@@ -91,10 +87,10 @@ struct swarmkv_net
void *on_msg_cb_arg;
struct event * stat_ev;
long long last_input_bytes, last_output_bytes;
- long long last_input_cmds, last_output_cmds;
+ long long last_input_msgs, last_output_msgs;
struct timespec last_stats;
double instantaneous_input_kbps, instantaneous_output_kbps;
- double instantaneous_input_cps, instantaneous_output_cps;
+ double instantaneous_input_msgs, instantaneous_output_msgs;
};
static void peer_conn_read_cb(struct bufferevent *bev, void *arg);
@@ -339,6 +335,7 @@ static void peer_conn_read_cb(struct bufferevent *bev, void *arg)
//swarmkv_msg_free(msg);
msg=NULL;
evbuffer_drain(input, msg_sz);
+ thr->stat.input_msgs++;
thr->stat.input_bytes += msg_sz;
conn->recv_state=RECEIVING_HDR;
}
@@ -410,7 +407,8 @@ static void snet_stat_periodic(evutil_socket_t fd, short what, void * arg)
struct swarmkv_net *net= (struct swarmkv_net *)arg;
struct timespec now;
long long input_bytes=0, output_bytes=0;
- long long input_cmds=0, output_cmds=0;
+ long long input_msgs=0, output_msgs=0;
+
clock_gettime(CLOCK_MONOTONIC, &now);
if(now.tv_sec == net->last_stats.tv_sec)
{
@@ -422,21 +420,21 @@ static void snet_stat_periodic(evutil_socket_t fd, short what, void * arg)
{
input_bytes += net->threads[i].stat.input_bytes;
output_bytes += net->threads[i].stat.output_bytes;
- input_cmds += net->threads[i].stat.input_cmds;
- output_cmds += net->threads[i].stat.output_cmds;
+ input_msgs += net->threads[i].stat.input_msgs;
+ output_msgs += net->threads[i].stat.output_msgs;
}
long long diff_sec=MAX(now.tv_sec - net->last_stats.tv_sec, 1);
net->instantaneous_input_kbps=(input_bytes - net->last_input_bytes)*8/1000/diff_sec;
net->instantaneous_output_kbps=(output_bytes - net->last_output_bytes)*8/1000/diff_sec;
- net->instantaneous_input_cps=(input_cmds - net->last_input_cmds)/diff_sec;
- net->instantaneous_output_cps=(output_cmds - net->last_output_cmds)/diff_sec;
+ net->instantaneous_input_msgs=(input_msgs - net->last_input_msgs)/diff_sec;
+ net->instantaneous_output_msgs=(output_msgs - net->last_output_msgs)/diff_sec;
clock_gettime(CLOCK_MONOTONIC, &net->last_stats);
net->last_input_bytes=input_bytes;
net->last_output_bytes=output_bytes;
- net->last_input_cmds=input_cmds;
- net->last_output_cmds=output_cmds;
+ net->last_input_msgs=input_msgs;
+ net->last_output_msgs=output_msgs;
return;
}
@@ -538,8 +536,8 @@ void swarmkv_net_info(struct swarmkv_net *net, struct snet_info *info)
}
info->instantaneous_input_kbps=net->instantaneous_input_kbps;
info->instantaneous_output_kbps=net->instantaneous_output_kbps;
- info->instantaneous_input_cps=net->instantaneous_input_cps;
- info->instantaneous_output_cps=net->instantaneous_output_cps;
+ info->instantaneous_input_msgs=net->instantaneous_input_msgs;
+ info->instantaneous_output_msgs=net->instantaneous_output_msgs;
return;
}
diff --git a/src/swarmkv_rpc.c b/src/swarmkv_rpc.c
index 1065b99..8dd2261 100644
--- a/src/swarmkv_rpc.c
+++ b/src/swarmkv_rpc.c
@@ -76,7 +76,7 @@ static void rpc_timeout_callback(evutil_socket_t fd, short events, void *arg)
long long swarmkv_rpc_launch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct future *f)
{
struct swarmkv_rpc *rpc=ALLOC(struct swarmkv_rpc, 1);
- rpc->sequence=++mgr->seq_generator;
+ rpc->sequence=__atomic_add_fetch(&mgr->seq_generator, 1, __ATOMIC_SEQ_CST);
struct timeval timeout={mgr->timeout_us/(1000*1000), mgr->timeout_us%(1000*1000)};
rpc->timeout_ev=event_new(mgr->evbases[thread_id], -1, 0, rpc_timeout_callback, rpc);
event_add(rpc->timeout_ev, &timeout);
diff --git a/test/swarmkv_perf_test.cpp b/test/swarmkv_perf_test.cpp
index 4aba2a6..777dcc9 100644
--- a/test/swarmkv_perf_test.cpp
+++ b/test/swarmkv_perf_test.cpp
@@ -66,6 +66,16 @@ void *blocking_call_thread(void *thread_arg)
return success;
}
+#define PERF_NTHREAD_CLUSTER_PORT_START 5310
+#define PERF_NTHREAD_HEALTH_PORT_START 6310
+#define PERF_DATA_SYNC_CLUSTER_PORT_START 7310
+#define PERF_DATA_SYNC_HEALTH_PORT_START 8310
+#define PERF_ASYNC_EXEC_CLUSTER_PORT_START 9310
+#define PERF_ASYNC_EXEC_HEALTH_PORT_START 10310
+#define PERF_RESILIENCE_ADD_SLOT_OWNER_CLUSTER_PORT_START 11310
+#define PERF_RESILIENCE_ADD_SLOT_OWNER_HEALTH_PORT_START 12310
+#define PERF_RESILIENCE_FAILOVER_CLUSTER_PORT_START 13310
+#define PERF_RESILIENCE_FAILOVER_HEALTH_PORT_START 14310
TEST(Performance, Nthreads)
{
@@ -80,7 +90,7 @@ TEST(Performance, Nthreads)
char node_list_str[1024]={0};
for(i=0; i<NODE_NUMBER; i++)
{
- snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%d ", 5310+i);
+ snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%d ", PERF_NTHREAD_CLUSTER_PORT_START+i);
}
const char *cluster_name="swarmkv-n-threads";
swarmkv_cli_create_cluster(cluster_name, node_list_str);
@@ -89,8 +99,8 @@ TEST(Performance, Nthreads)
for(i=0; i<NODE_NUMBER; i++)
{
opts[i]=swarmkv_options_new();
- swarmkv_options_set_cluster_port(opts[i], 5310+i);
- swarmkv_options_set_health_check_port(opts[i], 6310+i);
+ swarmkv_options_set_cluster_port(opts[i], PERF_NTHREAD_CLUSTER_PORT_START+i);
+ swarmkv_options_set_health_check_port(opts[i], PERF_NTHREAD_HEALTH_PORT_START+i);
swarmkv_options_set_logger(opts[i], logger);
swarmkv_options_set_worker_thread_number(opts[i], WORKER_THREAD_NUMBER);
swarmkv_options_set_caller_thread_number(opts[i], CALLER_THREAD_NUMBER);
@@ -163,7 +173,7 @@ void *background_tconsume_thread(void *thread_arg)
*success=0;
return success;
}
-TEST(Performance, Sync)
+TEST(Performance, DataSync)
{
size_t NODE_NUMBER=2;
size_t CALLER_THREAD_NUMBER=2;
@@ -172,11 +182,10 @@ TEST(Performance, Sync)
struct swarmkv *db[NODE_NUMBER];
char *err=NULL;
const char *log_path="./swarmkv-sync.log";
- unsigned int p2p_port_start=9310, health_port_start=10310;
char node_list_str[1024]={0};
for(size_t i=0; i<NODE_NUMBER; i++)
{
- snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%zu ", p2p_port_start+i);
+ snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%zu ", PERF_DATA_SYNC_CLUSTER_PORT_START+i);
}
const char *cluster_name="swarmkv-sync";
swarmkv_cli_create_cluster(cluster_name, node_list_str);
@@ -185,8 +194,8 @@ TEST(Performance, Sync)
for(size_t i=0; i<NODE_NUMBER; i++)
{
opts[i]=swarmkv_options_new();
- swarmkv_options_set_cluster_port(opts[i], p2p_port_start+i);
- swarmkv_options_set_health_check_port(opts[i], health_port_start+i);
+ swarmkv_options_set_cluster_port(opts[i], PERF_DATA_SYNC_CLUSTER_PORT_START+i);
+ swarmkv_options_set_health_check_port(opts[i], PERF_DATA_SYNC_HEALTH_PORT_START+i);
swarmkv_options_set_logger(opts[i], logger);
swarmkv_options_set_worker_thread_number(opts[i], WORKER_THREAD_NUMBER);
swarmkv_options_set_caller_thread_number(opts[i], CALLER_THREAD_NUMBER+1);
@@ -248,7 +257,89 @@ TEST(Performance, Sync)
}
log_handle_destroy(logger);
+}
+struct async_exec_ctx
+{
+ int expected_reply_cnt;
+ int reply_cnt;
+ struct swarmkv *db;
+};
+void async_on_reply_cb(const struct swarmkv_reply *reply, void * arg)
+{
+ struct async_exec_ctx *ctx=(struct async_exec_ctx *)arg;
+ EXPECT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ if(reply->type != SWARMKV_REPLY_STATUS)
+ {
+ swarmkv_reply_print(reply, stdout);
+ }
+ ctx->reply_cnt++;
+ if(ctx->reply_cnt==ctx->expected_reply_cnt)
+ {
+ swarmkv_caller_loop_break(ctx->db);
+ }
+}
+TEST(Performance, AsyncExec)
+{
+ int NODE_NUMBER=1;
+ int CALLER_THREAD_NUMBER=1;
+ int WORKER_THREAD_NUMBER=1;
+ int i=0;
+ struct swarmkv *db[NODE_NUMBER];
+ char *err=NULL;
+ const char *log_path="./swarmkv-async-exec.log";
+
+ char node_list_str[1024]={0};
+ for(i=0; i<NODE_NUMBER; i++)
+ {
+ snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%d ", PERF_ASYNC_EXEC_CLUSTER_PORT_START+i);
+ }
+ const char *cluster_name="swarmkv-async-exec";
+ swarmkv_cli_create_cluster(cluster_name, node_list_str);
+ struct log_handle * logger=log_handle_create(log_path, 0);
+ struct swarmkv_options* opts[NODE_NUMBER];
+ for(i=0; i<NODE_NUMBER; i++)
+ {
+ opts[i]=swarmkv_options_new();
+ swarmkv_options_set_cluster_port(opts[i], PERF_ASYNC_EXEC_CLUSTER_PORT_START+i);
+ swarmkv_options_set_health_check_port(opts[i], PERF_ASYNC_EXEC_HEALTH_PORT_START+i);
+ swarmkv_options_set_cluster_timeout_us(opts[i], 2000*1000);
+ swarmkv_options_set_logger(opts[i], logger);
+ swarmkv_options_set_worker_thread_number(opts[i], WORKER_THREAD_NUMBER);
+ swarmkv_options_set_caller_thread_number(opts[i], CALLER_THREAD_NUMBER);
+ db[i]=swarmkv_open(opts[i], cluster_name, &err);
+ if(err)
+ {
+ printf("swarmkv_open %d instance failed: %s\n", i, err);
+ free(err);
+ err=NULL;
+ }
+ swarmkv_register_thread(db[i]);
+ }
+ int key_number=65536;
+ struct async_exec_ctx ctx;
+ ctx.expected_reply_cnt=key_number;
+ ctx.reply_cnt=0;
+ ctx.db=db[0];
+ struct timespec start, end;
+ clock_gettime(CLOCK_REALTIME, &start);
+ for(i=0; i<key_number; i++)
+ {
+ swarmkv_async_command(db[0], async_on_reply_cb, &ctx, "SET async-key-%d by-node-%d", i, 0);
+ //swarmkv_caller_loop(db[0], SWARMKV_LOOP_NONBLOCK|SWARMKV_LOOP_ONCE, NULL);
+ }
+ clock_gettime(CLOCK_REALTIME, &end);
+ double elapsed_ms=(end.tv_sec-start.tv_sec)*1000.0+(end.tv_nsec-start.tv_nsec)/1000000.0;
+ printf("Async SET %d keys, elapsed %lf ms, %lf keys/s\n", key_number, elapsed_ms, key_number*1000.0/elapsed_ms);
+ swarmkv_caller_loop(db[0], SWARMKV_LOOP_NO_EXIT_ON_EMPTY, NULL);
+
+ for(i=0; i<NODE_NUMBER; i++)
+ {
+ //close slowly to cover more code branches.
+ sleep(2);
+ swarmkv_close(db[i]);
+ }
+ log_handle_destroy(logger);
}
int g_running_flag=0;
void *migration_background_thread(void *thread_arg)
@@ -305,11 +396,11 @@ TEST(Resilience, AddSlotOwner)
struct swarmkv *db[NODE_NUMBER+CANDINATE_NUMBER];
char *err=NULL;
const char *log_path="./swarmkv-migration-11.log";
- unsigned int p2p_port_start=7310, health_port_start=8310;
char node_list_str[1024]={0};
for(i=0; i<NODE_NUMBER; i++)
{
- snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%d ", p2p_port_start+i);
+ snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%d ",
+ PERF_RESILIENCE_ADD_SLOT_OWNER_CLUSTER_PORT_START+i);
}
const char *cluster_name="swarmkv-migration";
swarmkv_cli_create_cluster(cluster_name, node_list_str);
@@ -318,8 +409,8 @@ TEST(Resilience, AddSlotOwner)
for(i=0; i<NODE_NUMBER+CANDINATE_NUMBER; i++)
{
opts[i]=swarmkv_options_new();
- swarmkv_options_set_cluster_port(opts[i], p2p_port_start+i);
- swarmkv_options_set_health_check_port(opts[i], health_port_start+i);
+ swarmkv_options_set_cluster_port(opts[i], PERF_RESILIENCE_ADD_SLOT_OWNER_CLUSTER_PORT_START+i);
+ swarmkv_options_set_health_check_port(opts[i], PERF_RESILIENCE_ADD_SLOT_OWNER_HEALTH_PORT_START+i);
swarmkv_options_set_logger(opts[i], logger);
swarmkv_options_set_worker_thread_number(opts[i], WORKER_THREAD_NUMBER);
swarmkv_options_set_caller_thread_number(opts[i], CALLER_THREAD_NUMBER);
@@ -347,7 +438,7 @@ TEST(Resilience, AddSlotOwner)
int offset=0;
for(i=0; i<CANDINATE_NUMBER; i++)
{
- offset+=snprintf(candinate_string+offset, sizeof(candinate_string)-offset, "127.0.0.1:%u ", p2p_port_start+NODE_NUMBER+i);
+ offset+=snprintf(candinate_string+offset, sizeof(candinate_string)-offset, "127.0.0.1:%u ", PERF_RESILIENCE_ADD_SLOT_OWNER_CLUSTER_PORT_START+NODE_NUMBER+i);
}
swarmkv_cli_add_slot_owner(cluster_name, candinate_string);
sleep(10);
@@ -380,11 +471,10 @@ TEST(Resilience, Failover)
struct swarmkv *db[NODE_NUMBER];
char *err=NULL;
- unsigned int p2p_port_start=11310, health_port_start=12310;
char node_list_str[1024]={0};
for(size_t i=1; i<NODE_NUMBER; i++)
{
- snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%zu ", p2p_port_start+i);
+ snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%zu ", PERF_RESILIENCE_FAILOVER_CLUSTER_PORT_START+i);
}
const char *cluster_name="swarmkv-failover";
swarmkv_cli_create_cluster(cluster_name, node_list_str);
@@ -392,8 +482,8 @@ TEST(Resilience, Failover)
for(size_t i=0; i<NODE_NUMBER; i++)
{
opts[i]=swarmkv_options_new();
- swarmkv_options_set_cluster_port(opts[i], p2p_port_start+i);
- swarmkv_options_set_health_check_port(opts[i], health_port_start+i);
+ swarmkv_options_set_cluster_port(opts[i], PERF_RESILIENCE_FAILOVER_CLUSTER_PORT_START+i);
+ swarmkv_options_set_health_check_port(opts[i], PERF_RESILIENCE_FAILOVER_HEALTH_PORT_START+i);
swarmkv_options_set_worker_thread_number(opts[i], 1);
swarmkv_options_set_caller_thread_number(opts[i], 1);
db[i]=swarmkv_open(opts[i], cluster_name, &err);
@@ -436,7 +526,7 @@ TEST(Resilience, Failover)
swarmkv_reply_free(reply);
for(size_t j=1; j<NODE_NUMBER; j++)
{
- snprintf(target, sizeof(target), "127.0.0.1:%zu", p2p_port_start+j);
+ snprintf(target, sizeof(target), "127.0.0.1:%zu", PERF_RESILIENCE_FAILOVER_CLUSTER_PORT_START+j);
reply=swarmkv_command_on(db[1], target, "CRDT RLIST fo-%zu", i);
EXPECT_EQ(reply->type, SWARMKV_REPLY_ARRAY);
EXPECT_EQ(reply->n_element, NODE_NUMBER-2);