diff options
| author | Zheng Chao <[email protected]> | 2022-12-09 15:57:56 +0800 |
|---|---|---|
| committer | Zheng Chao <[email protected]> | 2022-12-09 15:57:56 +0800 |
| commit | 0e7ba516559ae20ba8691291b14380156a4ff203 (patch) | |
| tree | 5c28b7545bbd429c68052fd89fcccb8e57955a3b | |
| parent | ac33167d47b2b24ce841104946a11200233bd537 (diff) | |
:zap: For multi-thread scalability, each worker thread maintains connections to all active peers.
| -rw-r--r-- | docs/design.md | 7 | ||||
| -rw-r--r-- | src/swarmkv.c | 10 | ||||
| -rw-r--r-- | src/swarmkv_net.c | 390 | ||||
| -rw-r--r-- | test/swarmkv_gtest.cpp | 14 | ||||
| -rw-r--r-- | test/swarmkv_perf_test.cpp | 2 |
5 files changed, 228 insertions, 195 deletions
diff --git a/docs/design.md b/docs/design.md index c0b1d1a..d4ce2f4 100644 --- a/docs/design.md +++ b/docs/design.md @@ -239,7 +239,7 @@ Reply message A swarmkv instance has one key space thread and several (configurable) worker threads. Swarmkv uses [libevent](https://github.com/libevent/libevent) as a peer-to-peer communication infrastructure. -The keyspace thread is communicates with Consul. +The keyspace thread communicates with Consul. - Watch global slot table changes. - Watch leadership changes, run for leader if allowed. @@ -253,11 +253,12 @@ The worker thread 0 is responsible for: All worker threads are responsible for: - Send KV command message, which is from swarmkv API caller to destination peers. -- Receive KV command message, and then process it based on message type: +- Receive KV command message, and then process it based on command type: - Key routing request (from the requester, object reader/writer): lookup the value owner, and then send key routing response. - Object operation request (from the requester): lookup the value, perform the operation and then send value operation response. - Key routing response (from the key owner): send the request to the decoded value owner. - Object operation response (from object owner): update local cache, and invoke the user-specified callback. +- For multi-thread scalability, each worker thread maintains connections to all active peers. It's possible that the connection number between two nodes is more than one. ### Lock design @@ -299,7 +300,7 @@ The source files are organized as follows: [Tarantool](https://www.tarantool.io/en/) is an in-memory computing platform consisting of a database and an application server. -[eXtremeDB](https://www.mcobject.com/) is a hybrid persistent and in-memory database management system for edge and cloud. +[eXtremeDB](https://www.mcobject.com/) is a hybrid persistent and in-memory database management system for edge and cloud. It's a commercial software and close source. [vedis](https://vedis.symisc.net/) is an embeddable datastore C library built with over 70 commands similar in concept to Redis but without the networking layer since Vedis run in the same process of the host application. diff --git a/src/swarmkv.c b/src/swarmkv.c index c9b4611..6e22411 100644 --- a/src/swarmkv.c +++ b/src/swarmkv.c @@ -122,10 +122,10 @@ struct remote_caller_ctx struct future *my_future; struct swarmkv_net *ref_net; }; -static void remote_caller_on_success(void* result, void* user) +static void remote_caller_on_success(void *result, void *user) { struct remote_caller_ctx *ctx=(struct remote_caller_ctx*)user; - struct swarmkv_reply* reply=(struct swarmkv_reply*) result; + struct swarmkv_reply *reply=(struct swarmkv_reply*) result; char *reply_blob=NULL; size_t reply_blob_sz=0; swarmkv_msg_serialize_reply(reply, &reply_blob, &reply_blob_sz); @@ -227,7 +227,7 @@ enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct sw "synced: %lld\r\n" "sync_ok: %lld\r\n" "sync_err: %lld\r\n" - "sync_interval_in_usec:%u\r\n" + "sync_interval_in_msec: %.3f\r\n" , sto_info.shards, sto_info.keys, @@ -235,7 +235,7 @@ enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct sw sto_info.synced, sto_info.sync_ok, sto_info.sync_err, - db->opts->sync_interval_us + (double)db->opts->sync_interval_us/1000 ); struct keyspace_info ks_info; swarmkv_keyspace_info(db->mod_keyspace, &ks_info); @@ -252,7 +252,7 @@ enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct sw struct snet_info net_info; swarmkv_net_info(db->net, &net_info); snprintf(net_info_buff, sizeof(net_info_buff), "# Network\r\n" - "timeout_in_msec: %.2f\r\n" + "timeout_in_msec: %.3f\r\n" "connections: %lld\r\n" "pending_rpcs: %lld\r\n" "timed_out_rpcs: %lld\r\n" diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c index f7b4e68..77beb88 100644 --- a/src/swarmkv_net.c +++ b/src/swarmkv_net.c @@ -72,11 +72,17 @@ struct timeout_ctx int is_persist; struct event *ev; }; - +struct snet_reply_ctx +{ + unsigned long long sequence; + node_t peer; + struct snet_conn *conn; + UT_hash_handle hh; +}; struct snet_rpc { - unsigned long long sequence; + unsigned long long sequence; node_t peer; struct event *timeout_ev; on_reply_callback_t *cb; @@ -85,6 +91,7 @@ struct snet_rpc struct timespec start; UT_hash_handle hh; }; + struct snet_stat { long long input_msgs, output_msgs; @@ -102,9 +109,10 @@ struct snet_thread struct event_base *evbase; struct snet_conn *conn_table; struct snet_conn *duplicated_conn_table; - + struct snet_conn *designated_conn;//for sending reply long long peer_timeout_us; struct snet_rpc *rpc_table; + struct snet_reply_ctx *reply_ctx_table; void *ref_logger; struct timeout_ctx *to_ctx; @@ -135,12 +143,28 @@ struct swarmkv_net double instantaneous_input_kbps, instantaneous_output_kbps; double instantaneous_input_cps, instantaneous_output_cps; }; -static int get_net_thread_id(const node_t *node, size_t n_thread) + +enum TASK_TYPE { - size_t ip_len=strchr(node->addr, ':')-node->addr; - int thread_id=(int)(XXH32(node, ip_len, 171))%n_thread; - return thread_id; -} + TASK_TYPE_SEND, + TASK_TYPE_RECEIVE +}; + +struct snet_task +{ + enum TASK_TYPE task_type; + enum MSG_TYPE msg_type; + unsigned long long sequence; + node_t peer; + struct event *deferred_ev; + struct snet_thread *thr; + struct snet_conn *conn; + void *data; + size_t size; + + on_reply_callback_t *cb; + void *cb_arg; +}; void snet_rpc_free(struct snet_rpc *rpc) { @@ -184,27 +208,87 @@ struct snet_rpc * snet_rpc_new(struct snet_thread *thr, node_t *peer, unsigned l HASH_ADD(hh, thr->rpc_table, sequence, sizeof(rpc->sequence), rpc); return rpc; } +static void peer_conn_read_cb(struct bufferevent *bev, void *arg); +void connect_peer_eventcb(struct bufferevent *bev, short events, void *arg); +static void peer_conn_write_cb(struct bufferevent *bev, void *arg); +static void peer_conn_read_cb(struct bufferevent *bev, void *arg); +static void peer_conn_event_cb(struct bufferevent *bev, short events, void *arg); +void snet_conn_send(struct snet_conn *conn, const void *hdr, size_t hdr_sz, const void *payload, size_t payload_sz); -enum TASK_TYPE +struct snet_conn *snet_conn_new_by_connecting(struct snet_thread *thr, const node_t *dest) { - TASK_TYPE_SEND, - TASK_TYPE_RECEIVE -}; + struct event_base *base=thr->evbase; + + struct snet_conn* conn=ALLOC(struct snet_conn, 1); + conn->buff_for_sending=evbuffer_new(); -struct snet_task + //http://www.wangafu.net/~nickm/libevent-book/Ref6_bufferevent.html + conn->bev=bufferevent_socket_new(base, -1, BEV_OPT_DEFER_CALLBACKS|BEV_OPT_THREADSAFE);//BEV_OPT_UNLOCK_CALLBACKS| + bufferevent_setcb(conn->bev, NULL, NULL, connect_peer_eventcb, conn); + node_copy(&conn->peer_listen_addr, dest); + conn->thread_id=thr->thread_id; + conn->ref_thr=thr; + node_copy(&conn->connected_from, dest); + struct sockaddr sa; + node_to_sockaddr(dest, &sa); + if (bufferevent_socket_connect(conn->bev, &sa, sizeof(sa)) < 0) + { + bufferevent_free(conn->bev); + goto error_out; + } + return conn; + +error_out: + evbuffer_free(conn->buff_for_sending); + conn->buff_for_sending=NULL; + free(conn); + return NULL; + +} +struct snet_conn *snet_conn_new_by_fd(struct snet_thread *thr, evutil_socket_t fd, struct sockaddr * addr) { - node_t peer; - enum TASK_TYPE task_type; - enum MSG_TYPE msg_type; - unsigned long long sequence; - struct event *deferred_ev; - struct snet_thread *thr; - void *data; - size_t size; + struct snet_conn* conn=NULL; + conn=ALLOC(struct snet_conn, 1); + conn->buff_for_sending=evbuffer_new(); + int yes=1; + setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char*)&yes, sizeof(yes)); +// evbuffer_enable_locking(conn->buff_for_sending, NULL); + evutil_make_socket_nonblocking(fd); + node_init_from_sockaddr(&conn->connected_from, addr); - on_reply_callback_t *cb; - void *cb_arg; -}; + conn->bev=bufferevent_socket_new(thr->evbase, fd, BEV_OPT_DEFER_CALLBACKS|BEV_OPT_THREADSAFE);//BEV_OPT_UNLOCK_CALLBACKS| + //use BEV_OPT_THREADSAFE option for safely access from threads that calling swarmkv_net_send. + //? use BEV_OPT_UNLOCK_CALLBACKS option because swarmkv_net_send write buffer to output_buffer, and peer_conn_read_cb read input_buffer. + conn->fd=fd; + conn->thread_id=thr->thread_id; + conn->ref_thr=thr; + bufferevent_setcb(conn->bev, peer_conn_read_cb, peer_conn_write_cb, peer_conn_event_cb, conn); + bufferevent_enable(conn->bev, EV_READ | EV_WRITE); + return conn; +} + +void snet_task_deferred_run(evutil_socket_t fd, short events, void *arg); + + +struct snet_task *snet_task_new(struct snet_thread *thr, struct snet_conn *conn, const node_t* peer, enum TASK_TYPE task_type, enum MSG_TYPE msg_type, unsigned long long sequence, const void *data, size_t size, on_reply_callback_t *cb, void *cb_arg) +{ + struct snet_task *task=ALLOC(struct snet_task, 1); + node_copy(&task->peer, peer); + task->deferred_ev=event_new(thr->evbase, -1, 0, snet_task_deferred_run, task); + task->msg_type=msg_type; + task->task_type=task_type; + task->cb=cb; + task->cb_arg=cb_arg; + task->data=malloc(size); + memcpy(task->data, data, size); + task->size=size; + task->sequence=sequence; + task->thr=thr; + task->conn=conn; + event_add(task->deferred_ev, NULL); + event_active(task->deferred_ev, 0, 0); + return task; +} void snet_task_free(struct snet_task *task) { free(task->data); @@ -215,8 +299,6 @@ void snet_task_free(struct snet_task *task) return; } -void snet_send_in_evloop(struct snet_thread *thr, const node_t *peer, unsigned long long sequence, enum MSG_TYPE msg_type, void *data, size_t size); - void snet_task_deferred_run(evutil_socket_t fd, short events, void *arg) { struct snet_task *task=(struct snet_task *)arg; @@ -224,27 +306,62 @@ void snet_task_deferred_run(evutil_socket_t fd, short events, void *arg) if(task->task_type==TASK_TYPE_SEND) { + struct snet_conn *conn=NULL; if(task->msg_type==MSG_TYPE_CMD)//expecting reply { struct snet_rpc *rpc=snet_rpc_new(thr, &task->peer, task->sequence, task->cb, task->cb_arg); - snet_send_in_evloop(task->thr, &task->peer, rpc->sequence, task->msg_type, task->data, task->size); - +// snet_send_in_evloop(task->thr, &task->peer, rpc->sequence, task->msg_type, task->data, task->size); + task->sequence=rpc->sequence; + HASH_FIND(hh, thr->conn_table, &task->peer, sizeof(node_t), conn); + if(!conn) + { + conn=snet_conn_new_by_connecting(thr, &task->peer); + if(conn) + { + HASH_ADD(hh, thr->conn_table, peer_listen_addr, sizeof(node_t), conn); + conn->is_in_conn_table=1; + } + else + { + int err = EVUTIL_SOCKET_ERROR(); + log_fatal(task->thr->ref_logger, MODULE_SWAMRKV_NET, "connect to %s failed, reason %d (%s).", + task->peer.addr, + err, evutil_socket_error_to_string(err)); + return; + } + } thr->stat.output_cmds++; } else { - snet_send_in_evloop(task->thr, &task->peer, task->sequence, task->msg_type, task->data, task->size); + struct snet_reply_ctx *reply_ctx=NULL; + struct snet_reply_ctx key; + key.sequence=task->sequence; + node_copy(&key.peer, &task->peer); + HASH_FIND(hh, thr->reply_ctx_table, &key.sequence, sizeof(key.sequence)+sizeof(key.peer), reply_ctx); + assert(reply_ctx); + conn=reply_ctx->conn; + HASH_DEL(thr->reply_ctx_table, reply_ctx); + free(reply_ctx); thr->stat.output_replies++; } - + struct swarmkv_msg_hdr msghdr; + memset(&msghdr, 0, sizeof(msghdr)); + msghdr.magic=MSG_HDR_MAGIC; + msghdr.msg_type=task->msg_type; + msghdr.sequence=task->sequence; + msghdr.payload_len=task->size; + node_copy(&msghdr.source, &thr->ref_net->self); + snet_conn_send(conn, &msghdr, sizeof(msghdr), task->data, task->size); thr->stat.output_msgs++; } else//receive { - struct snet_rpc *rpc=NULL; + assert(task->sequence); if(task->msg_type==MSG_TYPE_REPLY) { + struct snet_rpc *rpc=NULL; rpc=snet_rpc_lookup(thr, task->sequence); if(rpc) { @@ -262,6 +379,11 @@ void snet_task_deferred_run(evutil_socket_t fd, short events, void *arg) } else { + struct snet_reply_ctx *reply_ctx=ALLOC(struct snet_reply_ctx, 1); + reply_ctx->sequence=task->sequence; + reply_ctx->conn=task->conn; + node_copy(&reply_ctx->peer, &task->peer); + HASH_ADD_KEYPTR(hh, thr->reply_ctx_table, &(reply_ctx->sequence), sizeof(reply_ctx->sequence)+sizeof(reply_ctx->peer), reply_ctx); thr->ref_net->on_cmd_cb(&task->peer, task->sequence, task->data, task->size, thr->ref_net->on_cmd_cb_arg); thr->stat.input_cmds++; } @@ -272,57 +394,6 @@ void snet_task_deferred_run(evutil_socket_t fd, short events, void *arg) return; } -struct snet_task *snet_task_new(struct snet_thread *thr, const node_t* peer, enum TASK_TYPE task_type, enum MSG_TYPE msg_type, unsigned long long sequence, const void *data, size_t size, on_reply_callback_t *cb, void *cb_arg) -{ - struct snet_task *task=ALLOC(struct snet_task, 1); - node_copy(&task->peer, peer); - task->deferred_ev=event_new(thr->evbase, -1, 0, snet_task_deferred_run, task); - task->msg_type=msg_type; - task->task_type=task_type; - task->cb=cb; - task->cb_arg=cb_arg; - task->data=malloc(size); - memcpy(task->data, data, size); - task->size=size; - task->sequence=sequence; - task->thr=thr; - event_add(task->deferred_ev, NULL); - event_active(task->deferred_ev, 0, 0); - return task; -} - - -static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg) -{ - struct swarmkv_net *net= (struct swarmkv_net *)arg; - struct timespec now; - long long input_bytes=0, output_bytes=0; - long long input_cmds=0, output_cmds=0; - if(0 != pthread_mutex_trylock(&net->stat_lock)) - { - return; - } - for(size_t i=0; i<net->n_thread; i++) - { - input_bytes += net->threads[i].stat.input_bytes; - output_bytes += net->threads[i].stat.output_bytes; - input_cmds += net->threads[i].stat.input_cmds; - output_cmds += net->threads[i].stat.output_cmds; - } - clock_gettime(CLOCK_MONOTONIC, &now); - net->instantaneous_input_kbps=(input_bytes - net->last_input_bytes)*8/1000/MAX(now.tv_sec - net->last_stats.tv_sec, 1); - net->instantaneous_output_kbps=(output_bytes - net->last_output_bytes)*8/1000/MAX(now.tv_sec - net->last_stats.tv_sec, 1); - net->instantaneous_input_cps=(input_cmds - net->last_input_cmds)/MAX(now.tv_sec - net->last_stats.tv_sec, 1); - net->instantaneous_output_cps=(output_cmds - net->last_output_cmds)/MAX(now.tv_sec - net->last_stats.tv_sec, 1); - - clock_gettime(CLOCK_MONOTONIC, &net->last_stats); - net->last_input_bytes=input_bytes; - net->last_output_bytes=output_bytes; - net->last_input_cmds=input_cmds; - net->last_output_cmds=output_cmds; - pthread_mutex_unlock(&net->stat_lock); - return; -} static void snet_conn_table_remove(struct snet_conn* conn) { struct snet_thread *thr=conn->ref_thr; @@ -499,7 +570,7 @@ static void peer_conn_read_cb(struct bufferevent *bev, void *arg) conn->recv_state=RECEIVING_PAYLOAD; break; } - snet_task_new(thr, &(conn->peer_listen_addr), TASK_TYPE_RECEIVE, conn->receiving_hdr.msg_type, conn->receiving_hdr.sequence, recv_buff, conn->receiving_hdr.payload_len, NULL, NULL); + snet_task_new(thr, conn, &(conn->peer_listen_addr), TASK_TYPE_RECEIVE, conn->receiving_hdr.msg_type, conn->receiving_hdr.sequence, recv_buff, conn->receiving_hdr.payload_len, NULL, NULL); evbuffer_drain(input, conn->receiving_hdr.payload_len); thr->stat.input_bytes+=conn->receiving_hdr.payload_len; conn->recv_state=RECEIVING_HDR; @@ -508,7 +579,6 @@ static void peer_conn_read_cb(struct bufferevent *bev, void *arg) } return; } - static void peer_conn_event_cb(struct bufferevent *bev, short events, void *arg) { struct snet_conn *conn=(struct snet_conn*)arg; @@ -527,8 +597,6 @@ static void peer_conn_event_cb(struct bufferevent *bev, short events, void *arg) snet_conn_free(conn); return; } - - void connect_peer_eventcb(struct bufferevent *bev, short events, void *arg) { struct snet_conn* conn= (struct snet_conn*) arg; @@ -557,58 +625,6 @@ void connect_peer_eventcb(struct bufferevent *bev, short events, void *arg) } } -struct snet_conn *snet_conn_new_by_connecting(struct snet_thread *thr, const node_t *dest) -{ - struct event_base *base=thr->evbase; - - struct snet_conn* conn=ALLOC(struct snet_conn, 1); - conn->buff_for_sending=evbuffer_new(); - - //http://www.wangafu.net/~nickm/libevent-book/Ref6_bufferevent.html - conn->bev=bufferevent_socket_new(base, -1, BEV_OPT_DEFER_CALLBACKS|BEV_OPT_THREADSAFE);//BEV_OPT_UNLOCK_CALLBACKS| - bufferevent_setcb(conn->bev, NULL, NULL, connect_peer_eventcb, conn); - node_copy(&conn->peer_listen_addr, dest); - conn->thread_id=thr->thread_id; - conn->ref_thr=thr; - node_copy(&conn->connected_from, dest); - struct sockaddr sa; - node_to_sockaddr(dest, &sa); - if (bufferevent_socket_connect(conn->bev, &sa, sizeof(sa)) < 0) - { - bufferevent_free(conn->bev); - goto error_out; - } - return conn; - -error_out: - evbuffer_free(conn->buff_for_sending); - conn->buff_for_sending=NULL; - free(conn); - return NULL; - -} -struct snet_conn *snet_conn_new_by_fd(struct snet_thread *thr, evutil_socket_t fd, struct sockaddr * addr) -{ - struct snet_conn* conn=NULL; - conn=ALLOC(struct snet_conn, 1); - conn->buff_for_sending=evbuffer_new(); - int yes=1; - setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char*)&yes, sizeof(yes)); -// evbuffer_enable_locking(conn->buff_for_sending, NULL); - evutil_make_socket_nonblocking(fd); - node_init_from_sockaddr(&conn->connected_from, addr); - - conn->bev=bufferevent_socket_new(thr->evbase, fd, BEV_OPT_DEFER_CALLBACKS|BEV_OPT_THREADSAFE);//BEV_OPT_UNLOCK_CALLBACKS| - //use BEV_OPT_THREADSAFE option for safely access from threads that calling swarmkv_net_send. - //? use BEV_OPT_UNLOCK_CALLBACKS option because swarmkv_net_send write buffer to output_buffer, and peer_conn_read_cb read input_buffer. - conn->fd=fd; - conn->thread_id=thr->thread_id; - conn->ref_thr=thr; - bufferevent_setcb(conn->bev, peer_conn_read_cb, peer_conn_write_cb, peer_conn_event_cb, conn); - bufferevent_enable(conn->bev, EV_READ | EV_WRITE); - return conn; -} - void snet_conn_send(struct snet_conn *conn, const void *hdr, size_t hdr_sz, const void *payload, size_t payload_sz) { struct evbuffer* output_buffer=NULL; @@ -627,13 +643,59 @@ void snet_conn_send(struct snet_conn *conn, const void *hdr, size_t hdr_sz, cons conn->ref_thr->stat.output_bytes += (hdr_sz+payload_sz); return; } +static int get_net_thread_id(const node_t *node, size_t n_thread) +{ + int thread_id=(int)(XXH32(node, sizeof(node_t), 171))%n_thread; + return thread_id; +} + +static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg) +{ + struct swarmkv_net *net= (struct swarmkv_net *)arg; + struct timespec now; + long long input_bytes=0, output_bytes=0; + long long input_cmds=0, output_cmds=0; + if(0 != pthread_mutex_trylock(&net->stat_lock)) + { + return; + } + for(size_t i=0; i<net->n_thread; i++) + { + input_bytes += net->threads[i].stat.input_bytes; + output_bytes += net->threads[i].stat.output_bytes; + input_cmds += net->threads[i].stat.input_cmds; + output_cmds += net->threads[i].stat.output_cmds; + } + clock_gettime(CLOCK_MONOTONIC, &now); + net->instantaneous_input_kbps=(input_bytes - net->last_input_bytes)*8/1000/MAX(now.tv_sec - net->last_stats.tv_sec, 1); + net->instantaneous_output_kbps=(output_bytes - net->last_output_bytes)*8/1000/MAX(now.tv_sec - net->last_stats.tv_sec, 1); + net->instantaneous_input_cps=(input_cmds - net->last_input_cmds)/MAX(now.tv_sec - net->last_stats.tv_sec, 1); + net->instantaneous_output_cps=(output_cmds - net->last_output_cmds)/MAX(now.tv_sec - net->last_stats.tv_sec, 1); + + clock_gettime(CLOCK_MONOTONIC, &net->last_stats); + net->last_input_bytes=input_bytes; + net->last_output_bytes=output_bytes; + net->last_input_cmds=input_cmds; + net->last_output_cmds=output_cmds; + pthread_mutex_unlock(&net->stat_lock); + return; +} + +static __thread int snet_thread_id=-1; void snet_send(struct swarmkv_net *net, const node_t *peer, enum MSG_TYPE msg_type, const void *data, size_t size, unsigned long long sequence, on_reply_callback_t *cb, void *cb_arg) { - int thread_id=0; - thread_id=get_net_thread_id(peer, net->n_thread); - struct snet_thread *thr=net->threads+thread_id; - snet_task_new(thr, peer, TASK_TYPE_SEND, msg_type, sequence, data, size, cb, cb_arg); + struct snet_thread *thr=NULL; + if(snet_thread_id>=0)//Invoked from swarmkv worker threads. + { + thr=net->threads+snet_thread_id; + } + else + { + thr=net->threads+random()%net->n_thread; + assert(msg_type!=MSG_TYPE_REPLY); + } + snet_task_new(thr, thr->designated_conn, peer, TASK_TYPE_SEND, msg_type, sequence, data, size, cb, cb_arg); return; } void swarmkv_net_send_cmd(struct swarmkv_net *net, const node_t *peer, const void *data, size_t size, on_reply_callback_t *cb, void *cb_arg) @@ -645,42 +707,6 @@ void swarmkv_net_send_reply(struct swarmkv_net *net, const node_t *peer, const v snet_send(net, peer, MSG_TYPE_REPLY, data, size, sequence, NULL, NULL); } -void snet_send_in_evloop(struct snet_thread *thr, const node_t *peer, unsigned long long sequence, enum MSG_TYPE msg_type, void *data, size_t size) -{ - struct snet_conn *conn=NULL; - void *logger=thr->ref_net->logger; - int err=0; - - HASH_FIND(hh, thr->conn_table, peer, sizeof(node_t), conn); - if(!conn) - { - conn=snet_conn_new_by_connecting(thr, peer); - if(conn) - { - HASH_ADD(hh, thr->conn_table, peer_listen_addr, sizeof(node_t), conn); - conn->is_in_conn_table=1; - } - else - { - err = EVUTIL_SOCKET_ERROR(); - log_fatal(logger, MODULE_SWAMRKV_NET, "connect to %s failed, reason %d (%s).", - peer->addr, - err, evutil_socket_error_to_string(err)); - return; - } - } - struct swarmkv_msg_hdr msghdr; - memset(&msghdr, 0, sizeof(msghdr)); - msghdr.magic=MSG_HDR_MAGIC; - msghdr.msg_type=msg_type; - msghdr.sequence=sequence; - msghdr.payload_len=size; - node_copy(&msghdr.source, &thr->ref_net->self); - snet_conn_send(conn, &msghdr, sizeof(msghdr), data, size); - return; -} - - static void accept_peer_connection_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr * addr, int socklen, void *arg) { struct swarmkv_net *net = (struct swarmkv_net*) arg; @@ -844,6 +870,7 @@ void swarmkv_net_free(struct swarmkv_net* net) struct snet_thread *thr=NULL; struct snet_conn *conn=NULL, *tmp=NULL; struct snet_rpc *rpc=NULL, *tmp_rpc=NULL; + struct snet_reply_ctx *rctx=NULL, *tmp_rctx=NULL; evconnlistener_free(net->listener); for(i=0; i<net->n_thread; i++) { @@ -867,6 +894,11 @@ void swarmkv_net_free(struct swarmkv_net* net) { snet_rpc_free(rpc); } + HASH_ITER(hh, thr->reply_ctx_table, rctx, tmp_rctx) + { + HASH_DEL(thr->reply_ctx_table, rctx); + free(rctx); + } event_base_free(thr->evbase); } pthread_mutex_destroy(&net->stat_lock); @@ -890,6 +922,8 @@ void swarmkv_net_set_monitor_handle(struct swarmkv_net* net, struct swarmkv_modu } void swarmkv_net_dispath(struct swarmkv_net *net, int thread_id) { + assert(snet_thread_id<0); + snet_thread_id=thread_id; struct event * ev = event_new(net->threads[thread_id].evbase, -1, EV_PERSIST, __dummy_event_handler, net); struct timeval timer_delay = {2, 0}; net->threads[thread_id].is_dispatching=1; diff --git a/test/swarmkv_gtest.cpp b/test/swarmkv_gtest.cpp index f8d2a1d..34575c7 100644 --- a/test/swarmkv_gtest.cpp +++ b/test/swarmkv_gtest.cpp @@ -12,9 +12,9 @@ #define CMD_EXEC_TIMEOUT_MS 1000*2 -void generic_callback(const struct swarmkv_reply* reply, void * cb_arg) +void generic_callback(const struct swarmkv_reply *reply, void * cb_arg) { - struct cmd_exec_arg* arg=(struct cmd_exec_arg*)cb_arg; + struct cmd_exec_arg *arg=(struct cmd_exec_arg*)cb_arg; if(0==reply_compare(reply, &(arg->expected_reply))) { cmd_exec_arg_success(arg); @@ -32,7 +32,7 @@ class SwarmkvBasicTest : public testing::Test protected: static void SetUpTestCase() { - const char* log_path="./basic-test.log"; + const char *log_path="./basic-test.log"; char *err=NULL; const char *cluster_name="swarmkv-basic-test"; swarmkv_cli_create_cluster(cluster_name, "127.0.0.1:5210"); @@ -64,8 +64,8 @@ TEST_F(SwarmkvBasicTest, TypeString) { struct cmd_exec_arg *arg=NULL; struct swarmkv *db=SwarmkvBasicTest::db; - const char* key="name"; - const char* val="zhangsan"; + const char *key="name"; + const char *val="zhangsan"; int exec_successful=0; arg=cmd_exec_arg_new(); @@ -1255,7 +1255,7 @@ protected: const char *cluster_name="swarmkv-n-nodes"; swarmkv_cli_create_cluster(cluster_name, node_list_str); logger=log_handle_create(log_path, 0); - struct swarmkv_options* opts[TEST_NODE_NUMBER]; + struct swarmkv_options *opts[TEST_NODE_NUMBER]; for(i=0; i<TEST_NODE_NUMBER; i++) { opts[i]=swarmkv_options_new(); @@ -1363,11 +1363,9 @@ TEST_F(SwarmkvNNodes, SET_GET) { success_cnt++; } - } EXPECT_EQ(success_cnt, TEST_NODE_NUMBER); cmd_exec_arg_free(arg); - } int main(int argc, char ** argv) { diff --git a/test/swarmkv_perf_test.cpp b/test/swarmkv_perf_test.cpp index 6a4e926..6709e9a 100644 --- a/test/swarmkv_perf_test.cpp +++ b/test/swarmkv_perf_test.cpp @@ -402,7 +402,7 @@ TEST(Performance, Sync) successful_backgroud_running_thread=NULL; } } - sleep(1000); + sleep(20); for(size_t i=0; i<NODE_NUMBER; i++) { swarmkv_close(db[i]); |
