summaryrefslogtreecommitdiff
path: root/src/swarmkv_store.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/swarmkv_store.c')
-rw-r--r--src/swarmkv_store.c46
1 files changed, 29 insertions, 17 deletions
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c
index 969b904..31d165b 100644
--- a/src/swarmkv_store.c
+++ b/src/swarmkv_store.c
@@ -131,6 +131,7 @@ struct swarmkv_obj_specs sobj_specs[__SWARMKV_OBJ_TYPE_MAX] =
struct peer_health
{
node_t peer;
+ struct timespec start_tracking_time;
struct timespec last_request_time;
int missed_responses;
int aborted_requests;
@@ -151,6 +152,7 @@ struct store_thread_runtime
long long sync_err;
long long sync_ok;
long long synced;
+ const struct swarmkv_options *opts;
};
struct swarmkv_store
{
@@ -322,42 +324,47 @@ void sobj_merge_blob(struct sobj *obj, const char *blob, size_t blob_sz, uuid_t
return;
}
-__attribute__((unused)) static void store_remove_failed_peer(struct swarmkv_store *store, int tid, const node_t *peer)
+static void store_thread_runtime_remove_failed_peer(struct store_thread_runtime *thread_rt, const node_t *peer)
{
struct scontainer *ctr = NULL, *tmp = NULL;
- if (tid >= store->opts->nr_worker_threads)
- return; // swarmkv_close() is called.
- HASH_ITER(hh, store->threads[tid].obj_table, ctr, tmp)
+ HASH_ITER(hh, thread_rt->obj_table, ctr, tmp)
{
scontainer_remove_replica_node(ctr, peer);
}
return;
}
-#define SYNC_COOLDOWN_MS 100
-
-static void store_thread_runtime_update_peer_health(struct store_thread_runtime *rt, const node_t *peer, int responed)
+static int store_thread_runtime_update_peer_health(struct store_thread_runtime *thread_rt, const node_t *peer, int responed)
{
struct peer_health *ph = NULL;
- HASH_FIND(hh, rt->peer_health_table, peer, sizeof(node_t), ph);
+ HASH_FIND(hh, thread_rt->peer_health_table, peer, sizeof(node_t), ph);
//If the the peer is responded, unnecessary to track.
if (responed)
{
if(ph)
{
- HASH_DEL(rt->peer_health_table, ph);
+ HASH_DEL(thread_rt->peer_health_table, ph);
free(ph);
}
- return;
+ return 1;
}
+ struct timespec now;
+ clock_gettime(CLOCK_MONOTONIC, &now);
if(!ph)
{
ph = ALLOC(struct peer_health, 1);
node_copy(&ph->peer, peer);
- HASH_ADD(hh, rt->peer_health_table, peer, sizeof(node_t), ph);
+ HASH_ADD(hh, thread_rt->peer_health_table, peer, sizeof(node_t), ph);
+ ph->last_request_time = ph->start_tracking_time = now;
}
ph->missed_responses++;
- clock_gettime(CLOCK_MONOTONIC, &ph->last_request_time);
- return;
+ if(timespec_diff_usec(&ph->start_tracking_time, &now)/1000 > thread_rt->opts->sync_retry_time_ms)
+ {
+ HASH_DEL(thread_rt->peer_health_table, ph);
+ free(ph);
+ return 0;
+ }
+ else
+ return 1;
}
static int store_thread_runtime_should_sync_with_peer(struct store_thread_runtime *rt, const node_t *peer)
{
@@ -370,7 +377,7 @@ static int store_thread_runtime_should_sync_with_peer(struct store_thread_runtim
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
- if (timespec_diff_usec(&ph->last_request_time, &now) < SYNC_COOLDOWN_MS * 1000)
+ if (timespec_diff_usec(&ph->last_request_time, &now)/1000 < rt->opts->sync_retry_interval_ms)
{
ph->aborted_requests++;
return 0;
@@ -444,7 +451,11 @@ static void crdt_merge_on_reply(const struct swarmkv_reply *reply, void *user)
thread_rt->sync_err++;
}
int responed = (reply->type != SWARMKV_REPLY_ERROR);
- store_thread_runtime_update_peer_health(thread_rt, &ctx->peer, responed);
+ int keep_tracking = store_thread_runtime_update_peer_health(thread_rt, &ctx->peer, responed);
+ if(!keep_tracking)
+ {
+ store_thread_runtime_remove_failed_peer(thread_rt, &ctx->peer);
+ }
free(ctx);
return;
}
@@ -524,9 +535,10 @@ int store_thread_runtime_sync(struct store_thread_runtime *thread_rt, int batch_
thread_rt->synced += n_synced;
return n_synced;
}
-void store_thread_runtime_init(struct store_thread_runtime *thread_rt)
+void store_thread_runtime_init(struct store_thread_runtime *thread_rt, const struct swarmkv_options *opts)
{
memset(thread_rt, 0, sizeof(struct store_thread_runtime));
+ thread_rt->opts = opts;
pthread_mutex_init(&thread_rt->sanity_lock, NULL);
return;
}
@@ -624,7 +636,7 @@ struct swarmkv_module *swarmkv_store_new(const struct swarmkv_options *opts)
store->threads = ALLOC(struct store_thread_runtime, opts->nr_worker_threads);
for (size_t i = 0; i < opts->nr_worker_threads; i++)
{
- store_thread_runtime_init(store->threads + i);
+ store_thread_runtime_init(store->threads + i, opts);
}
node_init(&store->self, opts->cluster_announce_ip, opts->cluster_announce_port);