summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorZheng Chao <[email protected]>2023-08-22 16:12:46 +0800
committer郑超 <[email protected]>2023-08-22 08:13:45 +0000
commit8e83747162a6d97e87bfbc60a723b071b166b030 (patch)
treebea0289092a4fdc61aba0f74a502930b8109e59f /src
parente32938f7469cf6b0797807873a6faff081382d35 (diff)
Add Async Exec test case.
Diffstat (limited to 'src')
-rw-r--r--src/swarmkv.c14
-rw-r--r--src/swarmkv_mesh.c12
-rw-r--r--src/swarmkv_net.c2
3 files changed, 22 insertions, 6 deletions
diff --git a/src/swarmkv.c b/src/swarmkv.c
index 7e77e8d..4407067 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -1302,12 +1302,24 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name,
{
}
db->ref_evbases=ALLOC(struct event_base*, opts->total_threads);
+ struct event_config *ev_cfg=event_config_new();
+ int ret=event_config_set_max_dispatch_interval(ev_cfg, NULL, 1000, 0);
+ assert(ret==0);
for(int i=0; i<opts->total_threads; i++)
{
- db->threads[i].evbase=event_base_new();
+ if(i>opts->nr_worker_threads)
+ {
+ //db->threads[i].evbase=event_base_new_with_config(ev_cfg);
+ db->threads[i].evbase=event_base_new();
+ }
+ else
+ {
+ db->threads[i].evbase=event_base_new();
+ }
db->threads[i].db=db;
db->ref_evbases[i]=db->threads[i].evbase;
}
+ event_config_free(ev_cfg);
db->rpc_mgr=swarmkv_rpc_mgr_new(db->opts, db->ref_evbases, opts->total_threads);
db->mesh=swarmkv_mesh_new(db->ref_evbases, opts->total_threads, db->logger);
swarmkv_mesh_set_on_msg_cb(db->mesh, __on_msg_callback, db);
diff --git a/src/swarmkv_mesh.c b/src/swarmkv_mesh.c
index 86a95c8..0b6cfc3 100644
--- a/src/swarmkv_mesh.c
+++ b/src/swarmkv_mesh.c
@@ -38,6 +38,7 @@ struct swarmkv_mesh
void *on_msg_cb_arg;
long long n_send, n_recv;
long long n_drop;
+ long long n_void_consume;
};
//The swarmkv_mesh_send takes the ownership of msg.
int swarmkv_mesh_send(struct swarmkv_mesh *mesh, int current_thread_id, int dest_thread_id, struct swarmkv_msg *msg)
@@ -83,8 +84,7 @@ void swarmkv_mesh_set_on_msg_cb(struct swarmkv_mesh *mesh, on_msg_callback_t cb_
mesh->on_msg_cb_arg=cb_arg;
return;
}
-
-static void eventfd_on_read_cb(evutil_socket_t fd, short what, void * arg)
+static void mesh_on_eventfd_read_cb(evutil_socket_t fd, short what, void * arg)
{
struct swarmkv_mesh_thread *thr=(struct swarmkv_mesh_thread*)arg;
struct swarmkv_mesh *mesh=thr->ref_mesh;
@@ -107,7 +107,11 @@ static void eventfd_on_read_cb(evutil_socket_t fd, short what, void * arg)
while(i<n_msg)
{
len=ringbuf_consume(ring, &offset);
- if(len==0) continue;
+ if(len==0)
+ {
+ mesh->n_void_consume++;
+ continue;
+ }
//The ringbuf adopts a two-phase write, two concurrent producers may generates gap in the buffer.
//So it is possible that consumer wakes up, but sees a zero-length message.
msg=*(struct swarmkv_msg**)(thr->buff+offset);
@@ -142,7 +146,7 @@ struct swarmkv_mesh *swarmkv_mesh_new(struct event_base *evbase[], int nthreads,
log_fatal(mesh->ref_logger, MODULE_SWAMRKV_MESH, "eventfd() failed: %s", strerror(errno));
assert(0);
}
- mesh->threads[i].ev=event_new(evbase[i], mesh->threads[i].efd, EV_READ|EV_PERSIST, eventfd_on_read_cb, mesh->threads+i);
+ mesh->threads[i].ev=event_new(evbase[i], mesh->threads[i].efd, EV_READ|EV_PERSIST, mesh_on_eventfd_read_cb, mesh->threads+i);
event_add(mesh->threads[i].ev, NULL);
mesh->threads[i].ref_mesh=mesh;
mesh->threads[i].evbase=evbase[i];
diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c
index 6d076b3..9111a21 100644
--- a/src/swarmkv_net.c
+++ b/src/swarmkv_net.c
@@ -142,7 +142,7 @@ struct snet_conn *snet_conn_new_by_fd(struct snet_thread *thr, evutil_socket_t f
evutil_make_socket_closeonexec(fd);
node_init_from_sockaddr(&conn->connected_from, addr);
- conn->bev=bufferevent_socket_new(thr->evbase, fd, BEV_OPT_DEFER_CALLBACKS|BEV_OPT_THREADSAFE);//BEV_OPT_UNLOCK_CALLBACKS|
+ conn->bev=bufferevent_socket_new(thr->evbase, fd, BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS);//BEV_OPT_UNLOCK_CALLBACKS|BEV_OPT_THREADSAFE
//use BEV_OPT_THREADSAFE option for safely access from threads that calling swarmkv_net_send.
//? use BEV_OPT_UNLOCK_CALLBACKS option because swarmkv_net_send write buffer to output_buffer, and peer_conn_read_cb read input_buffer.
conn->fd=fd;