summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZheng Chao <[email protected]>2022-12-23 18:20:05 +0800
committerZheng Chao <[email protected]>2022-12-23 18:20:05 +0800
commit465c48f7c14d786df0a3d893eac98380ca85a69c (patch)
tree62e95ec5a6025db5f5f4fd1291adb3505cab8c96
parent1ca15f1c666a27f1e63c346656b2a79391806469 (diff)
For avoiding key route entry oscillation, the keyspace ignores node crash.
-rw-r--r--src/inc_internal/swarmkv_store.h2
-rw-r--r--src/swarmkv_keyspace.c1
-rw-r--r--src/swarmkv_net.c20
-rw-r--r--src/swarmkv_store.c17
-rw-r--r--src/t_token_bucket.c12
-rw-r--r--test/swarmkv_perf_test.cpp25
6 files changed, 53 insertions, 24 deletions
diff --git a/src/inc_internal/swarmkv_store.h b/src/inc_internal/swarmkv_store.h
index e91a25f..1881226 100644
--- a/src/inc_internal/swarmkv_store.h
+++ b/src/inc_internal/swarmkv_store.h
@@ -55,7 +55,7 @@ struct store_info
void swarmkv_store_info(struct swarmkv_module *module, struct store_info *info);
void sobj_need_sync(struct swarmkv_module *mod_store, struct sobj *obj);
-
+int sobj_get_random_replica(struct sobj *obj, node_t *out);
void store_get_uuid(struct swarmkv_module* mod_store, uuid_t uuid);
struct sobj *store_lookup(struct swarmkv_module* mod_store, sds key);
diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c
index 98ded54..0d23fdf 100644
--- a/src/swarmkv_keyspace.c
+++ b/src/swarmkv_keyspace.c
@@ -1029,6 +1029,7 @@ void remove_failed_nodes_from_key_route_table(struct swarmkv_keyspace *ks, node_
struct slot_runtime *slot_rt=NULL;
struct replica_node *hash_health_node=NULL, *node=NULL, *found=NULL, *tmp_node=NULL;
size_t n_modified_key=0, n_removed_replica=0;
+ return;
for(size_t i=0; i<n_node; i++)
{
node=ALLOC(struct replica_node, 1);
diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c
index 0a913f8..728afe4 100644
--- a/src/swarmkv_net.c
+++ b/src/swarmkv_net.c
@@ -180,8 +180,9 @@ void snet_rpc_timeout_callback(evutil_socket_t fd, short events, void *arg)
rpc->cb(&rpc->peer, NULL, 0, rpc->cb_arg);
snet_rpc_free(rpc);
}
-void snet_rpc_process_peer_event(struct snet_thread *thr, const node_t *peer)
+int snet_rpc_process_peer_event(struct snet_thread *thr, const node_t *peer)
{
+ int n_canceled=0;
struct snet_rpc *rpc=NULL, *tmp_rpc=NULL;
HASH_ITER(hh, thr->rpc_table, rpc, tmp_rpc)
{
@@ -189,8 +190,10 @@ void snet_rpc_process_peer_event(struct snet_thread *thr, const node_t *peer)
{
rpc->cb(&rpc->peer, NULL, 0, rpc->cb_arg);
snet_rpc_free(rpc);
+ n_canceled++;
}
}
+ return n_canceled;
}
struct snet_rpc *snet_rpc_lookup(struct snet_thread *thr, unsigned long long sequence)
{
@@ -597,15 +600,16 @@ static void peer_conn_event_cb(struct bufferevent *bev, short events, void *arg)
struct snet_thread *thr=conn->ref_thr;
int err = EVUTIL_SOCKET_ERROR();
-
+ int canceled_rpc=snet_rpc_process_peer_event(thr, &conn->peer_listen_addr);
if (events & BEV_EVENT_ERROR || events & BEV_EVENT_EOF)
{
- log_debug(thr->ref_logger, MODULE_SWAMRKV_NET, "%s connection error, %d (%s).",
+ log_debug(thr->ref_logger, MODULE_SWAMRKV_NET, "%s connection error, %d (%s). %d RPCs are canceled.",
conn->connected_from.addr,
- err, evutil_socket_error_to_string(err));
+ err, evutil_socket_error_to_string(err),
+ canceled_rpc);
//swarmkv-cli's connection has listen port number > 10000.
}
- snet_rpc_process_peer_event(thr, &conn->peer_listen_addr);
+
snet_conn_table_remove(conn);
snet_conn_free(conn);
return;
@@ -630,9 +634,11 @@ void connect_peer_eventcb(struct bufferevent *bev, short events, void *arg)
else if (events & BEV_EVENT_ERROR)
{
int err = EVUTIL_SOCKET_ERROR();
- log_fatal(thr->ref_logger, MODULE_SWAMRKV_NET, "connect to %s failed %d (%s).",
+ int canceled_rpc=snet_rpc_process_peer_event(thr, &conn->peer_listen_addr);
+ log_fatal(thr->ref_logger, MODULE_SWAMRKV_NET, "connect to %s failed %d (%s). %d RPCs are canceled.",
conn->peer_listen_addr.addr,
- err, evutil_socket_error_to_string(err));
+ err, evutil_socket_error_to_string(err),
+ canceled_rpc);
snet_conn_table_remove(conn);
snet_conn_free(conn);
}
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c
index fa6804b..bc0b89d 100644
--- a/src/swarmkv_store.c
+++ b/src/swarmkv_store.c
@@ -242,12 +242,12 @@ struct scontainer *store_lookup_scontainer(struct swarmkv_store *store, sds key)
return NULL;
}
}
-struct sobj *store_lookup(struct swarmkv_module* mod_store, sds key)
+struct sobj *store_lookup(struct swarmkv_module *mod_store, sds key)
{
struct swarmkv_store *store=module2store(mod_store);
struct scontainer *ctr=NULL;
ctr=store_lookup_scontainer(store, key);
- if(ctr && !ctr->is_pending)
+ if(ctr) //&& !ctr->is_pending
{
return &(ctr->obj);
}
@@ -275,6 +275,14 @@ void sobj_need_sync(struct swarmkv_module *mod_store, struct sobj *obj)
}
return;
}
+int sobj_get_random_replica(struct sobj *obj, node_t *out)
+{
+ struct scontainer *ctr=container_of(obj, struct scontainer, obj);
+ if(!ctr->replica_node_list) return 0;
+ node_t *replica=(node_t*)utarray_eltptr(ctr->replica_node_list, 0);
+ node_copy(out, replica);
+ return 1;
+}
void scontainer_serialize(struct scontainer *ctr, char **blob, size_t *blob_sz)
{
char *value_blob=NULL;
@@ -898,6 +906,11 @@ enum cmd_exec_result crdt_rlist_command(struct swarmkv_module *mod_store, const
*reply=swarmkv_reply_new_nil();
return FINISHED;
}
+ if(!ctr->replica_node_list)
+ {
+ *reply=swarmkv_reply_new_array(0);
+ return FINISHED;
+ }
*reply=swarmkv_reply_new_array(utarray_len(ctr->replica_node_list));
for(size_t i=0; i< (*reply)->n_element; i++)
{
diff --git a/src/t_token_bucket.c b/src/t_token_bucket.c
index a665a2a..8e3a2b5 100644
--- a/src/t_token_bucket.c
+++ b/src/t_token_bucket.c
@@ -107,6 +107,18 @@ enum cmd_exec_result tconsume_command(struct swarmkv_module *mod_store, const st
{
return NEED_KEY_ROUTE;
}
+ if(obj->type==OBJ_TYPE_UNDEFINED)
+ {
+ node_t replica;
+ int ret=0;
+ ret=sobj_get_random_replica(obj, &replica);
+ if(ret)
+ {
+ *reply=swarmkv_reply_new_node(&replica, 1);
+ return REDIRECT;
+ }
+ return NEED_KEY_ROUTE;
+ }
if(obj->type!=OBJ_TYPE_TOKEN_BUCKET)
{
*reply=swarmkv_reply_new_error(error_wrong_type);
diff --git a/test/swarmkv_perf_test.cpp b/test/swarmkv_perf_test.cpp
index e564cb2..17b872a 100644
--- a/test/swarmkv_perf_test.cpp
+++ b/test/swarmkv_perf_test.cpp
@@ -307,10 +307,12 @@ void *background_tconsume_thread(void *thread_arg)
{
snprintf(key, sizeof(key), "tb-%zu", (round+start)%g_token_bucket_number);
cmd_exec_arg_expect_integer(arg, tokens);
+ arg->print_reply_on_fail=1;
swarmkv_tconsume(db, key, strlen(key), tokens, cmd_exec_generic_callback, arg);
ret=cmd_exec_arg_wait(arg, PERF_TEST_EXEC_TO_MS);
cmd_exec_arg_clear(arg);
if(ret==1) got_token_cnt++;
+ else printf("tconsume %s failed\n", key);
round++;
}
cmd_exec_arg_free(arg);
@@ -383,7 +385,7 @@ TEST(Performance, Sync)
}
}
- sleep(10);
+ sleep(20);
g_tconsume_running_flag=0;
int *successful_backgroud_running_thread=NULL;
for(size_t i=0; i<NODE_NUMBER; i++)
@@ -409,13 +411,13 @@ TEST(Performance, Sync)
}
TEST(Resilience, Failover)
{
- size_t NODE_NUMBER=3;
+ size_t NODE_NUMBER=4;
struct swarmkv *db[NODE_NUMBER];
char *err=NULL;
unsigned int p2p_port_start=11310, health_port_start=12310;
char node_list_str[1024]={0};
- for(size_t i=0; i<NODE_NUMBER; i++)
+ for(size_t i=1; i<NODE_NUMBER; i++)
{
snprintf(node_list_str+strlen(node_list_str), sizeof(node_list_str)-strlen(node_list_str), "127.0.0.1:%zu ", p2p_port_start+i);
}
@@ -436,38 +438,33 @@ TEST(Resilience, Failover)
}
ASSERT_TRUE(err==NULL);
}
- size_t KEY_NUMBER=8;
+ size_t KEY_NUMBER=1024;
struct swarmkv_reply *reply=NULL;
for(size_t i=0; i<KEY_NUMBER; i++)
{
- reply=swarmkv_command(db[i%NODE_NUMBER], "SET fo-%zu %zu", i, i);
- swarmkv_reply_free(reply);
- }
- for(size_t i=0; i<KEY_NUMBER; i++)
- {
for(size_t j=0; j<NODE_NUMBER; j++)
{
- reply=swarmkv_command(db[j], "GET fo-%zu", i);
+ reply=swarmkv_command(db[j], "SET fo-%zu node-%zu", i, j);
swarmkv_reply_free(reply);
}
}
swarmkv_close(db[0]);
db[0]=NULL;
- char target[32];
- sleep(10);
for(size_t i=0; i<KEY_NUMBER; i++)
{
for(size_t j=1; j<NODE_NUMBER; j++)
{
- reply=swarmkv_command(db[j], "SET fo-%zu %zu", i, i+1);
+ reply=swarmkv_command(db[j], "SET fo-%zu node-%zu", i, j);
swarmkv_reply_free(reply);
}
}
+ char target[32];
+ sleep(10);
for(size_t i=0; i<KEY_NUMBER; i++)
{
reply=swarmkv_command(db[1], "KEYSPACE RLIST fo-%zu", i);
EXPECT_EQ(reply->type, SWARMKV_REPLY_ARRAY);
- EXPECT_EQ(reply->n_element, NODE_NUMBER-1);
+ EXPECT_EQ(reply->n_element, NODE_NUMBER);
swarmkv_reply_free(reply);
for(size_t j=1; j<NODE_NUMBER; j++)
{