summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/swarmkv/swarmkv.h3
-rw-r--r--src/swarmkv_api.c14
-rw-r--r--src/swarmkv_common.h2
-rw-r--r--src/swarmkv_store.c46
-rw-r--r--test/swarmkv_gtest.cpp57
5 files changed, 100 insertions, 22 deletions
diff --git a/include/swarmkv/swarmkv.h b/include/swarmkv/swarmkv.h
index 92e5079..de9e5d1 100644
--- a/include/swarmkv/swarmkv.h
+++ b/include/swarmkv/swarmkv.h
@@ -69,6 +69,9 @@ int swarmkv_options_set_run_for_leader_enabled(struct swarmkv_options *opts, int
//Default 0: Disabled.
//Enabling batch synchronization requires all cluster nodes using same worker thread number.
int swarmkv_options_set_batch_sync_enabled(struct swarmkv_options *opts, int value);
+//Default 300 seconds, after that the failed peer will be removed from replica.
+int swarmkv_options_set_sync_max_retry_time_ms(struct swarmkv_options *opts, int ms);
+int swarmkv_options_set_sync_retry_interval_ms(struct swarmkv_options *opts, int ms);
int swarmkv_options_set_caller_thread_number(struct swarmkv_options *opts, int nr_caller_threads);
int swarmkv_options_set_worker_thread_number(struct swarmkv_options *opts, int nr_worker_threads);
int swarmkv_options_get_worker_thread_number(const struct swarmkv_options *opts);
diff --git a/src/swarmkv_api.c b/src/swarmkv_api.c
index 00147e8..cb42d6f 100644
--- a/src/swarmkv_api.c
+++ b/src/swarmkv_api.c
@@ -2,7 +2,6 @@
#include "swarmkv_common.h"
#include "swarmkv_utils.h"
-
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
@@ -16,7 +15,6 @@ struct swarmkv_readoptions
struct swarmkv_writeoptions
{
};
-
struct swarmkv_options *swarmkv_options_new(void)
{
struct swarmkv_options *opts = ALLOC(struct swarmkv_options, 1);
@@ -26,6 +24,8 @@ struct swarmkv_options *swarmkv_options_new(void)
opts->loglevel = 0;
opts->cluster_timeout_us = 500 * 1000; // Default 500ms
opts->sync_interval_us = 10 * 1000; // Default 10ms
+ opts->sync_retry_time_ms = 300*1000;
+ opts->sync_retry_interval_ms = 100;
strcpy(opts->bind_address, "0.0.0.0");
strcpy(opts->cluster_announce_ip, "127.0.0.1");
strcpy(opts->consul_agent_host, "127.0.0.1");
@@ -145,6 +145,16 @@ int swarmkv_options_set_batch_sync_enabled(struct swarmkv_options *opts, int val
opts->batch_sync_enabled = 1;
return 0;
}
+int swarmkv_options_set_sync_max_retry_time_ms(struct swarmkv_options *opts, int ms)
+{
+ opts->sync_retry_time_ms = ms;
+ return 0;
+}
+int swarmkv_options_set_sync_retry_interval_ms(struct swarmkv_options *opts, int ms)
+{
+ opts->sync_retry_interval_ms = ms;
+ return 0;
+}
int swarmkv_options_set_network_compression_enabled(struct swarmkv_options *opts, int value)
{
opts->network_compression_enabled = value;
diff --git a/src/swarmkv_common.h b/src/swarmkv_common.h
index a775124..f0fbc6c 100644
--- a/src/swarmkv_common.h
+++ b/src/swarmkv_common.h
@@ -66,6 +66,8 @@ struct swarmkv_options
unsigned int consul_port;
unsigned int cluster_timeout_us;
unsigned int sync_interval_us;
+ unsigned int sync_retry_time_ms;
+ unsigned int sync_retry_interval_ms;
struct log_handle *logger;
int loglevel;
const char *logpath;
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c
index 969b904..31d165b 100644
--- a/src/swarmkv_store.c
+++ b/src/swarmkv_store.c
@@ -131,6 +131,7 @@ struct swarmkv_obj_specs sobj_specs[__SWARMKV_OBJ_TYPE_MAX] =
struct peer_health
{
node_t peer;
+ struct timespec start_tracking_time;
struct timespec last_request_time;
int missed_responses;
int aborted_requests;
@@ -151,6 +152,7 @@ struct store_thread_runtime
long long sync_err;
long long sync_ok;
long long synced;
+ const struct swarmkv_options *opts;
};
struct swarmkv_store
{
@@ -322,42 +324,47 @@ void sobj_merge_blob(struct sobj *obj, const char *blob, size_t blob_sz, uuid_t
return;
}
-__attribute__((unused)) static void store_remove_failed_peer(struct swarmkv_store *store, int tid, const node_t *peer)
+static void store_thread_runtime_remove_failed_peer(struct store_thread_runtime *thread_rt, const node_t *peer)
{
struct scontainer *ctr = NULL, *tmp = NULL;
- if (tid >= store->opts->nr_worker_threads)
- return; // swarmkv_close() is called.
- HASH_ITER(hh, store->threads[tid].obj_table, ctr, tmp)
+ HASH_ITER(hh, thread_rt->obj_table, ctr, tmp)
{
scontainer_remove_replica_node(ctr, peer);
}
return;
}
-#define SYNC_COOLDOWN_MS 100
-
-static void store_thread_runtime_update_peer_health(struct store_thread_runtime *rt, const node_t *peer, int responed)
+static int store_thread_runtime_update_peer_health(struct store_thread_runtime *thread_rt, const node_t *peer, int responed)
{
struct peer_health *ph = NULL;
- HASH_FIND(hh, rt->peer_health_table, peer, sizeof(node_t), ph);
+ HASH_FIND(hh, thread_rt->peer_health_table, peer, sizeof(node_t), ph);
//If the the peer is responded, unnecessary to track.
if (responed)
{
if(ph)
{
- HASH_DEL(rt->peer_health_table, ph);
+ HASH_DEL(thread_rt->peer_health_table, ph);
free(ph);
}
- return;
+ return 1;
}
+ struct timespec now;
+ clock_gettime(CLOCK_MONOTONIC, &now);
if(!ph)
{
ph = ALLOC(struct peer_health, 1);
node_copy(&ph->peer, peer);
- HASH_ADD(hh, rt->peer_health_table, peer, sizeof(node_t), ph);
+ HASH_ADD(hh, thread_rt->peer_health_table, peer, sizeof(node_t), ph);
+ ph->last_request_time = ph->start_tracking_time = now;
}
ph->missed_responses++;
- clock_gettime(CLOCK_MONOTONIC, &ph->last_request_time);
- return;
+ if(timespec_diff_usec(&ph->start_tracking_time, &now)/1000 > thread_rt->opts->sync_retry_time_ms)
+ {
+ HASH_DEL(thread_rt->peer_health_table, ph);
+ free(ph);
+ return 0;
+ }
+ else
+ return 1;
}
static int store_thread_runtime_should_sync_with_peer(struct store_thread_runtime *rt, const node_t *peer)
{
@@ -370,7 +377,7 @@ static int store_thread_runtime_should_sync_with_peer(struct store_thread_runtim
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
- if (timespec_diff_usec(&ph->last_request_time, &now) < SYNC_COOLDOWN_MS * 1000)
+ if (timespec_diff_usec(&ph->last_request_time, &now)/1000 < rt->opts->sync_retry_interval_ms)
{
ph->aborted_requests++;
return 0;
@@ -444,7 +451,11 @@ static void crdt_merge_on_reply(const struct swarmkv_reply *reply, void *user)
thread_rt->sync_err++;
}
int responed = (reply->type != SWARMKV_REPLY_ERROR);
- store_thread_runtime_update_peer_health(thread_rt, &ctx->peer, responed);
+ int keep_tracking = store_thread_runtime_update_peer_health(thread_rt, &ctx->peer, responed);
+ if(!keep_tracking)
+ {
+ store_thread_runtime_remove_failed_peer(thread_rt, &ctx->peer);
+ }
free(ctx);
return;
}
@@ -524,9 +535,10 @@ int store_thread_runtime_sync(struct store_thread_runtime *thread_rt, int batch_
thread_rt->synced += n_synced;
return n_synced;
}
-void store_thread_runtime_init(struct store_thread_runtime *thread_rt)
+void store_thread_runtime_init(struct store_thread_runtime *thread_rt, const struct swarmkv_options *opts)
{
memset(thread_rt, 0, sizeof(struct store_thread_runtime));
+ thread_rt->opts = opts;
pthread_mutex_init(&thread_rt->sanity_lock, NULL);
return;
}
@@ -624,7 +636,7 @@ struct swarmkv_module *swarmkv_store_new(const struct swarmkv_options *opts)
store->threads = ALLOC(struct store_thread_runtime, opts->nr_worker_threads);
for (size_t i = 0; i < opts->nr_worker_threads; i++)
{
- store_thread_runtime_init(store->threads + i);
+ store_thread_runtime_init(store->threads + i, opts);
}
node_init(&store->self, opts->cluster_announce_ip, opts->cluster_announce_port);
diff --git a/test/swarmkv_gtest.cpp b/test/swarmkv_gtest.cpp
index dbb8613..c6bdfab 100644
--- a/test/swarmkv_gtest.cpp
+++ b/test/swarmkv_gtest.cpp
@@ -1122,6 +1122,7 @@ protected:
swarmkv_options_set_caller_thread_number(opts[i], 1);
swarmkv_options_set_batch_sync_enabled(opts[i], 1);
swarmkv_options_set_network_compression_enabled(opts[i], i%2);
+ swarmkv_options_set_sync_max_retry_time_ms(opts[i], 2000);
tmp_db[i]=swarmkv_open(opts[i], cluster_name, &err);
if(err)
{
@@ -2346,11 +2347,11 @@ static int parse_swarmkv_info(const char *text, const char *key) {
return value;
}
-TEST_F(SwarmkvTwoNodes, SyncRobustness)
+TEST_F(SwarmkvTwoNodes, SyncFailedResume)
{
struct swarmkv *db1=SwarmkvTwoNodes::db1, *db2=SwarmkvTwoNodes::db2;
struct swarmkv_reply *reply=NULL;
- const char *key="crdt-robustness";
+ const char *key="crdt-sync-resume";
reply=swarmkv_command(db1, "INCRBY %s 1", key);
EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
EXPECT_EQ(reply->integer, 1);
@@ -2372,7 +2373,7 @@ TEST_F(SwarmkvTwoNodes, SyncRobustness)
EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
int keyslot = reply->integer;
swarmkv_reply_free(reply);
-
+
//This is a hack of swarmkv_keyspace_slot2tid().
int tid = keyslot % SwarmkvTwoNodes::worker_thread_number;
reply=swarmkv_command(db2, "DEBUG %d sleep 1", tid);
@@ -2411,6 +2412,56 @@ TEST_F(SwarmkvTwoNodes, SyncRobustness)
swarmkv_reply_free(reply);
}
+TEST_F(SwarmkvTwoNodes, SyncRemoveFailedPeer)
+{
+ struct swarmkv *db1=SwarmkvTwoNodes::db1, *db2=SwarmkvTwoNodes::db2;
+ struct swarmkv_reply *reply=NULL;
+ const char *key="crdt-sync-remove";
+ reply=swarmkv_command(db1, "INCRBY %s 1", key);
+ EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 1);
+ swarmkv_reply_free(reply);
+
+ reply=swarmkv_command(db2, "INCRBY %s 1", key);
+ EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 2);
+ swarmkv_reply_free(reply);
+
+ wait_for_sync();
+
+ reply = swarmkv_command(db1, "CRDT RLIST %s", key);
+ EXPECT_EQ(reply->type, SWARMKV_REPLY_ARRAY);
+ EXPECT_EQ(reply->n_element, 1);
+ swarmkv_reply_free(reply);
+
+ reply = swarmkv_command(db2, "KEYSLOT %s", key);
+ EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ int keyslot = reply->integer;
+ swarmkv_reply_free(reply);
+
+ //Sleep longger than the peer_tracking_interval_ms=2000ms
+ int tid = keyslot % SwarmkvTwoNodes::worker_thread_number;
+ reply=swarmkv_command(db2, "DEBUG %d sleep 3", tid);
+ EXPECT_EQ(reply->type, SWARMKV_REPLY_ERROR);
+ swarmkv_reply_free(reply);
+
+ //db1 -> db2 sync failed
+ for(int i=0; i<25; i++)
+ {
+ reply=swarmkv_command(db1, "INCRBY %s 1", key);
+ EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 3+i);
+ swarmkv_reply_free(reply);
+ usleep(100*1000);
+ }
+
+ reply = swarmkv_command(db1, "CRDT RLIST %s", key);
+ EXPECT_EQ(reply->type, SWARMKV_REPLY_ARRAY);
+ EXPECT_EQ(reply->n_element, 0);
+ swarmkv_reply_print(reply, stdout);
+ swarmkv_reply_free(reply);
+
+}
TEST_F(SwarmkvTwoNodes, Wait)
{
return;