diff options
| author | 郑超 <[email protected]> | 2024-09-20 08:05:35 +0000 |
|---|---|---|
| committer | 郑超 <[email protected]> | 2024-09-20 08:05:35 +0000 |
| commit | 8606b34a4d5d2cffa5ab69dc0506514b813b4b84 (patch) | |
| tree | d3493e3eecb09c7334e5093eff1673aebb53c497 | |
| parent | 11cf5e4035c5002acb7023f233678ede0cb63beb (diff) | |
Track peer health for robust CRDT synchronization. If a peer is unresponsive, throttle synchronization to once every 100ms.v4.4.3
| -rw-r--r-- | src/swarmkv.c | 20 | ||||
| -rw-r--r-- | src/swarmkv_store.c | 660 | ||||
| -rw-r--r-- | test/swarmkv_gtest.cpp | 128 |
3 files changed, 471 insertions, 337 deletions
diff --git a/src/swarmkv.c b/src/swarmkv.c index 517ab73..081ccea 100644 --- a/src/swarmkv.c +++ b/src/swarmkv.c @@ -119,7 +119,7 @@ int swarmkv_gettid(const struct swarmkv *db) assert(0); return -1; } -#define INTER_THREAD_RPC_TIMEOUT_AHEAD 1000 +#define INTER_THREAD_RPC_TIMEOUT_AHEAD 10000 #define UNKNOWN_THREAD_ID -1 static void exec_at_peer(struct swarmkv *db, const struct swarmkv_cmd *cmd, const node_t *peer, struct future *future_of_caller) { @@ -831,25 +831,25 @@ enum cmd_exec_result ping_command(struct swarmkv_module *mod_db, const struct sw enum cmd_exec_result __attribute__((optimize("O0"))) debug_command(struct swarmkv_module *mod_db, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) { - if (cmd->argc == 2 && !strcasecmp(cmd->argv[1], "help")) + if (cmd->argc == 3 && !strcasecmp(cmd->argv[1], "help")) { const char *help = { - "DEBUG <subcommand> [<arg> [value] [opt] ...]. Subcommands are:\n" + "DEBUG tid <subcommand> [<arg> [value] [opt] ...]. Subcommands are:\n" "SLEEP <seconds>\n" " Stop the server for <seconds>. Decimals allowed.\n" "ASSERT\n" " Crash by assertion failed.\n"}; *reply = swarmkv_reply_new_status(help); } - else if (!strcasecmp(cmd->argv[1], "sleep") && cmd->argc > 2) + else if (!strcasecmp(cmd->argv[2], "sleep") && cmd->argc > 3) { - if (cmd->argc < 3) + if (cmd->argc < 4) { *reply = swarmkv_reply_new_error(error_need_additional_arg, cmd->argv[1]); } else { - double dtime = strtod(cmd->argv[2], NULL); + double dtime = strtod(cmd->argv[3], NULL); long long utime = dtime * 1000000; struct timespec tv; @@ -859,13 +859,13 @@ enum cmd_exec_result __attribute__((optimize("O0"))) debug_command(struct swarmk *reply = swarmkv_reply_new_status("OK"); } } - else if (!strcasecmp(cmd->argv[1], "assert")) + else if (!strcasecmp(cmd->argv[2], "assert")) { assert(0); } else { - *reply = swarmkv_reply_new_error(erorr_subcommand_syntax, cmd->argv[1], cmd->argv[0]); + *reply = swarmkv_reply_new_error(erorr_subcommand_syntax, cmd->argv[2], cmd->argv[0]); } return FINISHED; } @@ -1208,8 +1208,8 @@ void command_spec_init(struct swarmkv *db) swarmkv_command_table_register(db->mod_command_table, "INFO", "[section]", 0, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE, info_command, &db->module); - swarmkv_command_table_register(db->mod_command_table, "DEBUG", "<subcommand>", - 1, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE, + swarmkv_command_table_register(db->mod_command_table, "DEBUG", "tid <subcommand>", + 2, KEY_OFFSET_TID, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE, debug_command, &db->module); swarmkv_command_table_register(db->mod_command_table, "PING", "IP:port", 1, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE, diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c index d3319a7..969b904 100644 --- a/src/swarmkv_store.c +++ b/src/swarmkv_store.c @@ -128,26 +128,39 @@ struct swarmkv_obj_specs sobj_specs[__SWARMKV_OBJ_TYPE_MAX] = .obj_size = (size_t(*)(const void *))undefined_obj_mem_size}}; #define MODULE_SWAMRKV_STORE module_name_str("swarmkv.store") -struct swarmkv_store_thread +struct peer_health +{ + node_t peer; + struct timespec last_request_time; + int missed_responses; + int aborted_requests; + UT_hash_handle hh; +}; +struct store_thread_runtime { struct scontainer *obj_table; struct scontainer *sync_queue; + struct peer_health *peer_health_table; pthread_mutex_t sanity_lock; long long keys_to_sync; long long n_keys; long long calls; + struct sync_master *sync_master;//should be NULL when not in synchronization + struct sync_task *sync_task; + int offset_of_sync_task; + long long sync_err; + long long sync_ok; + long long synced; }; struct swarmkv_store { struct swarmkv_module module; node_t self; - struct swarmkv_store_thread *threads; + struct store_thread_runtime *threads; struct swarmkv *exec_cmd_handle; struct swarmkv_module *mod_monitor; - long long sync_err; - long long sync_ok; - long long synced; + const struct swarmkv_options *opts; }; @@ -208,26 +221,38 @@ static void scontainer_free(struct scontainer *ctr) free(ctr); return; } - -static struct scontainer *scontainer_find(struct scontainer **table, const sds key) -{ - struct scontainer *ctr = NULL; - HASH_FIND(hh, *table, key, sdslen(key), ctr); - return ctr; -} -static void scontainer_join(struct scontainer **table, struct scontainer *ctr) -{ - HASH_ADD_KEYPTR(hh, *table, ctr->obj.key, sdslen(ctr->obj.key), ctr); - ctr->is_in_table = 1; - return; -} -static void scontainer_remove(struct scontainer **table, struct scontainer *ctr) +void scontainer_serialize(struct scontainer *ctr, char **blob, size_t *blob_sz) { - HASH_DELETE(hh, *table, ctr); - ctr->is_in_table = 0; + char *value_blob = NULL; + size_t value_blob_sz = 0; + struct sobj *obj = &ctr->obj; + sobj_specs[obj->type].obj_serialize(obj->raw, &value_blob, &value_blob_sz); + char *mpack_buff = NULL; + size_t mpack_sz = 0; + mpack_sz = sizeof(obj->type) + sizeof(ctr->op_timestamp); + mpack_sz += sizeof(size_t) + value_blob_sz; + mpack_buff = ALLOC(char, mpack_sz); + size_t offset = 0; + + memcpy(mpack_buff, &obj->type, sizeof(obj->type)); + offset += sizeof(obj->type); + memcpy(mpack_buff + offset, &ctr->op_timestamp, sizeof(ctr->op_timestamp)); + offset += sizeof(ctr->op_timestamp); + + // We can add replica node list to the blob, but the merge process will consume more CPU time. + + memcpy(mpack_buff + offset, &value_blob_sz, sizeof(size_t)); + offset += sizeof(size_t); + + memcpy(mpack_buff + offset, value_blob, value_blob_sz); + offset += value_blob_sz; + assert(offset == mpack_sz); + *blob = mpack_buff; + *blob_sz = mpack_sz; + + free(value_blob); return; } - static int node_cmp(const void *a, const void *b) { return node_compare((node_t *)a, (node_t *)b); @@ -260,150 +285,6 @@ static void scontainer_remove_replica_node(struct scontainer *ctr, const node_t utarray_erase(ctr->replica_node_list, pos, 1); return; } -void store_add_scontainer(struct swarmkv_store *store, struct scontainer *ctr) -{ - scontainer_join(&(store->threads[ctr->tid].obj_table), ctr); - return; -} -void store_remove_scontainer(struct swarmkv_store *store, struct scontainer *ctr) -{ - HASH_DELETE(hh, store->threads[ctr->tid].obj_table, ctr); - ctr->is_in_table = 0; - if (ctr->is_in_sync_q) - { - DL_DELETE(store->threads[ctr->tid].sync_queue, ctr); - ctr->is_in_sync_q = 0; - } -} -typedef void sobj_callback_func_t(struct sobj *obj, void *cb_arg); -void store_iterate_sobj(struct swarmkv_store *store, int tid, sobj_callback_func_t *cb, void *cb_arg) -{ - struct scontainer *ctr = NULL, *tmp = NULL; - HASH_ITER(hh, store->threads[tid].obj_table, ctr, tmp) - { - if (!ctr->is_pending) - { - cb(&ctr->obj, cb_arg); - } - } -} -struct scontainer *store_lookup_scontainer(struct swarmkv_store *store, sds key) -{ - struct scontainer *ctr = NULL; - int designated_tid = key2tid(key, store->opts->nr_worker_threads); - int real_tid = swarmkv_gettid(store->exec_cmd_handle); - assert(designated_tid == real_tid); - ctr = scontainer_find(&(store->threads[designated_tid].obj_table), key); - if (0 == pthread_mutex_trylock(&store->threads[designated_tid].sanity_lock)) - { - pthread_mutex_unlock(&store->threads[designated_tid].sanity_lock); - } - else - { - assert(0); - } - if (ctr) - { - return ctr; - } - else - { - return NULL; - } -} -struct sobj *store_lookup(struct swarmkv_module *mod_store, sds key) -{ - struct swarmkv_store *store = module2store(mod_store); - struct scontainer *ctr = NULL; - ctr = store_lookup_scontainer(store, key); - if (ctr) //&& !ctr->is_pending - { - return &(ctr->obj); - } - else - { - return NULL; - } -} -void store_get_uuid(struct swarmkv_module *mod_store, uuid_t uuid) -{ - struct swarmkv_store *store = module2store(mod_store); - uuid_copy(uuid, store->opts->bin_uuid); - return; -} -void store_get_node_addr(struct swarmkv_module *mod_store, node_t *node) -{ - struct swarmkv_store *store = module2store(mod_store); - node_copy(node, &store->self); - return; -} -void store_mark_object_as_modified(struct swarmkv_module *mod_store, struct sobj *obj) -{ - struct swarmkv_store *store = module2store(mod_store); - struct scontainer *ctr = container_of(obj, struct scontainer, obj); - gettimeofday(&ctr->op_timestamp, NULL); - if (!ctr->is_in_sync_q && ctr->replica_node_list) - { - DL_APPEND(store->threads[ctr->tid].sync_queue, ctr); - ctr->is_in_sync_q = 1; - } - return; -} -int sobj_get_random_replica(struct sobj *obj, node_t *out) -{ - struct scontainer *ctr = container_of(obj, struct scontainer, obj); - if (!ctr->replica_node_list || 0 == utarray_len(ctr->replica_node_list)) - return 0; - node_t *replica = (node_t *)utarray_eltptr(ctr->replica_node_list, 0); - node_copy(out, replica); - return 1; -} -enum cmd_exec_result handle_undefined_object(struct sobj *obj, struct swarmkv_reply **reply) -{ - assert(obj->type == OBJ_TYPE_UNDEFINED); - node_t replica; - int ret = 0; - ret = sobj_get_random_replica(obj, &replica); - if (ret) - { - *reply = swarmkv_reply_new_node(&replica, 1); - return REDIRECT; - } - return NEED_KEY_ROUTE; -} -void scontainer_serialize(struct scontainer *ctr, char **blob, size_t *blob_sz) -{ - char *value_blob = NULL; - size_t value_blob_sz = 0; - struct sobj *obj = &ctr->obj; - sobj_specs[obj->type].obj_serialize(obj->raw, &value_blob, &value_blob_sz); - char *mpack_buff = NULL; - size_t mpack_sz = 0; - mpack_sz = sizeof(obj->type) + sizeof(ctr->op_timestamp); - mpack_sz += sizeof(size_t) + value_blob_sz; - mpack_buff = ALLOC(char, mpack_sz); - size_t offset = 0; - - memcpy(mpack_buff, &obj->type, sizeof(obj->type)); - offset += sizeof(obj->type); - memcpy(mpack_buff + offset, &ctr->op_timestamp, sizeof(ctr->op_timestamp)); - offset += sizeof(ctr->op_timestamp); - - // We can add replica node list to the blob, but the merge process will consume more CPU time. - - memcpy(mpack_buff + offset, &value_blob_sz, sizeof(size_t)); - offset += sizeof(size_t); - - memcpy(mpack_buff + offset, value_blob, value_blob_sz); - offset += value_blob_sz; - assert(offset == mpack_sz); - *blob = mpack_buff; - *blob_sz = mpack_sz; - - free(value_blob); - return; -} - void sobj_merge_blob(struct sobj *obj, const char *blob, size_t blob_sz, uuid_t uuid) { struct scontainer *ctr = container_of(obj, struct scontainer, obj); @@ -441,25 +322,6 @@ void sobj_merge_blob(struct sobj *obj, const char *blob, size_t blob_sz, uuid_t return; } -enum CRDT_OP -{ - CRDT_GET, - CRDT_MERGE, - CRDT_MEET -}; -struct crdt_generic_ctx -{ - enum CRDT_OP op; - struct swarmkv_store *store; - node_t peer; - sds key; -}; -void crdt_generic_ctx_free(struct crdt_generic_ctx *ctx) -{ - sdsfree(ctx->key); - ctx->store = NULL; - free(ctx); -} __attribute__((unused)) static void store_remove_failed_peer(struct swarmkv_store *store, int tid, const node_t *peer) { struct scontainer *ctr = NULL, *tmp = NULL; @@ -471,101 +333,145 @@ __attribute__((unused)) static void store_remove_failed_peer(struct swarmkv_stor } return; } -static void crdt_generic_on_reply(const struct swarmkv_reply *reply, void *user) -{ - struct crdt_generic_ctx *ctx = (struct crdt_generic_ctx *)user; +#define SYNC_COOLDOWN_MS 100 - uuid_t uuid; - store_get_uuid(&(ctx->store->module), uuid); - - __attribute__((unused)) long long error_before = ctx->store->sync_err; - - switch (ctx->op) - { - case CRDT_GET: +static void store_thread_runtime_update_peer_health(struct store_thread_runtime *rt, const node_t *peer, int responed) +{ + struct peer_health *ph = NULL; + HASH_FIND(hh, rt->peer_health_table, peer, sizeof(node_t), ph); + //If the the peer is responded, unnecessary to track. + if (responed) { - struct scontainer *ctr = NULL; - ctr = store_lookup_scontainer(ctx->store, ctx->key); - if (ctr && reply->type == SWARMKV_REPLY_VERBATIM) + if(ph) { - sobj_merge_blob(&ctr->obj, reply->str, reply->len, uuid); - ctr->is_pending = 0; - atomic_inc(&ctx->store->sync_ok); + HASH_DEL(rt->peer_health_table, ph); + free(ph); } - else - { - atomic_inc(&ctx->store->sync_err); - } - break; + return; } - case CRDT_MERGE: + if(!ph) { - if (reply->type == SWARMKV_REPLY_INTEGER && reply->integer > 0) - { - atomic_add(&ctx->store->sync_ok, reply->integer); - } - else - { - //In batch synchronization, the merge command operates multiple keys of one thread, - //the ctx->key is the first key of the batch. - if (reply->type == SWARMKV_REPLY_ERROR && strcasestr(reply->str, "timed out")) - { - int tid = __store_gettid(ctx->key, ctx->store->opts->nr_worker_threads); - store_remove_failed_peer(ctx->store, tid, &ctx->peer); - } - atomic_inc(&ctx->store->sync_err); - } - break; + ph = ALLOC(struct peer_health, 1); + node_copy(&ph->peer, peer); + HASH_ADD(hh, rt->peer_health_table, peer, sizeof(node_t), ph); } - case CRDT_MEET: + ph->missed_responses++; + clock_gettime(CLOCK_MONOTONIC, &ph->last_request_time); + return; +} +static int store_thread_runtime_should_sync_with_peer(struct store_thread_runtime *rt, const node_t *peer) +{ + struct peer_health *ph = NULL; + HASH_FIND(hh, rt->peer_health_table, peer, sizeof(node_t), ph); + if (!ph) { - if (reply->type == SWARMKV_REPLY_INTEGER) - { - atomic_inc(&ctx->store->sync_ok); - } - else + return 1; + } + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + + if (timespec_diff_usec(&ph->last_request_time, &now) < SYNC_COOLDOWN_MS * 1000) + { + ph->aborted_requests++; + return 0; + } + else + { + ph->last_request_time = now; + return 1; + } +} +static struct scontainer *store_thread_runtime_find_scontainter(struct store_thread_runtime *rt, const sds key) +{ + struct scontainer *ctr = NULL; + //Thread-safe SANITY CHECK + if (0 == pthread_mutex_trylock(&rt->sanity_lock)) + { + pthread_mutex_unlock(&rt->sanity_lock); + } + else + { + assert(0); + } + HASH_FIND(hh, rt->obj_table, key, sdslen(key), ctr); + return ctr; +} +static void store_thread_runtime_add_scontainer(struct store_thread_runtime *rt, struct scontainer *ctr) +{ + HASH_ADD_KEYPTR(hh, rt->obj_table, ctr->obj.key, sdslen(ctr->obj.key), ctr); + ctr->is_in_table = 1; + return; +} +static void store_thread_runtime_remove_scontainer(struct store_thread_runtime *rt, struct scontainer *ctr) +{ + HASH_DELETE(hh, rt->obj_table, ctr); + ctr->is_in_table = 0; + if (ctr->is_in_sync_q) + { + DL_DELETE(rt->sync_queue, ctr); + ctr->is_in_sync_q = 0; + } + return; +} +typedef void sobj_callback_func_t(struct sobj *obj, void *cb_arg); +void store_thread_runtime_iterate_sobj(struct store_thread_runtime *rt, sobj_callback_func_t *cb, void *cb_arg) +{ + struct scontainer *ctr = NULL, *tmp = NULL; + HASH_ITER(hh, rt->obj_table, ctr, tmp) + { + if (!ctr->is_pending) { - atomic_inc(&ctx->store->sync_err); + cb(&ctr->obj, cb_arg); } - break; } - default: +} +struct crdt_merge_ctx +{ + struct store_thread_runtime *thread_rt; + node_t peer; +}; +static void crdt_merge_on_reply(const struct swarmkv_reply *reply, void *user) +{ + struct crdt_merge_ctx *ctx = (struct crdt_merge_ctx *)user; + struct store_thread_runtime *thread_rt = ctx->thread_rt; + + if (reply->type == SWARMKV_REPLY_INTEGER && reply->integer > 0) { - atomic_inc(&ctx->store->sync_err); - break; + thread_rt->sync_ok += reply->integer; } + else + { + thread_rt->sync_err++; } - // assert(ctx->store->sync_err==error_before); - crdt_generic_ctx_free(ctx); + int responed = (reply->type != SWARMKV_REPLY_ERROR); + store_thread_runtime_update_peer_health(thread_rt, &ctx->peer, responed); + free(ctx); return; } - -void crdt_generic_call(struct swarmkv_store *store, enum CRDT_OP op, const node_t *peer, int argc, const char *argv[], size_t *argv_len) +void store_thread_runtime_sync_with_one_peer(struct store_thread_runtime *thread_rt, const node_t *peer, int argc, const char *argv[], size_t argv_len[], struct swarmkv *exec_cmd_handle) { - struct crdt_generic_ctx *ctx = NULL; - assert(peer); - ctx = ALLOC(struct crdt_generic_ctx, 1); - ctx->op = op; - ctx->store = store; - ctx->key = sdsnew(argv[2]); + if(!store_thread_runtime_should_sync_with_peer(thread_rt, peer)) + { + return; + } + struct crdt_merge_ctx *ctx = ALLOC(struct crdt_merge_ctx, 1); + ctx->thread_rt = thread_rt; node_copy(&ctx->peer, peer); - swarmkv_async_command_on_argv(store->exec_cmd_handle, crdt_generic_on_reply, ctx, peer->addr, argc, argv, argv_len); - return; + swarmkv_async_command_on_argv(exec_cmd_handle, crdt_merge_on_reply, ctx, peer->addr, argc, argv, argv_len); } -#define MONITOR_SYNC_EVENT_NAME "crdt-sync-cycle" #define MAX_SYNC_PER_PERIOD 100000 -int store_batch_sync(struct swarmkv_store *store, int tid) +int store_thread_runtime_sync(struct store_thread_runtime *thread_rt, int batch_sync_enabled, struct swarmkv *exec_cmd_handle) { int n_synced = 0; - struct swarmkv_store_thread *thr = &store->threads[tid]; + thread_rt->calls++; struct sync_master *sync_master = sync_master_new(); struct scontainer *ctr = NULL, *tmp = NULL; - DL_FOREACH_SAFE(thr->sync_queue, ctr, tmp) + DL_FOREACH_SAFE(thread_rt->sync_queue, ctr, tmp) { char *blob = NULL; size_t blob_sz = 0; scontainer_serialize(ctr, &blob, &blob_sz); - if (store->opts->batch_sync_enabled) + if (batch_sync_enabled) { sync_master_add_obj(sync_master, ctr->obj.key, blob, blob_sz, utarray_front(ctr->replica_node_list), utarray_len(ctr->replica_node_list)); @@ -585,13 +491,12 @@ int store_batch_sync(struct swarmkv_store *store, int tid) for (int i = 0; i < utarray_len(ctr->replica_node_list); i++) { node_t *peer = utarray_eltptr(ctr->replica_node_list, i); - crdt_generic_call(store, CRDT_MERGE, peer, 4, argv, argv_len); + store_thread_runtime_sync_with_one_peer(thread_rt, peer, 4, argv, argv_len, exec_cmd_handle); } free(blob); } - DL_DELETE(thr->sync_queue, ctr); + DL_DELETE(thread_rt->sync_queue, ctr); ctr->is_in_sync_q = 0; - store->synced++; n_synced++; if (n_synced >= MAX_SYNC_PER_PERIOD) break; @@ -609,64 +514,101 @@ int store_batch_sync(struct swarmkv_store *store, int tid) argv[1] = "merge"; argv_len[1] = strlen(argv[1]); sync_task_read_key_blob(task, argv + 2, argv_len + 2, n_data * 2); - crdt_generic_call(store, CRDT_MERGE, sync_task_peer(task), n_data * 2 + 2, argv, argv_len); + store_thread_runtime_sync_with_one_peer(thread_rt, sync_task_peer(task), n_data * 2 + 2, argv, argv_len, exec_cmd_handle); sync_task_free(task); task = sync_master_get_task(sync_master); } sync_master_free(sync_master); + thread_rt->n_keys = HASH_COUNT(thread_rt->obj_table); + DL_COUNT(thread_rt->sync_queue, ctr, thread_rt->keys_to_sync); + thread_rt->synced += n_synced; return n_synced; } +void store_thread_runtime_init(struct store_thread_runtime *thread_rt) +{ + memset(thread_rt, 0, sizeof(struct store_thread_runtime)); + pthread_mutex_init(&thread_rt->sanity_lock, NULL); + return; +} +void store_thread_runtime_deinit(struct store_thread_runtime *thread_rt) +{ + struct scontainer *ctr = NULL, *tmp = NULL; + HASH_ITER(hh, thread_rt->obj_table, ctr, tmp) + { + store_thread_runtime_remove_scontainer(thread_rt, ctr); + scontainer_free(ctr); + } + struct peer_health *ph = NULL, *tmp_ph = NULL; + HASH_ITER(hh, thread_rt->peer_health_table, ph, tmp_ph) + { + HASH_DEL(thread_rt->peer_health_table, ph); + free(ph); + } + pthread_mutex_destroy(&thread_rt->sanity_lock); + return; +} +void store_get_uuid(struct swarmkv_module *mod_store, uuid_t uuid) +{ + struct swarmkv_store *store = module2store(mod_store); + uuid_copy(uuid, store->opts->bin_uuid); + return; +} +void store_get_node_addr(struct swarmkv_module *mod_store, node_t *node) +{ + struct swarmkv_store *store = module2store(mod_store); + node_copy(node, &store->self); + return; +} +void store_mark_object_as_modified(struct swarmkv_module *mod_store, struct sobj *obj) +{ + struct swarmkv_store *store = module2store(mod_store); + struct scontainer *ctr = container_of(obj, struct scontainer, obj); + gettimeofday(&ctr->op_timestamp, NULL); + if (!ctr->is_in_sync_q && ctr->replica_node_list) + { + DL_APPEND(store->threads[ctr->tid].sync_queue, ctr); + ctr->is_in_sync_q = 1; + } + return; +} +int sobj_get_random_replica(struct sobj *obj, node_t *out) +{ + struct scontainer *ctr = container_of(obj, struct scontainer, obj); + if (!ctr->replica_node_list || 0 == utarray_len(ctr->replica_node_list)) + return 0; + node_t *replica = (node_t *)utarray_eltptr(ctr->replica_node_list, 0); + node_copy(out, replica); + return 1; +} +enum cmd_exec_result handle_undefined_object(struct sobj *obj, struct swarmkv_reply **reply) +{ + assert(obj->type == OBJ_TYPE_UNDEFINED); + node_t replica; + int ret = 0; + ret = sobj_get_random_replica(obj, &replica); + if (ret) + { + *reply = swarmkv_reply_new_node(&replica, 1); + return REDIRECT; + } + return NEED_KEY_ROUTE; +} + + +#define MONITOR_SYNC_EVENT_NAME "crdt-sync-cycle" + void swarmkv_store_periodic(struct swarmkv_module *mod_store, int thread_id) { struct swarmkv_store *store = module2store(mod_store); - struct scontainer *ctr = NULL, *tmp = NULL; struct timespec start, end; int n_synced = 0; int real_tid = swarmkv_gettid(store->exec_cmd_handle); assert(real_tid == thread_id); clock_gettime(CLOCK_MONOTONIC, &start); - struct swarmkv_store_thread *thr = &store->threads[real_tid]; - thr->calls++; - if (store->opts->batch_sync_enabled) - { - n_synced = store_batch_sync(store, real_tid); - } - else - { - DL_FOREACH_SAFE(thr->sync_queue, ctr, tmp) - { - char *blob = NULL; - size_t blob_sz = 0; - scontainer_serialize(ctr, &blob, &blob_sz); - const char *argv[4]; - size_t argv_len[4]; - argv[0] = "crdt"; - argv_len[0] = strlen(argv[0]); - argv[1] = "merge"; - argv_len[1] = strlen(argv[1]); - argv[2] = ctr->obj.key; - argv_len[2] = sdslen(ctr->obj.key); - argv[3] = blob; - argv_len[3] = blob_sz; - for (int i = 0; i < utarray_len(ctr->replica_node_list); i++) - { - node_t *peer = utarray_eltptr(ctr->replica_node_list, i); - crdt_generic_call(store, CRDT_MERGE, peer, 4, argv, argv_len); - } - free(blob); - DL_DELETE(thr->sync_queue, ctr); - ctr->is_in_sync_q = 0; - store->synced++; - n_synced++; - if (n_synced >= MAX_SYNC_PER_PERIOD) - break; - } - } - thr->n_keys = HASH_COUNT(thr->obj_table); - DL_COUNT(thr->sync_queue, ctr, thr->keys_to_sync); - clock_gettime(CLOCK_MONOTONIC, &end); + n_synced = store_thread_runtime_sync(store->threads + real_tid, store->opts->batch_sync_enabled, store->exec_cmd_handle); + clock_gettime(CLOCK_MONOTONIC, &end); if (n_synced) { swarmkv_monitor_record_event(store->mod_monitor, MONITOR_SYNC_EVENT_NAME, timespec_diff_usec(&start, &end)); @@ -679,8 +621,11 @@ struct swarmkv_module *swarmkv_store_new(const struct swarmkv_options *opts) strncpy(store->module.name, "store", sizeof(store->module.name)); store->module.mod_ctx = store; store->opts = opts; - store->threads = ALLOC(struct swarmkv_store_thread, opts->nr_worker_threads); - + store->threads = ALLOC(struct store_thread_runtime, opts->nr_worker_threads); + for (size_t i = 0; i < opts->nr_worker_threads; i++) + { + store_thread_runtime_init(store->threads + i); + } node_init(&store->self, opts->cluster_announce_ip, opts->cluster_announce_port); return &(store->module); @@ -694,17 +639,9 @@ void swarmkv_store_set_exec_cmd_handle(struct swarmkv_module *mod_store, struct void swarmkv_store_free(struct swarmkv_module *mod_store) { struct swarmkv_store *store = module2store(mod_store); - struct scontainer *ctr = NULL, *tmp = NULL; - for (size_t i = 0; i < store->opts->nr_worker_threads; i++) { - - HASH_ITER(hh, store->threads[i].obj_table, ctr, tmp) - { - assert(ctr->tid == i); - scontainer_remove(&(store->threads[i].obj_table), ctr); - scontainer_free(ctr); - } + store_thread_runtime_deinit(store->threads + i); } free(store->threads); store->threads = NULL; @@ -722,17 +659,17 @@ void swarmkv_store_info(struct swarmkv_module *mod_store, struct store_info *inf struct swarmkv_store *store = module2store(mod_store); info->keys = 0; info->keys_to_sync = 0; - struct swarmkv_store_thread *thread = NULL; + struct store_thread_runtime *thread = NULL; memset(info, 0, sizeof(struct store_info)); for (size_t i = 0; i < store->opts->nr_worker_threads; i++) { thread = store->threads + i; info->keys_to_sync += __sync_add_and_fetch(&thread->keys_to_sync, 0); info->keys += __sync_add_and_fetch(&thread->n_keys, 0); + info->sync_ok += __sync_add_and_fetch(&thread->sync_ok, 0); + info->sync_err += __sync_add_and_fetch(&thread->sync_err, 0); + info->synced += __sync_add_and_fetch(&thread->synced, 0); } - info->sync_ok = __sync_add_and_fetch(&store->sync_ok, 0); - info->sync_err = __sync_add_and_fetch(&store->sync_err, 0); - info->synced = store->synced; } UT_icd ut_array_matched_reply = {sizeof(struct swarmkv_reply *), NULL, NULL, NULL}; @@ -755,6 +692,95 @@ enum cmd_exec_result type_command(struct swarmkv_module *mod_store, const struct *reply = swarmkv_reply_new_string_fmt(sobj_specs[obj->type].type_name); return FINISHED; } +struct scontainer *store_lookup_scontainer(struct swarmkv_store *store, sds key) +{ + struct scontainer *ctr = NULL; + int designated_tid = key2tid(key, store->opts->nr_worker_threads); + int real_tid = swarmkv_gettid(store->exec_cmd_handle); + assert(designated_tid == real_tid); + ctr = store_thread_runtime_find_scontainter(store->threads + designated_tid, key); + return ctr; +} +struct sobj *store_lookup(struct swarmkv_module *mod_store, sds key) +{ + struct swarmkv_store *store = module2store(mod_store); + struct scontainer *ctr = NULL; + + ctr = store_lookup_scontainer(store, key); + if (ctr) //&& !ctr->is_pending + { + return &(ctr->obj); + } + else + { + return NULL; + } +} +enum CRDT_OP +{ + CRDT_GET, + CRDT_MERGE, + CRDT_MEET +}; +struct crdt_generic_ctx +{ + enum CRDT_OP op; + struct swarmkv_store *store; + node_t peer; + sds key; +}; +void crdt_generic_ctx_free(struct crdt_generic_ctx *ctx) +{ + sdsfree(ctx->key); + ctx->store = NULL; + free(ctx); +} +static void crdt_generic_on_reply(const struct swarmkv_reply *reply, void *user) +{ + struct crdt_generic_ctx *ctx = (struct crdt_generic_ctx *)user; + + uuid_t uuid; + store_get_uuid(&(ctx->store->module), uuid); + + switch (ctx->op) + { + case CRDT_GET: + { + struct scontainer *ctr = NULL; + ctr = store_lookup_scontainer(ctx->store, ctx->key); + if (ctr && reply->type == SWARMKV_REPLY_VERBATIM) + { + sobj_merge_blob(&ctr->obj, reply->str, reply->len, uuid); + ctr->is_pending = 0; + } + break; + } + case CRDT_MEET: + { + break; + } + default: + { + assert(0); + break; + } + } + crdt_generic_ctx_free(ctx); + return; +} + +void crdt_generic_call(struct swarmkv_store *store, enum CRDT_OP op, const node_t *peer, int argc, const char *argv[], size_t *argv_len) +{ + struct crdt_generic_ctx *ctx = NULL; + assert(peer); + ctx = ALLOC(struct crdt_generic_ctx, 1); + ctx->op = op; + ctx->store = store; + ctx->key = sdsnew(argv[2]); + node_copy(&ctx->peer, peer); + swarmkv_async_command_on_argv(store->exec_cmd_handle, crdt_generic_on_reply, ctx, peer->addr, argc, argv, argv_len); + return; +} enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) { /*CRDT ADD key [IP:port ...]*/ @@ -781,7 +807,7 @@ enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const st { ctr->is_pending = 0; } - store_add_scontainer(store, ctr); + store_thread_runtime_add_scontainer(store->threads + ctr->tid, ctr); } node_t *replica_nodes = ALLOC(node_t, n_replica_node); for (size_t i = 0; i < n_replica_node; i++) @@ -892,7 +918,7 @@ enum cmd_exec_result crdt_del_command(struct swarmkv_module *mod_store, const st ctr = store_lookup_scontainer(store, key); if (ctr) { - store_remove_scontainer(store, ctr); + store_thread_runtime_remove_scontainer(store->threads + ctr->tid, ctr); scontainer_free(ctr); *reply = swarmkv_reply_new_integer(1); } @@ -954,16 +980,16 @@ enum cmd_exec_result crdt_keys_command(struct swarmkv_module *mod_store, const s // CRDT KEYS tid pattern int i = 0, n_matched = 0; struct pattern_match_arg cb_arg; - int thread_id = atoll(cmd->argv[2]); + int tid = atoll(cmd->argv[2]); cb_arg.pattern = cmd->argv[3]; utarray_new(cb_arg.matched_replies, &ut_array_matched_reply); struct swarmkv_store *store = module2store(mod_store); - if (thread_id > store->opts->nr_worker_threads) + if (tid > store->opts->nr_worker_threads) { *reply = swarmkv_reply_new_error("Invalid thread id"); return FINISHED; } - store_iterate_sobj(store, thread_id, pattern_match_function, &cb_arg); + store_thread_runtime_iterate_sobj(store->threads + tid, pattern_match_function, &cb_arg); n_matched = utarray_len(cb_arg.matched_replies); if (n_matched > 0) { diff --git a/test/swarmkv_gtest.cpp b/test/swarmkv_gtest.cpp index eff3446..dbb8613 100644 --- a/test/swarmkv_gtest.cpp +++ b/test/swarmkv_gtest.cpp @@ -1102,7 +1102,7 @@ protected: const char *log_path="./swarmkv-2-nodes.log"; const char *cluster_name="swarmkv-2-nodes"; - + worker_thread_number = 1; char ip_list[1024]={0}; snprintf(ip_list, sizeof(ip_list), "127.0.0.1:%d 127.0.0.1:%d", TWO_NODES_TEST_CLUSTER_PORT, TWO_NODES_TEST_CLUSTER_PORT+1); swarmkv_cli_create_cluster(cluster_name, ip_list); @@ -1116,9 +1116,9 @@ protected: swarmkv_options_set_cluster_port(opts[i], TWO_NODES_TEST_CLUSTER_PORT+i); swarmkv_options_set_health_check_port(opts[i], TWO_NODES_TEST_HEALTH_PORT+i); swarmkv_options_set_logger(opts[i], logger); - swarmkv_options_set_sync_interval_us(opts[i], 10000); - swarmkv_options_set_cluster_timeout_us(opts[i], 500*1000); - swarmkv_options_set_worker_thread_number(opts[i], 2); + swarmkv_options_set_sync_interval_us(opts[i], 10*1000); + swarmkv_options_set_cluster_timeout_us(opts[i], 200*1000); + swarmkv_options_set_worker_thread_number(opts[i], worker_thread_number); swarmkv_options_set_caller_thread_number(opts[i], 1); swarmkv_options_set_batch_sync_enabled(opts[i], 1); swarmkv_options_set_network_compression_enabled(opts[i], i%2); @@ -1143,10 +1143,12 @@ protected: // Some expensive resource shared by all tests. static struct log_handle *logger; static struct swarmkv *db1, *db2; + static int worker_thread_number; }; struct log_handle *SwarmkvTwoNodes::logger; struct swarmkv *SwarmkvTwoNodes::db1; struct swarmkv *SwarmkvTwoNodes::db2; +int SwarmkvTwoNodes::worker_thread_number; TEST_F(SwarmkvTwoNodes, KeyspaceKeys) { struct swarmkv *db[2]; @@ -2008,17 +2010,19 @@ TEST_F(SwarmkvTwoNodes, TypeBloomFilter) struct swarmkv_reply *reply=NULL; const char *key="bloom-filter-001"; const char *item[4]={"hello", "world", "bloom", "filter"}; - reply=swarmkv_command(db[0], "BFINIT %s 0.0001 1000000 time 3000000 13", key); + reply=swarmkv_command(db[0], "BFINIT %s 0.0001 100000 time 3000000 13", key); ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); EXPECT_STREQ(reply->str, "OK"); swarmkv_reply_free(reply); - for(int i=0; i<3; i++) + for(int i=0; i<4; i++) { reply=swarmkv_command(db[i%2], "BFADD %s %s", key, item[i]); + if(reply->type == SWARMKV_REPLY_ERROR) swarmkv_reply_print(reply, stdout); ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); swarmkv_reply_free(reply); } + wait_for_sync(); reply=swarmkv_command(db[1], "BFMEXISTS %s %s %s %s", key, item[0], item[1], item[2]); ASSERT_EQ(reply->type, SWARMKV_REPLY_ARRAY); ASSERT_EQ(reply->n_element, 3); @@ -2046,7 +2050,7 @@ TEST_F(SwarmkvTwoNodes, TypeBloomFilter) ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); swarmkv_reply_free(reply); - usleep(100*1000); + wait_for_sync(); reply=swarmkv_command(db[1], "BFEXISTS %s %s", key, item[3]); ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); @@ -2185,7 +2189,7 @@ TEST_F(SwarmkvTwoNodes, Info) for(size_t i=0; i<2; i++) { reply=swarmkv_command(db[i%2], "INFO"); - ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); + EXPECT_EQ(reply->type, SWARMKV_REPLY_STATUS); swarmkv_reply_free(reply); } } @@ -2198,7 +2202,7 @@ TEST_F(SwarmkvTwoNodes, Monitor) for(size_t i=0; i<2; i++) { reply=swarmkv_command(db[i%2], "MONREG %s g?t", swarmkv_self_address(db[i%2])); - ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); + EXPECT_EQ(reply->type, SWARMKV_REPLY_STATUS); swarmkv_reply_free(reply); } } @@ -2301,7 +2305,111 @@ TEST_F(SwarmkvTwoNodes, Latency) reply=swarmkv_command(db1, "latency peer"); EXPECT_EQ(reply->type, SWARMKV_REPLY_ARRAY); swarmkv_reply_free(reply); - sleep(3500); +} +static int parse_swarmkv_info(const char *text, const char *key) { + // Duplicate the input text since strtok_r modifies the string + char *copy = strdup(text); + if (copy == NULL) { + perror("Failed to allocate memory"); + return -1; + } + + char *line; + char *rest = copy; + int value = -1; + size_t key_len = strlen(key); + + // Construct the search prefix: key followed by ':' + char *prefix = (char*) malloc(key_len + 2); // +2 for ':' and null terminator + if (prefix == NULL) { + perror("Failed to allocate memory"); + free(copy); + return -1; + } + strcpy(prefix, key); + strcat(prefix, ":"); + + // Tokenize the string by newline characters + while ((line = strtok_r(rest, "\n", &rest))) { + // Check if the line starts with the key prefix + if (strncmp(line, prefix, strlen(prefix)) == 0) { + // Extract the integer value after the key and colon + if (sscanf(line + strlen(prefix), "%d", &value) == 1) { + break; // Value found; exit the loop + } + } + } + + // Clean up allocated memory + free(prefix); + free(copy); + + return value; +} +TEST_F(SwarmkvTwoNodes, SyncRobustness) +{ + struct swarmkv *db1=SwarmkvTwoNodes::db1, *db2=SwarmkvTwoNodes::db2; + struct swarmkv_reply *reply=NULL; + const char *key="crdt-robustness"; + reply=swarmkv_command(db1, "INCRBY %s 1", key); + EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 1); + swarmkv_reply_free(reply); + + reply=swarmkv_command(db2, "INCRBY %s 1", key); + EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 2); + swarmkv_reply_free(reply); + + wait_for_sync(); + int sync_err_before = 0, sync_err_after = 0; + reply = swarmkv_command(db1, "INFO Store"); + EXPECT_EQ(reply->type, SWARMKV_REPLY_STATUS); + sync_err_before = parse_swarmkv_info(reply->str, "sync_err"); + swarmkv_reply_free(reply); + + reply = swarmkv_command(db2, "KEYSLOT %s", key); + EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + int keyslot = reply->integer; + swarmkv_reply_free(reply); + + //This is a hack of swarmkv_keyspace_slot2tid(). + int tid = keyslot % SwarmkvTwoNodes::worker_thread_number; + reply=swarmkv_command(db2, "DEBUG %d sleep 1", tid); + EXPECT_EQ(reply->type, SWARMKV_REPLY_ERROR); + swarmkv_reply_print(reply, stdout); + swarmkv_reply_free(reply); + + //db1 -> db2 sync failed + reply=swarmkv_command(db1, "INCRBY %s 1", key); + EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 3); + swarmkv_reply_free(reply); + + sleep(1); + + reply = swarmkv_command(db1, "INFO Store"); + EXPECT_EQ(reply->type, SWARMKV_REPLY_STATUS); + sync_err_after = parse_swarmkv_info(reply->str, "sync_err"); + EXPECT_EQ(sync_err_after - sync_err_before, 1); + //swarmkv_reply_print(reply, stdout); + swarmkv_reply_free(reply); + + + + //db1 -> db2 sync should be resumed after this operation + reply=swarmkv_command(db1, "INCRBY %s 1", key); + EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 4); + swarmkv_reply_free(reply); + + wait_for_sync(); + + reply=swarmkv_command(db2, "GET %s", key); + EXPECT_EQ(reply->type, SWARMKV_REPLY_STRING); + EXPECT_STREQ(reply->str, "4"); + swarmkv_reply_free(reply); + } TEST_F(SwarmkvTwoNodes, Wait) { |
