summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/swarmkv.c20
-rw-r--r--src/swarmkv_store.c660
-rw-r--r--test/swarmkv_gtest.cpp128
3 files changed, 471 insertions, 337 deletions
diff --git a/src/swarmkv.c b/src/swarmkv.c
index 517ab73..081ccea 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -119,7 +119,7 @@ int swarmkv_gettid(const struct swarmkv *db)
assert(0);
return -1;
}
-#define INTER_THREAD_RPC_TIMEOUT_AHEAD 1000
+#define INTER_THREAD_RPC_TIMEOUT_AHEAD 10000
#define UNKNOWN_THREAD_ID -1
static void exec_at_peer(struct swarmkv *db, const struct swarmkv_cmd *cmd, const node_t *peer, struct future *future_of_caller)
{
@@ -831,25 +831,25 @@ enum cmd_exec_result ping_command(struct swarmkv_module *mod_db, const struct sw
enum cmd_exec_result __attribute__((optimize("O0"))) debug_command(struct swarmkv_module *mod_db, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
{
- if (cmd->argc == 2 && !strcasecmp(cmd->argv[1], "help"))
+ if (cmd->argc == 3 && !strcasecmp(cmd->argv[1], "help"))
{
const char *help = {
- "DEBUG <subcommand> [<arg> [value] [opt] ...]. Subcommands are:\n"
+ "DEBUG tid <subcommand> [<arg> [value] [opt] ...]. Subcommands are:\n"
"SLEEP <seconds>\n"
" Stop the server for <seconds>. Decimals allowed.\n"
"ASSERT\n"
" Crash by assertion failed.\n"};
*reply = swarmkv_reply_new_status(help);
}
- else if (!strcasecmp(cmd->argv[1], "sleep") && cmd->argc > 2)
+ else if (!strcasecmp(cmd->argv[2], "sleep") && cmd->argc > 3)
{
- if (cmd->argc < 3)
+ if (cmd->argc < 4)
{
*reply = swarmkv_reply_new_error(error_need_additional_arg, cmd->argv[1]);
}
else
{
- double dtime = strtod(cmd->argv[2], NULL);
+ double dtime = strtod(cmd->argv[3], NULL);
long long utime = dtime * 1000000;
struct timespec tv;
@@ -859,13 +859,13 @@ enum cmd_exec_result __attribute__((optimize("O0"))) debug_command(struct swarmk
*reply = swarmkv_reply_new_status("OK");
}
}
- else if (!strcasecmp(cmd->argv[1], "assert"))
+ else if (!strcasecmp(cmd->argv[2], "assert"))
{
assert(0);
}
else
{
- *reply = swarmkv_reply_new_error(erorr_subcommand_syntax, cmd->argv[1], cmd->argv[0]);
+ *reply = swarmkv_reply_new_error(erorr_subcommand_syntax, cmd->argv[2], cmd->argv[0]);
}
return FINISHED;
}
@@ -1208,8 +1208,8 @@ void command_spec_init(struct swarmkv *db)
swarmkv_command_table_register(db->mod_command_table, "INFO", "[section]",
0, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE,
info_command, &db->module);
- swarmkv_command_table_register(db->mod_command_table, "DEBUG", "<subcommand>",
- 1, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE,
+ swarmkv_command_table_register(db->mod_command_table, "DEBUG", "tid <subcommand>",
+ 2, KEY_OFFSET_TID, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE,
debug_command, &db->module);
swarmkv_command_table_register(db->mod_command_table, "PING", "IP:port",
1, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE,
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c
index d3319a7..969b904 100644
--- a/src/swarmkv_store.c
+++ b/src/swarmkv_store.c
@@ -128,26 +128,39 @@ struct swarmkv_obj_specs sobj_specs[__SWARMKV_OBJ_TYPE_MAX] =
.obj_size = (size_t(*)(const void *))undefined_obj_mem_size}};
#define MODULE_SWAMRKV_STORE module_name_str("swarmkv.store")
-struct swarmkv_store_thread
+struct peer_health
+{
+ node_t peer;
+ struct timespec last_request_time;
+ int missed_responses;
+ int aborted_requests;
+ UT_hash_handle hh;
+};
+struct store_thread_runtime
{
struct scontainer *obj_table;
struct scontainer *sync_queue;
+ struct peer_health *peer_health_table;
pthread_mutex_t sanity_lock;
long long keys_to_sync;
long long n_keys;
long long calls;
+ struct sync_master *sync_master;//should be NULL when not in synchronization
+ struct sync_task *sync_task;
+ int offset_of_sync_task;
+ long long sync_err;
+ long long sync_ok;
+ long long synced;
};
struct swarmkv_store
{
struct swarmkv_module module;
node_t self;
- struct swarmkv_store_thread *threads;
+ struct store_thread_runtime *threads;
struct swarmkv *exec_cmd_handle;
struct swarmkv_module *mod_monitor;
- long long sync_err;
- long long sync_ok;
- long long synced;
+
const struct swarmkv_options *opts;
};
@@ -208,26 +221,38 @@ static void scontainer_free(struct scontainer *ctr)
free(ctr);
return;
}
-
-static struct scontainer *scontainer_find(struct scontainer **table, const sds key)
-{
- struct scontainer *ctr = NULL;
- HASH_FIND(hh, *table, key, sdslen(key), ctr);
- return ctr;
-}
-static void scontainer_join(struct scontainer **table, struct scontainer *ctr)
-{
- HASH_ADD_KEYPTR(hh, *table, ctr->obj.key, sdslen(ctr->obj.key), ctr);
- ctr->is_in_table = 1;
- return;
-}
-static void scontainer_remove(struct scontainer **table, struct scontainer *ctr)
+void scontainer_serialize(struct scontainer *ctr, char **blob, size_t *blob_sz)
{
- HASH_DELETE(hh, *table, ctr);
- ctr->is_in_table = 0;
+ char *value_blob = NULL;
+ size_t value_blob_sz = 0;
+ struct sobj *obj = &ctr->obj;
+ sobj_specs[obj->type].obj_serialize(obj->raw, &value_blob, &value_blob_sz);
+ char *mpack_buff = NULL;
+ size_t mpack_sz = 0;
+ mpack_sz = sizeof(obj->type) + sizeof(ctr->op_timestamp);
+ mpack_sz += sizeof(size_t) + value_blob_sz;
+ mpack_buff = ALLOC(char, mpack_sz);
+ size_t offset = 0;
+
+ memcpy(mpack_buff, &obj->type, sizeof(obj->type));
+ offset += sizeof(obj->type);
+ memcpy(mpack_buff + offset, &ctr->op_timestamp, sizeof(ctr->op_timestamp));
+ offset += sizeof(ctr->op_timestamp);
+
+ // We can add replica node list to the blob, but the merge process will consume more CPU time.
+
+ memcpy(mpack_buff + offset, &value_blob_sz, sizeof(size_t));
+ offset += sizeof(size_t);
+
+ memcpy(mpack_buff + offset, value_blob, value_blob_sz);
+ offset += value_blob_sz;
+ assert(offset == mpack_sz);
+ *blob = mpack_buff;
+ *blob_sz = mpack_sz;
+
+ free(value_blob);
return;
}
-
static int node_cmp(const void *a, const void *b)
{
return node_compare((node_t *)a, (node_t *)b);
@@ -260,150 +285,6 @@ static void scontainer_remove_replica_node(struct scontainer *ctr, const node_t
utarray_erase(ctr->replica_node_list, pos, 1);
return;
}
-void store_add_scontainer(struct swarmkv_store *store, struct scontainer *ctr)
-{
- scontainer_join(&(store->threads[ctr->tid].obj_table), ctr);
- return;
-}
-void store_remove_scontainer(struct swarmkv_store *store, struct scontainer *ctr)
-{
- HASH_DELETE(hh, store->threads[ctr->tid].obj_table, ctr);
- ctr->is_in_table = 0;
- if (ctr->is_in_sync_q)
- {
- DL_DELETE(store->threads[ctr->tid].sync_queue, ctr);
- ctr->is_in_sync_q = 0;
- }
-}
-typedef void sobj_callback_func_t(struct sobj *obj, void *cb_arg);
-void store_iterate_sobj(struct swarmkv_store *store, int tid, sobj_callback_func_t *cb, void *cb_arg)
-{
- struct scontainer *ctr = NULL, *tmp = NULL;
- HASH_ITER(hh, store->threads[tid].obj_table, ctr, tmp)
- {
- if (!ctr->is_pending)
- {
- cb(&ctr->obj, cb_arg);
- }
- }
-}
-struct scontainer *store_lookup_scontainer(struct swarmkv_store *store, sds key)
-{
- struct scontainer *ctr = NULL;
- int designated_tid = key2tid(key, store->opts->nr_worker_threads);
- int real_tid = swarmkv_gettid(store->exec_cmd_handle);
- assert(designated_tid == real_tid);
- ctr = scontainer_find(&(store->threads[designated_tid].obj_table), key);
- if (0 == pthread_mutex_trylock(&store->threads[designated_tid].sanity_lock))
- {
- pthread_mutex_unlock(&store->threads[designated_tid].sanity_lock);
- }
- else
- {
- assert(0);
- }
- if (ctr)
- {
- return ctr;
- }
- else
- {
- return NULL;
- }
-}
-struct sobj *store_lookup(struct swarmkv_module *mod_store, sds key)
-{
- struct swarmkv_store *store = module2store(mod_store);
- struct scontainer *ctr = NULL;
- ctr = store_lookup_scontainer(store, key);
- if (ctr) //&& !ctr->is_pending
- {
- return &(ctr->obj);
- }
- else
- {
- return NULL;
- }
-}
-void store_get_uuid(struct swarmkv_module *mod_store, uuid_t uuid)
-{
- struct swarmkv_store *store = module2store(mod_store);
- uuid_copy(uuid, store->opts->bin_uuid);
- return;
-}
-void store_get_node_addr(struct swarmkv_module *mod_store, node_t *node)
-{
- struct swarmkv_store *store = module2store(mod_store);
- node_copy(node, &store->self);
- return;
-}
-void store_mark_object_as_modified(struct swarmkv_module *mod_store, struct sobj *obj)
-{
- struct swarmkv_store *store = module2store(mod_store);
- struct scontainer *ctr = container_of(obj, struct scontainer, obj);
- gettimeofday(&ctr->op_timestamp, NULL);
- if (!ctr->is_in_sync_q && ctr->replica_node_list)
- {
- DL_APPEND(store->threads[ctr->tid].sync_queue, ctr);
- ctr->is_in_sync_q = 1;
- }
- return;
-}
-int sobj_get_random_replica(struct sobj *obj, node_t *out)
-{
- struct scontainer *ctr = container_of(obj, struct scontainer, obj);
- if (!ctr->replica_node_list || 0 == utarray_len(ctr->replica_node_list))
- return 0;
- node_t *replica = (node_t *)utarray_eltptr(ctr->replica_node_list, 0);
- node_copy(out, replica);
- return 1;
-}
-enum cmd_exec_result handle_undefined_object(struct sobj *obj, struct swarmkv_reply **reply)
-{
- assert(obj->type == OBJ_TYPE_UNDEFINED);
- node_t replica;
- int ret = 0;
- ret = sobj_get_random_replica(obj, &replica);
- if (ret)
- {
- *reply = swarmkv_reply_new_node(&replica, 1);
- return REDIRECT;
- }
- return NEED_KEY_ROUTE;
-}
-void scontainer_serialize(struct scontainer *ctr, char **blob, size_t *blob_sz)
-{
- char *value_blob = NULL;
- size_t value_blob_sz = 0;
- struct sobj *obj = &ctr->obj;
- sobj_specs[obj->type].obj_serialize(obj->raw, &value_blob, &value_blob_sz);
- char *mpack_buff = NULL;
- size_t mpack_sz = 0;
- mpack_sz = sizeof(obj->type) + sizeof(ctr->op_timestamp);
- mpack_sz += sizeof(size_t) + value_blob_sz;
- mpack_buff = ALLOC(char, mpack_sz);
- size_t offset = 0;
-
- memcpy(mpack_buff, &obj->type, sizeof(obj->type));
- offset += sizeof(obj->type);
- memcpy(mpack_buff + offset, &ctr->op_timestamp, sizeof(ctr->op_timestamp));
- offset += sizeof(ctr->op_timestamp);
-
- // We can add replica node list to the blob, but the merge process will consume more CPU time.
-
- memcpy(mpack_buff + offset, &value_blob_sz, sizeof(size_t));
- offset += sizeof(size_t);
-
- memcpy(mpack_buff + offset, value_blob, value_blob_sz);
- offset += value_blob_sz;
- assert(offset == mpack_sz);
- *blob = mpack_buff;
- *blob_sz = mpack_sz;
-
- free(value_blob);
- return;
-}
-
void sobj_merge_blob(struct sobj *obj, const char *blob, size_t blob_sz, uuid_t uuid)
{
struct scontainer *ctr = container_of(obj, struct scontainer, obj);
@@ -441,25 +322,6 @@ void sobj_merge_blob(struct sobj *obj, const char *blob, size_t blob_sz, uuid_t
return;
}
-enum CRDT_OP
-{
- CRDT_GET,
- CRDT_MERGE,
- CRDT_MEET
-};
-struct crdt_generic_ctx
-{
- enum CRDT_OP op;
- struct swarmkv_store *store;
- node_t peer;
- sds key;
-};
-void crdt_generic_ctx_free(struct crdt_generic_ctx *ctx)
-{
- sdsfree(ctx->key);
- ctx->store = NULL;
- free(ctx);
-}
__attribute__((unused)) static void store_remove_failed_peer(struct swarmkv_store *store, int tid, const node_t *peer)
{
struct scontainer *ctr = NULL, *tmp = NULL;
@@ -471,101 +333,145 @@ __attribute__((unused)) static void store_remove_failed_peer(struct swarmkv_stor
}
return;
}
-static void crdt_generic_on_reply(const struct swarmkv_reply *reply, void *user)
-{
- struct crdt_generic_ctx *ctx = (struct crdt_generic_ctx *)user;
+#define SYNC_COOLDOWN_MS 100
- uuid_t uuid;
- store_get_uuid(&(ctx->store->module), uuid);
-
- __attribute__((unused)) long long error_before = ctx->store->sync_err;
-
- switch (ctx->op)
- {
- case CRDT_GET:
+static void store_thread_runtime_update_peer_health(struct store_thread_runtime *rt, const node_t *peer, int responed)
+{
+ struct peer_health *ph = NULL;
+ HASH_FIND(hh, rt->peer_health_table, peer, sizeof(node_t), ph);
+ //If the the peer is responded, unnecessary to track.
+ if (responed)
{
- struct scontainer *ctr = NULL;
- ctr = store_lookup_scontainer(ctx->store, ctx->key);
- if (ctr && reply->type == SWARMKV_REPLY_VERBATIM)
+ if(ph)
{
- sobj_merge_blob(&ctr->obj, reply->str, reply->len, uuid);
- ctr->is_pending = 0;
- atomic_inc(&ctx->store->sync_ok);
+ HASH_DEL(rt->peer_health_table, ph);
+ free(ph);
}
- else
- {
- atomic_inc(&ctx->store->sync_err);
- }
- break;
+ return;
}
- case CRDT_MERGE:
+ if(!ph)
{
- if (reply->type == SWARMKV_REPLY_INTEGER && reply->integer > 0)
- {
- atomic_add(&ctx->store->sync_ok, reply->integer);
- }
- else
- {
- //In batch synchronization, the merge command operates multiple keys of one thread,
- //the ctx->key is the first key of the batch.
- if (reply->type == SWARMKV_REPLY_ERROR && strcasestr(reply->str, "timed out"))
- {
- int tid = __store_gettid(ctx->key, ctx->store->opts->nr_worker_threads);
- store_remove_failed_peer(ctx->store, tid, &ctx->peer);
- }
- atomic_inc(&ctx->store->sync_err);
- }
- break;
+ ph = ALLOC(struct peer_health, 1);
+ node_copy(&ph->peer, peer);
+ HASH_ADD(hh, rt->peer_health_table, peer, sizeof(node_t), ph);
}
- case CRDT_MEET:
+ ph->missed_responses++;
+ clock_gettime(CLOCK_MONOTONIC, &ph->last_request_time);
+ return;
+}
+static int store_thread_runtime_should_sync_with_peer(struct store_thread_runtime *rt, const node_t *peer)
+{
+ struct peer_health *ph = NULL;
+ HASH_FIND(hh, rt->peer_health_table, peer, sizeof(node_t), ph);
+ if (!ph)
{
- if (reply->type == SWARMKV_REPLY_INTEGER)
- {
- atomic_inc(&ctx->store->sync_ok);
- }
- else
+ return 1;
+ }
+ struct timespec now;
+ clock_gettime(CLOCK_MONOTONIC, &now);
+
+ if (timespec_diff_usec(&ph->last_request_time, &now) < SYNC_COOLDOWN_MS * 1000)
+ {
+ ph->aborted_requests++;
+ return 0;
+ }
+ else
+ {
+ ph->last_request_time = now;
+ return 1;
+ }
+}
+static struct scontainer *store_thread_runtime_find_scontainter(struct store_thread_runtime *rt, const sds key)
+{
+ struct scontainer *ctr = NULL;
+ //Thread-safe SANITY CHECK
+ if (0 == pthread_mutex_trylock(&rt->sanity_lock))
+ {
+ pthread_mutex_unlock(&rt->sanity_lock);
+ }
+ else
+ {
+ assert(0);
+ }
+ HASH_FIND(hh, rt->obj_table, key, sdslen(key), ctr);
+ return ctr;
+}
+static void store_thread_runtime_add_scontainer(struct store_thread_runtime *rt, struct scontainer *ctr)
+{
+ HASH_ADD_KEYPTR(hh, rt->obj_table, ctr->obj.key, sdslen(ctr->obj.key), ctr);
+ ctr->is_in_table = 1;
+ return;
+}
+static void store_thread_runtime_remove_scontainer(struct store_thread_runtime *rt, struct scontainer *ctr)
+{
+ HASH_DELETE(hh, rt->obj_table, ctr);
+ ctr->is_in_table = 0;
+ if (ctr->is_in_sync_q)
+ {
+ DL_DELETE(rt->sync_queue, ctr);
+ ctr->is_in_sync_q = 0;
+ }
+ return;
+}
+typedef void sobj_callback_func_t(struct sobj *obj, void *cb_arg);
+void store_thread_runtime_iterate_sobj(struct store_thread_runtime *rt, sobj_callback_func_t *cb, void *cb_arg)
+{
+ struct scontainer *ctr = NULL, *tmp = NULL;
+ HASH_ITER(hh, rt->obj_table, ctr, tmp)
+ {
+ if (!ctr->is_pending)
{
- atomic_inc(&ctx->store->sync_err);
+ cb(&ctr->obj, cb_arg);
}
- break;
}
- default:
+}
+struct crdt_merge_ctx
+{
+ struct store_thread_runtime *thread_rt;
+ node_t peer;
+};
+static void crdt_merge_on_reply(const struct swarmkv_reply *reply, void *user)
+{
+ struct crdt_merge_ctx *ctx = (struct crdt_merge_ctx *)user;
+ struct store_thread_runtime *thread_rt = ctx->thread_rt;
+
+ if (reply->type == SWARMKV_REPLY_INTEGER && reply->integer > 0)
{
- atomic_inc(&ctx->store->sync_err);
- break;
+ thread_rt->sync_ok += reply->integer;
}
+ else
+ {
+ thread_rt->sync_err++;
}
- // assert(ctx->store->sync_err==error_before);
- crdt_generic_ctx_free(ctx);
+ int responed = (reply->type != SWARMKV_REPLY_ERROR);
+ store_thread_runtime_update_peer_health(thread_rt, &ctx->peer, responed);
+ free(ctx);
return;
}
-
-void crdt_generic_call(struct swarmkv_store *store, enum CRDT_OP op, const node_t *peer, int argc, const char *argv[], size_t *argv_len)
+void store_thread_runtime_sync_with_one_peer(struct store_thread_runtime *thread_rt, const node_t *peer, int argc, const char *argv[], size_t argv_len[], struct swarmkv *exec_cmd_handle)
{
- struct crdt_generic_ctx *ctx = NULL;
- assert(peer);
- ctx = ALLOC(struct crdt_generic_ctx, 1);
- ctx->op = op;
- ctx->store = store;
- ctx->key = sdsnew(argv[2]);
+ if(!store_thread_runtime_should_sync_with_peer(thread_rt, peer))
+ {
+ return;
+ }
+ struct crdt_merge_ctx *ctx = ALLOC(struct crdt_merge_ctx, 1);
+ ctx->thread_rt = thread_rt;
node_copy(&ctx->peer, peer);
- swarmkv_async_command_on_argv(store->exec_cmd_handle, crdt_generic_on_reply, ctx, peer->addr, argc, argv, argv_len);
- return;
+ swarmkv_async_command_on_argv(exec_cmd_handle, crdt_merge_on_reply, ctx, peer->addr, argc, argv, argv_len);
}
-#define MONITOR_SYNC_EVENT_NAME "crdt-sync-cycle"
#define MAX_SYNC_PER_PERIOD 100000
-int store_batch_sync(struct swarmkv_store *store, int tid)
+int store_thread_runtime_sync(struct store_thread_runtime *thread_rt, int batch_sync_enabled, struct swarmkv *exec_cmd_handle)
{
int n_synced = 0;
- struct swarmkv_store_thread *thr = &store->threads[tid];
+ thread_rt->calls++;
struct sync_master *sync_master = sync_master_new();
struct scontainer *ctr = NULL, *tmp = NULL;
- DL_FOREACH_SAFE(thr->sync_queue, ctr, tmp)
+ DL_FOREACH_SAFE(thread_rt->sync_queue, ctr, tmp)
{
char *blob = NULL;
size_t blob_sz = 0;
scontainer_serialize(ctr, &blob, &blob_sz);
- if (store->opts->batch_sync_enabled)
+ if (batch_sync_enabled)
{
sync_master_add_obj(sync_master, ctr->obj.key, blob, blob_sz,
utarray_front(ctr->replica_node_list), utarray_len(ctr->replica_node_list));
@@ -585,13 +491,12 @@ int store_batch_sync(struct swarmkv_store *store, int tid)
for (int i = 0; i < utarray_len(ctr->replica_node_list); i++)
{
node_t *peer = utarray_eltptr(ctr->replica_node_list, i);
- crdt_generic_call(store, CRDT_MERGE, peer, 4, argv, argv_len);
+ store_thread_runtime_sync_with_one_peer(thread_rt, peer, 4, argv, argv_len, exec_cmd_handle);
}
free(blob);
}
- DL_DELETE(thr->sync_queue, ctr);
+ DL_DELETE(thread_rt->sync_queue, ctr);
ctr->is_in_sync_q = 0;
- store->synced++;
n_synced++;
if (n_synced >= MAX_SYNC_PER_PERIOD)
break;
@@ -609,64 +514,101 @@ int store_batch_sync(struct swarmkv_store *store, int tid)
argv[1] = "merge";
argv_len[1] = strlen(argv[1]);
sync_task_read_key_blob(task, argv + 2, argv_len + 2, n_data * 2);
- crdt_generic_call(store, CRDT_MERGE, sync_task_peer(task), n_data * 2 + 2, argv, argv_len);
+ store_thread_runtime_sync_with_one_peer(thread_rt, sync_task_peer(task), n_data * 2 + 2, argv, argv_len, exec_cmd_handle);
sync_task_free(task);
task = sync_master_get_task(sync_master);
}
sync_master_free(sync_master);
+ thread_rt->n_keys = HASH_COUNT(thread_rt->obj_table);
+ DL_COUNT(thread_rt->sync_queue, ctr, thread_rt->keys_to_sync);
+ thread_rt->synced += n_synced;
return n_synced;
}
+void store_thread_runtime_init(struct store_thread_runtime *thread_rt)
+{
+ memset(thread_rt, 0, sizeof(struct store_thread_runtime));
+ pthread_mutex_init(&thread_rt->sanity_lock, NULL);
+ return;
+}
+void store_thread_runtime_deinit(struct store_thread_runtime *thread_rt)
+{
+ struct scontainer *ctr = NULL, *tmp = NULL;
+ HASH_ITER(hh, thread_rt->obj_table, ctr, tmp)
+ {
+ store_thread_runtime_remove_scontainer(thread_rt, ctr);
+ scontainer_free(ctr);
+ }
+ struct peer_health *ph = NULL, *tmp_ph = NULL;
+ HASH_ITER(hh, thread_rt->peer_health_table, ph, tmp_ph)
+ {
+ HASH_DEL(thread_rt->peer_health_table, ph);
+ free(ph);
+ }
+ pthread_mutex_destroy(&thread_rt->sanity_lock);
+ return;
+}
+void store_get_uuid(struct swarmkv_module *mod_store, uuid_t uuid)
+{
+ struct swarmkv_store *store = module2store(mod_store);
+ uuid_copy(uuid, store->opts->bin_uuid);
+ return;
+}
+void store_get_node_addr(struct swarmkv_module *mod_store, node_t *node)
+{
+ struct swarmkv_store *store = module2store(mod_store);
+ node_copy(node, &store->self);
+ return;
+}
+void store_mark_object_as_modified(struct swarmkv_module *mod_store, struct sobj *obj)
+{
+ struct swarmkv_store *store = module2store(mod_store);
+ struct scontainer *ctr = container_of(obj, struct scontainer, obj);
+ gettimeofday(&ctr->op_timestamp, NULL);
+ if (!ctr->is_in_sync_q && ctr->replica_node_list)
+ {
+ DL_APPEND(store->threads[ctr->tid].sync_queue, ctr);
+ ctr->is_in_sync_q = 1;
+ }
+ return;
+}
+int sobj_get_random_replica(struct sobj *obj, node_t *out)
+{
+ struct scontainer *ctr = container_of(obj, struct scontainer, obj);
+ if (!ctr->replica_node_list || 0 == utarray_len(ctr->replica_node_list))
+ return 0;
+ node_t *replica = (node_t *)utarray_eltptr(ctr->replica_node_list, 0);
+ node_copy(out, replica);
+ return 1;
+}
+enum cmd_exec_result handle_undefined_object(struct sobj *obj, struct swarmkv_reply **reply)
+{
+ assert(obj->type == OBJ_TYPE_UNDEFINED);
+ node_t replica;
+ int ret = 0;
+ ret = sobj_get_random_replica(obj, &replica);
+ if (ret)
+ {
+ *reply = swarmkv_reply_new_node(&replica, 1);
+ return REDIRECT;
+ }
+ return NEED_KEY_ROUTE;
+}
+
+
+#define MONITOR_SYNC_EVENT_NAME "crdt-sync-cycle"
+
void swarmkv_store_periodic(struct swarmkv_module *mod_store, int thread_id)
{
struct swarmkv_store *store = module2store(mod_store);
- struct scontainer *ctr = NULL, *tmp = NULL;
struct timespec start, end;
int n_synced = 0;
int real_tid = swarmkv_gettid(store->exec_cmd_handle);
assert(real_tid == thread_id);
clock_gettime(CLOCK_MONOTONIC, &start);
- struct swarmkv_store_thread *thr = &store->threads[real_tid];
- thr->calls++;
- if (store->opts->batch_sync_enabled)
- {
- n_synced = store_batch_sync(store, real_tid);
- }
- else
- {
- DL_FOREACH_SAFE(thr->sync_queue, ctr, tmp)
- {
- char *blob = NULL;
- size_t blob_sz = 0;
- scontainer_serialize(ctr, &blob, &blob_sz);
- const char *argv[4];
- size_t argv_len[4];
- argv[0] = "crdt";
- argv_len[0] = strlen(argv[0]);
- argv[1] = "merge";
- argv_len[1] = strlen(argv[1]);
- argv[2] = ctr->obj.key;
- argv_len[2] = sdslen(ctr->obj.key);
- argv[3] = blob;
- argv_len[3] = blob_sz;
- for (int i = 0; i < utarray_len(ctr->replica_node_list); i++)
- {
- node_t *peer = utarray_eltptr(ctr->replica_node_list, i);
- crdt_generic_call(store, CRDT_MERGE, peer, 4, argv, argv_len);
- }
- free(blob);
- DL_DELETE(thr->sync_queue, ctr);
- ctr->is_in_sync_q = 0;
- store->synced++;
- n_synced++;
- if (n_synced >= MAX_SYNC_PER_PERIOD)
- break;
- }
- }
- thr->n_keys = HASH_COUNT(thr->obj_table);
- DL_COUNT(thr->sync_queue, ctr, thr->keys_to_sync);
- clock_gettime(CLOCK_MONOTONIC, &end);
+ n_synced = store_thread_runtime_sync(store->threads + real_tid, store->opts->batch_sync_enabled, store->exec_cmd_handle);
+ clock_gettime(CLOCK_MONOTONIC, &end);
if (n_synced)
{
swarmkv_monitor_record_event(store->mod_monitor, MONITOR_SYNC_EVENT_NAME, timespec_diff_usec(&start, &end));
@@ -679,8 +621,11 @@ struct swarmkv_module *swarmkv_store_new(const struct swarmkv_options *opts)
strncpy(store->module.name, "store", sizeof(store->module.name));
store->module.mod_ctx = store;
store->opts = opts;
- store->threads = ALLOC(struct swarmkv_store_thread, opts->nr_worker_threads);
-
+ store->threads = ALLOC(struct store_thread_runtime, opts->nr_worker_threads);
+ for (size_t i = 0; i < opts->nr_worker_threads; i++)
+ {
+ store_thread_runtime_init(store->threads + i);
+ }
node_init(&store->self, opts->cluster_announce_ip, opts->cluster_announce_port);
return &(store->module);
@@ -694,17 +639,9 @@ void swarmkv_store_set_exec_cmd_handle(struct swarmkv_module *mod_store, struct
void swarmkv_store_free(struct swarmkv_module *mod_store)
{
struct swarmkv_store *store = module2store(mod_store);
- struct scontainer *ctr = NULL, *tmp = NULL;
-
for (size_t i = 0; i < store->opts->nr_worker_threads; i++)
{
-
- HASH_ITER(hh, store->threads[i].obj_table, ctr, tmp)
- {
- assert(ctr->tid == i);
- scontainer_remove(&(store->threads[i].obj_table), ctr);
- scontainer_free(ctr);
- }
+ store_thread_runtime_deinit(store->threads + i);
}
free(store->threads);
store->threads = NULL;
@@ -722,17 +659,17 @@ void swarmkv_store_info(struct swarmkv_module *mod_store, struct store_info *inf
struct swarmkv_store *store = module2store(mod_store);
info->keys = 0;
info->keys_to_sync = 0;
- struct swarmkv_store_thread *thread = NULL;
+ struct store_thread_runtime *thread = NULL;
memset(info, 0, sizeof(struct store_info));
for (size_t i = 0; i < store->opts->nr_worker_threads; i++)
{
thread = store->threads + i;
info->keys_to_sync += __sync_add_and_fetch(&thread->keys_to_sync, 0);
info->keys += __sync_add_and_fetch(&thread->n_keys, 0);
+ info->sync_ok += __sync_add_and_fetch(&thread->sync_ok, 0);
+ info->sync_err += __sync_add_and_fetch(&thread->sync_err, 0);
+ info->synced += __sync_add_and_fetch(&thread->synced, 0);
}
- info->sync_ok = __sync_add_and_fetch(&store->sync_ok, 0);
- info->sync_err = __sync_add_and_fetch(&store->sync_err, 0);
- info->synced = store->synced;
}
UT_icd ut_array_matched_reply = {sizeof(struct swarmkv_reply *), NULL, NULL, NULL};
@@ -755,6 +692,95 @@ enum cmd_exec_result type_command(struct swarmkv_module *mod_store, const struct
*reply = swarmkv_reply_new_string_fmt(sobj_specs[obj->type].type_name);
return FINISHED;
}
+struct scontainer *store_lookup_scontainer(struct swarmkv_store *store, sds key)
+{
+ struct scontainer *ctr = NULL;
+ int designated_tid = key2tid(key, store->opts->nr_worker_threads);
+ int real_tid = swarmkv_gettid(store->exec_cmd_handle);
+ assert(designated_tid == real_tid);
+ ctr = store_thread_runtime_find_scontainter(store->threads + designated_tid, key);
+ return ctr;
+}
+struct sobj *store_lookup(struct swarmkv_module *mod_store, sds key)
+{
+ struct swarmkv_store *store = module2store(mod_store);
+ struct scontainer *ctr = NULL;
+
+ ctr = store_lookup_scontainer(store, key);
+ if (ctr) //&& !ctr->is_pending
+ {
+ return &(ctr->obj);
+ }
+ else
+ {
+ return NULL;
+ }
+}
+enum CRDT_OP
+{
+ CRDT_GET,
+ CRDT_MERGE,
+ CRDT_MEET
+};
+struct crdt_generic_ctx
+{
+ enum CRDT_OP op;
+ struct swarmkv_store *store;
+ node_t peer;
+ sds key;
+};
+void crdt_generic_ctx_free(struct crdt_generic_ctx *ctx)
+{
+ sdsfree(ctx->key);
+ ctx->store = NULL;
+ free(ctx);
+}
+static void crdt_generic_on_reply(const struct swarmkv_reply *reply, void *user)
+{
+ struct crdt_generic_ctx *ctx = (struct crdt_generic_ctx *)user;
+
+ uuid_t uuid;
+ store_get_uuid(&(ctx->store->module), uuid);
+
+ switch (ctx->op)
+ {
+ case CRDT_GET:
+ {
+ struct scontainer *ctr = NULL;
+ ctr = store_lookup_scontainer(ctx->store, ctx->key);
+ if (ctr && reply->type == SWARMKV_REPLY_VERBATIM)
+ {
+ sobj_merge_blob(&ctr->obj, reply->str, reply->len, uuid);
+ ctr->is_pending = 0;
+ }
+ break;
+ }
+ case CRDT_MEET:
+ {
+ break;
+ }
+ default:
+ {
+ assert(0);
+ break;
+ }
+ }
+ crdt_generic_ctx_free(ctx);
+ return;
+}
+
+void crdt_generic_call(struct swarmkv_store *store, enum CRDT_OP op, const node_t *peer, int argc, const char *argv[], size_t *argv_len)
+{
+ struct crdt_generic_ctx *ctx = NULL;
+ assert(peer);
+ ctx = ALLOC(struct crdt_generic_ctx, 1);
+ ctx->op = op;
+ ctx->store = store;
+ ctx->key = sdsnew(argv[2]);
+ node_copy(&ctx->peer, peer);
+ swarmkv_async_command_on_argv(store->exec_cmd_handle, crdt_generic_on_reply, ctx, peer->addr, argc, argv, argv_len);
+ return;
+}
enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
{
/*CRDT ADD key [IP:port ...]*/
@@ -781,7 +807,7 @@ enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const st
{
ctr->is_pending = 0;
}
- store_add_scontainer(store, ctr);
+ store_thread_runtime_add_scontainer(store->threads + ctr->tid, ctr);
}
node_t *replica_nodes = ALLOC(node_t, n_replica_node);
for (size_t i = 0; i < n_replica_node; i++)
@@ -892,7 +918,7 @@ enum cmd_exec_result crdt_del_command(struct swarmkv_module *mod_store, const st
ctr = store_lookup_scontainer(store, key);
if (ctr)
{
- store_remove_scontainer(store, ctr);
+ store_thread_runtime_remove_scontainer(store->threads + ctr->tid, ctr);
scontainer_free(ctr);
*reply = swarmkv_reply_new_integer(1);
}
@@ -954,16 +980,16 @@ enum cmd_exec_result crdt_keys_command(struct swarmkv_module *mod_store, const s
// CRDT KEYS tid pattern
int i = 0, n_matched = 0;
struct pattern_match_arg cb_arg;
- int thread_id = atoll(cmd->argv[2]);
+ int tid = atoll(cmd->argv[2]);
cb_arg.pattern = cmd->argv[3];
utarray_new(cb_arg.matched_replies, &ut_array_matched_reply);
struct swarmkv_store *store = module2store(mod_store);
- if (thread_id > store->opts->nr_worker_threads)
+ if (tid > store->opts->nr_worker_threads)
{
*reply = swarmkv_reply_new_error("Invalid thread id");
return FINISHED;
}
- store_iterate_sobj(store, thread_id, pattern_match_function, &cb_arg);
+ store_thread_runtime_iterate_sobj(store->threads + tid, pattern_match_function, &cb_arg);
n_matched = utarray_len(cb_arg.matched_replies);
if (n_matched > 0)
{
diff --git a/test/swarmkv_gtest.cpp b/test/swarmkv_gtest.cpp
index eff3446..dbb8613 100644
--- a/test/swarmkv_gtest.cpp
+++ b/test/swarmkv_gtest.cpp
@@ -1102,7 +1102,7 @@ protected:
const char *log_path="./swarmkv-2-nodes.log";
const char *cluster_name="swarmkv-2-nodes";
-
+ worker_thread_number = 1;
char ip_list[1024]={0};
snprintf(ip_list, sizeof(ip_list), "127.0.0.1:%d 127.0.0.1:%d", TWO_NODES_TEST_CLUSTER_PORT, TWO_NODES_TEST_CLUSTER_PORT+1);
swarmkv_cli_create_cluster(cluster_name, ip_list);
@@ -1116,9 +1116,9 @@ protected:
swarmkv_options_set_cluster_port(opts[i], TWO_NODES_TEST_CLUSTER_PORT+i);
swarmkv_options_set_health_check_port(opts[i], TWO_NODES_TEST_HEALTH_PORT+i);
swarmkv_options_set_logger(opts[i], logger);
- swarmkv_options_set_sync_interval_us(opts[i], 10000);
- swarmkv_options_set_cluster_timeout_us(opts[i], 500*1000);
- swarmkv_options_set_worker_thread_number(opts[i], 2);
+ swarmkv_options_set_sync_interval_us(opts[i], 10*1000);
+ swarmkv_options_set_cluster_timeout_us(opts[i], 200*1000);
+ swarmkv_options_set_worker_thread_number(opts[i], worker_thread_number);
swarmkv_options_set_caller_thread_number(opts[i], 1);
swarmkv_options_set_batch_sync_enabled(opts[i], 1);
swarmkv_options_set_network_compression_enabled(opts[i], i%2);
@@ -1143,10 +1143,12 @@ protected:
// Some expensive resource shared by all tests.
static struct log_handle *logger;
static struct swarmkv *db1, *db2;
+ static int worker_thread_number;
};
struct log_handle *SwarmkvTwoNodes::logger;
struct swarmkv *SwarmkvTwoNodes::db1;
struct swarmkv *SwarmkvTwoNodes::db2;
+int SwarmkvTwoNodes::worker_thread_number;
TEST_F(SwarmkvTwoNodes, KeyspaceKeys)
{
struct swarmkv *db[2];
@@ -2008,17 +2010,19 @@ TEST_F(SwarmkvTwoNodes, TypeBloomFilter)
struct swarmkv_reply *reply=NULL;
const char *key="bloom-filter-001";
const char *item[4]={"hello", "world", "bloom", "filter"};
- reply=swarmkv_command(db[0], "BFINIT %s 0.0001 1000000 time 3000000 13", key);
+ reply=swarmkv_command(db[0], "BFINIT %s 0.0001 100000 time 3000000 13", key);
ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
EXPECT_STREQ(reply->str, "OK");
swarmkv_reply_free(reply);
- for(int i=0; i<3; i++)
+ for(int i=0; i<4; i++)
{
reply=swarmkv_command(db[i%2], "BFADD %s %s", key, item[i]);
+ if(reply->type == SWARMKV_REPLY_ERROR) swarmkv_reply_print(reply, stdout);
ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
swarmkv_reply_free(reply);
}
+ wait_for_sync();
reply=swarmkv_command(db[1], "BFMEXISTS %s %s %s %s", key, item[0], item[1], item[2]);
ASSERT_EQ(reply->type, SWARMKV_REPLY_ARRAY);
ASSERT_EQ(reply->n_element, 3);
@@ -2046,7 +2050,7 @@ TEST_F(SwarmkvTwoNodes, TypeBloomFilter)
ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
swarmkv_reply_free(reply);
- usleep(100*1000);
+ wait_for_sync();
reply=swarmkv_command(db[1], "BFEXISTS %s %s", key, item[3]);
ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
@@ -2185,7 +2189,7 @@ TEST_F(SwarmkvTwoNodes, Info)
for(size_t i=0; i<2; i++)
{
reply=swarmkv_command(db[i%2], "INFO");
- ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ EXPECT_EQ(reply->type, SWARMKV_REPLY_STATUS);
swarmkv_reply_free(reply);
}
}
@@ -2198,7 +2202,7 @@ TEST_F(SwarmkvTwoNodes, Monitor)
for(size_t i=0; i<2; i++)
{
reply=swarmkv_command(db[i%2], "MONREG %s g?t", swarmkv_self_address(db[i%2]));
- ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ EXPECT_EQ(reply->type, SWARMKV_REPLY_STATUS);
swarmkv_reply_free(reply);
}
}
@@ -2301,7 +2305,111 @@ TEST_F(SwarmkvTwoNodes, Latency)
reply=swarmkv_command(db1, "latency peer");
EXPECT_EQ(reply->type, SWARMKV_REPLY_ARRAY);
swarmkv_reply_free(reply);
- sleep(3500);
+}
+static int parse_swarmkv_info(const char *text, const char *key) {
+ // Duplicate the input text since strtok_r modifies the string
+ char *copy = strdup(text);
+ if (copy == NULL) {
+ perror("Failed to allocate memory");
+ return -1;
+ }
+
+ char *line;
+ char *rest = copy;
+ int value = -1;
+ size_t key_len = strlen(key);
+
+ // Construct the search prefix: key followed by ':'
+ char *prefix = (char*) malloc(key_len + 2); // +2 for ':' and null terminator
+ if (prefix == NULL) {
+ perror("Failed to allocate memory");
+ free(copy);
+ return -1;
+ }
+ strcpy(prefix, key);
+ strcat(prefix, ":");
+
+ // Tokenize the string by newline characters
+ while ((line = strtok_r(rest, "\n", &rest))) {
+ // Check if the line starts with the key prefix
+ if (strncmp(line, prefix, strlen(prefix)) == 0) {
+ // Extract the integer value after the key and colon
+ if (sscanf(line + strlen(prefix), "%d", &value) == 1) {
+ break; // Value found; exit the loop
+ }
+ }
+ }
+
+ // Clean up allocated memory
+ free(prefix);
+ free(copy);
+
+ return value;
+}
+TEST_F(SwarmkvTwoNodes, SyncRobustness)
+{
+ struct swarmkv *db1=SwarmkvTwoNodes::db1, *db2=SwarmkvTwoNodes::db2;
+ struct swarmkv_reply *reply=NULL;
+ const char *key="crdt-robustness";
+ reply=swarmkv_command(db1, "INCRBY %s 1", key);
+ EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 1);
+ swarmkv_reply_free(reply);
+
+ reply=swarmkv_command(db2, "INCRBY %s 1", key);
+ EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 2);
+ swarmkv_reply_free(reply);
+
+ wait_for_sync();
+ int sync_err_before = 0, sync_err_after = 0;
+ reply = swarmkv_command(db1, "INFO Store");
+ EXPECT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ sync_err_before = parse_swarmkv_info(reply->str, "sync_err");
+ swarmkv_reply_free(reply);
+
+ reply = swarmkv_command(db2, "KEYSLOT %s", key);
+ EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ int keyslot = reply->integer;
+ swarmkv_reply_free(reply);
+
+ //This is a hack of swarmkv_keyspace_slot2tid().
+ int tid = keyslot % SwarmkvTwoNodes::worker_thread_number;
+ reply=swarmkv_command(db2, "DEBUG %d sleep 1", tid);
+ EXPECT_EQ(reply->type, SWARMKV_REPLY_ERROR);
+ swarmkv_reply_print(reply, stdout);
+ swarmkv_reply_free(reply);
+
+ //db1 -> db2 sync failed
+ reply=swarmkv_command(db1, "INCRBY %s 1", key);
+ EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 3);
+ swarmkv_reply_free(reply);
+
+ sleep(1);
+
+ reply = swarmkv_command(db1, "INFO Store");
+ EXPECT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ sync_err_after = parse_swarmkv_info(reply->str, "sync_err");
+ EXPECT_EQ(sync_err_after - sync_err_before, 1);
+ //swarmkv_reply_print(reply, stdout);
+ swarmkv_reply_free(reply);
+
+
+
+ //db1 -> db2 sync should be resumed after this operation
+ reply=swarmkv_command(db1, "INCRBY %s 1", key);
+ EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 4);
+ swarmkv_reply_free(reply);
+
+ wait_for_sync();
+
+ reply=swarmkv_command(db2, "GET %s", key);
+ EXPECT_EQ(reply->type, SWARMKV_REPLY_STRING);
+ EXPECT_STREQ(reply->str, "4");
+ swarmkv_reply_free(reply);
+
}
TEST_F(SwarmkvTwoNodes, Wait)
{