diff options
| author | Zheng Chao <[email protected]> | 2023-08-11 00:48:02 +0800 |
|---|---|---|
| committer | Zheng Chao <[email protected]> | 2023-08-11 00:48:02 +0800 |
| commit | 0c766ac60c1c71afbf6de38f97819fb2123a3d94 (patch) | |
| tree | f84c42f8d06dde07f5400628a254dd85c048ff84 | |
| parent | 546b91069cac4dabc0b2751f57b3cce2385f1291 (diff) | |
`__exec_cmd()` determins target node, and then target thread.
| -rw-r--r-- | src/inc_internal/swarmkv_limits.h | 1 | ||||
| -rw-r--r-- | src/swarmkv.c | 32 | ||||
| -rw-r--r-- | src/swarmkv_common.c | 4 | ||||
| -rw-r--r-- | src/swarmkv_store.c | 55 | ||||
| -rw-r--r-- | tools/swarmkv_cli.c | 2 |
5 files changed, 50 insertions, 44 deletions
diff --git a/src/inc_internal/swarmkv_limits.h b/src/inc_internal/swarmkv_limits.h index d698096..6eefbae 100644 --- a/src/inc_internal/swarmkv_limits.h +++ b/src/inc_internal/swarmkv_limits.h @@ -1,6 +1,7 @@ #pragma once #define KEYSPACE_SLOT_NUM 16384 +#define STORE_SHARD_NUM 128 #define SWARMKV_MAX_NODE_NUM 1024 #define MAX_REPLICA_NUM 8 #define SWARMKV_THREAD_MAX 16 diff --git a/src/swarmkv.c b/src/swarmkv.c index 900b2c0..5fb3802 100644 --- a/src/swarmkv.c +++ b/src/swarmkv.c @@ -316,19 +316,19 @@ enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct sw { if(0==strcasecmp("Node", cmd->argv[1])) { - *reply=swarmkv_reply_new_string_fmt(node_info_buff); + *reply=swarmkv_reply_new_status(node_info_buff); } else if(0==strcasecmp("Store", cmd->argv[1])) { - *reply=swarmkv_reply_new_string_fmt(store_info_buff); + *reply=swarmkv_reply_new_status(store_info_buff); } else if(0==strcasecmp("Keyspace", cmd->argv[1])) { - *reply=swarmkv_reply_new_string_fmt(ks_info_buff); + *reply=swarmkv_reply_new_status(ks_info_buff); } else if(0==strcasecmp("Network", cmd->argv[1])) { - *reply=swarmkv_reply_new_string_fmt(net_info_buff); + *reply=swarmkv_reply_new_status(net_info_buff); } else if(0==strcasecmp("Threads", cmd->argv[1])) { @@ -853,23 +853,29 @@ void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swar int target_tid=spec_gettid(spec, cmd, db->opts->nr_worker_threads); - struct swarmkv_msg *msg=NULL; - //Only worker threads can do network communication, so command should be executed at worker thread first. - if(cur_tid != target_tid) + struct swarmkv_msg *msg=NULL; + if(!node_is_empty(target_node) && node_compare(&db->self, target_node)) { - //cmd will be executed in target thread's on_msg_callback + //cmd is executed in target node's on_msg_callback long long sequence=swarmkv_rpc_launch(db->rpc_mgr, cur_tid, future_of_caller); msg=swarmkv_msg_new_by_cmd(cmd, &db->self, cur_tid, target_node, sequence); - swarmkv_mesh_send(db->mesh, cur_tid, target_tid, msg); + if(cur_tid >= db->opts->nr_worker_threads) + { + //Only worker threads can do network communication, so command should be executed at worker thread first. + swarmkv_mesh_send(db->mesh, cur_tid, random()%db->opts->nr_worker_threads, msg); + } + else + { + swarmkv_net_send(db->net, target_node, msg); + } return; } - - if(!node_is_empty(target_node) && node_compare(&db->self, target_node)) + if(cur_tid != target_tid) { - //cmd is executed in target node's on_msg_callback + //cmd will be executed in target thread's on_msg_callback long long sequence=swarmkv_rpc_launch(db->rpc_mgr, cur_tid, future_of_caller); msg=swarmkv_msg_new_by_cmd(cmd, &db->self, cur_tid, target_node, sequence); - swarmkv_net_send(db->net, target_node, msg); + swarmkv_mesh_send(db->mesh, cur_tid, target_tid, msg); return; } enum cmd_exec_result exec_ret=FINISHED; diff --git a/src/swarmkv_common.c b/src/swarmkv_common.c index 55406bc..ba5e7d0 100644 --- a/src/swarmkv_common.c +++ b/src/swarmkv_common.c @@ -746,8 +746,8 @@ int key2tid(const sds key, int nr_worker_threads) struct http_client { int response_code; - struct event_base* evbase; - struct evhttp_connection* evhttpconn; + struct event_base *evbase; + struct evhttp_connection *evhttpconn; struct evhttp_request *req; int error_happens; sds response_body; diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c index c93ac3d..0d0838d 100644 --- a/src/swarmkv_store.c +++ b/src/swarmkv_store.c @@ -451,7 +451,7 @@ enum CRDT_OP { CRDT_GET, CRDT_MERGE, - CRDT_JOIN + CRDT_MEET }; struct crdt_generic_ctx { @@ -482,6 +482,7 @@ static void crdt_generic_on_reply(const struct swarmkv_reply *reply, void *user) uuid_t uuid; store_get_uuid(&(ctx->store->module), uuid); int tid=__gettid(ctx->store->exec_cmd_handle); + long long error_before=ctx->store->sync_err; if(reply->type==SWARMKV_REPLY_ERROR) { atomic_inc(&ctx->store->sync_err); @@ -513,8 +514,8 @@ static void crdt_generic_on_reply(const struct swarmkv_reply *reply, void *user) atomic_inc(&ctx->store->sync_err); } break; - case CRDT_JOIN: - if(reply->type==SWARMKV_REPLY_INTEGER && reply->integer==1) + case CRDT_MEET: + if(reply->type==SWARMKV_REPLY_INTEGER) { atomic_inc(&ctx->store->sync_ok); } @@ -527,6 +528,7 @@ static void crdt_generic_on_reply(const struct swarmkv_reply *reply, void *user) atomic_inc(&ctx->store->sync_err); break; } + assert(ctx->store->sync_err==error_before); error_out: crdt_generic_ctx_free(ctx); return; @@ -557,33 +559,30 @@ void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id) struct swarmkv_store_thread *thr=&store->threads[real_tid]; thr->calls++; - struct sync_master *sync_master=sync_master_new(); + DL_FOREACH_SAFE(thr->sync_queue, ctr, tmp) { char *blob=NULL; size_t blob_sz=0; scontainer_serialize(ctr, &blob, &blob_sz); - sync_master_add_obj(sync_master, ctr->obj.key, blob, blob_sz, - utarray_front(ctr->replica_node_list), utarray_len(ctr->replica_node_list)); + struct swarmkv_cmd *cmd=swarmkv_cmd_new(4); + cmd->argv[0]=sdsnew("crdt"); + cmd->argv[1]=sdsnew("merge"); + cmd->argv[2]=sdsnew(ctr->obj.key); + cmd->argv[3]=sdsnewlen(blob, blob_sz); + for(int i=0; i<utarray_len(ctr->replica_node_list); i++) + { + node_t *peer=utarray_eltptr(ctr->replica_node_list, i); + crdt_generic_call(store, CRDT_MERGE, cmd, peer); + } + swarmkv_cmd_free(cmd); + free(blob); DL_DELETE(thr->sync_queue, ctr); ctr->is_in_sync_q=0; store->synced++; n_synced++; if(n_synced>=MAX_SYNC_PER_PERIOD) break; } - node_t peer; - struct swarmkv_cmd *cmd=NULL; - int ret=1; - while(1) - { - ret=sync_master_get_cmd(sync_master, &peer, &cmd); - if(!ret) break; - crdt_generic_call(store, CRDT_MERGE, cmd, &peer); - //printf("Debug: %s synced %s -> %s\n", cmd->argv[2], store->self.addr, peer.addr); - swarmkv_cmd_free(cmd); - cmd=NULL; - } - sync_master_free(sync_master); thr->n_keys=HASH_COUNT(thr->obj_table); DL_COUNT(thr->sync_queue, ctr, thr->keys_to_sync); clock_gettime(CLOCK_MONOTONIC, &end); @@ -640,7 +639,7 @@ void swarmkv_store_set_monitor_handle(struct swarmkv_module *mod_store, struct s void swarmkv_store_info(struct swarmkv_module *mod_store, struct store_info *info) { struct swarmkv_store *store=module2store(mod_store); - info->shards=store->opts->nr_worker_threads; + info->shards=STORE_SHARD_NUM; info->keys=0; info->keys_to_sync=0; struct swarmkv_store_thread *thread=NULL; @@ -714,20 +713,20 @@ enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const st assert(node_compare(replica_nodes+i, &store->self)!=0); scontainer_add_replica_node(ctr, replica_nodes+i); } - struct swarmkv_cmd *crdt_join_cmd=NULL; + struct swarmkv_cmd *crdt_meet_cmd=NULL; for(size_t i=0; i<n_replica_node; i++) { if(i<max_pull_node_num) { crdt_generic_call(store, CRDT_GET, crdt_get_cmd, replica_nodes+i); } - crdt_join_cmd=swarmkv_cmd_new(4); - crdt_join_cmd->argv[0]=sdsnew("crdt"); - crdt_join_cmd->argv[1]=sdsnew("meet"); - crdt_join_cmd->argv[2]=sdsdup(ctr->obj.key); - crdt_join_cmd->argv[3]=node_addr2sds(&store->self); - crdt_generic_call(store, CRDT_JOIN, crdt_join_cmd, replica_nodes+i); - swarmkv_cmd_free(crdt_join_cmd); + crdt_meet_cmd=swarmkv_cmd_new(4); + crdt_meet_cmd->argv[0]=sdsnew("crdt"); + crdt_meet_cmd->argv[1]=sdsnew("meet"); + crdt_meet_cmd->argv[2]=sdsdup(ctr->obj.key); + crdt_meet_cmd->argv[3]=node_addr2sds(&store->self); + crdt_generic_call(store, CRDT_MEET, crdt_meet_cmd, replica_nodes+i); + swarmkv_cmd_free(crdt_meet_cmd); } swarmkv_cmd_free(crdt_get_cmd); crdt_get_cmd=NULL; diff --git a/tools/swarmkv_cli.c b/tools/swarmkv_cli.c index c26e2b1..9bfbe62 100644 --- a/tools/swarmkv_cli.c +++ b/tools/swarmkv_cli.c @@ -557,7 +557,7 @@ struct swarmkv_reply *cluster_info_command(struct swarmkv *db, char *argv[], siz reply->elements[i]=swarmkv_reply_new_array(2); reply->elements[i]->elements[0]=swarmkv_reply_new_string_fmt(nodes_reply->elements[i]->str); aligned_info=str_replace(per_node_info_reply->str, "\n", "\n "); - reply->elements[i]->elements[1]=swarmkv_reply_new_string_fmt(aligned_info); + reply->elements[i]->elements[1]=swarmkv_reply_new_status(aligned_info); free(aligned_info); swarmkv_reply_free(per_node_info_reply); } |
