summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorZheng Chao <[email protected]>2022-12-23 18:20:05 +0800
committerZheng Chao <[email protected]>2022-12-23 18:20:05 +0800
commit465c48f7c14d786df0a3d893eac98380ca85a69c (patch)
tree62e95ec5a6025db5f5f4fd1291adb3505cab8c96 /src
parent1ca15f1c666a27f1e63c346656b2a79391806469 (diff)
For avoiding key route entry oscillation, the keyspace ignores node crash.
Diffstat (limited to 'src')
-rw-r--r--src/inc_internal/swarmkv_store.h2
-rw-r--r--src/swarmkv_keyspace.c1
-rw-r--r--src/swarmkv_net.c20
-rw-r--r--src/swarmkv_store.c17
-rw-r--r--src/t_token_bucket.c12
5 files changed, 42 insertions, 10 deletions
diff --git a/src/inc_internal/swarmkv_store.h b/src/inc_internal/swarmkv_store.h
index e91a25f..1881226 100644
--- a/src/inc_internal/swarmkv_store.h
+++ b/src/inc_internal/swarmkv_store.h
@@ -55,7 +55,7 @@ struct store_info
void swarmkv_store_info(struct swarmkv_module *module, struct store_info *info);
void sobj_need_sync(struct swarmkv_module *mod_store, struct sobj *obj);
-
+int sobj_get_random_replica(struct sobj *obj, node_t *out);
void store_get_uuid(struct swarmkv_module* mod_store, uuid_t uuid);
struct sobj *store_lookup(struct swarmkv_module* mod_store, sds key);
diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c
index 98ded54..0d23fdf 100644
--- a/src/swarmkv_keyspace.c
+++ b/src/swarmkv_keyspace.c
@@ -1029,6 +1029,7 @@ void remove_failed_nodes_from_key_route_table(struct swarmkv_keyspace *ks, node_
struct slot_runtime *slot_rt=NULL;
struct replica_node *hash_health_node=NULL, *node=NULL, *found=NULL, *tmp_node=NULL;
size_t n_modified_key=0, n_removed_replica=0;
+ return;
for(size_t i=0; i<n_node; i++)
{
node=ALLOC(struct replica_node, 1);
diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c
index 0a913f8..728afe4 100644
--- a/src/swarmkv_net.c
+++ b/src/swarmkv_net.c
@@ -180,8 +180,9 @@ void snet_rpc_timeout_callback(evutil_socket_t fd, short events, void *arg)
rpc->cb(&rpc->peer, NULL, 0, rpc->cb_arg);
snet_rpc_free(rpc);
}
-void snet_rpc_process_peer_event(struct snet_thread *thr, const node_t *peer)
+int snet_rpc_process_peer_event(struct snet_thread *thr, const node_t *peer)
{
+ int n_canceled=0;
struct snet_rpc *rpc=NULL, *tmp_rpc=NULL;
HASH_ITER(hh, thr->rpc_table, rpc, tmp_rpc)
{
@@ -189,8 +190,10 @@ void snet_rpc_process_peer_event(struct snet_thread *thr, const node_t *peer)
{
rpc->cb(&rpc->peer, NULL, 0, rpc->cb_arg);
snet_rpc_free(rpc);
+ n_canceled++;
}
}
+ return n_canceled;
}
struct snet_rpc *snet_rpc_lookup(struct snet_thread *thr, unsigned long long sequence)
{
@@ -597,15 +600,16 @@ static void peer_conn_event_cb(struct bufferevent *bev, short events, void *arg)
struct snet_thread *thr=conn->ref_thr;
int err = EVUTIL_SOCKET_ERROR();
-
+ int canceled_rpc=snet_rpc_process_peer_event(thr, &conn->peer_listen_addr);
if (events & BEV_EVENT_ERROR || events & BEV_EVENT_EOF)
{
- log_debug(thr->ref_logger, MODULE_SWAMRKV_NET, "%s connection error, %d (%s).",
+ log_debug(thr->ref_logger, MODULE_SWAMRKV_NET, "%s connection error, %d (%s). %d RPCs are canceled.",
conn->connected_from.addr,
- err, evutil_socket_error_to_string(err));
+ err, evutil_socket_error_to_string(err),
+ canceled_rpc);
//swarmkv-cli's connection has listen port number > 10000.
}
- snet_rpc_process_peer_event(thr, &conn->peer_listen_addr);
+
snet_conn_table_remove(conn);
snet_conn_free(conn);
return;
@@ -630,9 +634,11 @@ void connect_peer_eventcb(struct bufferevent *bev, short events, void *arg)
else if (events & BEV_EVENT_ERROR)
{
int err = EVUTIL_SOCKET_ERROR();
- log_fatal(thr->ref_logger, MODULE_SWAMRKV_NET, "connect to %s failed %d (%s).",
+ int canceled_rpc=snet_rpc_process_peer_event(thr, &conn->peer_listen_addr);
+ log_fatal(thr->ref_logger, MODULE_SWAMRKV_NET, "connect to %s failed %d (%s). %d RPCs are canceled.",
conn->peer_listen_addr.addr,
- err, evutil_socket_error_to_string(err));
+ err, evutil_socket_error_to_string(err),
+ canceled_rpc);
snet_conn_table_remove(conn);
snet_conn_free(conn);
}
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c
index fa6804b..bc0b89d 100644
--- a/src/swarmkv_store.c
+++ b/src/swarmkv_store.c
@@ -242,12 +242,12 @@ struct scontainer *store_lookup_scontainer(struct swarmkv_store *store, sds key)
return NULL;
}
}
-struct sobj *store_lookup(struct swarmkv_module* mod_store, sds key)
+struct sobj *store_lookup(struct swarmkv_module *mod_store, sds key)
{
struct swarmkv_store *store=module2store(mod_store);
struct scontainer *ctr=NULL;
ctr=store_lookup_scontainer(store, key);
- if(ctr && !ctr->is_pending)
+ if(ctr) //&& !ctr->is_pending
{
return &(ctr->obj);
}
@@ -275,6 +275,14 @@ void sobj_need_sync(struct swarmkv_module *mod_store, struct sobj *obj)
}
return;
}
+int sobj_get_random_replica(struct sobj *obj, node_t *out)
+{
+ struct scontainer *ctr=container_of(obj, struct scontainer, obj);
+ if(!ctr->replica_node_list) return 0;
+ node_t *replica=(node_t*)utarray_eltptr(ctr->replica_node_list, 0);
+ node_copy(out, replica);
+ return 1;
+}
void scontainer_serialize(struct scontainer *ctr, char **blob, size_t *blob_sz)
{
char *value_blob=NULL;
@@ -898,6 +906,11 @@ enum cmd_exec_result crdt_rlist_command(struct swarmkv_module *mod_store, const
*reply=swarmkv_reply_new_nil();
return FINISHED;
}
+ if(!ctr->replica_node_list)
+ {
+ *reply=swarmkv_reply_new_array(0);
+ return FINISHED;
+ }
*reply=swarmkv_reply_new_array(utarray_len(ctr->replica_node_list));
for(size_t i=0; i< (*reply)->n_element; i++)
{
diff --git a/src/t_token_bucket.c b/src/t_token_bucket.c
index a665a2a..8e3a2b5 100644
--- a/src/t_token_bucket.c
+++ b/src/t_token_bucket.c
@@ -107,6 +107,18 @@ enum cmd_exec_result tconsume_command(struct swarmkv_module *mod_store, const st
{
return NEED_KEY_ROUTE;
}
+ if(obj->type==OBJ_TYPE_UNDEFINED)
+ {
+ node_t replica;
+ int ret=0;
+ ret=sobj_get_random_replica(obj, &replica);
+ if(ret)
+ {
+ *reply=swarmkv_reply_new_node(&replica, 1);
+ return REDIRECT;
+ }
+ return NEED_KEY_ROUTE;
+ }
if(obj->type!=OBJ_TYPE_TOKEN_BUCKET)
{
*reply=swarmkv_reply_new_error(error_wrong_type);