summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZheng Chao <[email protected]>2023-08-22 16:12:46 +0800
committer郑超 <[email protected]>2023-08-22 08:13:45 +0000
commit8e83747162a6d97e87bfbc60a723b071b166b030 (patch)
treebea0289092a4fdc61aba0f74a502930b8109e59f
parente32938f7469cf6b0797807873a6faff081382d35 (diff)
Add Async Exec test case.
-rw-r--r--src/swarmkv.c14
-rw-r--r--src/swarmkv_mesh.c12
-rw-r--r--src/swarmkv_net.c2
-rw-r--r--test/swarmkv_perf_test.cpp14
4 files changed, 32 insertions, 10 deletions
diff --git a/src/swarmkv.c b/src/swarmkv.c
index 7e77e8d..4407067 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -1302,12 +1302,24 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name,
{
}
db->ref_evbases=ALLOC(struct event_base*, opts->total_threads);
+ struct event_config *ev_cfg=event_config_new();
+ int ret=event_config_set_max_dispatch_interval(ev_cfg, NULL, 1000, 0);
+ assert(ret==0);
for(int i=0; i<opts->total_threads; i++)
{
- db->threads[i].evbase=event_base_new();
+ if(i>opts->nr_worker_threads)
+ {
+ //db->threads[i].evbase=event_base_new_with_config(ev_cfg);
+ db->threads[i].evbase=event_base_new();
+ }
+ else
+ {
+ db->threads[i].evbase=event_base_new();
+ }
db->threads[i].db=db;
db->ref_evbases[i]=db->threads[i].evbase;
}
+ event_config_free(ev_cfg);
db->rpc_mgr=swarmkv_rpc_mgr_new(db->opts, db->ref_evbases, opts->total_threads);
db->mesh=swarmkv_mesh_new(db->ref_evbases, opts->total_threads, db->logger);
swarmkv_mesh_set_on_msg_cb(db->mesh, __on_msg_callback, db);
diff --git a/src/swarmkv_mesh.c b/src/swarmkv_mesh.c
index 86a95c8..0b6cfc3 100644
--- a/src/swarmkv_mesh.c
+++ b/src/swarmkv_mesh.c
@@ -38,6 +38,7 @@ struct swarmkv_mesh
void *on_msg_cb_arg;
long long n_send, n_recv;
long long n_drop;
+ long long n_void_consume;
};
//The swarmkv_mesh_send takes the ownership of msg.
int swarmkv_mesh_send(struct swarmkv_mesh *mesh, int current_thread_id, int dest_thread_id, struct swarmkv_msg *msg)
@@ -83,8 +84,7 @@ void swarmkv_mesh_set_on_msg_cb(struct swarmkv_mesh *mesh, on_msg_callback_t cb_
mesh->on_msg_cb_arg=cb_arg;
return;
}
-
-static void eventfd_on_read_cb(evutil_socket_t fd, short what, void * arg)
+static void mesh_on_eventfd_read_cb(evutil_socket_t fd, short what, void * arg)
{
struct swarmkv_mesh_thread *thr=(struct swarmkv_mesh_thread*)arg;
struct swarmkv_mesh *mesh=thr->ref_mesh;
@@ -107,7 +107,11 @@ static void eventfd_on_read_cb(evutil_socket_t fd, short what, void * arg)
while(i<n_msg)
{
len=ringbuf_consume(ring, &offset);
- if(len==0) continue;
+ if(len==0)
+ {
+ mesh->n_void_consume++;
+ continue;
+ }
//The ringbuf adopts a two-phase write, two concurrent producers may generates gap in the buffer.
//So it is possible that consumer wakes up, but sees a zero-length message.
msg=*(struct swarmkv_msg**)(thr->buff+offset);
@@ -142,7 +146,7 @@ struct swarmkv_mesh *swarmkv_mesh_new(struct event_base *evbase[], int nthreads,
log_fatal(mesh->ref_logger, MODULE_SWAMRKV_MESH, "eventfd() failed: %s", strerror(errno));
assert(0);
}
- mesh->threads[i].ev=event_new(evbase[i], mesh->threads[i].efd, EV_READ|EV_PERSIST, eventfd_on_read_cb, mesh->threads+i);
+ mesh->threads[i].ev=event_new(evbase[i], mesh->threads[i].efd, EV_READ|EV_PERSIST, mesh_on_eventfd_read_cb, mesh->threads+i);
event_add(mesh->threads[i].ev, NULL);
mesh->threads[i].ref_mesh=mesh;
mesh->threads[i].evbase=evbase[i];
diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c
index 6d076b3..9111a21 100644
--- a/src/swarmkv_net.c
+++ b/src/swarmkv_net.c
@@ -142,7 +142,7 @@ struct snet_conn *snet_conn_new_by_fd(struct snet_thread *thr, evutil_socket_t f
evutil_make_socket_closeonexec(fd);
node_init_from_sockaddr(&conn->connected_from, addr);
- conn->bev=bufferevent_socket_new(thr->evbase, fd, BEV_OPT_DEFER_CALLBACKS|BEV_OPT_THREADSAFE);//BEV_OPT_UNLOCK_CALLBACKS|
+ conn->bev=bufferevent_socket_new(thr->evbase, fd, BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS);//BEV_OPT_UNLOCK_CALLBACKS|BEV_OPT_THREADSAFE
//use BEV_OPT_THREADSAFE option for safely access from threads that calling swarmkv_net_send.
//? use BEV_OPT_UNLOCK_CALLBACKS option because swarmkv_net_send write buffer to output_buffer, and peer_conn_read_cb read input_buffer.
conn->fd=fd;
diff --git a/test/swarmkv_perf_test.cpp b/test/swarmkv_perf_test.cpp
index 8113fe3..1b948b7 100644
--- a/test/swarmkv_perf_test.cpp
+++ b/test/swarmkv_perf_test.cpp
@@ -263,6 +263,8 @@ struct async_exec_ctx
int expected_reply_cnt;
int reply_cnt;
struct swarmkv *db;
+ int cmd_inprogress;
+ int on_fly_reply_cnt;
};
void async_on_reply_cb(const struct swarmkv_reply *reply, void * arg)
{
@@ -272,6 +274,7 @@ void async_on_reply_cb(const struct swarmkv_reply *reply, void * arg)
{
swarmkv_reply_print(reply, stdout);
}
+ if(ctx->cmd_inprogress) ctx->on_fly_reply_cnt++;
ctx->reply_cnt++;
if(ctx->reply_cnt==ctx->expected_reply_cnt)
{
@@ -280,7 +283,7 @@ void async_on_reply_cb(const struct swarmkv_reply *reply, void * arg)
}
TEST(Performance, AsyncExec)
{
- int NODE_NUMBER=2;
+ int NODE_NUMBER=3;
int CALLER_THREAD_NUMBER=1;
int WORKER_THREAD_NUMBER=3;
int i=0;
@@ -302,7 +305,7 @@ TEST(Performance, AsyncExec)
opts[i]=swarmkv_options_new();
swarmkv_options_set_cluster_port(opts[i], PERF_ASYNC_EXEC_CLUSTER_PORT_START+i);
swarmkv_options_set_health_check_port(opts[i], PERF_ASYNC_EXEC_HEALTH_PORT_START+i);
- swarmkv_options_set_cluster_timeout_us(opts[i], 20000*1000);
+ swarmkv_options_set_cluster_timeout_us(opts[i], 5000*1000);
swarmkv_options_set_logger(opts[i], logger);
swarmkv_options_set_worker_thread_number(opts[i], WORKER_THREAD_NUMBER);
swarmkv_options_set_caller_thread_number(opts[i], CALLER_THREAD_NUMBER);
@@ -317,21 +320,24 @@ TEST(Performance, AsyncExec)
}
int key_number=128*1024;
struct async_exec_ctx ctx;
+ memset(&ctx, 0, sizeof(ctx));
ctx.expected_reply_cnt=key_number;
ctx.reply_cnt=0;
ctx.db=db[0];
+ ctx.cmd_inprogress=1;
struct timespec start, end;
clock_gettime(CLOCK_REALTIME, &start);
for(i=0; i<key_number; i++)
{
swarmkv_async_command(db[0], async_on_reply_cb, &ctx, "SET async-key-%d by-node-%d", i, 0);
- swarmkv_caller_loop(db[0], SWARMKV_LOOP_NONBLOCK|SWARMKV_LOOP_NO_EXIT_ON_EMPTY, NULL);
+ swarmkv_caller_loop(db[0], SWARMKV_LOOP_NONBLOCK, NULL);//SWARMKV_LOOP_ONCE|SWARMKV_LOOP_NONBLOCK
}
+ ctx.cmd_inprogress=0;
swarmkv_caller_loop(db[0], SWARMKV_LOOP_NO_EXIT_ON_EMPTY, NULL);
clock_gettime(CLOCK_REALTIME, &end);
double elapsed_ms=(end.tv_sec-start.tv_sec)*1000.0+(end.tv_nsec-start.tv_nsec)/1000000.0;
- printf("Async SET %d keys, elapsed %lf ms, %lf keys/s\n", key_number, elapsed_ms, key_number*1000.0/elapsed_ms);
+ printf("Async SET %d keys, elapsed %lf ms, %lf cmds/s\n", key_number, elapsed_ms, key_number*1000.0/elapsed_ms);
for(i=0; i<NODE_NUMBER; i++)