diff options
| -rw-r--r-- | include/swarmkv/swarmkv.h | 3 | ||||
| -rw-r--r-- | src/swarmkv_api.c | 14 | ||||
| -rw-r--r-- | src/swarmkv_common.h | 2 | ||||
| -rw-r--r-- | src/swarmkv_store.c | 46 | ||||
| -rw-r--r-- | test/swarmkv_gtest.cpp | 57 |
5 files changed, 100 insertions, 22 deletions
diff --git a/include/swarmkv/swarmkv.h b/include/swarmkv/swarmkv.h index 92e5079..de9e5d1 100644 --- a/include/swarmkv/swarmkv.h +++ b/include/swarmkv/swarmkv.h @@ -69,6 +69,9 @@ int swarmkv_options_set_run_for_leader_enabled(struct swarmkv_options *opts, int //Default 0: Disabled. //Enabling batch synchronization requires all cluster nodes using same worker thread number. int swarmkv_options_set_batch_sync_enabled(struct swarmkv_options *opts, int value); +//Default 300 seconds, after that the failed peer will be removed from replica. +int swarmkv_options_set_sync_max_retry_time_ms(struct swarmkv_options *opts, int ms); +int swarmkv_options_set_sync_retry_interval_ms(struct swarmkv_options *opts, int ms); int swarmkv_options_set_caller_thread_number(struct swarmkv_options *opts, int nr_caller_threads); int swarmkv_options_set_worker_thread_number(struct swarmkv_options *opts, int nr_worker_threads); int swarmkv_options_get_worker_thread_number(const struct swarmkv_options *opts); diff --git a/src/swarmkv_api.c b/src/swarmkv_api.c index 00147e8..cb42d6f 100644 --- a/src/swarmkv_api.c +++ b/src/swarmkv_api.c @@ -2,7 +2,6 @@ #include "swarmkv_common.h" #include "swarmkv_utils.h" - #include <stdio.h> #include <string.h> #include <stdlib.h> @@ -16,7 +15,6 @@ struct swarmkv_readoptions struct swarmkv_writeoptions { }; - struct swarmkv_options *swarmkv_options_new(void) { struct swarmkv_options *opts = ALLOC(struct swarmkv_options, 1); @@ -26,6 +24,8 @@ struct swarmkv_options *swarmkv_options_new(void) opts->loglevel = 0; opts->cluster_timeout_us = 500 * 1000; // Default 500ms opts->sync_interval_us = 10 * 1000; // Default 10ms + opts->sync_retry_time_ms = 300*1000; + opts->sync_retry_interval_ms = 100; strcpy(opts->bind_address, "0.0.0.0"); strcpy(opts->cluster_announce_ip, "127.0.0.1"); strcpy(opts->consul_agent_host, "127.0.0.1"); @@ -145,6 +145,16 @@ int swarmkv_options_set_batch_sync_enabled(struct swarmkv_options *opts, int val opts->batch_sync_enabled = 1; return 0; } +int swarmkv_options_set_sync_max_retry_time_ms(struct swarmkv_options *opts, int ms) +{ + opts->sync_retry_time_ms = ms; + return 0; +} +int swarmkv_options_set_sync_retry_interval_ms(struct swarmkv_options *opts, int ms) +{ + opts->sync_retry_interval_ms = ms; + return 0; +} int swarmkv_options_set_network_compression_enabled(struct swarmkv_options *opts, int value) { opts->network_compression_enabled = value; diff --git a/src/swarmkv_common.h b/src/swarmkv_common.h index a775124..f0fbc6c 100644 --- a/src/swarmkv_common.h +++ b/src/swarmkv_common.h @@ -66,6 +66,8 @@ struct swarmkv_options unsigned int consul_port; unsigned int cluster_timeout_us; unsigned int sync_interval_us; + unsigned int sync_retry_time_ms; + unsigned int sync_retry_interval_ms; struct log_handle *logger; int loglevel; const char *logpath; diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c index 969b904..31d165b 100644 --- a/src/swarmkv_store.c +++ b/src/swarmkv_store.c @@ -131,6 +131,7 @@ struct swarmkv_obj_specs sobj_specs[__SWARMKV_OBJ_TYPE_MAX] = struct peer_health { node_t peer; + struct timespec start_tracking_time; struct timespec last_request_time; int missed_responses; int aborted_requests; @@ -151,6 +152,7 @@ struct store_thread_runtime long long sync_err; long long sync_ok; long long synced; + const struct swarmkv_options *opts; }; struct swarmkv_store { @@ -322,42 +324,47 @@ void sobj_merge_blob(struct sobj *obj, const char *blob, size_t blob_sz, uuid_t return; } -__attribute__((unused)) static void store_remove_failed_peer(struct swarmkv_store *store, int tid, const node_t *peer) +static void store_thread_runtime_remove_failed_peer(struct store_thread_runtime *thread_rt, const node_t *peer) { struct scontainer *ctr = NULL, *tmp = NULL; - if (tid >= store->opts->nr_worker_threads) - return; // swarmkv_close() is called. - HASH_ITER(hh, store->threads[tid].obj_table, ctr, tmp) + HASH_ITER(hh, thread_rt->obj_table, ctr, tmp) { scontainer_remove_replica_node(ctr, peer); } return; } -#define SYNC_COOLDOWN_MS 100 - -static void store_thread_runtime_update_peer_health(struct store_thread_runtime *rt, const node_t *peer, int responed) +static int store_thread_runtime_update_peer_health(struct store_thread_runtime *thread_rt, const node_t *peer, int responed) { struct peer_health *ph = NULL; - HASH_FIND(hh, rt->peer_health_table, peer, sizeof(node_t), ph); + HASH_FIND(hh, thread_rt->peer_health_table, peer, sizeof(node_t), ph); //If the the peer is responded, unnecessary to track. if (responed) { if(ph) { - HASH_DEL(rt->peer_health_table, ph); + HASH_DEL(thread_rt->peer_health_table, ph); free(ph); } - return; + return 1; } + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); if(!ph) { ph = ALLOC(struct peer_health, 1); node_copy(&ph->peer, peer); - HASH_ADD(hh, rt->peer_health_table, peer, sizeof(node_t), ph); + HASH_ADD(hh, thread_rt->peer_health_table, peer, sizeof(node_t), ph); + ph->last_request_time = ph->start_tracking_time = now; } ph->missed_responses++; - clock_gettime(CLOCK_MONOTONIC, &ph->last_request_time); - return; + if(timespec_diff_usec(&ph->start_tracking_time, &now)/1000 > thread_rt->opts->sync_retry_time_ms) + { + HASH_DEL(thread_rt->peer_health_table, ph); + free(ph); + return 0; + } + else + return 1; } static int store_thread_runtime_should_sync_with_peer(struct store_thread_runtime *rt, const node_t *peer) { @@ -370,7 +377,7 @@ static int store_thread_runtime_should_sync_with_peer(struct store_thread_runtim struct timespec now; clock_gettime(CLOCK_MONOTONIC, &now); - if (timespec_diff_usec(&ph->last_request_time, &now) < SYNC_COOLDOWN_MS * 1000) + if (timespec_diff_usec(&ph->last_request_time, &now)/1000 < rt->opts->sync_retry_interval_ms) { ph->aborted_requests++; return 0; @@ -444,7 +451,11 @@ static void crdt_merge_on_reply(const struct swarmkv_reply *reply, void *user) thread_rt->sync_err++; } int responed = (reply->type != SWARMKV_REPLY_ERROR); - store_thread_runtime_update_peer_health(thread_rt, &ctx->peer, responed); + int keep_tracking = store_thread_runtime_update_peer_health(thread_rt, &ctx->peer, responed); + if(!keep_tracking) + { + store_thread_runtime_remove_failed_peer(thread_rt, &ctx->peer); + } free(ctx); return; } @@ -524,9 +535,10 @@ int store_thread_runtime_sync(struct store_thread_runtime *thread_rt, int batch_ thread_rt->synced += n_synced; return n_synced; } -void store_thread_runtime_init(struct store_thread_runtime *thread_rt) +void store_thread_runtime_init(struct store_thread_runtime *thread_rt, const struct swarmkv_options *opts) { memset(thread_rt, 0, sizeof(struct store_thread_runtime)); + thread_rt->opts = opts; pthread_mutex_init(&thread_rt->sanity_lock, NULL); return; } @@ -624,7 +636,7 @@ struct swarmkv_module *swarmkv_store_new(const struct swarmkv_options *opts) store->threads = ALLOC(struct store_thread_runtime, opts->nr_worker_threads); for (size_t i = 0; i < opts->nr_worker_threads; i++) { - store_thread_runtime_init(store->threads + i); + store_thread_runtime_init(store->threads + i, opts); } node_init(&store->self, opts->cluster_announce_ip, opts->cluster_announce_port); diff --git a/test/swarmkv_gtest.cpp b/test/swarmkv_gtest.cpp index dbb8613..c6bdfab 100644 --- a/test/swarmkv_gtest.cpp +++ b/test/swarmkv_gtest.cpp @@ -1122,6 +1122,7 @@ protected: swarmkv_options_set_caller_thread_number(opts[i], 1); swarmkv_options_set_batch_sync_enabled(opts[i], 1); swarmkv_options_set_network_compression_enabled(opts[i], i%2); + swarmkv_options_set_sync_max_retry_time_ms(opts[i], 2000); tmp_db[i]=swarmkv_open(opts[i], cluster_name, &err); if(err) { @@ -2346,11 +2347,11 @@ static int parse_swarmkv_info(const char *text, const char *key) { return value; } -TEST_F(SwarmkvTwoNodes, SyncRobustness) +TEST_F(SwarmkvTwoNodes, SyncFailedResume) { struct swarmkv *db1=SwarmkvTwoNodes::db1, *db2=SwarmkvTwoNodes::db2; struct swarmkv_reply *reply=NULL; - const char *key="crdt-robustness"; + const char *key="crdt-sync-resume"; reply=swarmkv_command(db1, "INCRBY %s 1", key); EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER); EXPECT_EQ(reply->integer, 1); @@ -2372,7 +2373,7 @@ TEST_F(SwarmkvTwoNodes, SyncRobustness) EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER); int keyslot = reply->integer; swarmkv_reply_free(reply); - + //This is a hack of swarmkv_keyspace_slot2tid(). int tid = keyslot % SwarmkvTwoNodes::worker_thread_number; reply=swarmkv_command(db2, "DEBUG %d sleep 1", tid); @@ -2411,6 +2412,56 @@ TEST_F(SwarmkvTwoNodes, SyncRobustness) swarmkv_reply_free(reply); } +TEST_F(SwarmkvTwoNodes, SyncRemoveFailedPeer) +{ + struct swarmkv *db1=SwarmkvTwoNodes::db1, *db2=SwarmkvTwoNodes::db2; + struct swarmkv_reply *reply=NULL; + const char *key="crdt-sync-remove"; + reply=swarmkv_command(db1, "INCRBY %s 1", key); + EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 1); + swarmkv_reply_free(reply); + + reply=swarmkv_command(db2, "INCRBY %s 1", key); + EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 2); + swarmkv_reply_free(reply); + + wait_for_sync(); + + reply = swarmkv_command(db1, "CRDT RLIST %s", key); + EXPECT_EQ(reply->type, SWARMKV_REPLY_ARRAY); + EXPECT_EQ(reply->n_element, 1); + swarmkv_reply_free(reply); + + reply = swarmkv_command(db2, "KEYSLOT %s", key); + EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + int keyslot = reply->integer; + swarmkv_reply_free(reply); + + //Sleep longger than the peer_tracking_interval_ms=2000ms + int tid = keyslot % SwarmkvTwoNodes::worker_thread_number; + reply=swarmkv_command(db2, "DEBUG %d sleep 3", tid); + EXPECT_EQ(reply->type, SWARMKV_REPLY_ERROR); + swarmkv_reply_free(reply); + + //db1 -> db2 sync failed + for(int i=0; i<25; i++) + { + reply=swarmkv_command(db1, "INCRBY %s 1", key); + EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 3+i); + swarmkv_reply_free(reply); + usleep(100*1000); + } + + reply = swarmkv_command(db1, "CRDT RLIST %s", key); + EXPECT_EQ(reply->type, SWARMKV_REPLY_ARRAY); + EXPECT_EQ(reply->n_element, 0); + swarmkv_reply_print(reply, stdout); + swarmkv_reply_free(reply); + +} TEST_F(SwarmkvTwoNodes, Wait) { return; |
