diff options
Diffstat (limited to 'src/swarmkv_store.c')
| -rw-r--r-- | src/swarmkv_store.c | 46 |
1 files changed, 29 insertions, 17 deletions
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c index 969b904..31d165b 100644 --- a/src/swarmkv_store.c +++ b/src/swarmkv_store.c @@ -131,6 +131,7 @@ struct swarmkv_obj_specs sobj_specs[__SWARMKV_OBJ_TYPE_MAX] = struct peer_health { node_t peer; + struct timespec start_tracking_time; struct timespec last_request_time; int missed_responses; int aborted_requests; @@ -151,6 +152,7 @@ struct store_thread_runtime long long sync_err; long long sync_ok; long long synced; + const struct swarmkv_options *opts; }; struct swarmkv_store { @@ -322,42 +324,47 @@ void sobj_merge_blob(struct sobj *obj, const char *blob, size_t blob_sz, uuid_t return; } -__attribute__((unused)) static void store_remove_failed_peer(struct swarmkv_store *store, int tid, const node_t *peer) +static void store_thread_runtime_remove_failed_peer(struct store_thread_runtime *thread_rt, const node_t *peer) { struct scontainer *ctr = NULL, *tmp = NULL; - if (tid >= store->opts->nr_worker_threads) - return; // swarmkv_close() is called. - HASH_ITER(hh, store->threads[tid].obj_table, ctr, tmp) + HASH_ITER(hh, thread_rt->obj_table, ctr, tmp) { scontainer_remove_replica_node(ctr, peer); } return; } -#define SYNC_COOLDOWN_MS 100 - -static void store_thread_runtime_update_peer_health(struct store_thread_runtime *rt, const node_t *peer, int responed) +static int store_thread_runtime_update_peer_health(struct store_thread_runtime *thread_rt, const node_t *peer, int responed) { struct peer_health *ph = NULL; - HASH_FIND(hh, rt->peer_health_table, peer, sizeof(node_t), ph); + HASH_FIND(hh, thread_rt->peer_health_table, peer, sizeof(node_t), ph); //If the the peer is responded, unnecessary to track. if (responed) { if(ph) { - HASH_DEL(rt->peer_health_table, ph); + HASH_DEL(thread_rt->peer_health_table, ph); free(ph); } - return; + return 1; } + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); if(!ph) { ph = ALLOC(struct peer_health, 1); node_copy(&ph->peer, peer); - HASH_ADD(hh, rt->peer_health_table, peer, sizeof(node_t), ph); + HASH_ADD(hh, thread_rt->peer_health_table, peer, sizeof(node_t), ph); + ph->last_request_time = ph->start_tracking_time = now; } ph->missed_responses++; - clock_gettime(CLOCK_MONOTONIC, &ph->last_request_time); - return; + if(timespec_diff_usec(&ph->start_tracking_time, &now)/1000 > thread_rt->opts->sync_retry_time_ms) + { + HASH_DEL(thread_rt->peer_health_table, ph); + free(ph); + return 0; + } + else + return 1; } static int store_thread_runtime_should_sync_with_peer(struct store_thread_runtime *rt, const node_t *peer) { @@ -370,7 +377,7 @@ static int store_thread_runtime_should_sync_with_peer(struct store_thread_runtim struct timespec now; clock_gettime(CLOCK_MONOTONIC, &now); - if (timespec_diff_usec(&ph->last_request_time, &now) < SYNC_COOLDOWN_MS * 1000) + if (timespec_diff_usec(&ph->last_request_time, &now)/1000 < rt->opts->sync_retry_interval_ms) { ph->aborted_requests++; return 0; @@ -444,7 +451,11 @@ static void crdt_merge_on_reply(const struct swarmkv_reply *reply, void *user) thread_rt->sync_err++; } int responed = (reply->type != SWARMKV_REPLY_ERROR); - store_thread_runtime_update_peer_health(thread_rt, &ctx->peer, responed); + int keep_tracking = store_thread_runtime_update_peer_health(thread_rt, &ctx->peer, responed); + if(!keep_tracking) + { + store_thread_runtime_remove_failed_peer(thread_rt, &ctx->peer); + } free(ctx); return; } @@ -524,9 +535,10 @@ int store_thread_runtime_sync(struct store_thread_runtime *thread_rt, int batch_ thread_rt->synced += n_synced; return n_synced; } -void store_thread_runtime_init(struct store_thread_runtime *thread_rt) +void store_thread_runtime_init(struct store_thread_runtime *thread_rt, const struct swarmkv_options *opts) { memset(thread_rt, 0, sizeof(struct store_thread_runtime)); + thread_rt->opts = opts; pthread_mutex_init(&thread_rt->sanity_lock, NULL); return; } @@ -624,7 +636,7 @@ struct swarmkv_module *swarmkv_store_new(const struct swarmkv_options *opts) store->threads = ALLOC(struct store_thread_runtime, opts->nr_worker_threads); for (size_t i = 0; i < opts->nr_worker_threads; i++) { - store_thread_runtime_init(store->threads + i); + store_thread_runtime_init(store->threads + i, opts); } node_init(&store->self, opts->cluster_announce_ip, opts->cluster_announce_port); |
