summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZheng Chao <[email protected]>2023-08-11 00:48:02 +0800
committerZheng Chao <[email protected]>2023-08-11 00:48:02 +0800
commit0c766ac60c1c71afbf6de38f97819fb2123a3d94 (patch)
treef84c42f8d06dde07f5400628a254dd85c048ff84
parent546b91069cac4dabc0b2751f57b3cce2385f1291 (diff)
`__exec_cmd()` determins target node, and then target thread.
-rw-r--r--src/inc_internal/swarmkv_limits.h1
-rw-r--r--src/swarmkv.c32
-rw-r--r--src/swarmkv_common.c4
-rw-r--r--src/swarmkv_store.c55
-rw-r--r--tools/swarmkv_cli.c2
5 files changed, 50 insertions, 44 deletions
diff --git a/src/inc_internal/swarmkv_limits.h b/src/inc_internal/swarmkv_limits.h
index d698096..6eefbae 100644
--- a/src/inc_internal/swarmkv_limits.h
+++ b/src/inc_internal/swarmkv_limits.h
@@ -1,6 +1,7 @@
#pragma once
#define KEYSPACE_SLOT_NUM 16384
+#define STORE_SHARD_NUM 128
#define SWARMKV_MAX_NODE_NUM 1024
#define MAX_REPLICA_NUM 8
#define SWARMKV_THREAD_MAX 16
diff --git a/src/swarmkv.c b/src/swarmkv.c
index 900b2c0..5fb3802 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -316,19 +316,19 @@ enum cmd_exec_result info_command(struct swarmkv_module *mod_db, const struct sw
{
if(0==strcasecmp("Node", cmd->argv[1]))
{
- *reply=swarmkv_reply_new_string_fmt(node_info_buff);
+ *reply=swarmkv_reply_new_status(node_info_buff);
}
else if(0==strcasecmp("Store", cmd->argv[1]))
{
- *reply=swarmkv_reply_new_string_fmt(store_info_buff);
+ *reply=swarmkv_reply_new_status(store_info_buff);
}
else if(0==strcasecmp("Keyspace", cmd->argv[1]))
{
- *reply=swarmkv_reply_new_string_fmt(ks_info_buff);
+ *reply=swarmkv_reply_new_status(ks_info_buff);
}
else if(0==strcasecmp("Network", cmd->argv[1]))
{
- *reply=swarmkv_reply_new_string_fmt(net_info_buff);
+ *reply=swarmkv_reply_new_status(net_info_buff);
}
else if(0==strcasecmp("Threads", cmd->argv[1]))
{
@@ -853,23 +853,29 @@ void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swar
int target_tid=spec_gettid(spec, cmd, db->opts->nr_worker_threads);
- struct swarmkv_msg *msg=NULL;
- //Only worker threads can do network communication, so command should be executed at worker thread first.
- if(cur_tid != target_tid)
+ struct swarmkv_msg *msg=NULL;
+ if(!node_is_empty(target_node) && node_compare(&db->self, target_node))
{
- //cmd will be executed in target thread's on_msg_callback
+ //cmd is executed in target node's on_msg_callback
long long sequence=swarmkv_rpc_launch(db->rpc_mgr, cur_tid, future_of_caller);
msg=swarmkv_msg_new_by_cmd(cmd, &db->self, cur_tid, target_node, sequence);
- swarmkv_mesh_send(db->mesh, cur_tid, target_tid, msg);
+ if(cur_tid >= db->opts->nr_worker_threads)
+ {
+ //Only worker threads can do network communication, so command should be executed at worker thread first.
+ swarmkv_mesh_send(db->mesh, cur_tid, random()%db->opts->nr_worker_threads, msg);
+ }
+ else
+ {
+ swarmkv_net_send(db->net, target_node, msg);
+ }
return;
}
-
- if(!node_is_empty(target_node) && node_compare(&db->self, target_node))
+ if(cur_tid != target_tid)
{
- //cmd is executed in target node's on_msg_callback
+ //cmd will be executed in target thread's on_msg_callback
long long sequence=swarmkv_rpc_launch(db->rpc_mgr, cur_tid, future_of_caller);
msg=swarmkv_msg_new_by_cmd(cmd, &db->self, cur_tid, target_node, sequence);
- swarmkv_net_send(db->net, target_node, msg);
+ swarmkv_mesh_send(db->mesh, cur_tid, target_tid, msg);
return;
}
enum cmd_exec_result exec_ret=FINISHED;
diff --git a/src/swarmkv_common.c b/src/swarmkv_common.c
index 55406bc..ba5e7d0 100644
--- a/src/swarmkv_common.c
+++ b/src/swarmkv_common.c
@@ -746,8 +746,8 @@ int key2tid(const sds key, int nr_worker_threads)
struct http_client
{
int response_code;
- struct event_base* evbase;
- struct evhttp_connection* evhttpconn;
+ struct event_base *evbase;
+ struct evhttp_connection *evhttpconn;
struct evhttp_request *req;
int error_happens;
sds response_body;
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c
index c93ac3d..0d0838d 100644
--- a/src/swarmkv_store.c
+++ b/src/swarmkv_store.c
@@ -451,7 +451,7 @@ enum CRDT_OP
{
CRDT_GET,
CRDT_MERGE,
- CRDT_JOIN
+ CRDT_MEET
};
struct crdt_generic_ctx
{
@@ -482,6 +482,7 @@ static void crdt_generic_on_reply(const struct swarmkv_reply *reply, void *user)
uuid_t uuid;
store_get_uuid(&(ctx->store->module), uuid);
int tid=__gettid(ctx->store->exec_cmd_handle);
+ long long error_before=ctx->store->sync_err;
if(reply->type==SWARMKV_REPLY_ERROR)
{
atomic_inc(&ctx->store->sync_err);
@@ -513,8 +514,8 @@ static void crdt_generic_on_reply(const struct swarmkv_reply *reply, void *user)
atomic_inc(&ctx->store->sync_err);
}
break;
- case CRDT_JOIN:
- if(reply->type==SWARMKV_REPLY_INTEGER && reply->integer==1)
+ case CRDT_MEET:
+ if(reply->type==SWARMKV_REPLY_INTEGER)
{
atomic_inc(&ctx->store->sync_ok);
}
@@ -527,6 +528,7 @@ static void crdt_generic_on_reply(const struct swarmkv_reply *reply, void *user)
atomic_inc(&ctx->store->sync_err);
break;
}
+ assert(ctx->store->sync_err==error_before);
error_out:
crdt_generic_ctx_free(ctx);
return;
@@ -557,33 +559,30 @@ void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id)
struct swarmkv_store_thread *thr=&store->threads[real_tid];
thr->calls++;
- struct sync_master *sync_master=sync_master_new();
+
DL_FOREACH_SAFE(thr->sync_queue, ctr, tmp)
{
char *blob=NULL;
size_t blob_sz=0;
scontainer_serialize(ctr, &blob, &blob_sz);
- sync_master_add_obj(sync_master, ctr->obj.key, blob, blob_sz,
- utarray_front(ctr->replica_node_list), utarray_len(ctr->replica_node_list));
+ struct swarmkv_cmd *cmd=swarmkv_cmd_new(4);
+ cmd->argv[0]=sdsnew("crdt");
+ cmd->argv[1]=sdsnew("merge");
+ cmd->argv[2]=sdsnew(ctr->obj.key);
+ cmd->argv[3]=sdsnewlen(blob, blob_sz);
+ for(int i=0; i<utarray_len(ctr->replica_node_list); i++)
+ {
+ node_t *peer=utarray_eltptr(ctr->replica_node_list, i);
+ crdt_generic_call(store, CRDT_MERGE, cmd, peer);
+ }
+ swarmkv_cmd_free(cmd);
+ free(blob);
DL_DELETE(thr->sync_queue, ctr);
ctr->is_in_sync_q=0;
store->synced++;
n_synced++;
if(n_synced>=MAX_SYNC_PER_PERIOD) break;
}
- node_t peer;
- struct swarmkv_cmd *cmd=NULL;
- int ret=1;
- while(1)
- {
- ret=sync_master_get_cmd(sync_master, &peer, &cmd);
- if(!ret) break;
- crdt_generic_call(store, CRDT_MERGE, cmd, &peer);
- //printf("Debug: %s synced %s -> %s\n", cmd->argv[2], store->self.addr, peer.addr);
- swarmkv_cmd_free(cmd);
- cmd=NULL;
- }
- sync_master_free(sync_master);
thr->n_keys=HASH_COUNT(thr->obj_table);
DL_COUNT(thr->sync_queue, ctr, thr->keys_to_sync);
clock_gettime(CLOCK_MONOTONIC, &end);
@@ -640,7 +639,7 @@ void swarmkv_store_set_monitor_handle(struct swarmkv_module *mod_store, struct s
void swarmkv_store_info(struct swarmkv_module *mod_store, struct store_info *info)
{
struct swarmkv_store *store=module2store(mod_store);
- info->shards=store->opts->nr_worker_threads;
+ info->shards=STORE_SHARD_NUM;
info->keys=0;
info->keys_to_sync=0;
struct swarmkv_store_thread *thread=NULL;
@@ -714,20 +713,20 @@ enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const st
assert(node_compare(replica_nodes+i, &store->self)!=0);
scontainer_add_replica_node(ctr, replica_nodes+i);
}
- struct swarmkv_cmd *crdt_join_cmd=NULL;
+ struct swarmkv_cmd *crdt_meet_cmd=NULL;
for(size_t i=0; i<n_replica_node; i++)
{
if(i<max_pull_node_num)
{
crdt_generic_call(store, CRDT_GET, crdt_get_cmd, replica_nodes+i);
}
- crdt_join_cmd=swarmkv_cmd_new(4);
- crdt_join_cmd->argv[0]=sdsnew("crdt");
- crdt_join_cmd->argv[1]=sdsnew("meet");
- crdt_join_cmd->argv[2]=sdsdup(ctr->obj.key);
- crdt_join_cmd->argv[3]=node_addr2sds(&store->self);
- crdt_generic_call(store, CRDT_JOIN, crdt_join_cmd, replica_nodes+i);
- swarmkv_cmd_free(crdt_join_cmd);
+ crdt_meet_cmd=swarmkv_cmd_new(4);
+ crdt_meet_cmd->argv[0]=sdsnew("crdt");
+ crdt_meet_cmd->argv[1]=sdsnew("meet");
+ crdt_meet_cmd->argv[2]=sdsdup(ctr->obj.key);
+ crdt_meet_cmd->argv[3]=node_addr2sds(&store->self);
+ crdt_generic_call(store, CRDT_MEET, crdt_meet_cmd, replica_nodes+i);
+ swarmkv_cmd_free(crdt_meet_cmd);
}
swarmkv_cmd_free(crdt_get_cmd);
crdt_get_cmd=NULL;
diff --git a/tools/swarmkv_cli.c b/tools/swarmkv_cli.c
index c26e2b1..9bfbe62 100644
--- a/tools/swarmkv_cli.c
+++ b/tools/swarmkv_cli.c
@@ -557,7 +557,7 @@ struct swarmkv_reply *cluster_info_command(struct swarmkv *db, char *argv[], siz
reply->elements[i]=swarmkv_reply_new_array(2);
reply->elements[i]->elements[0]=swarmkv_reply_new_string_fmt(nodes_reply->elements[i]->str);
aligned_info=str_replace(per_node_info_reply->str, "\n", "\n ");
- reply->elements[i]->elements[1]=swarmkv_reply_new_string_fmt(aligned_info);
+ reply->elements[i]->elements[1]=swarmkv_reply_new_status(aligned_info);
free(aligned_info);
swarmkv_reply_free(per_node_info_reply);
}