diff options
| author | Zheng Chao <[email protected]> | 2023-08-22 16:12:46 +0800 |
|---|---|---|
| committer | 郑超 <[email protected]> | 2023-08-22 08:13:45 +0000 |
| commit | 8e83747162a6d97e87bfbc60a723b071b166b030 (patch) | |
| tree | bea0289092a4fdc61aba0f74a502930b8109e59f /src | |
| parent | e32938f7469cf6b0797807873a6faff081382d35 (diff) | |
Add Async Exec test case.
Diffstat (limited to 'src')
| -rw-r--r-- | src/swarmkv.c | 14 | ||||
| -rw-r--r-- | src/swarmkv_mesh.c | 12 | ||||
| -rw-r--r-- | src/swarmkv_net.c | 2 |
3 files changed, 22 insertions, 6 deletions
diff --git a/src/swarmkv.c b/src/swarmkv.c index 7e77e8d..4407067 100644 --- a/src/swarmkv.c +++ b/src/swarmkv.c @@ -1302,12 +1302,24 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name, { } db->ref_evbases=ALLOC(struct event_base*, opts->total_threads); + struct event_config *ev_cfg=event_config_new(); + int ret=event_config_set_max_dispatch_interval(ev_cfg, NULL, 1000, 0); + assert(ret==0); for(int i=0; i<opts->total_threads; i++) { - db->threads[i].evbase=event_base_new(); + if(i>opts->nr_worker_threads) + { + //db->threads[i].evbase=event_base_new_with_config(ev_cfg); + db->threads[i].evbase=event_base_new(); + } + else + { + db->threads[i].evbase=event_base_new(); + } db->threads[i].db=db; db->ref_evbases[i]=db->threads[i].evbase; } + event_config_free(ev_cfg); db->rpc_mgr=swarmkv_rpc_mgr_new(db->opts, db->ref_evbases, opts->total_threads); db->mesh=swarmkv_mesh_new(db->ref_evbases, opts->total_threads, db->logger); swarmkv_mesh_set_on_msg_cb(db->mesh, __on_msg_callback, db); diff --git a/src/swarmkv_mesh.c b/src/swarmkv_mesh.c index 86a95c8..0b6cfc3 100644 --- a/src/swarmkv_mesh.c +++ b/src/swarmkv_mesh.c @@ -38,6 +38,7 @@ struct swarmkv_mesh void *on_msg_cb_arg; long long n_send, n_recv; long long n_drop; + long long n_void_consume; }; //The swarmkv_mesh_send takes the ownership of msg. int swarmkv_mesh_send(struct swarmkv_mesh *mesh, int current_thread_id, int dest_thread_id, struct swarmkv_msg *msg) @@ -83,8 +84,7 @@ void swarmkv_mesh_set_on_msg_cb(struct swarmkv_mesh *mesh, on_msg_callback_t cb_ mesh->on_msg_cb_arg=cb_arg; return; } - -static void eventfd_on_read_cb(evutil_socket_t fd, short what, void * arg) +static void mesh_on_eventfd_read_cb(evutil_socket_t fd, short what, void * arg) { struct swarmkv_mesh_thread *thr=(struct swarmkv_mesh_thread*)arg; struct swarmkv_mesh *mesh=thr->ref_mesh; @@ -107,7 +107,11 @@ static void eventfd_on_read_cb(evutil_socket_t fd, short what, void * arg) while(i<n_msg) { len=ringbuf_consume(ring, &offset); - if(len==0) continue; + if(len==0) + { + mesh->n_void_consume++; + continue; + } //The ringbuf adopts a two-phase write, two concurrent producers may generates gap in the buffer. //So it is possible that consumer wakes up, but sees a zero-length message. msg=*(struct swarmkv_msg**)(thr->buff+offset); @@ -142,7 +146,7 @@ struct swarmkv_mesh *swarmkv_mesh_new(struct event_base *evbase[], int nthreads, log_fatal(mesh->ref_logger, MODULE_SWAMRKV_MESH, "eventfd() failed: %s", strerror(errno)); assert(0); } - mesh->threads[i].ev=event_new(evbase[i], mesh->threads[i].efd, EV_READ|EV_PERSIST, eventfd_on_read_cb, mesh->threads+i); + mesh->threads[i].ev=event_new(evbase[i], mesh->threads[i].efd, EV_READ|EV_PERSIST, mesh_on_eventfd_read_cb, mesh->threads+i); event_add(mesh->threads[i].ev, NULL); mesh->threads[i].ref_mesh=mesh; mesh->threads[i].evbase=evbase[i]; diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c index 6d076b3..9111a21 100644 --- a/src/swarmkv_net.c +++ b/src/swarmkv_net.c @@ -142,7 +142,7 @@ struct snet_conn *snet_conn_new_by_fd(struct snet_thread *thr, evutil_socket_t f evutil_make_socket_closeonexec(fd); node_init_from_sockaddr(&conn->connected_from, addr); - conn->bev=bufferevent_socket_new(thr->evbase, fd, BEV_OPT_DEFER_CALLBACKS|BEV_OPT_THREADSAFE);//BEV_OPT_UNLOCK_CALLBACKS| + conn->bev=bufferevent_socket_new(thr->evbase, fd, BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS);//BEV_OPT_UNLOCK_CALLBACKS|BEV_OPT_THREADSAFE //use BEV_OPT_THREADSAFE option for safely access from threads that calling swarmkv_net_send. //? use BEV_OPT_UNLOCK_CALLBACKS option because swarmkv_net_send write buffer to output_buffer, and peer_conn_read_cb read input_buffer. conn->fd=fd; |
