summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZheng Chao <[email protected]>2023-08-05 21:26:05 +0800
committerZheng Chao <[email protected]>2023-08-05 21:26:05 +0800
commit9bd52eb1b47669c90970252b3b5f1dc67ab37de9 (patch)
treee63d24014772558cd3ff916a0710bdefd3cf773d
parent3d97b9505d8e164ba6b1217aea4f3483c3a26f2a (diff)
For both local and remote caller, __exec_cmd executes the command recursively.
-rw-r--r--src/inc_internal/swarmkv_net.h1
-rw-r--r--src/inc_internal/swarmkv_store.h2
-rw-r--r--src/swarmkv.c14
-rw-r--r--src/swarmkv_keyspace.c30
-rw-r--r--src/swarmkv_store.c36
-rw-r--r--test/swarmkv_gtest.cpp14
6 files changed, 78 insertions, 19 deletions
diff --git a/src/inc_internal/swarmkv_net.h b/src/inc_internal/swarmkv_net.h
index 96284c1..39b9544 100644
--- a/src/inc_internal/swarmkv_net.h
+++ b/src/inc_internal/swarmkv_net.h
@@ -8,6 +8,7 @@ struct swarmkv_net *swarmkv_net_new(struct event_base *evbases[], int nr_worker_
void swarmkv_net_free(struct swarmkv_net *net);
void swarmkv_net_set_on_msg_callback(struct swarmkv_net *net, on_msg_callback_t *cb, void *cb_arg);
+//swamrkv_net_send takes the ownership of msg.
void swarmkv_net_send(struct swarmkv_net *net, const node_t *dest, struct swarmkv_msg *msg);
void swarmkv_net_set_monitor_handle(struct swarmkv_net *net, struct swarmkv_module *mod_monitor);
diff --git a/src/inc_internal/swarmkv_store.h b/src/inc_internal/swarmkv_store.h
index f25ff5f..79c7671 100644
--- a/src/inc_internal/swarmkv_store.h
+++ b/src/inc_internal/swarmkv_store.h
@@ -70,7 +70,7 @@ enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const st
enum cmd_exec_result crdt_get_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply);
enum cmd_exec_result crdt_merge_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply);
enum cmd_exec_result crdt_del_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply);
-enum cmd_exec_result crdt_join_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply);
+enum cmd_exec_result crdt_meet_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply);
enum cmd_exec_result crdt_keys_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply);
enum cmd_exec_result crdt_exists_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply);
enum cmd_exec_result crdt_rlist_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply);
diff --git a/src/swarmkv.c b/src/swarmkv.c
index 120e358..3713eba 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -216,7 +216,7 @@ void exec_for_remote(struct swarmkv *db, const struct swarmkv_msg *msg)
ctx->my_future=future_create("for_remote",
remote_caller_on_success,
remote_caller_on_fail,
- ctx);
+ ctx);
__exec_cmd(db, &msg->caller, NULL, msg->cmd, ctx->my_future);
return;
}
@@ -810,6 +810,7 @@ static void key_route_on_success(void *result, void *user)
if(n_replica_node>0)
{
__exec_cmd(ctx->db, &ctx->db->self, replica_nodes+0, ctx->cmd, ctx->future_of_caller);
+ //__send_cmd(ctx->db, replica_nodes+0, ctx->cmd, ctx->future_of_caller);
}
if(self_is_a_replica)
{
@@ -848,6 +849,7 @@ void __on_msg_callback(struct swarmkv_msg *msg, void *arg)
int cur_tid=__gettid(db);
if(msg->type==MSG_TYPE_CMD)
{
+ //command may be from other thread or other node.
assert(cur_tid < db->opts->nr_worker_threads);
exec_for_remote(db, msg);
swarmkv_msg_free(msg);
@@ -856,6 +858,7 @@ void __on_msg_callback(struct swarmkv_msg *msg, void *arg)
{
if(msg->caller_tid!=cur_tid)
{
+ //The msg's onwership is transfered to swarmkv_mesh_send.
swarmkv_mesh_send(db->mesh, cur_tid, msg->caller_tid, msg);
}
else
@@ -895,7 +898,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t *
return;
}
/*Initiating a non-auto-route command without target from local is NOT allowed.*/
- if(!spec->auto_route && is_called_by_self && !target_node)
+ if(0 && !spec->auto_route && is_called_by_self && !target_node)
{
reply=swarmkv_reply_new_error(error_no_target_node, spec->name);
p=future_to_promise(future_of_caller);
@@ -931,7 +934,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t *
swarmkv_monitor_record_command(db->mod_monitor, spec->name, timespec_diff_usec(&start, &end));
- if(!is_called_by_self)//Remote call, non-recursive exec
+ if(0 && !is_called_by_self)//Remote call, non-recursive exec
{
struct promise *p=future_to_promise(future_of_caller);
promise_success(p, reply);
@@ -978,6 +981,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t *
}
return;
}
+
void command_register(struct swarmkv_cmd_spec **table, const char *name, const char *hint,
int arity, int key_offset, enum cmd_key_flag flag, enum key_not_found_reply failover, int auto_route,
command_proc_func *proc, struct swarmkv_module *module)
@@ -1141,9 +1145,9 @@ void command_spec_init(struct swarmkv *db)
command_register(&(db->command_table), "CRDT MERGE", "key blob [key blob ...]",
2, 2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE,
crdt_merge_command, db->mod_store);
- command_register(&(db->command_table), "CRDT JOIN", "key IP:port",
+ command_register(&(db->command_table), "CRDT MEET", "key IP:port [IP:port ...]",
2, 2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE,
- crdt_join_command, db->mod_store);
+ crdt_meet_command, db->mod_store);
command_register(&(db->command_table), "CRDT DEL", "key",
1, 2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE,
crdt_del_command, db->mod_store);
diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c
index e18b15c..afa17fe 100644
--- a/src/swarmkv_keyspace.c
+++ b/src/swarmkv_keyspace.c
@@ -175,12 +175,38 @@ void key_entry_deletion_notification(struct key_route_entry *key_entry, const no
struct crdt_del_ctx *ctx=ALLOC(struct crdt_del_ctx, 1);
node_copy(&ctx->peer, &replica->node);
ctx->key=sdsdup(key_entry->key);
- ctx->f=future_create("_del", crdt_del_on_succ, crdt_del_on_fail, ctx);
+ ctx->f=future_create("crdt_del", crdt_del_on_succ, crdt_del_on_fail, ctx);
exec_cmd(exec_cmd_handle, self, &replica->node, crdt_del_cmd, ctx->f);
}
swarmkv_cmd_free(crdt_del_cmd);
return;
}
+void key_entry_meet_replica(struct key_route_entry *key_entry, const node_t *self, exec_cmd_func *exec_cmd, struct swarmkv *exec_cmd_handle)
+{
+ struct replica_node *replica=NULL, *tmp=NULL;
+ int n_replica=HASH_COUNT(key_entry->hash_replica), i=0;
+ struct swarmkv_cmd *crdt_meet_cmd=swarmkv_cmd_new(3+n_replica);
+ crdt_meet_cmd->argv[0]=sdsnew("crdt");
+ crdt_meet_cmd->argv[1]=sdsnew("meet");
+ crdt_meet_cmd->argv[2]=sdsdup(key_entry->key);
+
+ HASH_ITER(hh, key_entry->hash_replica, replica, tmp)
+ {
+ crdt_meet_cmd->argv[3+i]=node_addr2sds(&replica->node);
+ i++;
+ }
+ assert(i==n_replica);
+ HASH_ITER(hh, key_entry->hash_replica, replica, tmp)
+ {
+ struct crdt_del_ctx *ctx=ALLOC(struct crdt_del_ctx, 1);
+ node_copy(&ctx->peer, &replica->node);
+ ctx->key=sdsdup(key_entry->key);
+ ctx->f=future_create("crdt_meet", crdt_del_on_succ, crdt_del_on_fail, ctx);
+ exec_cmd(exec_cmd_handle, self, &replica->node, crdt_meet_cmd, ctx->f);
+ }
+ swarmkv_cmd_free(crdt_meet_cmd);
+ return;
+}
struct slot_runtime
{
@@ -1672,6 +1698,7 @@ enum cmd_exec_result key_route_generic(struct swarmkv_keyspace *ks, enum KEYSPAC
HASH_ADD_KEYPTR(hh, slot_rt->keyroute_table, key_entry->key, sdslen(key_entry->key), key_entry);
}
key_entry_add_replica_node(key_entry, new_replica);
+ key_entry_meet_replica(key_entry, &ks->self, ks->exec_cmd_func, ks->exec_cmd_handle);
*reply=key_entry_list_replica_nodes(key_entry);
break;
case KEYSPACE_XRADD:
@@ -1679,6 +1706,7 @@ enum cmd_exec_result key_route_generic(struct swarmkv_keyspace *ks, enum KEYSPAC
{
key_entry_add_replica_node(key_entry, new_replica);
*reply=key_entry_list_replica_nodes(key_entry);
+ key_entry_meet_replica(key_entry, &ks->self, ks->exec_cmd_func, ks->exec_cmd_handle);
}
else
{
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c
index 047db36..9fca213 100644
--- a/src/swarmkv_store.c
+++ b/src/swarmkv_store.c
@@ -342,7 +342,12 @@ void store_get_uuid(struct swarmkv_module* mod_store, uuid_t uuid)
uuid_copy(uuid, store->my_uuid);
return;
}
-
+void store_get_node_addr(struct swarmkv_module* mod_store, node_t *node)
+{
+ struct swarmkv_store *store=module2store(mod_store);
+ node_copy(node, &store->self);
+ return;
+}
void sobj_need_sync(struct swarmkv_module *mod_store, struct sobj *obj)
{
struct swarmkv_store *store=module2store(mod_store);
@@ -577,7 +582,7 @@ void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id)
n_synced++;
for(int i=0; i<utarray_len(ctr->replica_node_list); i++)
{
- printf("Debug: %s synced %s -> %s\n", ctr->obj.key, store->self.addr, ((node_t*)utarray_eltptr(ctr->replica_node_list, i))->addr);
+// printf("Debug: %s synced %s -> %s\n", ctr->obj.key, store->self.addr, ((node_t*)utarray_eltptr(ctr->replica_node_list, i))->addr);
}
if(n_synced>=MAX_SYNC_PER_PERIOD) break;
@@ -590,6 +595,8 @@ void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id)
ret=sync_master_get_cmd(sync_master, &peer, &cmd);
if(!ret) break;
crdt_generic_call(store, CRDT_MERGE, cmd, &peer);
+ printf("Debug: %s synced %s -> %s\n", cmd->argv[2], store->self.addr, peer.addr);
+
swarmkv_cmd_free(cmd);
cmd=NULL;
}
@@ -791,7 +798,10 @@ enum cmd_exec_result crdt_merge_command(struct swarmkv_module *mod_store, const
int __attribute__((__unused__))tid=0;
first_key_tid=store_gettid(mod_store, cmd->argv[2]);
+ node_t self;
+ store_get_node_addr(mod_store, &self);
store_get_uuid(mod_store, uuid);
+ printf("Merge: %s merge %s -> %s\n", accessing_node->addr, cmd->argv[2], self.addr);
for(size_t i=0; i<cmd->argc-2; i+=2)
{
key=cmd->argv[2+i];
@@ -828,23 +838,31 @@ enum cmd_exec_result crdt_del_command(struct swarmkv_module *mod_store, const st
}
return FINISHED;
}
-enum cmd_exec_result crdt_join_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply)
+enum cmd_exec_result crdt_meet_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply)
{
- /* CRDT JOIN <key> IP:port*/
+ /* CRDT MEET <key> IP:port [IP:port ...]*/
struct scontainer *ctr=NULL;
const sds key=cmd->argv[2];
assert(accessing_node!=NULL);//should never invoked by local
node_t replica_node;
- node_init_from_sds(&replica_node, cmd->argv[3]);
+
struct swarmkv_store *store=module2store(mod_store);
- assert(node_compare(&replica_node, &store->self)!=0);
- printf("CRDT JOIN %s %s <- %s\n", key, store->self.addr, cmd->argv[3]);
+ //assert(node_compare(&replica_node, &store->self)!=0);
+ //printf("CRDT MEET %s %s <- %s\n", key, store->self.addr, cmd->argv[3]);
ctr=store_lookup_scontainer(store, key);
int added=0;
if(ctr)
{
- added=scontainer_add_replica_node(ctr, &replica_node);
+ for(int i=0; i<cmd->argc-3; i++)
+ {
+ node_init_from_sds(&replica_node, cmd->argv[3+i]);
+ if(node_compare(&replica_node, &store->self)==0)
+ {
+ continue;
+ }
+ added+=scontainer_add_replica_node(ctr, &replica_node);
+ }
*reply=swarmkv_reply_new_integer(added);
}
else
@@ -910,7 +928,7 @@ enum cmd_exec_result crdt_exists_command(struct swarmkv_module *mod_store, const
sds key=cmd->argv[2];
struct sobj *obj=NULL;
obj=store_lookup(mod_store, key);
- *reply=swarmkv_reply_new_integer(obj?1:0);
+ *reply=swarmkv_reply_new_integer((obj && obj->type!=OBJ_TYPE_UNDEFINED)?1:0);
return FINISHED;
}
diff --git a/test/swarmkv_gtest.cpp b/test/swarmkv_gtest.cpp
index 878ce60..1bf8be3 100644
--- a/test/swarmkv_gtest.cpp
+++ b/test/swarmkv_gtest.cpp
@@ -1425,7 +1425,7 @@ TEST(CloudNative, AnnounceIPPort)
swarmkv_close(db[i]);
}
}
-#define TEST_NODE_NUMBER 8
+#define TEST_NODE_NUMBER 2
class SwarmkvNNodes : public testing::Test
{
@@ -1449,8 +1449,9 @@ protected:
opts[i]=swarmkv_options_new();
swarmkv_options_set_cluster_port(opts[i], 5210+i);
swarmkv_options_set_health_check_port(opts[i], 6210+i);
- swarmkv_options_set_worker_thread_number(opts[i], 2);
+ swarmkv_options_set_worker_thread_number(opts[i], 8);
swarmkv_options_set_logger(opts[i], logger);
+ swarmkv_options_set_cluster_timeout_us(opts[i], 30*1000*1000);
db[i]=swarmkv_open(opts[i], cluster_name, &err);
if(err)
{
@@ -1504,7 +1505,14 @@ TEST_F(SwarmkvNNodes, SET_GET)
swarmkv_reply_free(reply);
wait_for_sync();
- //sleep(10);
+ sleep(2);
+ for(i=0; i<TEST_NODE_NUMBER; i++)
+ {
+ reply=swarmkv_command_on(db[i], swarmkv_self_address(db[i]), "CRDT EXISTS %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 1);
+ swarmkv_reply_free(reply);
+ }
for(i=0; i<TEST_NODE_NUMBER; i++)
{
reply=swarmkv_command(db[i], "GET %s", key);