summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZheng Chao <[email protected]>2022-12-09 15:57:56 +0800
committerZheng Chao <[email protected]>2022-12-09 15:57:56 +0800
commit0e7ba516559ae20ba8691291b14380156a4ff203 (patch)
tree5c28b7545bbd429c68052fd89fcccb8e57955a3b
parentac33167d47b2b24ce841104946a11200233bd537 (diff)
:zap: For multi-thread scalability, each worker thread maintains connections to all active peers.
-rw-r--r--docs/design.md7
-rw-r--r--src/swarmkv.c10
-rw-r--r--src/swarmkv_net.c390
-rw-r--r--test/swarmkv_gtest.cpp14
-rw-r--r--test/swarmkv_perf_test.cpp2
5 files changed, 228 insertions, 195 deletions
diff --git a/docs/design.md b/docs/design.md
index c0b1d1a..d4ce2f4 100644
--- a/docs/design.md
+++ b/docs/design.md
@@ -239,7 +239,7 @@ Reply message
A swarmkv instance has one key space thread and several (configurable) worker threads. Swarmkv uses [libevent](https://github.com/libevent/libevent) as a peer-to-peer communication infrastructure.
-The keyspace thread is communicates with Consul.
+The keyspace thread communicates with Consul.
- Watch global slot table changes.
- Watch leadership changes, run for leader if allowed.
@@ -253,11 +253,12 @@ The worker thread 0 is responsible for:
All worker threads are responsible for:
- Send KV command message, which is from swarmkv API caller to destination peers.
-- Receive KV command message, and then process it based on message type:
+- Receive KV command message, and then process it based on command type:
- Key routing request (from the requester, object reader/writer): lookup the value owner, and then send key routing response.
- Object operation request (from the requester): lookup the value, perform the operation and then send value operation response.
- Key routing response (from the key owner): send the request to the decoded value owner.
- Object operation response (from object owner): update local cache, and invoke the user-specified callback.
+- For multi-thread scalability, each worker thread maintains connections to all active peers. It's possible that the connection number between two nodes is more than one.
### Lock design
@@ -299,7 +300,7 @@ The source files are organized as follows:
[Tarantool](https://www.tarantool.io/en/) is an in-memory computing platform consisting of a database and an application server.
-[eXtremeDB](https://www.mcobject.com/) is a hybrid persistent and in-memory database management system for edge and cloud.
+[eXtremeDB](https://www.mcobject.com/) is a hybrid persistent and in-memory database management system for edge and cloud. It's a commercial software and close source.
[vedis](https://vedis.symisc.net/) is an embeddable datastore C library built with over 70 commands similar in concept to Redis but without the networking layer since Vedis run in the same process of the host application.
diff --git a/src/swarmkv.c b/src/swarmkv.c
index c9b4611..6e22411 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -122,10 +122,10 @@ struct remote_caller_ctx
struct future *my_future;
struct swarmkv_net *ref_net;
};
-static void remote_caller_on_success(void* result, void* user)
+static void remote_caller_on_success(void *result, void *user)
{
struct remote_caller_ctx *ctx=(struct remote_caller_ctx*)user;
- struct swarmkv_reply* reply=(struct swarmkv_reply*) result;
+ struct swarmkv_reply *reply=(struct swarmkv_reply*) result;
char *reply_blob=NULL;
size_t reply_blob_sz=0;
swarmkv_msg_serialize_reply(reply, &reply_blob, &reply_blob_sz);
@@ -227,7 +227,7 @@ enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct sw
"synced: %lld\r\n"
"sync_ok: %lld\r\n"
"sync_err: %lld\r\n"
- "sync_interval_in_usec:%u\r\n"
+ "sync_interval_in_msec: %.3f\r\n"
,
sto_info.shards,
sto_info.keys,
@@ -235,7 +235,7 @@ enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct sw
sto_info.synced,
sto_info.sync_ok,
sto_info.sync_err,
- db->opts->sync_interval_us
+ (double)db->opts->sync_interval_us/1000
);
struct keyspace_info ks_info;
swarmkv_keyspace_info(db->mod_keyspace, &ks_info);
@@ -252,7 +252,7 @@ enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct sw
struct snet_info net_info;
swarmkv_net_info(db->net, &net_info);
snprintf(net_info_buff, sizeof(net_info_buff), "# Network\r\n"
- "timeout_in_msec: %.2f\r\n"
+ "timeout_in_msec: %.3f\r\n"
"connections: %lld\r\n"
"pending_rpcs: %lld\r\n"
"timed_out_rpcs: %lld\r\n"
diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c
index f7b4e68..77beb88 100644
--- a/src/swarmkv_net.c
+++ b/src/swarmkv_net.c
@@ -72,11 +72,17 @@ struct timeout_ctx
int is_persist;
struct event *ev;
};
-
+struct snet_reply_ctx
+{
+ unsigned long long sequence;
+ node_t peer;
+ struct snet_conn *conn;
+ UT_hash_handle hh;
+};
struct snet_rpc
{
- unsigned long long sequence;
+ unsigned long long sequence;
node_t peer;
struct event *timeout_ev;
on_reply_callback_t *cb;
@@ -85,6 +91,7 @@ struct snet_rpc
struct timespec start;
UT_hash_handle hh;
};
+
struct snet_stat
{
long long input_msgs, output_msgs;
@@ -102,9 +109,10 @@ struct snet_thread
struct event_base *evbase;
struct snet_conn *conn_table;
struct snet_conn *duplicated_conn_table;
-
+ struct snet_conn *designated_conn;//for sending reply
long long peer_timeout_us;
struct snet_rpc *rpc_table;
+ struct snet_reply_ctx *reply_ctx_table;
void *ref_logger;
struct timeout_ctx *to_ctx;
@@ -135,12 +143,28 @@ struct swarmkv_net
double instantaneous_input_kbps, instantaneous_output_kbps;
double instantaneous_input_cps, instantaneous_output_cps;
};
-static int get_net_thread_id(const node_t *node, size_t n_thread)
+
+enum TASK_TYPE
{
- size_t ip_len=strchr(node->addr, ':')-node->addr;
- int thread_id=(int)(XXH32(node, ip_len, 171))%n_thread;
- return thread_id;
-}
+ TASK_TYPE_SEND,
+ TASK_TYPE_RECEIVE
+};
+
+struct snet_task
+{
+ enum TASK_TYPE task_type;
+ enum MSG_TYPE msg_type;
+ unsigned long long sequence;
+ node_t peer;
+ struct event *deferred_ev;
+ struct snet_thread *thr;
+ struct snet_conn *conn;
+ void *data;
+ size_t size;
+
+ on_reply_callback_t *cb;
+ void *cb_arg;
+};
void snet_rpc_free(struct snet_rpc *rpc)
{
@@ -184,27 +208,87 @@ struct snet_rpc * snet_rpc_new(struct snet_thread *thr, node_t *peer, unsigned l
HASH_ADD(hh, thr->rpc_table, sequence, sizeof(rpc->sequence), rpc);
return rpc;
}
+static void peer_conn_read_cb(struct bufferevent *bev, void *arg);
+void connect_peer_eventcb(struct bufferevent *bev, short events, void *arg);
+static void peer_conn_write_cb(struct bufferevent *bev, void *arg);
+static void peer_conn_read_cb(struct bufferevent *bev, void *arg);
+static void peer_conn_event_cb(struct bufferevent *bev, short events, void *arg);
+void snet_conn_send(struct snet_conn *conn, const void *hdr, size_t hdr_sz, const void *payload, size_t payload_sz);
-enum TASK_TYPE
+struct snet_conn *snet_conn_new_by_connecting(struct snet_thread *thr, const node_t *dest)
{
- TASK_TYPE_SEND,
- TASK_TYPE_RECEIVE
-};
+ struct event_base *base=thr->evbase;
+
+ struct snet_conn* conn=ALLOC(struct snet_conn, 1);
+ conn->buff_for_sending=evbuffer_new();
-struct snet_task
+ //http://www.wangafu.net/~nickm/libevent-book/Ref6_bufferevent.html
+ conn->bev=bufferevent_socket_new(base, -1, BEV_OPT_DEFER_CALLBACKS|BEV_OPT_THREADSAFE);//BEV_OPT_UNLOCK_CALLBACKS|
+ bufferevent_setcb(conn->bev, NULL, NULL, connect_peer_eventcb, conn);
+ node_copy(&conn->peer_listen_addr, dest);
+ conn->thread_id=thr->thread_id;
+ conn->ref_thr=thr;
+ node_copy(&conn->connected_from, dest);
+ struct sockaddr sa;
+ node_to_sockaddr(dest, &sa);
+ if (bufferevent_socket_connect(conn->bev, &sa, sizeof(sa)) < 0)
+ {
+ bufferevent_free(conn->bev);
+ goto error_out;
+ }
+ return conn;
+
+error_out:
+ evbuffer_free(conn->buff_for_sending);
+ conn->buff_for_sending=NULL;
+ free(conn);
+ return NULL;
+
+}
+struct snet_conn *snet_conn_new_by_fd(struct snet_thread *thr, evutil_socket_t fd, struct sockaddr * addr)
{
- node_t peer;
- enum TASK_TYPE task_type;
- enum MSG_TYPE msg_type;
- unsigned long long sequence;
- struct event *deferred_ev;
- struct snet_thread *thr;
- void *data;
- size_t size;
+ struct snet_conn* conn=NULL;
+ conn=ALLOC(struct snet_conn, 1);
+ conn->buff_for_sending=evbuffer_new();
+ int yes=1;
+ setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char*)&yes, sizeof(yes));
+// evbuffer_enable_locking(conn->buff_for_sending, NULL);
+ evutil_make_socket_nonblocking(fd);
+ node_init_from_sockaddr(&conn->connected_from, addr);
- on_reply_callback_t *cb;
- void *cb_arg;
-};
+ conn->bev=bufferevent_socket_new(thr->evbase, fd, BEV_OPT_DEFER_CALLBACKS|BEV_OPT_THREADSAFE);//BEV_OPT_UNLOCK_CALLBACKS|
+ //use BEV_OPT_THREADSAFE option for safely access from threads that calling swarmkv_net_send.
+ //? use BEV_OPT_UNLOCK_CALLBACKS option because swarmkv_net_send write buffer to output_buffer, and peer_conn_read_cb read input_buffer.
+ conn->fd=fd;
+ conn->thread_id=thr->thread_id;
+ conn->ref_thr=thr;
+ bufferevent_setcb(conn->bev, peer_conn_read_cb, peer_conn_write_cb, peer_conn_event_cb, conn);
+ bufferevent_enable(conn->bev, EV_READ | EV_WRITE);
+ return conn;
+}
+
+void snet_task_deferred_run(evutil_socket_t fd, short events, void *arg);
+
+
+struct snet_task *snet_task_new(struct snet_thread *thr, struct snet_conn *conn, const node_t* peer, enum TASK_TYPE task_type, enum MSG_TYPE msg_type, unsigned long long sequence, const void *data, size_t size, on_reply_callback_t *cb, void *cb_arg)
+{
+ struct snet_task *task=ALLOC(struct snet_task, 1);
+ node_copy(&task->peer, peer);
+ task->deferred_ev=event_new(thr->evbase, -1, 0, snet_task_deferred_run, task);
+ task->msg_type=msg_type;
+ task->task_type=task_type;
+ task->cb=cb;
+ task->cb_arg=cb_arg;
+ task->data=malloc(size);
+ memcpy(task->data, data, size);
+ task->size=size;
+ task->sequence=sequence;
+ task->thr=thr;
+ task->conn=conn;
+ event_add(task->deferred_ev, NULL);
+ event_active(task->deferred_ev, 0, 0);
+ return task;
+}
void snet_task_free(struct snet_task *task)
{
free(task->data);
@@ -215,8 +299,6 @@ void snet_task_free(struct snet_task *task)
return;
}
-void snet_send_in_evloop(struct snet_thread *thr, const node_t *peer, unsigned long long sequence, enum MSG_TYPE msg_type, void *data, size_t size);
-
void snet_task_deferred_run(evutil_socket_t fd, short events, void *arg)
{
struct snet_task *task=(struct snet_task *)arg;
@@ -224,27 +306,62 @@ void snet_task_deferred_run(evutil_socket_t fd, short events, void *arg)
if(task->task_type==TASK_TYPE_SEND)
{
+ struct snet_conn *conn=NULL;
if(task->msg_type==MSG_TYPE_CMD)//expecting reply
{
struct snet_rpc *rpc=snet_rpc_new(thr, &task->peer, task->sequence, task->cb, task->cb_arg);
- snet_send_in_evloop(task->thr, &task->peer, rpc->sequence, task->msg_type, task->data, task->size);
-
+// snet_send_in_evloop(task->thr, &task->peer, rpc->sequence, task->msg_type, task->data, task->size);
+ task->sequence=rpc->sequence;
+ HASH_FIND(hh, thr->conn_table, &task->peer, sizeof(node_t), conn);
+ if(!conn)
+ {
+ conn=snet_conn_new_by_connecting(thr, &task->peer);
+ if(conn)
+ {
+ HASH_ADD(hh, thr->conn_table, peer_listen_addr, sizeof(node_t), conn);
+ conn->is_in_conn_table=1;
+ }
+ else
+ {
+ int err = EVUTIL_SOCKET_ERROR();
+ log_fatal(task->thr->ref_logger, MODULE_SWAMRKV_NET, "connect to %s failed, reason %d (%s).",
+ task->peer.addr,
+ err, evutil_socket_error_to_string(err));
+ return;
+ }
+ }
thr->stat.output_cmds++;
}
else
{
- snet_send_in_evloop(task->thr, &task->peer, task->sequence, task->msg_type, task->data, task->size);
+ struct snet_reply_ctx *reply_ctx=NULL;
+ struct snet_reply_ctx key;
+ key.sequence=task->sequence;
+ node_copy(&key.peer, &task->peer);
+ HASH_FIND(hh, thr->reply_ctx_table, &key.sequence, sizeof(key.sequence)+sizeof(key.peer), reply_ctx);
+ assert(reply_ctx);
+ conn=reply_ctx->conn;
+ HASH_DEL(thr->reply_ctx_table, reply_ctx);
+ free(reply_ctx);
thr->stat.output_replies++;
}
-
+ struct swarmkv_msg_hdr msghdr;
+ memset(&msghdr, 0, sizeof(msghdr));
+ msghdr.magic=MSG_HDR_MAGIC;
+ msghdr.msg_type=task->msg_type;
+ msghdr.sequence=task->sequence;
+ msghdr.payload_len=task->size;
+ node_copy(&msghdr.source, &thr->ref_net->self);
+ snet_conn_send(conn, &msghdr, sizeof(msghdr), task->data, task->size);
thr->stat.output_msgs++;
}
else//receive
{
- struct snet_rpc *rpc=NULL;
+
assert(task->sequence);
if(task->msg_type==MSG_TYPE_REPLY)
{
+ struct snet_rpc *rpc=NULL;
rpc=snet_rpc_lookup(thr, task->sequence);
if(rpc)
{
@@ -262,6 +379,11 @@ void snet_task_deferred_run(evutil_socket_t fd, short events, void *arg)
}
else
{
+ struct snet_reply_ctx *reply_ctx=ALLOC(struct snet_reply_ctx, 1);
+ reply_ctx->sequence=task->sequence;
+ reply_ctx->conn=task->conn;
+ node_copy(&reply_ctx->peer, &task->peer);
+ HASH_ADD_KEYPTR(hh, thr->reply_ctx_table, &(reply_ctx->sequence), sizeof(reply_ctx->sequence)+sizeof(reply_ctx->peer), reply_ctx);
thr->ref_net->on_cmd_cb(&task->peer, task->sequence, task->data, task->size, thr->ref_net->on_cmd_cb_arg);
thr->stat.input_cmds++;
}
@@ -272,57 +394,6 @@ void snet_task_deferred_run(evutil_socket_t fd, short events, void *arg)
return;
}
-struct snet_task *snet_task_new(struct snet_thread *thr, const node_t* peer, enum TASK_TYPE task_type, enum MSG_TYPE msg_type, unsigned long long sequence, const void *data, size_t size, on_reply_callback_t *cb, void *cb_arg)
-{
- struct snet_task *task=ALLOC(struct snet_task, 1);
- node_copy(&task->peer, peer);
- task->deferred_ev=event_new(thr->evbase, -1, 0, snet_task_deferred_run, task);
- task->msg_type=msg_type;
- task->task_type=task_type;
- task->cb=cb;
- task->cb_arg=cb_arg;
- task->data=malloc(size);
- memcpy(task->data, data, size);
- task->size=size;
- task->sequence=sequence;
- task->thr=thr;
- event_add(task->deferred_ev, NULL);
- event_active(task->deferred_ev, 0, 0);
- return task;
-}
-
-
-static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg)
-{
- struct swarmkv_net *net= (struct swarmkv_net *)arg;
- struct timespec now;
- long long input_bytes=0, output_bytes=0;
- long long input_cmds=0, output_cmds=0;
- if(0 != pthread_mutex_trylock(&net->stat_lock))
- {
- return;
- }
- for(size_t i=0; i<net->n_thread; i++)
- {
- input_bytes += net->threads[i].stat.input_bytes;
- output_bytes += net->threads[i].stat.output_bytes;
- input_cmds += net->threads[i].stat.input_cmds;
- output_cmds += net->threads[i].stat.output_cmds;
- }
- clock_gettime(CLOCK_MONOTONIC, &now);
- net->instantaneous_input_kbps=(input_bytes - net->last_input_bytes)*8/1000/MAX(now.tv_sec - net->last_stats.tv_sec, 1);
- net->instantaneous_output_kbps=(output_bytes - net->last_output_bytes)*8/1000/MAX(now.tv_sec - net->last_stats.tv_sec, 1);
- net->instantaneous_input_cps=(input_cmds - net->last_input_cmds)/MAX(now.tv_sec - net->last_stats.tv_sec, 1);
- net->instantaneous_output_cps=(output_cmds - net->last_output_cmds)/MAX(now.tv_sec - net->last_stats.tv_sec, 1);
-
- clock_gettime(CLOCK_MONOTONIC, &net->last_stats);
- net->last_input_bytes=input_bytes;
- net->last_output_bytes=output_bytes;
- net->last_input_cmds=input_cmds;
- net->last_output_cmds=output_cmds;
- pthread_mutex_unlock(&net->stat_lock);
- return;
-}
static void snet_conn_table_remove(struct snet_conn* conn)
{
struct snet_thread *thr=conn->ref_thr;
@@ -499,7 +570,7 @@ static void peer_conn_read_cb(struct bufferevent *bev, void *arg)
conn->recv_state=RECEIVING_PAYLOAD;
break;
}
- snet_task_new(thr, &(conn->peer_listen_addr), TASK_TYPE_RECEIVE, conn->receiving_hdr.msg_type, conn->receiving_hdr.sequence, recv_buff, conn->receiving_hdr.payload_len, NULL, NULL);
+ snet_task_new(thr, conn, &(conn->peer_listen_addr), TASK_TYPE_RECEIVE, conn->receiving_hdr.msg_type, conn->receiving_hdr.sequence, recv_buff, conn->receiving_hdr.payload_len, NULL, NULL);
evbuffer_drain(input, conn->receiving_hdr.payload_len);
thr->stat.input_bytes+=conn->receiving_hdr.payload_len;
conn->recv_state=RECEIVING_HDR;
@@ -508,7 +579,6 @@ static void peer_conn_read_cb(struct bufferevent *bev, void *arg)
}
return;
}
-
static void peer_conn_event_cb(struct bufferevent *bev, short events, void *arg)
{
struct snet_conn *conn=(struct snet_conn*)arg;
@@ -527,8 +597,6 @@ static void peer_conn_event_cb(struct bufferevent *bev, short events, void *arg)
snet_conn_free(conn);
return;
}
-
-
void connect_peer_eventcb(struct bufferevent *bev, short events, void *arg)
{
struct snet_conn* conn= (struct snet_conn*) arg;
@@ -557,58 +625,6 @@ void connect_peer_eventcb(struct bufferevent *bev, short events, void *arg)
}
}
-struct snet_conn *snet_conn_new_by_connecting(struct snet_thread *thr, const node_t *dest)
-{
- struct event_base *base=thr->evbase;
-
- struct snet_conn* conn=ALLOC(struct snet_conn, 1);
- conn->buff_for_sending=evbuffer_new();
-
- //http://www.wangafu.net/~nickm/libevent-book/Ref6_bufferevent.html
- conn->bev=bufferevent_socket_new(base, -1, BEV_OPT_DEFER_CALLBACKS|BEV_OPT_THREADSAFE);//BEV_OPT_UNLOCK_CALLBACKS|
- bufferevent_setcb(conn->bev, NULL, NULL, connect_peer_eventcb, conn);
- node_copy(&conn->peer_listen_addr, dest);
- conn->thread_id=thr->thread_id;
- conn->ref_thr=thr;
- node_copy(&conn->connected_from, dest);
- struct sockaddr sa;
- node_to_sockaddr(dest, &sa);
- if (bufferevent_socket_connect(conn->bev, &sa, sizeof(sa)) < 0)
- {
- bufferevent_free(conn->bev);
- goto error_out;
- }
- return conn;
-
-error_out:
- evbuffer_free(conn->buff_for_sending);
- conn->buff_for_sending=NULL;
- free(conn);
- return NULL;
-
-}
-struct snet_conn *snet_conn_new_by_fd(struct snet_thread *thr, evutil_socket_t fd, struct sockaddr * addr)
-{
- struct snet_conn* conn=NULL;
- conn=ALLOC(struct snet_conn, 1);
- conn->buff_for_sending=evbuffer_new();
- int yes=1;
- setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char*)&yes, sizeof(yes));
-// evbuffer_enable_locking(conn->buff_for_sending, NULL);
- evutil_make_socket_nonblocking(fd);
- node_init_from_sockaddr(&conn->connected_from, addr);
-
- conn->bev=bufferevent_socket_new(thr->evbase, fd, BEV_OPT_DEFER_CALLBACKS|BEV_OPT_THREADSAFE);//BEV_OPT_UNLOCK_CALLBACKS|
- //use BEV_OPT_THREADSAFE option for safely access from threads that calling swarmkv_net_send.
- //? use BEV_OPT_UNLOCK_CALLBACKS option because swarmkv_net_send write buffer to output_buffer, and peer_conn_read_cb read input_buffer.
- conn->fd=fd;
- conn->thread_id=thr->thread_id;
- conn->ref_thr=thr;
- bufferevent_setcb(conn->bev, peer_conn_read_cb, peer_conn_write_cb, peer_conn_event_cb, conn);
- bufferevent_enable(conn->bev, EV_READ | EV_WRITE);
- return conn;
-}
-
void snet_conn_send(struct snet_conn *conn, const void *hdr, size_t hdr_sz, const void *payload, size_t payload_sz)
{
struct evbuffer* output_buffer=NULL;
@@ -627,13 +643,59 @@ void snet_conn_send(struct snet_conn *conn, const void *hdr, size_t hdr_sz, cons
conn->ref_thr->stat.output_bytes += (hdr_sz+payload_sz);
return;
}
+static int get_net_thread_id(const node_t *node, size_t n_thread)
+{
+ int thread_id=(int)(XXH32(node, sizeof(node_t), 171))%n_thread;
+ return thread_id;
+}
+
+static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg)
+{
+ struct swarmkv_net *net= (struct swarmkv_net *)arg;
+ struct timespec now;
+ long long input_bytes=0, output_bytes=0;
+ long long input_cmds=0, output_cmds=0;
+ if(0 != pthread_mutex_trylock(&net->stat_lock))
+ {
+ return;
+ }
+ for(size_t i=0; i<net->n_thread; i++)
+ {
+ input_bytes += net->threads[i].stat.input_bytes;
+ output_bytes += net->threads[i].stat.output_bytes;
+ input_cmds += net->threads[i].stat.input_cmds;
+ output_cmds += net->threads[i].stat.output_cmds;
+ }
+ clock_gettime(CLOCK_MONOTONIC, &now);
+ net->instantaneous_input_kbps=(input_bytes - net->last_input_bytes)*8/1000/MAX(now.tv_sec - net->last_stats.tv_sec, 1);
+ net->instantaneous_output_kbps=(output_bytes - net->last_output_bytes)*8/1000/MAX(now.tv_sec - net->last_stats.tv_sec, 1);
+ net->instantaneous_input_cps=(input_cmds - net->last_input_cmds)/MAX(now.tv_sec - net->last_stats.tv_sec, 1);
+ net->instantaneous_output_cps=(output_cmds - net->last_output_cmds)/MAX(now.tv_sec - net->last_stats.tv_sec, 1);
+
+ clock_gettime(CLOCK_MONOTONIC, &net->last_stats);
+ net->last_input_bytes=input_bytes;
+ net->last_output_bytes=output_bytes;
+ net->last_input_cmds=input_cmds;
+ net->last_output_cmds=output_cmds;
+ pthread_mutex_unlock(&net->stat_lock);
+ return;
+}
+
+static __thread int snet_thread_id=-1;
void snet_send(struct swarmkv_net *net, const node_t *peer, enum MSG_TYPE msg_type, const void *data, size_t size, unsigned long long sequence, on_reply_callback_t *cb, void *cb_arg)
{
- int thread_id=0;
- thread_id=get_net_thread_id(peer, net->n_thread);
- struct snet_thread *thr=net->threads+thread_id;
- snet_task_new(thr, peer, TASK_TYPE_SEND, msg_type, sequence, data, size, cb, cb_arg);
+ struct snet_thread *thr=NULL;
+ if(snet_thread_id>=0)//Invoked from swarmkv worker threads.
+ {
+ thr=net->threads+snet_thread_id;
+ }
+ else
+ {
+ thr=net->threads+random()%net->n_thread;
+ assert(msg_type!=MSG_TYPE_REPLY);
+ }
+ snet_task_new(thr, thr->designated_conn, peer, TASK_TYPE_SEND, msg_type, sequence, data, size, cb, cb_arg);
return;
}
void swarmkv_net_send_cmd(struct swarmkv_net *net, const node_t *peer, const void *data, size_t size, on_reply_callback_t *cb, void *cb_arg)
@@ -645,42 +707,6 @@ void swarmkv_net_send_reply(struct swarmkv_net *net, const node_t *peer, const v
snet_send(net, peer, MSG_TYPE_REPLY, data, size, sequence, NULL, NULL);
}
-void snet_send_in_evloop(struct snet_thread *thr, const node_t *peer, unsigned long long sequence, enum MSG_TYPE msg_type, void *data, size_t size)
-{
- struct snet_conn *conn=NULL;
- void *logger=thr->ref_net->logger;
- int err=0;
-
- HASH_FIND(hh, thr->conn_table, peer, sizeof(node_t), conn);
- if(!conn)
- {
- conn=snet_conn_new_by_connecting(thr, peer);
- if(conn)
- {
- HASH_ADD(hh, thr->conn_table, peer_listen_addr, sizeof(node_t), conn);
- conn->is_in_conn_table=1;
- }
- else
- {
- err = EVUTIL_SOCKET_ERROR();
- log_fatal(logger, MODULE_SWAMRKV_NET, "connect to %s failed, reason %d (%s).",
- peer->addr,
- err, evutil_socket_error_to_string(err));
- return;
- }
- }
- struct swarmkv_msg_hdr msghdr;
- memset(&msghdr, 0, sizeof(msghdr));
- msghdr.magic=MSG_HDR_MAGIC;
- msghdr.msg_type=msg_type;
- msghdr.sequence=sequence;
- msghdr.payload_len=size;
- node_copy(&msghdr.source, &thr->ref_net->self);
- snet_conn_send(conn, &msghdr, sizeof(msghdr), data, size);
- return;
-}
-
-
static void accept_peer_connection_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr * addr, int socklen, void *arg)
{
struct swarmkv_net *net = (struct swarmkv_net*) arg;
@@ -844,6 +870,7 @@ void swarmkv_net_free(struct swarmkv_net* net)
struct snet_thread *thr=NULL;
struct snet_conn *conn=NULL, *tmp=NULL;
struct snet_rpc *rpc=NULL, *tmp_rpc=NULL;
+ struct snet_reply_ctx *rctx=NULL, *tmp_rctx=NULL;
evconnlistener_free(net->listener);
for(i=0; i<net->n_thread; i++)
{
@@ -867,6 +894,11 @@ void swarmkv_net_free(struct swarmkv_net* net)
{
snet_rpc_free(rpc);
}
+ HASH_ITER(hh, thr->reply_ctx_table, rctx, tmp_rctx)
+ {
+ HASH_DEL(thr->reply_ctx_table, rctx);
+ free(rctx);
+ }
event_base_free(thr->evbase);
}
pthread_mutex_destroy(&net->stat_lock);
@@ -890,6 +922,8 @@ void swarmkv_net_set_monitor_handle(struct swarmkv_net* net, struct swarmkv_modu
}
void swarmkv_net_dispath(struct swarmkv_net *net, int thread_id)
{
+ assert(snet_thread_id<0);
+ snet_thread_id=thread_id;
struct event * ev = event_new(net->threads[thread_id].evbase, -1, EV_PERSIST, __dummy_event_handler, net);
struct timeval timer_delay = {2, 0};
net->threads[thread_id].is_dispatching=1;
diff --git a/test/swarmkv_gtest.cpp b/test/swarmkv_gtest.cpp
index f8d2a1d..34575c7 100644
--- a/test/swarmkv_gtest.cpp
+++ b/test/swarmkv_gtest.cpp
@@ -12,9 +12,9 @@
#define CMD_EXEC_TIMEOUT_MS 1000*2
-void generic_callback(const struct swarmkv_reply* reply, void * cb_arg)
+void generic_callback(const struct swarmkv_reply *reply, void * cb_arg)
{
- struct cmd_exec_arg* arg=(struct cmd_exec_arg*)cb_arg;
+ struct cmd_exec_arg *arg=(struct cmd_exec_arg*)cb_arg;
if(0==reply_compare(reply, &(arg->expected_reply)))
{
cmd_exec_arg_success(arg);
@@ -32,7 +32,7 @@ class SwarmkvBasicTest : public testing::Test
protected:
static void SetUpTestCase()
{
- const char* log_path="./basic-test.log";
+ const char *log_path="./basic-test.log";
char *err=NULL;
const char *cluster_name="swarmkv-basic-test";
swarmkv_cli_create_cluster(cluster_name, "127.0.0.1:5210");
@@ -64,8 +64,8 @@ TEST_F(SwarmkvBasicTest, TypeString)
{
struct cmd_exec_arg *arg=NULL;
struct swarmkv *db=SwarmkvBasicTest::db;
- const char* key="name";
- const char* val="zhangsan";
+ const char *key="name";
+ const char *val="zhangsan";
int exec_successful=0;
arg=cmd_exec_arg_new();
@@ -1255,7 +1255,7 @@ protected:
const char *cluster_name="swarmkv-n-nodes";
swarmkv_cli_create_cluster(cluster_name, node_list_str);
logger=log_handle_create(log_path, 0);
- struct swarmkv_options* opts[TEST_NODE_NUMBER];
+ struct swarmkv_options *opts[TEST_NODE_NUMBER];
for(i=0; i<TEST_NODE_NUMBER; i++)
{
opts[i]=swarmkv_options_new();
@@ -1363,11 +1363,9 @@ TEST_F(SwarmkvNNodes, SET_GET)
{
success_cnt++;
}
-
}
EXPECT_EQ(success_cnt, TEST_NODE_NUMBER);
cmd_exec_arg_free(arg);
-
}
int main(int argc, char ** argv)
{
diff --git a/test/swarmkv_perf_test.cpp b/test/swarmkv_perf_test.cpp
index 6a4e926..6709e9a 100644
--- a/test/swarmkv_perf_test.cpp
+++ b/test/swarmkv_perf_test.cpp
@@ -402,7 +402,7 @@ TEST(Performance, Sync)
successful_backgroud_running_thread=NULL;
}
}
- sleep(1000);
+ sleep(20);
for(size_t i=0; i<NODE_NUMBER; i++)
{
swarmkv_close(db[i]);