diff options
| author | Zheng Chao <[email protected]> | 2023-09-18 21:43:53 +0800 |
|---|---|---|
| committer | Zheng Chao <[email protected]> | 2023-09-18 21:43:53 +0800 |
| commit | c3c69f12af1d6413e66f1553549f524746f23de0 (patch) | |
| tree | c1f17d2ce35abd22b94b5454a35f889af3323cbd /src | |
| parent | 0e520dfebcf49d8ebffd98070a7079c78acf22c0 (diff) | |
Bugfix: Reset struct xx_info to avoid inaccurate statistics.
Diffstat (limited to 'src')
| -rw-r--r-- | src/swarmkv.c | 14 | ||||
| -rw-r--r-- | src/swarmkv_common.c | 11 | ||||
| -rw-r--r-- | src/swarmkv_net.c | 17 | ||||
| -rw-r--r-- | src/swarmkv_rpc.c | 38 | ||||
| -rw-r--r-- | src/swarmkv_rpc.h | 8 |
5 files changed, 70 insertions, 18 deletions
diff --git a/src/swarmkv.c b/src/swarmkv.c index bca8a82..61761a6 100644 --- a/src/swarmkv.c +++ b/src/swarmkv.c @@ -295,7 +295,7 @@ enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct sw rpc_info.timed_out_rpcs, rpc_info.unknown_sequence, mesh_info.queued_msgs, - mesh_info.drop_msgs, + mesh_info.enqueue_drops, server_time_us, now_monotonic.tv_sec-db->boot_time.tv_sec, (now_monotonic.tv_sec-db->boot_time.tv_sec)/(3600*24) @@ -868,6 +868,7 @@ void __on_msg_callback(struct swarmkv_msg *msg, void *arg) } } } +#define INTER_THREAD_RPC_TIMEOUT_AHEAD 1000 void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swarmkv_cmd *cmd, struct future *future_of_caller) { struct swarmkv_cmd_spec *spec=NULL; @@ -899,7 +900,8 @@ void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swar if(!node_is_empty(target_node) && node_compare(&db->self, target_node)) { //cmd is executed in target node's on_msg_callback - long long sequence=swarmkv_rpc_launch(db->rpc_mgr, cur_tid, future_of_caller); + struct swarmkv_rpc *rpc=swarmkv_rpc_launch(db->rpc_mgr, cur_tid, future_of_caller); + long long sequence=swarmkv_rpc_get_sequence(rpc); msg=swarmkv_msg_new_by_cmd(cmd, &db->self, cur_tid, target_node, sequence); int ret=0; if(cur_tid >= db->opts->nr_worker_threads) @@ -910,6 +912,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swar { reply=swarmkv_reply_new_error(error_thread_rpc_buffer_full); } + swarmkv_rpc_set_timeout(rpc, db->opts->cluster_timeout_us+INTER_THREAD_RPC_TIMEOUT_AHEAD); } else { @@ -919,6 +922,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swar { reply=swarmkv_reply_new_error(error_network_error, node_addr2sds(target_node), err_str); } + swarmkv_rpc_set_peer(rpc, target_node); } if(reply) { @@ -930,7 +934,9 @@ void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swar if(cur_tid != target_tid) { //cmd will be executed in target thread's on_msg_callback - long long sequence=swarmkv_rpc_launch(db->rpc_mgr, cur_tid, future_of_caller); + struct swarmkv_rpc *rpc=swarmkv_rpc_launch(db->rpc_mgr, cur_tid, future_of_caller); + swarmkv_rpc_set_timeout(rpc, db->opts->cluster_timeout_us+INTER_THREAD_RPC_TIMEOUT_AHEAD); + long long sequence=swarmkv_rpc_get_sequence(rpc); msg=swarmkv_msg_new_by_cmd(cmd, &db->self, cur_tid, target_node, sequence); int ret=0; ret=swarmkv_mesh_send(db->mesh, cur_tid, target_tid, msg); @@ -1409,7 +1415,7 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name, db->mod_monitor=swarmkv_monitor_new(db->opts); - db->rpc_mgr=swarmkv_rpc_mgr_new(db->opts, db->ref_evbases, opts->total_threads); + db->rpc_mgr=swarmkv_rpc_mgr_new(db->ref_evbases, opts->total_threads, opts->cluster_timeout_us); db->mesh=swarmkv_mesh_new(db->ref_evbases, opts->total_threads, db->logger); swarmkv_mesh_set_on_msg_cb(db->mesh, __on_msg_callback, db); diff --git a/src/swarmkv_common.c b/src/swarmkv_common.c index b277785..649a5dd 100644 --- a/src/swarmkv_common.c +++ b/src/swarmkv_common.c @@ -488,10 +488,17 @@ void swarmkv_reply_merge_array(struct swarmkv_reply **dst, struct swarmkv_reply { *dst=swarmkv_reply_new_array(0); } - if((*dst)->type!=SWARMKV_REPLY_ARRAY || src->type!=SWARMKV_REPLY_ARRAY) + if((*dst)->type!=SWARMKV_REPLY_ARRAY) { return; } + if(src->type!=SWARMKV_REPLY_ARRAY) + { + (*dst)->elements=(struct swarmkv_reply **)realloc((*dst)->elements, ((*dst)->n_element+1)*sizeof(struct swarmkv_reply*)); + (*dst)->elements[(*dst)->n_element]=src; + (*dst)->n_element+=1; + return; + } (*dst)->elements=(struct swarmkv_reply **)realloc((*dst)->elements, ((*dst)->n_element+src->n_element)*sizeof(struct swarmkv_reply*)); size_t i=0; for(i=0; i<src->n_element; i++) @@ -500,7 +507,7 @@ void swarmkv_reply_merge_array(struct swarmkv_reply **dst, struct swarmkv_reply src->elements[i]=NULL; } (*dst)->n_element+=src->n_element; - src->n_element=0; + src->n_element=0;//Only free the parent strutcture. swarmkv_reply_free(src); return; } diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c index 37bb445..031414c 100644 --- a/src/swarmkv_net.c +++ b/src/swarmkv_net.c @@ -40,6 +40,7 @@ struct snet_conn node_t connected_from; int thread_id; int need_to_be_kill; + long long ts_ecr; enum receive_state recv_state; const struct swarmkv_msg *receiving_hdr;//pointer of receiving buffer int is_in_conn_table; @@ -96,9 +97,8 @@ struct swarmkv_net }; static void peer_conn_read_cb(struct bufferevent *bev, void *arg); -void connect_peer_eventcb(struct bufferevent *bev, short events, void *arg); +void peer_connected_event_cb(struct bufferevent *bev, short events, void *arg); static void peer_conn_write_cb(struct bufferevent *bev, void *arg); -static void peer_conn_read_cb(struct bufferevent *bev, void *arg); static void peer_conn_event_cb(struct bufferevent *bev, short events, void *arg); struct snet_conn *snet_conn_new_by_connecting(struct snet_thread *thr, const node_t *dest) @@ -111,7 +111,7 @@ struct snet_conn *snet_conn_new_by_connecting(struct snet_thread *thr, const nod //http://www.wangafu.net/~nickm/libevent-book/Ref6_bufferevent.html conn->bev=bufferevent_socket_new(base, -1, BEV_OPT_DEFER_CALLBACKS|BEV_OPT_THREADSAFE);//BEV_OPT_UNLOCK_CALLBACKS| - bufferevent_setcb(conn->bev, NULL, NULL, connect_peer_eventcb, conn); + bufferevent_setcb(conn->bev, NULL, NULL, peer_connected_event_cb, conn); node_copy(&conn->peer_listen_addr, dest); conn->thread_id=thr->thread_id; conn->ref_thr=thr; @@ -318,8 +318,15 @@ static void peer_conn_read_cb(struct bufferevent *bev, void *arg) snet_conn_set_peer(conn, &hdr->caller); snet_conn_table_add(thr, conn); } + conn->ts_ecr=hdr->ts_val; conn->recv_state=RECEIVING_PAYLOAD; conn->receiving_hdr=(struct swarmkv_msg *)recv_buff; + long long now_us=ustime(); + if(hdr->ts_ecr && now_us > hdr->ts_ecr) + { + //If now_us < hdr->ts_ecr, may be manually time adjustment + swarmkv_monitor_record_peer(thr->mod_monitor, &conn->peer_listen_addr, now_us-hdr->ts_ecr, thr->thread_id); + } } if(conn->recv_state==RECEIVING_PAYLOAD) { @@ -362,7 +369,7 @@ static void peer_conn_event_cb(struct bufferevent *bev, short events, void *arg) snet_conn_free(conn); return; } -void connect_peer_eventcb(struct bufferevent *bev, short events, void *arg) +void peer_connected_event_cb(struct bufferevent *bev, short events, void *arg) { struct snet_conn* conn= (struct snet_conn*) arg; struct snet_thread* thr=conn->ref_thr; @@ -616,6 +623,8 @@ int swarmkv_net_send(struct swarmkv_net *net, const node_t *dest, struct swarmkv } char *data=NULL; size_t size=0; + msg->ts_val=ustime(); + msg->ts_ecr=conn->ts_ecr; swarmkv_msg_serialize(msg, &data, &size); swarmkv_msg_free(msg); struct evbuffer* output_buffer=NULL; diff --git a/src/swarmkv_rpc.c b/src/swarmkv_rpc.c index ebae1f4..55d3622 100644 --- a/src/swarmkv_rpc.c +++ b/src/swarmkv_rpc.c @@ -14,6 +14,7 @@ struct swarmkv_rpc struct event *timeout_ev; struct future * f; struct timespec start; + node_t target_peer; //For inter-node rpc int thread_id; struct swarmkv_rpc_mgr *ref_mgr; UT_hash_handle hh; @@ -28,7 +29,7 @@ struct swarmkv_rpc_mgr { long long seq_generator; int nr_worker_threads; - unsigned int timeout_us; + unsigned int default_timeout_us; struct swarmkv_rpc_thr *threads; //stats @@ -43,15 +44,15 @@ void swarmkv_rpc_free(struct swarmkv_rpc *rpc) HASH_DELETE(hh, thr->rpc_table, rpc); free(rpc); } -struct swarmkv_rpc_mgr *swarmkv_rpc_mgr_new(const struct swarmkv_options *opts, struct event_base *evbases[], int nr_threads) +struct swarmkv_rpc_mgr *swarmkv_rpc_mgr_new(struct event_base *evbases[], int nr_threads, long long default_timeout_us) { struct swarmkv_rpc_mgr *mgr=ALLOC(struct swarmkv_rpc_mgr, 1); mgr->seq_generator=0; mgr->nr_worker_threads=nr_threads; - mgr->timeout_us=opts->cluster_timeout_us; + mgr->default_timeout_us=default_timeout_us; mgr->threads=ALLOC(struct swarmkv_rpc_thr, nr_threads); - struct timeval duration={mgr->timeout_us/(1000*1000), mgr->timeout_us%(1000*1000)}; + struct timeval duration={mgr->default_timeout_us/(1000*1000), mgr->default_timeout_us%(1000*1000)}; struct swarmkv_rpc_thr *thr=NULL; for(int i=0; i<nr_threads; i++) { @@ -86,11 +87,20 @@ static void rpc_timeout_callback(evutil_socket_t fd, short events, void *arg) struct swarmkv_rpc *rpc=(struct swarmkv_rpc *)arg; rpc->ref_mgr->timed_out_rpcs++; struct promise *p=future_to_promise(rpc->f); - promise_failed(p, FUTURE_ERROR_TIMEOUT, "rpc timed out"); + char error_str[128]; + if(!node_is_empty(&rpc->target_peer)) + { + snprintf(error_str, sizeof(error_str), "peer %s timed out", rpc->target_peer.addr); + } + else + { + snprintf(error_str, sizeof(error_str), "inter-thread rpc timed out"); + } + promise_failed(p, FUTURE_ERROR_TIMEOUT, error_str); swarmkv_rpc_free(rpc); } -long long swarmkv_rpc_launch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct future *f) +struct swarmkv_rpc *swarmkv_rpc_launch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct future *f) { struct swarmkv_rpc_thr *thr=mgr->threads+thread_id; struct swarmkv_rpc *rpc=ALLOC(struct swarmkv_rpc, 1); @@ -104,8 +114,24 @@ long long swarmkv_rpc_launch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct rpc->ref_mgr=mgr; rpc->f=f; HASH_ADD(hh, thr->rpc_table, sequence, sizeof(rpc->sequence), rpc); + return rpc; +} +long long swarmkv_rpc_get_sequence(const struct swarmkv_rpc *rpc) +{ return rpc->sequence; } +void swarmkv_rpc_set_peer(struct swarmkv_rpc *rpc, const node_t *peer) +{ + node_copy(&rpc->target_peer, peer); + return; +} +void swarmkv_rpc_set_timeout(struct swarmkv_rpc *rpc, long long timeout_us) +{ + struct timeval timeout={timeout_us/(1000*1000), timeout_us%(1000*1000)}; + event_del(rpc->timeout_ev); + event_add(rpc->timeout_ev, &timeout); + return; +} long long swarmkv_rpc_complete(struct swarmkv_rpc_mgr *mgr, int thread_id, long long sequence, void *reply) { struct swarmkv_rpc *rpc=NULL; diff --git a/src/swarmkv_rpc.h b/src/swarmkv_rpc.h index e2e5d4a..ebe9cbc 100644 --- a/src/swarmkv_rpc.h +++ b/src/swarmkv_rpc.h @@ -5,10 +5,14 @@ #include "future_promise.h" struct swarmkv_rpc_mgr; -struct swarmkv_rpc_mgr *swarmkv_rpc_mgr_new(const struct swarmkv_options *opts, struct event_base *evbases[], int nr_worker_threads); +struct swarmkv_rpc; +struct swarmkv_rpc_mgr *swarmkv_rpc_mgr_new(struct event_base *evbases[], int nr_threads, long long default_timeout_us); void swarmkv_rpc_mgr_free(struct swarmkv_rpc_mgr *mgr); //Return a sequence number, which can be used to complete the request -long long swarmkv_rpc_launch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct future *f); +struct swarmkv_rpc *swarmkv_rpc_launch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct future *f); +long long swarmkv_rpc_get_sequence(const struct swarmkv_rpc *rpc); +void swarmkv_rpc_set_peer(struct swarmkv_rpc *rpc, const node_t *peer); +void swarmkv_rpc_set_timeout(struct swarmkv_rpc *rpc, long long timeout_us); //Return latency in microseconds, or -1 if failed to complete the rpc. long long swarmkv_rpc_complete(struct swarmkv_rpc_mgr *mgr, int thread_id, long long sequence, void *response); long long swarmkv_rpc_mgr_count(struct swarmkv_rpc_mgr *mgr, int thread_id); |
