diff options
| author | Zheng Chao <[email protected]> | 2022-12-23 18:20:05 +0800 |
|---|---|---|
| committer | Zheng Chao <[email protected]> | 2022-12-23 18:20:05 +0800 |
| commit | 465c48f7c14d786df0a3d893eac98380ca85a69c (patch) | |
| tree | 62e95ec5a6025db5f5f4fd1291adb3505cab8c96 | |
| parent | 1ca15f1c666a27f1e63c346656b2a79391806469 (diff) | |
For avoiding key route entry oscillation, the keyspace ignores node crash.
| -rw-r--r-- | src/inc_internal/swarmkv_store.h | 2 | ||||
| -rw-r--r-- | src/swarmkv_keyspace.c | 1 | ||||
| -rw-r--r-- | src/swarmkv_net.c | 20 | ||||
| -rw-r--r-- | src/swarmkv_store.c | 17 | ||||
| -rw-r--r-- | src/t_token_bucket.c | 12 | ||||
| -rw-r--r-- | test/swarmkv_perf_test.cpp | 25 |
6 files changed, 53 insertions, 24 deletions
diff --git a/src/inc_internal/swarmkv_store.h b/src/inc_internal/swarmkv_store.h index e91a25f..1881226 100644 --- a/src/inc_internal/swarmkv_store.h +++ b/src/inc_internal/swarmkv_store.h @@ -55,7 +55,7 @@ struct store_info void swarmkv_store_info(struct swarmkv_module *module, struct store_info *info); void sobj_need_sync(struct swarmkv_module *mod_store, struct sobj *obj); - +int sobj_get_random_replica(struct sobj *obj, node_t *out); void store_get_uuid(struct swarmkv_module* mod_store, uuid_t uuid); struct sobj *store_lookup(struct swarmkv_module* mod_store, sds key); diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c index 98ded54..0d23fdf 100644 --- a/src/swarmkv_keyspace.c +++ b/src/swarmkv_keyspace.c @@ -1029,6 +1029,7 @@ void remove_failed_nodes_from_key_route_table(struct swarmkv_keyspace *ks, node_ struct slot_runtime *slot_rt=NULL; struct replica_node *hash_health_node=NULL, *node=NULL, *found=NULL, *tmp_node=NULL; size_t n_modified_key=0, n_removed_replica=0; + return; for(size_t i=0; i<n_node; i++) { node=ALLOC(struct replica_node, 1); diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c index 0a913f8..728afe4 100644 --- a/src/swarmkv_net.c +++ b/src/swarmkv_net.c @@ -180,8 +180,9 @@ void snet_rpc_timeout_callback(evutil_socket_t fd, short events, void *arg) rpc->cb(&rpc->peer, NULL, 0, rpc->cb_arg); snet_rpc_free(rpc); } -void snet_rpc_process_peer_event(struct snet_thread *thr, const node_t *peer) +int snet_rpc_process_peer_event(struct snet_thread *thr, const node_t *peer) { + int n_canceled=0; struct snet_rpc *rpc=NULL, *tmp_rpc=NULL; HASH_ITER(hh, thr->rpc_table, rpc, tmp_rpc) { @@ -189,8 +190,10 @@ void snet_rpc_process_peer_event(struct snet_thread *thr, const node_t *peer) { rpc->cb(&rpc->peer, NULL, 0, rpc->cb_arg); snet_rpc_free(rpc); + n_canceled++; } } + return n_canceled; } struct snet_rpc *snet_rpc_lookup(struct snet_thread *thr, unsigned long long sequence) { @@ -597,15 +600,16 @@ static void peer_conn_event_cb(struct bufferevent *bev, short events, void *arg) struct snet_thread *thr=conn->ref_thr; int err = EVUTIL_SOCKET_ERROR(); - + int canceled_rpc=snet_rpc_process_peer_event(thr, &conn->peer_listen_addr); if (events & BEV_EVENT_ERROR || events & BEV_EVENT_EOF) { - log_debug(thr->ref_logger, MODULE_SWAMRKV_NET, "%s connection error, %d (%s).", + log_debug(thr->ref_logger, MODULE_SWAMRKV_NET, "%s connection error, %d (%s). %d RPCs are canceled.", conn->connected_from.addr, - err, evutil_socket_error_to_string(err)); + err, evutil_socket_error_to_string(err), + canceled_rpc); //swarmkv-cli's connection has listen port number > 10000. } - snet_rpc_process_peer_event(thr, &conn->peer_listen_addr); + snet_conn_table_remove(conn); snet_conn_free(conn); return; @@ -630,9 +634,11 @@ void connect_peer_eventcb(struct bufferevent *bev, short events, void *arg) else if (events & BEV_EVENT_ERROR) { int err = EVUTIL_SOCKET_ERROR(); - log_fatal(thr->ref_logger, MODULE_SWAMRKV_NET, "connect to %s failed %d (%s).", + int canceled_rpc=snet_rpc_process_peer_event(thr, &conn->peer_listen_addr); + log_fatal(thr->ref_logger, MODULE_SWAMRKV_NET, "connect to %s failed %d (%s). %d RPCs are canceled.", conn->peer_listen_addr.addr, - err, evutil_socket_error_to_string(err)); + err, evutil_socket_error_to_string(err), + canceled_rpc); snet_conn_table_remove(conn); snet_conn_free(conn); } diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c index fa6804b..bc0b89d 100644 --- a/src/swarmkv_store.c +++ b/src/swarmkv_store.c @@ -242,12 +242,12 @@ struct scontainer *store_lookup_scontainer(struct swarmkv_store *store, sds key) return NULL; } } -struct sobj *store_lookup(struct swarmkv_module* mod_store, sds key) +struct sobj *store_lookup(struct swarmkv_module *mod_store, sds key) { struct swarmkv_store *store=module2store(mod_store); struct scontainer *ctr=NULL; ctr=store_lookup_scontainer(store, key); - if(ctr && !ctr->is_pending) + if(ctr) //&& !ctr->is_pending { return &(ctr->obj); } @@ -275,6 +275,14 @@ void sobj_need_sync(struct swarmkv_module *mod_store, struct sobj *obj) } return; } +int sobj_get_random_replica(struct sobj *obj, node_t *out) +{ + struct scontainer *ctr=container_of(obj, struct scontainer, obj); + if(!ctr->replica_node_list) return 0; + node_t *replica=(node_t*)utarray_eltptr(ctr->replica_node_list, 0); + node_copy(out, replica); + return 1; +} void scontainer_serialize(struct scontainer *ctr, char **blob, size_t *blob_sz) { char *value_blob=NULL; @@ -898,6 +906,11 @@ enum cmd_exec_result crdt_rlist_command(struct swarmkv_module *mod_store, const *reply=swarmkv_reply_new_nil(); return FINISHED; } + if(!ctr->replica_node_list) + { + *reply=swarmkv_reply_new_array(0); + return FINISHED; + } *reply=swarmkv_reply_new_array(utarray_len(ctr->replica_node_list)); for(size_t i=0; i< (*reply)->n_element; i++) { diff --git a/src/t_token_bucket.c b/src/t_token_bucket.c index a665a2a..8e3a2b5 100644 --- a/src/t_token_bucket.c +++ b/src/t_token_bucket.c @@ -107,6 +107,18 @@ enum cmd_exec_result tconsume_command(struct swarmkv_module *mod_store, const st { return NEED_KEY_ROUTE; } + if(obj->type==OBJ_TYPE_UNDEFINED) + { + node_t replica; + int ret=0; + ret=sobj_get_random_replica(obj, &replica); + if(ret) + { + *reply=swarmkv_reply_new_node(&replica, 1); + return REDIRECT; + } + return NEED_KEY_ROUTE; + } if(obj->type!=OBJ_TYPE_TOKEN_BUCKET) { *reply=swarmkv_reply_new_error(error_wrong_type); diff --git a/test/swarmkv_perf_test.cpp b/test/swarmkv_perf_test.cpp index e564cb2..17b872a 100644 --- a/test/swarmkv_perf_test.cpp +++ b/test/swarmkv_perf_test.cpp @@ -307,10 +307,12 @@ void *background_tconsume_thread(void *thread_arg) { snprintf(key, sizeof(key), "tb-%zu", (round+start)%g_token_bucket_number); cmd_exec_arg_expect_integer(arg, tokens); + arg->print_reply_on_fail=1; swarmkv_tconsume(db, key, strlen(key), tokens, cmd_exec_generic_callback, arg); ret=cmd_exec_arg_wait(arg, PERF_TEST_EXEC_TO_MS); cmd_exec_arg_clear(arg); if(ret==1) got_token_cnt++; + else printf("tconsume %s failed\n", key); round++; } cmd_exec_arg_free(arg); @@ -383,7 +385,7 @@ TEST(Performance, Sync) } } - sleep(10); + sleep(20); g_tconsume_running_flag=0; int *successful_backgroud_running_thread=NULL; for(size_t i=0; i<NODE_NUMBER; i++) @@ -409,13 +411,13 @@ TEST(Performance, Sync) } TEST(Resilience, Failover) { - size_t NODE_NUMBER=3; + size_t NODE_NUMBER=4; struct swarmkv *db[NODE_NUMBER]; char *err=NULL; unsigned int p2p_port_start=11310, health_port_start=12310; char node_list_str[1024]={0}; - for(size_t i=0; i<NODE_NUMBER; i++) + for(size_t i=1; i<NODE_NUMBER; i++) { snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%zu ", p2p_port_start+i); } @@ -436,38 +438,33 @@ TEST(Resilience, Failover) } ASSERT_TRUE(err==NULL); } - size_t KEY_NUMBER=8; + size_t KEY_NUMBER=1024; struct swarmkv_reply *reply=NULL; for(size_t i=0; i<KEY_NUMBER; i++) { - reply=swarmkv_command(db[i%NODE_NUMBER], "SET fo-%zu %zu", i, i); - swarmkv_reply_free(reply); - } - for(size_t i=0; i<KEY_NUMBER; i++) - { for(size_t j=0; j<NODE_NUMBER; j++) { - reply=swarmkv_command(db[j], "GET fo-%zu", i); + reply=swarmkv_command(db[j], "SET fo-%zu node-%zu", i, j); swarmkv_reply_free(reply); } } swarmkv_close(db[0]); db[0]=NULL; - char target[32]; - sleep(10); for(size_t i=0; i<KEY_NUMBER; i++) { for(size_t j=1; j<NODE_NUMBER; j++) { - reply=swarmkv_command(db[j], "SET fo-%zu %zu", i, i+1); + reply=swarmkv_command(db[j], "SET fo-%zu node-%zu", i, j); swarmkv_reply_free(reply); } } + char target[32]; + sleep(10); for(size_t i=0; i<KEY_NUMBER; i++) { reply=swarmkv_command(db[1], "KEYSPACE RLIST fo-%zu", i); EXPECT_EQ(reply->type, SWARMKV_REPLY_ARRAY); - EXPECT_EQ(reply->n_element, NODE_NUMBER-1); + EXPECT_EQ(reply->n_element, NODE_NUMBER); swarmkv_reply_free(reply); for(size_t j=1; j<NODE_NUMBER; j++) { |
