summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorZheng Chao <[email protected]>2023-09-18 21:43:53 +0800
committerZheng Chao <[email protected]>2023-09-18 21:43:53 +0800
commitc3c69f12af1d6413e66f1553549f524746f23de0 (patch)
treec1f17d2ce35abd22b94b5454a35f889af3323cbd /src
parent0e520dfebcf49d8ebffd98070a7079c78acf22c0 (diff)
Bugfix: Reset struct xx_info to avoid inaccurate statistics.
Diffstat (limited to 'src')
-rw-r--r--src/swarmkv.c14
-rw-r--r--src/swarmkv_common.c11
-rw-r--r--src/swarmkv_net.c17
-rw-r--r--src/swarmkv_rpc.c38
-rw-r--r--src/swarmkv_rpc.h8
5 files changed, 70 insertions, 18 deletions
diff --git a/src/swarmkv.c b/src/swarmkv.c
index bca8a82..61761a6 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -295,7 +295,7 @@ enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct sw
rpc_info.timed_out_rpcs,
rpc_info.unknown_sequence,
mesh_info.queued_msgs,
- mesh_info.drop_msgs,
+ mesh_info.enqueue_drops,
server_time_us,
now_monotonic.tv_sec-db->boot_time.tv_sec,
(now_monotonic.tv_sec-db->boot_time.tv_sec)/(3600*24)
@@ -868,6 +868,7 @@ void __on_msg_callback(struct swarmkv_msg *msg, void *arg)
}
}
}
+#define INTER_THREAD_RPC_TIMEOUT_AHEAD 1000
void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swarmkv_cmd *cmd, struct future *future_of_caller)
{
struct swarmkv_cmd_spec *spec=NULL;
@@ -899,7 +900,8 @@ void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swar
if(!node_is_empty(target_node) && node_compare(&db->self, target_node))
{
//cmd is executed in target node's on_msg_callback
- long long sequence=swarmkv_rpc_launch(db->rpc_mgr, cur_tid, future_of_caller);
+ struct swarmkv_rpc *rpc=swarmkv_rpc_launch(db->rpc_mgr, cur_tid, future_of_caller);
+ long long sequence=swarmkv_rpc_get_sequence(rpc);
msg=swarmkv_msg_new_by_cmd(cmd, &db->self, cur_tid, target_node, sequence);
int ret=0;
if(cur_tid >= db->opts->nr_worker_threads)
@@ -910,6 +912,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swar
{
reply=swarmkv_reply_new_error(error_thread_rpc_buffer_full);
}
+ swarmkv_rpc_set_timeout(rpc, db->opts->cluster_timeout_us+INTER_THREAD_RPC_TIMEOUT_AHEAD);
}
else
{
@@ -919,6 +922,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swar
{
reply=swarmkv_reply_new_error(error_network_error, node_addr2sds(target_node), err_str);
}
+ swarmkv_rpc_set_peer(rpc, target_node);
}
if(reply)
{
@@ -930,7 +934,9 @@ void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swar
if(cur_tid != target_tid)
{
//cmd will be executed in target thread's on_msg_callback
- long long sequence=swarmkv_rpc_launch(db->rpc_mgr, cur_tid, future_of_caller);
+ struct swarmkv_rpc *rpc=swarmkv_rpc_launch(db->rpc_mgr, cur_tid, future_of_caller);
+ swarmkv_rpc_set_timeout(rpc, db->opts->cluster_timeout_us+INTER_THREAD_RPC_TIMEOUT_AHEAD);
+ long long sequence=swarmkv_rpc_get_sequence(rpc);
msg=swarmkv_msg_new_by_cmd(cmd, &db->self, cur_tid, target_node, sequence);
int ret=0;
ret=swarmkv_mesh_send(db->mesh, cur_tid, target_tid, msg);
@@ -1409,7 +1415,7 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name,
db->mod_monitor=swarmkv_monitor_new(db->opts);
- db->rpc_mgr=swarmkv_rpc_mgr_new(db->opts, db->ref_evbases, opts->total_threads);
+ db->rpc_mgr=swarmkv_rpc_mgr_new(db->ref_evbases, opts->total_threads, opts->cluster_timeout_us);
db->mesh=swarmkv_mesh_new(db->ref_evbases, opts->total_threads, db->logger);
swarmkv_mesh_set_on_msg_cb(db->mesh, __on_msg_callback, db);
diff --git a/src/swarmkv_common.c b/src/swarmkv_common.c
index b277785..649a5dd 100644
--- a/src/swarmkv_common.c
+++ b/src/swarmkv_common.c
@@ -488,10 +488,17 @@ void swarmkv_reply_merge_array(struct swarmkv_reply **dst, struct swarmkv_reply
{
*dst=swarmkv_reply_new_array(0);
}
- if((*dst)->type!=SWARMKV_REPLY_ARRAY || src->type!=SWARMKV_REPLY_ARRAY)
+ if((*dst)->type!=SWARMKV_REPLY_ARRAY)
{
return;
}
+ if(src->type!=SWARMKV_REPLY_ARRAY)
+ {
+ (*dst)->elements=(struct swarmkv_reply **)realloc((*dst)->elements, ((*dst)->n_element+1)*sizeof(struct swarmkv_reply*));
+ (*dst)->elements[(*dst)->n_element]=src;
+ (*dst)->n_element+=1;
+ return;
+ }
(*dst)->elements=(struct swarmkv_reply **)realloc((*dst)->elements, ((*dst)->n_element+src->n_element)*sizeof(struct swarmkv_reply*));
size_t i=0;
for(i=0; i<src->n_element; i++)
@@ -500,7 +507,7 @@ void swarmkv_reply_merge_array(struct swarmkv_reply **dst, struct swarmkv_reply
src->elements[i]=NULL;
}
(*dst)->n_element+=src->n_element;
- src->n_element=0;
+ src->n_element=0;//Only free the parent strutcture.
swarmkv_reply_free(src);
return;
}
diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c
index 37bb445..031414c 100644
--- a/src/swarmkv_net.c
+++ b/src/swarmkv_net.c
@@ -40,6 +40,7 @@ struct snet_conn
node_t connected_from;
int thread_id;
int need_to_be_kill;
+ long long ts_ecr;
enum receive_state recv_state;
const struct swarmkv_msg *receiving_hdr;//pointer of receiving buffer
int is_in_conn_table;
@@ -96,9 +97,8 @@ struct swarmkv_net
};
static void peer_conn_read_cb(struct bufferevent *bev, void *arg);
-void connect_peer_eventcb(struct bufferevent *bev, short events, void *arg);
+void peer_connected_event_cb(struct bufferevent *bev, short events, void *arg);
static void peer_conn_write_cb(struct bufferevent *bev, void *arg);
-static void peer_conn_read_cb(struct bufferevent *bev, void *arg);
static void peer_conn_event_cb(struct bufferevent *bev, short events, void *arg);
struct snet_conn *snet_conn_new_by_connecting(struct snet_thread *thr, const node_t *dest)
@@ -111,7 +111,7 @@ struct snet_conn *snet_conn_new_by_connecting(struct snet_thread *thr, const nod
//http://www.wangafu.net/~nickm/libevent-book/Ref6_bufferevent.html
conn->bev=bufferevent_socket_new(base, -1, BEV_OPT_DEFER_CALLBACKS|BEV_OPT_THREADSAFE);//BEV_OPT_UNLOCK_CALLBACKS|
- bufferevent_setcb(conn->bev, NULL, NULL, connect_peer_eventcb, conn);
+ bufferevent_setcb(conn->bev, NULL, NULL, peer_connected_event_cb, conn);
node_copy(&conn->peer_listen_addr, dest);
conn->thread_id=thr->thread_id;
conn->ref_thr=thr;
@@ -318,8 +318,15 @@ static void peer_conn_read_cb(struct bufferevent *bev, void *arg)
snet_conn_set_peer(conn, &hdr->caller);
snet_conn_table_add(thr, conn);
}
+ conn->ts_ecr=hdr->ts_val;
conn->recv_state=RECEIVING_PAYLOAD;
conn->receiving_hdr=(struct swarmkv_msg *)recv_buff;
+ long long now_us=ustime();
+ if(hdr->ts_ecr && now_us > hdr->ts_ecr)
+ {
+ //If now_us < hdr->ts_ecr, may be manually time adjustment
+ swarmkv_monitor_record_peer(thr->mod_monitor, &conn->peer_listen_addr, now_us-hdr->ts_ecr, thr->thread_id);
+ }
}
if(conn->recv_state==RECEIVING_PAYLOAD)
{
@@ -362,7 +369,7 @@ static void peer_conn_event_cb(struct bufferevent *bev, short events, void *arg)
snet_conn_free(conn);
return;
}
-void connect_peer_eventcb(struct bufferevent *bev, short events, void *arg)
+void peer_connected_event_cb(struct bufferevent *bev, short events, void *arg)
{
struct snet_conn* conn= (struct snet_conn*) arg;
struct snet_thread* thr=conn->ref_thr;
@@ -616,6 +623,8 @@ int swarmkv_net_send(struct swarmkv_net *net, const node_t *dest, struct swarmkv
}
char *data=NULL;
size_t size=0;
+ msg->ts_val=ustime();
+ msg->ts_ecr=conn->ts_ecr;
swarmkv_msg_serialize(msg, &data, &size);
swarmkv_msg_free(msg);
struct evbuffer* output_buffer=NULL;
diff --git a/src/swarmkv_rpc.c b/src/swarmkv_rpc.c
index ebae1f4..55d3622 100644
--- a/src/swarmkv_rpc.c
+++ b/src/swarmkv_rpc.c
@@ -14,6 +14,7 @@ struct swarmkv_rpc
struct event *timeout_ev;
struct future * f;
struct timespec start;
+ node_t target_peer; //For inter-node rpc
int thread_id;
struct swarmkv_rpc_mgr *ref_mgr;
UT_hash_handle hh;
@@ -28,7 +29,7 @@ struct swarmkv_rpc_mgr
{
long long seq_generator;
int nr_worker_threads;
- unsigned int timeout_us;
+ unsigned int default_timeout_us;
struct swarmkv_rpc_thr *threads;
//stats
@@ -43,15 +44,15 @@ void swarmkv_rpc_free(struct swarmkv_rpc *rpc)
HASH_DELETE(hh, thr->rpc_table, rpc);
free(rpc);
}
-struct swarmkv_rpc_mgr *swarmkv_rpc_mgr_new(const struct swarmkv_options *opts, struct event_base *evbases[], int nr_threads)
+struct swarmkv_rpc_mgr *swarmkv_rpc_mgr_new(struct event_base *evbases[], int nr_threads, long long default_timeout_us)
{
struct swarmkv_rpc_mgr *mgr=ALLOC(struct swarmkv_rpc_mgr, 1);
mgr->seq_generator=0;
mgr->nr_worker_threads=nr_threads;
- mgr->timeout_us=opts->cluster_timeout_us;
+ mgr->default_timeout_us=default_timeout_us;
mgr->threads=ALLOC(struct swarmkv_rpc_thr, nr_threads);
- struct timeval duration={mgr->timeout_us/(1000*1000), mgr->timeout_us%(1000*1000)};
+ struct timeval duration={mgr->default_timeout_us/(1000*1000), mgr->default_timeout_us%(1000*1000)};
struct swarmkv_rpc_thr *thr=NULL;
for(int i=0; i<nr_threads; i++)
{
@@ -86,11 +87,20 @@ static void rpc_timeout_callback(evutil_socket_t fd, short events, void *arg)
struct swarmkv_rpc *rpc=(struct swarmkv_rpc *)arg;
rpc->ref_mgr->timed_out_rpcs++;
struct promise *p=future_to_promise(rpc->f);
- promise_failed(p, FUTURE_ERROR_TIMEOUT, "rpc timed out");
+ char error_str[128];
+ if(!node_is_empty(&rpc->target_peer))
+ {
+ snprintf(error_str, sizeof(error_str), "peer %s timed out", rpc->target_peer.addr);
+ }
+ else
+ {
+ snprintf(error_str, sizeof(error_str), "inter-thread rpc timed out");
+ }
+ promise_failed(p, FUTURE_ERROR_TIMEOUT, error_str);
swarmkv_rpc_free(rpc);
}
-long long swarmkv_rpc_launch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct future *f)
+struct swarmkv_rpc *swarmkv_rpc_launch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct future *f)
{
struct swarmkv_rpc_thr *thr=mgr->threads+thread_id;
struct swarmkv_rpc *rpc=ALLOC(struct swarmkv_rpc, 1);
@@ -104,8 +114,24 @@ long long swarmkv_rpc_launch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct
rpc->ref_mgr=mgr;
rpc->f=f;
HASH_ADD(hh, thr->rpc_table, sequence, sizeof(rpc->sequence), rpc);
+ return rpc;
+}
+long long swarmkv_rpc_get_sequence(const struct swarmkv_rpc *rpc)
+{
return rpc->sequence;
}
+void swarmkv_rpc_set_peer(struct swarmkv_rpc *rpc, const node_t *peer)
+{
+ node_copy(&rpc->target_peer, peer);
+ return;
+}
+void swarmkv_rpc_set_timeout(struct swarmkv_rpc *rpc, long long timeout_us)
+{
+ struct timeval timeout={timeout_us/(1000*1000), timeout_us%(1000*1000)};
+ event_del(rpc->timeout_ev);
+ event_add(rpc->timeout_ev, &timeout);
+ return;
+}
long long swarmkv_rpc_complete(struct swarmkv_rpc_mgr *mgr, int thread_id, long long sequence, void *reply)
{
struct swarmkv_rpc *rpc=NULL;
diff --git a/src/swarmkv_rpc.h b/src/swarmkv_rpc.h
index e2e5d4a..ebe9cbc 100644
--- a/src/swarmkv_rpc.h
+++ b/src/swarmkv_rpc.h
@@ -5,10 +5,14 @@
#include "future_promise.h"
struct swarmkv_rpc_mgr;
-struct swarmkv_rpc_mgr *swarmkv_rpc_mgr_new(const struct swarmkv_options *opts, struct event_base *evbases[], int nr_worker_threads);
+struct swarmkv_rpc;
+struct swarmkv_rpc_mgr *swarmkv_rpc_mgr_new(struct event_base *evbases[], int nr_threads, long long default_timeout_us);
void swarmkv_rpc_mgr_free(struct swarmkv_rpc_mgr *mgr);
//Return a sequence number, which can be used to complete the request
-long long swarmkv_rpc_launch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct future *f);
+struct swarmkv_rpc *swarmkv_rpc_launch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct future *f);
+long long swarmkv_rpc_get_sequence(const struct swarmkv_rpc *rpc);
+void swarmkv_rpc_set_peer(struct swarmkv_rpc *rpc, const node_t *peer);
+void swarmkv_rpc_set_timeout(struct swarmkv_rpc *rpc, long long timeout_us);
//Return latency in microseconds, or -1 if failed to complete the rpc.
long long swarmkv_rpc_complete(struct swarmkv_rpc_mgr *mgr, int thread_id, long long sequence, void *response);
long long swarmkv_rpc_mgr_count(struct swarmkv_rpc_mgr *mgr, int thread_id);