diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/swarmkv_api.c | 14 | ||||
| -rw-r--r-- | src/swarmkv_common.h | 2 | ||||
| -rw-r--r-- | src/swarmkv_store.c | 46 |
3 files changed, 43 insertions, 19 deletions
diff --git a/src/swarmkv_api.c b/src/swarmkv_api.c index 00147e8..cb42d6f 100644 --- a/src/swarmkv_api.c +++ b/src/swarmkv_api.c @@ -2,7 +2,6 @@ #include "swarmkv_common.h" #include "swarmkv_utils.h" - #include <stdio.h> #include <string.h> #include <stdlib.h> @@ -16,7 +15,6 @@ struct swarmkv_readoptions struct swarmkv_writeoptions { }; - struct swarmkv_options *swarmkv_options_new(void) { struct swarmkv_options *opts = ALLOC(struct swarmkv_options, 1); @@ -26,6 +24,8 @@ struct swarmkv_options *swarmkv_options_new(void) opts->loglevel = 0; opts->cluster_timeout_us = 500 * 1000; // Default 500ms opts->sync_interval_us = 10 * 1000; // Default 10ms + opts->sync_retry_time_ms = 300*1000; + opts->sync_retry_interval_ms = 100; strcpy(opts->bind_address, "0.0.0.0"); strcpy(opts->cluster_announce_ip, "127.0.0.1"); strcpy(opts->consul_agent_host, "127.0.0.1"); @@ -145,6 +145,16 @@ int swarmkv_options_set_batch_sync_enabled(struct swarmkv_options *opts, int val opts->batch_sync_enabled = 1; return 0; } +int swarmkv_options_set_sync_max_retry_time_ms(struct swarmkv_options *opts, int ms) +{ + opts->sync_retry_time_ms = ms; + return 0; +} +int swarmkv_options_set_sync_retry_interval_ms(struct swarmkv_options *opts, int ms) +{ + opts->sync_retry_interval_ms = ms; + return 0; +} int swarmkv_options_set_network_compression_enabled(struct swarmkv_options *opts, int value) { opts->network_compression_enabled = value; diff --git a/src/swarmkv_common.h b/src/swarmkv_common.h index a775124..f0fbc6c 100644 --- a/src/swarmkv_common.h +++ b/src/swarmkv_common.h @@ -66,6 +66,8 @@ struct swarmkv_options unsigned int consul_port; unsigned int cluster_timeout_us; unsigned int sync_interval_us; + unsigned int sync_retry_time_ms; + unsigned int sync_retry_interval_ms; struct log_handle *logger; int loglevel; const char *logpath; diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c index 969b904..31d165b 100644 --- a/src/swarmkv_store.c +++ b/src/swarmkv_store.c @@ -131,6 +131,7 @@ struct swarmkv_obj_specs sobj_specs[__SWARMKV_OBJ_TYPE_MAX] = struct peer_health { node_t peer; + struct timespec start_tracking_time; struct timespec last_request_time; int missed_responses; int aborted_requests; @@ -151,6 +152,7 @@ struct store_thread_runtime long long sync_err; long long sync_ok; long long synced; + const struct swarmkv_options *opts; }; struct swarmkv_store { @@ -322,42 +324,47 @@ void sobj_merge_blob(struct sobj *obj, const char *blob, size_t blob_sz, uuid_t return; } -__attribute__((unused)) static void store_remove_failed_peer(struct swarmkv_store *store, int tid, const node_t *peer) +static void store_thread_runtime_remove_failed_peer(struct store_thread_runtime *thread_rt, const node_t *peer) { struct scontainer *ctr = NULL, *tmp = NULL; - if (tid >= store->opts->nr_worker_threads) - return; // swarmkv_close() is called. - HASH_ITER(hh, store->threads[tid].obj_table, ctr, tmp) + HASH_ITER(hh, thread_rt->obj_table, ctr, tmp) { scontainer_remove_replica_node(ctr, peer); } return; } -#define SYNC_COOLDOWN_MS 100 - -static void store_thread_runtime_update_peer_health(struct store_thread_runtime *rt, const node_t *peer, int responed) +static int store_thread_runtime_update_peer_health(struct store_thread_runtime *thread_rt, const node_t *peer, int responed) { struct peer_health *ph = NULL; - HASH_FIND(hh, rt->peer_health_table, peer, sizeof(node_t), ph); + HASH_FIND(hh, thread_rt->peer_health_table, peer, sizeof(node_t), ph); //If the the peer is responded, unnecessary to track. if (responed) { if(ph) { - HASH_DEL(rt->peer_health_table, ph); + HASH_DEL(thread_rt->peer_health_table, ph); free(ph); } - return; + return 1; } + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); if(!ph) { ph = ALLOC(struct peer_health, 1); node_copy(&ph->peer, peer); - HASH_ADD(hh, rt->peer_health_table, peer, sizeof(node_t), ph); + HASH_ADD(hh, thread_rt->peer_health_table, peer, sizeof(node_t), ph); + ph->last_request_time = ph->start_tracking_time = now; } ph->missed_responses++; - clock_gettime(CLOCK_MONOTONIC, &ph->last_request_time); - return; + if(timespec_diff_usec(&ph->start_tracking_time, &now)/1000 > thread_rt->opts->sync_retry_time_ms) + { + HASH_DEL(thread_rt->peer_health_table, ph); + free(ph); + return 0; + } + else + return 1; } static int store_thread_runtime_should_sync_with_peer(struct store_thread_runtime *rt, const node_t *peer) { @@ -370,7 +377,7 @@ static int store_thread_runtime_should_sync_with_peer(struct store_thread_runtim struct timespec now; clock_gettime(CLOCK_MONOTONIC, &now); - if (timespec_diff_usec(&ph->last_request_time, &now) < SYNC_COOLDOWN_MS * 1000) + if (timespec_diff_usec(&ph->last_request_time, &now)/1000 < rt->opts->sync_retry_interval_ms) { ph->aborted_requests++; return 0; @@ -444,7 +451,11 @@ static void crdt_merge_on_reply(const struct swarmkv_reply *reply, void *user) thread_rt->sync_err++; } int responed = (reply->type != SWARMKV_REPLY_ERROR); - store_thread_runtime_update_peer_health(thread_rt, &ctx->peer, responed); + int keep_tracking = store_thread_runtime_update_peer_health(thread_rt, &ctx->peer, responed); + if(!keep_tracking) + { + store_thread_runtime_remove_failed_peer(thread_rt, &ctx->peer); + } free(ctx); return; } @@ -524,9 +535,10 @@ int store_thread_runtime_sync(struct store_thread_runtime *thread_rt, int batch_ thread_rt->synced += n_synced; return n_synced; } -void store_thread_runtime_init(struct store_thread_runtime *thread_rt) +void store_thread_runtime_init(struct store_thread_runtime *thread_rt, const struct swarmkv_options *opts) { memset(thread_rt, 0, sizeof(struct store_thread_runtime)); + thread_rt->opts = opts; pthread_mutex_init(&thread_rt->sanity_lock, NULL); return; } @@ -624,7 +636,7 @@ struct swarmkv_module *swarmkv_store_new(const struct swarmkv_options *opts) store->threads = ALLOC(struct store_thread_runtime, opts->nr_worker_threads); for (size_t i = 0; i < opts->nr_worker_threads; i++) { - store_thread_runtime_init(store->threads + i); + store_thread_runtime_init(store->threads + i, opts); } node_init(&store->self, opts->cluster_announce_ip, opts->cluster_announce_port); |
