diff options
| author | 郑超 <[email protected]> | 2024-09-20 08:05:35 +0000 |
|---|---|---|
| committer | 郑超 <[email protected]> | 2024-09-20 08:05:35 +0000 |
| commit | 8606b34a4d5d2cffa5ab69dc0506514b813b4b84 (patch) | |
| tree | d3493e3eecb09c7334e5093eff1673aebb53c497 /src | |
| parent | 11cf5e4035c5002acb7023f233678ede0cb63beb (diff) | |
Track peer health for robust CRDT synchronization. If a peer is unresponsive, throttle synchronization to once every 100ms.v4.4.3
Diffstat (limited to 'src')
| -rw-r--r-- | src/swarmkv.c | 20 | ||||
| -rw-r--r-- | src/swarmkv_store.c | 660 |
2 files changed, 353 insertions, 327 deletions
diff --git a/src/swarmkv.c b/src/swarmkv.c index 517ab73..081ccea 100644 --- a/src/swarmkv.c +++ b/src/swarmkv.c @@ -119,7 +119,7 @@ int swarmkv_gettid(const struct swarmkv *db) assert(0); return -1; } -#define INTER_THREAD_RPC_TIMEOUT_AHEAD 1000 +#define INTER_THREAD_RPC_TIMEOUT_AHEAD 10000 #define UNKNOWN_THREAD_ID -1 static void exec_at_peer(struct swarmkv *db, const struct swarmkv_cmd *cmd, const node_t *peer, struct future *future_of_caller) { @@ -831,25 +831,25 @@ enum cmd_exec_result ping_command(struct swarmkv_module *mod_db, const struct sw enum cmd_exec_result __attribute__((optimize("O0"))) debug_command(struct swarmkv_module *mod_db, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) { - if (cmd->argc == 2 && !strcasecmp(cmd->argv[1], "help")) + if (cmd->argc == 3 && !strcasecmp(cmd->argv[1], "help")) { const char *help = { - "DEBUG <subcommand> [<arg> [value] [opt] ...]. Subcommands are:\n" + "DEBUG tid <subcommand> [<arg> [value] [opt] ...]. Subcommands are:\n" "SLEEP <seconds>\n" " Stop the server for <seconds>. Decimals allowed.\n" "ASSERT\n" " Crash by assertion failed.\n"}; *reply = swarmkv_reply_new_status(help); } - else if (!strcasecmp(cmd->argv[1], "sleep") && cmd->argc > 2) + else if (!strcasecmp(cmd->argv[2], "sleep") && cmd->argc > 3) { - if (cmd->argc < 3) + if (cmd->argc < 4) { *reply = swarmkv_reply_new_error(error_need_additional_arg, cmd->argv[1]); } else { - double dtime = strtod(cmd->argv[2], NULL); + double dtime = strtod(cmd->argv[3], NULL); long long utime = dtime * 1000000; struct timespec tv; @@ -859,13 +859,13 @@ enum cmd_exec_result __attribute__((optimize("O0"))) debug_command(struct swarmk *reply = swarmkv_reply_new_status("OK"); } } - else if (!strcasecmp(cmd->argv[1], "assert")) + else if (!strcasecmp(cmd->argv[2], "assert")) { assert(0); } else { - *reply = swarmkv_reply_new_error(erorr_subcommand_syntax, cmd->argv[1], cmd->argv[0]); + *reply = swarmkv_reply_new_error(erorr_subcommand_syntax, cmd->argv[2], cmd->argv[0]); } return FINISHED; } @@ -1208,8 +1208,8 @@ void command_spec_init(struct swarmkv *db) swarmkv_command_table_register(db->mod_command_table, "INFO", "[section]", 0, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE, info_command, &db->module); - swarmkv_command_table_register(db->mod_command_table, "DEBUG", "<subcommand>", - 1, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE, + swarmkv_command_table_register(db->mod_command_table, "DEBUG", "tid <subcommand>", + 2, KEY_OFFSET_TID, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE, debug_command, &db->module); swarmkv_command_table_register(db->mod_command_table, "PING", "IP:port", 1, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE, diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c index d3319a7..969b904 100644 --- a/src/swarmkv_store.c +++ b/src/swarmkv_store.c @@ -128,26 +128,39 @@ struct swarmkv_obj_specs sobj_specs[__SWARMKV_OBJ_TYPE_MAX] = .obj_size = (size_t(*)(const void *))undefined_obj_mem_size}}; #define MODULE_SWAMRKV_STORE module_name_str("swarmkv.store") -struct swarmkv_store_thread +struct peer_health +{ + node_t peer; + struct timespec last_request_time; + int missed_responses; + int aborted_requests; + UT_hash_handle hh; +}; +struct store_thread_runtime { struct scontainer *obj_table; struct scontainer *sync_queue; + struct peer_health *peer_health_table; pthread_mutex_t sanity_lock; long long keys_to_sync; long long n_keys; long long calls; + struct sync_master *sync_master;//should be NULL when not in synchronization + struct sync_task *sync_task; + int offset_of_sync_task; + long long sync_err; + long long sync_ok; + long long synced; }; struct swarmkv_store { struct swarmkv_module module; node_t self; - struct swarmkv_store_thread *threads; + struct store_thread_runtime *threads; struct swarmkv *exec_cmd_handle; struct swarmkv_module *mod_monitor; - long long sync_err; - long long sync_ok; - long long synced; + const struct swarmkv_options *opts; }; @@ -208,26 +221,38 @@ static void scontainer_free(struct scontainer *ctr) free(ctr); return; } - -static struct scontainer *scontainer_find(struct scontainer **table, const sds key) -{ - struct scontainer *ctr = NULL; - HASH_FIND(hh, *table, key, sdslen(key), ctr); - return ctr; -} -static void scontainer_join(struct scontainer **table, struct scontainer *ctr) -{ - HASH_ADD_KEYPTR(hh, *table, ctr->obj.key, sdslen(ctr->obj.key), ctr); - ctr->is_in_table = 1; - return; -} -static void scontainer_remove(struct scontainer **table, struct scontainer *ctr) +void scontainer_serialize(struct scontainer *ctr, char **blob, size_t *blob_sz) { - HASH_DELETE(hh, *table, ctr); - ctr->is_in_table = 0; + char *value_blob = NULL; + size_t value_blob_sz = 0; + struct sobj *obj = &ctr->obj; + sobj_specs[obj->type].obj_serialize(obj->raw, &value_blob, &value_blob_sz); + char *mpack_buff = NULL; + size_t mpack_sz = 0; + mpack_sz = sizeof(obj->type) + sizeof(ctr->op_timestamp); + mpack_sz += sizeof(size_t) + value_blob_sz; + mpack_buff = ALLOC(char, mpack_sz); + size_t offset = 0; + + memcpy(mpack_buff, &obj->type, sizeof(obj->type)); + offset += sizeof(obj->type); + memcpy(mpack_buff + offset, &ctr->op_timestamp, sizeof(ctr->op_timestamp)); + offset += sizeof(ctr->op_timestamp); + + // We can add replica node list to the blob, but the merge process will consume more CPU time. + + memcpy(mpack_buff + offset, &value_blob_sz, sizeof(size_t)); + offset += sizeof(size_t); + + memcpy(mpack_buff + offset, value_blob, value_blob_sz); + offset += value_blob_sz; + assert(offset == mpack_sz); + *blob = mpack_buff; + *blob_sz = mpack_sz; + + free(value_blob); return; } - static int node_cmp(const void *a, const void *b) { return node_compare((node_t *)a, (node_t *)b); @@ -260,150 +285,6 @@ static void scontainer_remove_replica_node(struct scontainer *ctr, const node_t utarray_erase(ctr->replica_node_list, pos, 1); return; } -void store_add_scontainer(struct swarmkv_store *store, struct scontainer *ctr) -{ - scontainer_join(&(store->threads[ctr->tid].obj_table), ctr); - return; -} -void store_remove_scontainer(struct swarmkv_store *store, struct scontainer *ctr) -{ - HASH_DELETE(hh, store->threads[ctr->tid].obj_table, ctr); - ctr->is_in_table = 0; - if (ctr->is_in_sync_q) - { - DL_DELETE(store->threads[ctr->tid].sync_queue, ctr); - ctr->is_in_sync_q = 0; - } -} -typedef void sobj_callback_func_t(struct sobj *obj, void *cb_arg); -void store_iterate_sobj(struct swarmkv_store *store, int tid, sobj_callback_func_t *cb, void *cb_arg) -{ - struct scontainer *ctr = NULL, *tmp = NULL; - HASH_ITER(hh, store->threads[tid].obj_table, ctr, tmp) - { - if (!ctr->is_pending) - { - cb(&ctr->obj, cb_arg); - } - } -} -struct scontainer *store_lookup_scontainer(struct swarmkv_store *store, sds key) -{ - struct scontainer *ctr = NULL; - int designated_tid = key2tid(key, store->opts->nr_worker_threads); - int real_tid = swarmkv_gettid(store->exec_cmd_handle); - assert(designated_tid == real_tid); - ctr = scontainer_find(&(store->threads[designated_tid].obj_table), key); - if (0 == pthread_mutex_trylock(&store->threads[designated_tid].sanity_lock)) - { - pthread_mutex_unlock(&store->threads[designated_tid].sanity_lock); - } - else - { - assert(0); - } - if (ctr) - { - return ctr; - } - else - { - return NULL; - } -} -struct sobj *store_lookup(struct swarmkv_module *mod_store, sds key) -{ - struct swarmkv_store *store = module2store(mod_store); - struct scontainer *ctr = NULL; - ctr = store_lookup_scontainer(store, key); - if (ctr) //&& !ctr->is_pending - { - return &(ctr->obj); - } - else - { - return NULL; - } -} -void store_get_uuid(struct swarmkv_module *mod_store, uuid_t uuid) -{ - struct swarmkv_store *store = module2store(mod_store); - uuid_copy(uuid, store->opts->bin_uuid); - return; -} -void store_get_node_addr(struct swarmkv_module *mod_store, node_t *node) -{ - struct swarmkv_store *store = module2store(mod_store); - node_copy(node, &store->self); - return; -} -void store_mark_object_as_modified(struct swarmkv_module *mod_store, struct sobj *obj) -{ - struct swarmkv_store *store = module2store(mod_store); - struct scontainer *ctr = container_of(obj, struct scontainer, obj); - gettimeofday(&ctr->op_timestamp, NULL); - if (!ctr->is_in_sync_q && ctr->replica_node_list) - { - DL_APPEND(store->threads[ctr->tid].sync_queue, ctr); - ctr->is_in_sync_q = 1; - } - return; -} -int sobj_get_random_replica(struct sobj *obj, node_t *out) -{ - struct scontainer *ctr = container_of(obj, struct scontainer, obj); - if (!ctr->replica_node_list || 0 == utarray_len(ctr->replica_node_list)) - return 0; - node_t *replica = (node_t *)utarray_eltptr(ctr->replica_node_list, 0); - node_copy(out, replica); - return 1; -} -enum cmd_exec_result handle_undefined_object(struct sobj *obj, struct swarmkv_reply **reply) -{ - assert(obj->type == OBJ_TYPE_UNDEFINED); - node_t replica; - int ret = 0; - ret = sobj_get_random_replica(obj, &replica); - if (ret) - { - *reply = swarmkv_reply_new_node(&replica, 1); - return REDIRECT; - } - return NEED_KEY_ROUTE; -} -void scontainer_serialize(struct scontainer *ctr, char **blob, size_t *blob_sz) -{ - char *value_blob = NULL; - size_t value_blob_sz = 0; - struct sobj *obj = &ctr->obj; - sobj_specs[obj->type].obj_serialize(obj->raw, &value_blob, &value_blob_sz); - char *mpack_buff = NULL; - size_t mpack_sz = 0; - mpack_sz = sizeof(obj->type) + sizeof(ctr->op_timestamp); - mpack_sz += sizeof(size_t) + value_blob_sz; - mpack_buff = ALLOC(char, mpack_sz); - size_t offset = 0; - - memcpy(mpack_buff, &obj->type, sizeof(obj->type)); - offset += sizeof(obj->type); - memcpy(mpack_buff + offset, &ctr->op_timestamp, sizeof(ctr->op_timestamp)); - offset += sizeof(ctr->op_timestamp); - - // We can add replica node list to the blob, but the merge process will consume more CPU time. - - memcpy(mpack_buff + offset, &value_blob_sz, sizeof(size_t)); - offset += sizeof(size_t); - - memcpy(mpack_buff + offset, value_blob, value_blob_sz); - offset += value_blob_sz; - assert(offset == mpack_sz); - *blob = mpack_buff; - *blob_sz = mpack_sz; - - free(value_blob); - return; -} - void sobj_merge_blob(struct sobj *obj, const char *blob, size_t blob_sz, uuid_t uuid) { struct scontainer *ctr = container_of(obj, struct scontainer, obj); @@ -441,25 +322,6 @@ void sobj_merge_blob(struct sobj *obj, const char *blob, size_t blob_sz, uuid_t return; } -enum CRDT_OP -{ - CRDT_GET, - CRDT_MERGE, - CRDT_MEET -}; -struct crdt_generic_ctx -{ - enum CRDT_OP op; - struct swarmkv_store *store; - node_t peer; - sds key; -}; -void crdt_generic_ctx_free(struct crdt_generic_ctx *ctx) -{ - sdsfree(ctx->key); - ctx->store = NULL; - free(ctx); -} __attribute__((unused)) static void store_remove_failed_peer(struct swarmkv_store *store, int tid, const node_t *peer) { struct scontainer *ctr = NULL, *tmp = NULL; @@ -471,101 +333,145 @@ __attribute__((unused)) static void store_remove_failed_peer(struct swarmkv_stor } return; } -static void crdt_generic_on_reply(const struct swarmkv_reply *reply, void *user) -{ - struct crdt_generic_ctx *ctx = (struct crdt_generic_ctx *)user; +#define SYNC_COOLDOWN_MS 100 - uuid_t uuid; - store_get_uuid(&(ctx->store->module), uuid); - - __attribute__((unused)) long long error_before = ctx->store->sync_err; - - switch (ctx->op) - { - case CRDT_GET: +static void store_thread_runtime_update_peer_health(struct store_thread_runtime *rt, const node_t *peer, int responed) +{ + struct peer_health *ph = NULL; + HASH_FIND(hh, rt->peer_health_table, peer, sizeof(node_t), ph); + //If the the peer is responded, unnecessary to track. + if (responed) { - struct scontainer *ctr = NULL; - ctr = store_lookup_scontainer(ctx->store, ctx->key); - if (ctr && reply->type == SWARMKV_REPLY_VERBATIM) + if(ph) { - sobj_merge_blob(&ctr->obj, reply->str, reply->len, uuid); - ctr->is_pending = 0; - atomic_inc(&ctx->store->sync_ok); + HASH_DEL(rt->peer_health_table, ph); + free(ph); } - else - { - atomic_inc(&ctx->store->sync_err); - } - break; + return; } - case CRDT_MERGE: + if(!ph) { - if (reply->type == SWARMKV_REPLY_INTEGER && reply->integer > 0) - { - atomic_add(&ctx->store->sync_ok, reply->integer); - } - else - { - //In batch synchronization, the merge command operates multiple keys of one thread, - //the ctx->key is the first key of the batch. - if (reply->type == SWARMKV_REPLY_ERROR && strcasestr(reply->str, "timed out")) - { - int tid = __store_gettid(ctx->key, ctx->store->opts->nr_worker_threads); - store_remove_failed_peer(ctx->store, tid, &ctx->peer); - } - atomic_inc(&ctx->store->sync_err); - } - break; + ph = ALLOC(struct peer_health, 1); + node_copy(&ph->peer, peer); + HASH_ADD(hh, rt->peer_health_table, peer, sizeof(node_t), ph); } - case CRDT_MEET: + ph->missed_responses++; + clock_gettime(CLOCK_MONOTONIC, &ph->last_request_time); + return; +} +static int store_thread_runtime_should_sync_with_peer(struct store_thread_runtime *rt, const node_t *peer) +{ + struct peer_health *ph = NULL; + HASH_FIND(hh, rt->peer_health_table, peer, sizeof(node_t), ph); + if (!ph) { - if (reply->type == SWARMKV_REPLY_INTEGER) - { - atomic_inc(&ctx->store->sync_ok); - } - else + return 1; + } + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + + if (timespec_diff_usec(&ph->last_request_time, &now) < SYNC_COOLDOWN_MS * 1000) + { + ph->aborted_requests++; + return 0; + } + else + { + ph->last_request_time = now; + return 1; + } +} +static struct scontainer *store_thread_runtime_find_scontainter(struct store_thread_runtime *rt, const sds key) +{ + struct scontainer *ctr = NULL; + //Thread-safe SANITY CHECK + if (0 == pthread_mutex_trylock(&rt->sanity_lock)) + { + pthread_mutex_unlock(&rt->sanity_lock); + } + else + { + assert(0); + } + HASH_FIND(hh, rt->obj_table, key, sdslen(key), ctr); + return ctr; +} +static void store_thread_runtime_add_scontainer(struct store_thread_runtime *rt, struct scontainer *ctr) +{ + HASH_ADD_KEYPTR(hh, rt->obj_table, ctr->obj.key, sdslen(ctr->obj.key), ctr); + ctr->is_in_table = 1; + return; +} +static void store_thread_runtime_remove_scontainer(struct store_thread_runtime *rt, struct scontainer *ctr) +{ + HASH_DELETE(hh, rt->obj_table, ctr); + ctr->is_in_table = 0; + if (ctr->is_in_sync_q) + { + DL_DELETE(rt->sync_queue, ctr); + ctr->is_in_sync_q = 0; + } + return; +} +typedef void sobj_callback_func_t(struct sobj *obj, void *cb_arg); +void store_thread_runtime_iterate_sobj(struct store_thread_runtime *rt, sobj_callback_func_t *cb, void *cb_arg) +{ + struct scontainer *ctr = NULL, *tmp = NULL; + HASH_ITER(hh, rt->obj_table, ctr, tmp) + { + if (!ctr->is_pending) { - atomic_inc(&ctx->store->sync_err); + cb(&ctr->obj, cb_arg); } - break; } - default: +} +struct crdt_merge_ctx +{ + struct store_thread_runtime *thread_rt; + node_t peer; +}; +static void crdt_merge_on_reply(const struct swarmkv_reply *reply, void *user) +{ + struct crdt_merge_ctx *ctx = (struct crdt_merge_ctx *)user; + struct store_thread_runtime *thread_rt = ctx->thread_rt; + + if (reply->type == SWARMKV_REPLY_INTEGER && reply->integer > 0) { - atomic_inc(&ctx->store->sync_err); - break; + thread_rt->sync_ok += reply->integer; } + else + { + thread_rt->sync_err++; } - // assert(ctx->store->sync_err==error_before); - crdt_generic_ctx_free(ctx); + int responed = (reply->type != SWARMKV_REPLY_ERROR); + store_thread_runtime_update_peer_health(thread_rt, &ctx->peer, responed); + free(ctx); return; } - -void crdt_generic_call(struct swarmkv_store *store, enum CRDT_OP op, const node_t *peer, int argc, const char *argv[], size_t *argv_len) +void store_thread_runtime_sync_with_one_peer(struct store_thread_runtime *thread_rt, const node_t *peer, int argc, const char *argv[], size_t argv_len[], struct swarmkv *exec_cmd_handle) { - struct crdt_generic_ctx *ctx = NULL; - assert(peer); - ctx = ALLOC(struct crdt_generic_ctx, 1); - ctx->op = op; - ctx->store = store; - ctx->key = sdsnew(argv[2]); + if(!store_thread_runtime_should_sync_with_peer(thread_rt, peer)) + { + return; + } + struct crdt_merge_ctx *ctx = ALLOC(struct crdt_merge_ctx, 1); + ctx->thread_rt = thread_rt; node_copy(&ctx->peer, peer); - swarmkv_async_command_on_argv(store->exec_cmd_handle, crdt_generic_on_reply, ctx, peer->addr, argc, argv, argv_len); - return; + swarmkv_async_command_on_argv(exec_cmd_handle, crdt_merge_on_reply, ctx, peer->addr, argc, argv, argv_len); } -#define MONITOR_SYNC_EVENT_NAME "crdt-sync-cycle" #define MAX_SYNC_PER_PERIOD 100000 -int store_batch_sync(struct swarmkv_store *store, int tid) +int store_thread_runtime_sync(struct store_thread_runtime *thread_rt, int batch_sync_enabled, struct swarmkv *exec_cmd_handle) { int n_synced = 0; - struct swarmkv_store_thread *thr = &store->threads[tid]; + thread_rt->calls++; struct sync_master *sync_master = sync_master_new(); struct scontainer *ctr = NULL, *tmp = NULL; - DL_FOREACH_SAFE(thr->sync_queue, ctr, tmp) + DL_FOREACH_SAFE(thread_rt->sync_queue, ctr, tmp) { char *blob = NULL; size_t blob_sz = 0; scontainer_serialize(ctr, &blob, &blob_sz); - if (store->opts->batch_sync_enabled) + if (batch_sync_enabled) { sync_master_add_obj(sync_master, ctr->obj.key, blob, blob_sz, utarray_front(ctr->replica_node_list), utarray_len(ctr->replica_node_list)); @@ -585,13 +491,12 @@ int store_batch_sync(struct swarmkv_store *store, int tid) for (int i = 0; i < utarray_len(ctr->replica_node_list); i++) { node_t *peer = utarray_eltptr(ctr->replica_node_list, i); - crdt_generic_call(store, CRDT_MERGE, peer, 4, argv, argv_len); + store_thread_runtime_sync_with_one_peer(thread_rt, peer, 4, argv, argv_len, exec_cmd_handle); } free(blob); } - DL_DELETE(thr->sync_queue, ctr); + DL_DELETE(thread_rt->sync_queue, ctr); ctr->is_in_sync_q = 0; - store->synced++; n_synced++; if (n_synced >= MAX_SYNC_PER_PERIOD) break; @@ -609,64 +514,101 @@ int store_batch_sync(struct swarmkv_store *store, int tid) argv[1] = "merge"; argv_len[1] = strlen(argv[1]); sync_task_read_key_blob(task, argv + 2, argv_len + 2, n_data * 2); - crdt_generic_call(store, CRDT_MERGE, sync_task_peer(task), n_data * 2 + 2, argv, argv_len); + store_thread_runtime_sync_with_one_peer(thread_rt, sync_task_peer(task), n_data * 2 + 2, argv, argv_len, exec_cmd_handle); sync_task_free(task); task = sync_master_get_task(sync_master); } sync_master_free(sync_master); + thread_rt->n_keys = HASH_COUNT(thread_rt->obj_table); + DL_COUNT(thread_rt->sync_queue, ctr, thread_rt->keys_to_sync); + thread_rt->synced += n_synced; return n_synced; } +void store_thread_runtime_init(struct store_thread_runtime *thread_rt) +{ + memset(thread_rt, 0, sizeof(struct store_thread_runtime)); + pthread_mutex_init(&thread_rt->sanity_lock, NULL); + return; +} +void store_thread_runtime_deinit(struct store_thread_runtime *thread_rt) +{ + struct scontainer *ctr = NULL, *tmp = NULL; + HASH_ITER(hh, thread_rt->obj_table, ctr, tmp) + { + store_thread_runtime_remove_scontainer(thread_rt, ctr); + scontainer_free(ctr); + } + struct peer_health *ph = NULL, *tmp_ph = NULL; + HASH_ITER(hh, thread_rt->peer_health_table, ph, tmp_ph) + { + HASH_DEL(thread_rt->peer_health_table, ph); + free(ph); + } + pthread_mutex_destroy(&thread_rt->sanity_lock); + return; +} +void store_get_uuid(struct swarmkv_module *mod_store, uuid_t uuid) +{ + struct swarmkv_store *store = module2store(mod_store); + uuid_copy(uuid, store->opts->bin_uuid); + return; +} +void store_get_node_addr(struct swarmkv_module *mod_store, node_t *node) +{ + struct swarmkv_store *store = module2store(mod_store); + node_copy(node, &store->self); + return; +} +void store_mark_object_as_modified(struct swarmkv_module *mod_store, struct sobj *obj) +{ + struct swarmkv_store *store = module2store(mod_store); + struct scontainer *ctr = container_of(obj, struct scontainer, obj); + gettimeofday(&ctr->op_timestamp, NULL); + if (!ctr->is_in_sync_q && ctr->replica_node_list) + { + DL_APPEND(store->threads[ctr->tid].sync_queue, ctr); + ctr->is_in_sync_q = 1; + } + return; +} +int sobj_get_random_replica(struct sobj *obj, node_t *out) +{ + struct scontainer *ctr = container_of(obj, struct scontainer, obj); + if (!ctr->replica_node_list || 0 == utarray_len(ctr->replica_node_list)) + return 0; + node_t *replica = (node_t *)utarray_eltptr(ctr->replica_node_list, 0); + node_copy(out, replica); + return 1; +} +enum cmd_exec_result handle_undefined_object(struct sobj *obj, struct swarmkv_reply **reply) +{ + assert(obj->type == OBJ_TYPE_UNDEFINED); + node_t replica; + int ret = 0; + ret = sobj_get_random_replica(obj, &replica); + if (ret) + { + *reply = swarmkv_reply_new_node(&replica, 1); + return REDIRECT; + } + return NEED_KEY_ROUTE; +} + + +#define MONITOR_SYNC_EVENT_NAME "crdt-sync-cycle" + void swarmkv_store_periodic(struct swarmkv_module *mod_store, int thread_id) { struct swarmkv_store *store = module2store(mod_store); - struct scontainer *ctr = NULL, *tmp = NULL; struct timespec start, end; int n_synced = 0; int real_tid = swarmkv_gettid(store->exec_cmd_handle); assert(real_tid == thread_id); clock_gettime(CLOCK_MONOTONIC, &start); - struct swarmkv_store_thread *thr = &store->threads[real_tid]; - thr->calls++; - if (store->opts->batch_sync_enabled) - { - n_synced = store_batch_sync(store, real_tid); - } - else - { - DL_FOREACH_SAFE(thr->sync_queue, ctr, tmp) - { - char *blob = NULL; - size_t blob_sz = 0; - scontainer_serialize(ctr, &blob, &blob_sz); - const char *argv[4]; - size_t argv_len[4]; - argv[0] = "crdt"; - argv_len[0] = strlen(argv[0]); - argv[1] = "merge"; - argv_len[1] = strlen(argv[1]); - argv[2] = ctr->obj.key; - argv_len[2] = sdslen(ctr->obj.key); - argv[3] = blob; - argv_len[3] = blob_sz; - for (int i = 0; i < utarray_len(ctr->replica_node_list); i++) - { - node_t *peer = utarray_eltptr(ctr->replica_node_list, i); - crdt_generic_call(store, CRDT_MERGE, peer, 4, argv, argv_len); - } - free(blob); - DL_DELETE(thr->sync_queue, ctr); - ctr->is_in_sync_q = 0; - store->synced++; - n_synced++; - if (n_synced >= MAX_SYNC_PER_PERIOD) - break; - } - } - thr->n_keys = HASH_COUNT(thr->obj_table); - DL_COUNT(thr->sync_queue, ctr, thr->keys_to_sync); - clock_gettime(CLOCK_MONOTONIC, &end); + n_synced = store_thread_runtime_sync(store->threads + real_tid, store->opts->batch_sync_enabled, store->exec_cmd_handle); + clock_gettime(CLOCK_MONOTONIC, &end); if (n_synced) { swarmkv_monitor_record_event(store->mod_monitor, MONITOR_SYNC_EVENT_NAME, timespec_diff_usec(&start, &end)); @@ -679,8 +621,11 @@ struct swarmkv_module *swarmkv_store_new(const struct swarmkv_options *opts) strncpy(store->module.name, "store", sizeof(store->module.name)); store->module.mod_ctx = store; store->opts = opts; - store->threads = ALLOC(struct swarmkv_store_thread, opts->nr_worker_threads); - + store->threads = ALLOC(struct store_thread_runtime, opts->nr_worker_threads); + for (size_t i = 0; i < opts->nr_worker_threads; i++) + { + store_thread_runtime_init(store->threads + i); + } node_init(&store->self, opts->cluster_announce_ip, opts->cluster_announce_port); return &(store->module); @@ -694,17 +639,9 @@ void swarmkv_store_set_exec_cmd_handle(struct swarmkv_module *mod_store, struct void swarmkv_store_free(struct swarmkv_module *mod_store) { struct swarmkv_store *store = module2store(mod_store); - struct scontainer *ctr = NULL, *tmp = NULL; - for (size_t i = 0; i < store->opts->nr_worker_threads; i++) { - - HASH_ITER(hh, store->threads[i].obj_table, ctr, tmp) - { - assert(ctr->tid == i); - scontainer_remove(&(store->threads[i].obj_table), ctr); - scontainer_free(ctr); - } + store_thread_runtime_deinit(store->threads + i); } free(store->threads); store->threads = NULL; @@ -722,17 +659,17 @@ void swarmkv_store_info(struct swarmkv_module *mod_store, struct store_info *inf struct swarmkv_store *store = module2store(mod_store); info->keys = 0; info->keys_to_sync = 0; - struct swarmkv_store_thread *thread = NULL; + struct store_thread_runtime *thread = NULL; memset(info, 0, sizeof(struct store_info)); for (size_t i = 0; i < store->opts->nr_worker_threads; i++) { thread = store->threads + i; info->keys_to_sync += __sync_add_and_fetch(&thread->keys_to_sync, 0); info->keys += __sync_add_and_fetch(&thread->n_keys, 0); + info->sync_ok += __sync_add_and_fetch(&thread->sync_ok, 0); + info->sync_err += __sync_add_and_fetch(&thread->sync_err, 0); + info->synced += __sync_add_and_fetch(&thread->synced, 0); } - info->sync_ok = __sync_add_and_fetch(&store->sync_ok, 0); - info->sync_err = __sync_add_and_fetch(&store->sync_err, 0); - info->synced = store->synced; } UT_icd ut_array_matched_reply = {sizeof(struct swarmkv_reply *), NULL, NULL, NULL}; @@ -755,6 +692,95 @@ enum cmd_exec_result type_command(struct swarmkv_module *mod_store, const struct *reply = swarmkv_reply_new_string_fmt(sobj_specs[obj->type].type_name); return FINISHED; } +struct scontainer *store_lookup_scontainer(struct swarmkv_store *store, sds key) +{ + struct scontainer *ctr = NULL; + int designated_tid = key2tid(key, store->opts->nr_worker_threads); + int real_tid = swarmkv_gettid(store->exec_cmd_handle); + assert(designated_tid == real_tid); + ctr = store_thread_runtime_find_scontainter(store->threads + designated_tid, key); + return ctr; +} +struct sobj *store_lookup(struct swarmkv_module *mod_store, sds key) +{ + struct swarmkv_store *store = module2store(mod_store); + struct scontainer *ctr = NULL; + + ctr = store_lookup_scontainer(store, key); + if (ctr) //&& !ctr->is_pending + { + return &(ctr->obj); + } + else + { + return NULL; + } +} +enum CRDT_OP +{ + CRDT_GET, + CRDT_MERGE, + CRDT_MEET +}; +struct crdt_generic_ctx +{ + enum CRDT_OP op; + struct swarmkv_store *store; + node_t peer; + sds key; +}; +void crdt_generic_ctx_free(struct crdt_generic_ctx *ctx) +{ + sdsfree(ctx->key); + ctx->store = NULL; + free(ctx); +} +static void crdt_generic_on_reply(const struct swarmkv_reply *reply, void *user) +{ + struct crdt_generic_ctx *ctx = (struct crdt_generic_ctx *)user; + + uuid_t uuid; + store_get_uuid(&(ctx->store->module), uuid); + + switch (ctx->op) + { + case CRDT_GET: + { + struct scontainer *ctr = NULL; + ctr = store_lookup_scontainer(ctx->store, ctx->key); + if (ctr && reply->type == SWARMKV_REPLY_VERBATIM) + { + sobj_merge_blob(&ctr->obj, reply->str, reply->len, uuid); + ctr->is_pending = 0; + } + break; + } + case CRDT_MEET: + { + break; + } + default: + { + assert(0); + break; + } + } + crdt_generic_ctx_free(ctx); + return; +} + +void crdt_generic_call(struct swarmkv_store *store, enum CRDT_OP op, const node_t *peer, int argc, const char *argv[], size_t *argv_len) +{ + struct crdt_generic_ctx *ctx = NULL; + assert(peer); + ctx = ALLOC(struct crdt_generic_ctx, 1); + ctx->op = op; + ctx->store = store; + ctx->key = sdsnew(argv[2]); + node_copy(&ctx->peer, peer); + swarmkv_async_command_on_argv(store->exec_cmd_handle, crdt_generic_on_reply, ctx, peer->addr, argc, argv, argv_len); + return; +} enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) { /*CRDT ADD key [IP:port ...]*/ @@ -781,7 +807,7 @@ enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const st { ctr->is_pending = 0; } - store_add_scontainer(store, ctr); + store_thread_runtime_add_scontainer(store->threads + ctr->tid, ctr); } node_t *replica_nodes = ALLOC(node_t, n_replica_node); for (size_t i = 0; i < n_replica_node; i++) @@ -892,7 +918,7 @@ enum cmd_exec_result crdt_del_command(struct swarmkv_module *mod_store, const st ctr = store_lookup_scontainer(store, key); if (ctr) { - store_remove_scontainer(store, ctr); + store_thread_runtime_remove_scontainer(store->threads + ctr->tid, ctr); scontainer_free(ctr); *reply = swarmkv_reply_new_integer(1); } @@ -954,16 +980,16 @@ enum cmd_exec_result crdt_keys_command(struct swarmkv_module *mod_store, const s // CRDT KEYS tid pattern int i = 0, n_matched = 0; struct pattern_match_arg cb_arg; - int thread_id = atoll(cmd->argv[2]); + int tid = atoll(cmd->argv[2]); cb_arg.pattern = cmd->argv[3]; utarray_new(cb_arg.matched_replies, &ut_array_matched_reply); struct swarmkv_store *store = module2store(mod_store); - if (thread_id > store->opts->nr_worker_threads) + if (tid > store->opts->nr_worker_threads) { *reply = swarmkv_reply_new_error("Invalid thread id"); return FINISHED; } - store_iterate_sobj(store, thread_id, pattern_match_function, &cb_arg); + store_thread_runtime_iterate_sobj(store->threads + tid, pattern_match_function, &cb_arg); n_matched = utarray_len(cb_arg.matched_replies); if (n_matched > 0) { |
