diff options
| author | Zheng Chao <[email protected]> | 2022-12-23 18:20:05 +0800 |
|---|---|---|
| committer | Zheng Chao <[email protected]> | 2022-12-23 18:20:05 +0800 |
| commit | 465c48f7c14d786df0a3d893eac98380ca85a69c (patch) | |
| tree | 62e95ec5a6025db5f5f4fd1291adb3505cab8c96 /src | |
| parent | 1ca15f1c666a27f1e63c346656b2a79391806469 (diff) | |
For avoiding key route entry oscillation, the keyspace ignores node crash.
Diffstat (limited to 'src')
| -rw-r--r-- | src/inc_internal/swarmkv_store.h | 2 | ||||
| -rw-r--r-- | src/swarmkv_keyspace.c | 1 | ||||
| -rw-r--r-- | src/swarmkv_net.c | 20 | ||||
| -rw-r--r-- | src/swarmkv_store.c | 17 | ||||
| -rw-r--r-- | src/t_token_bucket.c | 12 |
5 files changed, 42 insertions, 10 deletions
diff --git a/src/inc_internal/swarmkv_store.h b/src/inc_internal/swarmkv_store.h index e91a25f..1881226 100644 --- a/src/inc_internal/swarmkv_store.h +++ b/src/inc_internal/swarmkv_store.h @@ -55,7 +55,7 @@ struct store_info void swarmkv_store_info(struct swarmkv_module *module, struct store_info *info); void sobj_need_sync(struct swarmkv_module *mod_store, struct sobj *obj); - +int sobj_get_random_replica(struct sobj *obj, node_t *out); void store_get_uuid(struct swarmkv_module* mod_store, uuid_t uuid); struct sobj *store_lookup(struct swarmkv_module* mod_store, sds key); diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c index 98ded54..0d23fdf 100644 --- a/src/swarmkv_keyspace.c +++ b/src/swarmkv_keyspace.c @@ -1029,6 +1029,7 @@ void remove_failed_nodes_from_key_route_table(struct swarmkv_keyspace *ks, node_ struct slot_runtime *slot_rt=NULL; struct replica_node *hash_health_node=NULL, *node=NULL, *found=NULL, *tmp_node=NULL; size_t n_modified_key=0, n_removed_replica=0; + return; for(size_t i=0; i<n_node; i++) { node=ALLOC(struct replica_node, 1); diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c index 0a913f8..728afe4 100644 --- a/src/swarmkv_net.c +++ b/src/swarmkv_net.c @@ -180,8 +180,9 @@ void snet_rpc_timeout_callback(evutil_socket_t fd, short events, void *arg) rpc->cb(&rpc->peer, NULL, 0, rpc->cb_arg); snet_rpc_free(rpc); } -void snet_rpc_process_peer_event(struct snet_thread *thr, const node_t *peer) +int snet_rpc_process_peer_event(struct snet_thread *thr, const node_t *peer) { + int n_canceled=0; struct snet_rpc *rpc=NULL, *tmp_rpc=NULL; HASH_ITER(hh, thr->rpc_table, rpc, tmp_rpc) { @@ -189,8 +190,10 @@ void snet_rpc_process_peer_event(struct snet_thread *thr, const node_t *peer) { rpc->cb(&rpc->peer, NULL, 0, rpc->cb_arg); snet_rpc_free(rpc); + n_canceled++; } } + return n_canceled; } struct snet_rpc *snet_rpc_lookup(struct snet_thread *thr, unsigned long long sequence) { @@ -597,15 +600,16 @@ static void peer_conn_event_cb(struct bufferevent *bev, short events, void *arg) struct snet_thread *thr=conn->ref_thr; int err = EVUTIL_SOCKET_ERROR(); - + int canceled_rpc=snet_rpc_process_peer_event(thr, &conn->peer_listen_addr); if (events & BEV_EVENT_ERROR || events & BEV_EVENT_EOF) { - log_debug(thr->ref_logger, MODULE_SWAMRKV_NET, "%s connection error, %d (%s).", + log_debug(thr->ref_logger, MODULE_SWAMRKV_NET, "%s connection error, %d (%s). %d RPCs are canceled.", conn->connected_from.addr, - err, evutil_socket_error_to_string(err)); + err, evutil_socket_error_to_string(err), + canceled_rpc); //swarmkv-cli's connection has listen port number > 10000. } - snet_rpc_process_peer_event(thr, &conn->peer_listen_addr); + snet_conn_table_remove(conn); snet_conn_free(conn); return; @@ -630,9 +634,11 @@ void connect_peer_eventcb(struct bufferevent *bev, short events, void *arg) else if (events & BEV_EVENT_ERROR) { int err = EVUTIL_SOCKET_ERROR(); - log_fatal(thr->ref_logger, MODULE_SWAMRKV_NET, "connect to %s failed %d (%s).", + int canceled_rpc=snet_rpc_process_peer_event(thr, &conn->peer_listen_addr); + log_fatal(thr->ref_logger, MODULE_SWAMRKV_NET, "connect to %s failed %d (%s). %d RPCs are canceled.", conn->peer_listen_addr.addr, - err, evutil_socket_error_to_string(err)); + err, evutil_socket_error_to_string(err), + canceled_rpc); snet_conn_table_remove(conn); snet_conn_free(conn); } diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c index fa6804b..bc0b89d 100644 --- a/src/swarmkv_store.c +++ b/src/swarmkv_store.c @@ -242,12 +242,12 @@ struct scontainer *store_lookup_scontainer(struct swarmkv_store *store, sds key) return NULL; } } -struct sobj *store_lookup(struct swarmkv_module* mod_store, sds key) +struct sobj *store_lookup(struct swarmkv_module *mod_store, sds key) { struct swarmkv_store *store=module2store(mod_store); struct scontainer *ctr=NULL; ctr=store_lookup_scontainer(store, key); - if(ctr && !ctr->is_pending) + if(ctr) //&& !ctr->is_pending { return &(ctr->obj); } @@ -275,6 +275,14 @@ void sobj_need_sync(struct swarmkv_module *mod_store, struct sobj *obj) } return; } +int sobj_get_random_replica(struct sobj *obj, node_t *out) +{ + struct scontainer *ctr=container_of(obj, struct scontainer, obj); + if(!ctr->replica_node_list) return 0; + node_t *replica=(node_t*)utarray_eltptr(ctr->replica_node_list, 0); + node_copy(out, replica); + return 1; +} void scontainer_serialize(struct scontainer *ctr, char **blob, size_t *blob_sz) { char *value_blob=NULL; @@ -898,6 +906,11 @@ enum cmd_exec_result crdt_rlist_command(struct swarmkv_module *mod_store, const *reply=swarmkv_reply_new_nil(); return FINISHED; } + if(!ctr->replica_node_list) + { + *reply=swarmkv_reply_new_array(0); + return FINISHED; + } *reply=swarmkv_reply_new_array(utarray_len(ctr->replica_node_list)); for(size_t i=0; i< (*reply)->n_element; i++) { diff --git a/src/t_token_bucket.c b/src/t_token_bucket.c index a665a2a..8e3a2b5 100644 --- a/src/t_token_bucket.c +++ b/src/t_token_bucket.c @@ -107,6 +107,18 @@ enum cmd_exec_result tconsume_command(struct swarmkv_module *mod_store, const st { return NEED_KEY_ROUTE; } + if(obj->type==OBJ_TYPE_UNDEFINED) + { + node_t replica; + int ret=0; + ret=sobj_get_random_replica(obj, &replica); + if(ret) + { + *reply=swarmkv_reply_new_node(&replica, 1); + return REDIRECT; + } + return NEED_KEY_ROUTE; + } if(obj->type!=OBJ_TYPE_TOKEN_BUCKET) { *reply=swarmkv_reply_new_error(error_wrong_type); |
