diff options
| author | Zheng Chao <[email protected]> | 2023-08-05 21:26:05 +0800 |
|---|---|---|
| committer | Zheng Chao <[email protected]> | 2023-08-05 21:26:05 +0800 |
| commit | 9bd52eb1b47669c90970252b3b5f1dc67ab37de9 (patch) | |
| tree | e63d24014772558cd3ff916a0710bdefd3cf773d | |
| parent | 3d97b9505d8e164ba6b1217aea4f3483c3a26f2a (diff) | |
For both local and remote caller, __exec_cmd executes the command recursively.
| -rw-r--r-- | src/inc_internal/swarmkv_net.h | 1 | ||||
| -rw-r--r-- | src/inc_internal/swarmkv_store.h | 2 | ||||
| -rw-r--r-- | src/swarmkv.c | 14 | ||||
| -rw-r--r-- | src/swarmkv_keyspace.c | 30 | ||||
| -rw-r--r-- | src/swarmkv_store.c | 36 | ||||
| -rw-r--r-- | test/swarmkv_gtest.cpp | 14 |
6 files changed, 78 insertions, 19 deletions
diff --git a/src/inc_internal/swarmkv_net.h b/src/inc_internal/swarmkv_net.h index 96284c1..39b9544 100644 --- a/src/inc_internal/swarmkv_net.h +++ b/src/inc_internal/swarmkv_net.h @@ -8,6 +8,7 @@ struct swarmkv_net *swarmkv_net_new(struct event_base *evbases[], int nr_worker_ void swarmkv_net_free(struct swarmkv_net *net); void swarmkv_net_set_on_msg_callback(struct swarmkv_net *net, on_msg_callback_t *cb, void *cb_arg); +//swamrkv_net_send takes the ownership of msg. void swarmkv_net_send(struct swarmkv_net *net, const node_t *dest, struct swarmkv_msg *msg); void swarmkv_net_set_monitor_handle(struct swarmkv_net *net, struct swarmkv_module *mod_monitor); diff --git a/src/inc_internal/swarmkv_store.h b/src/inc_internal/swarmkv_store.h index f25ff5f..79c7671 100644 --- a/src/inc_internal/swarmkv_store.h +++ b/src/inc_internal/swarmkv_store.h @@ -70,7 +70,7 @@ enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const st enum cmd_exec_result crdt_get_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply); enum cmd_exec_result crdt_merge_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply); enum cmd_exec_result crdt_del_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply); -enum cmd_exec_result crdt_join_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply); +enum cmd_exec_result crdt_meet_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply); enum cmd_exec_result crdt_keys_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply); enum cmd_exec_result crdt_exists_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply); enum cmd_exec_result crdt_rlist_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply); diff --git a/src/swarmkv.c b/src/swarmkv.c index 120e358..3713eba 100644 --- a/src/swarmkv.c +++ b/src/swarmkv.c @@ -216,7 +216,7 @@ void exec_for_remote(struct swarmkv *db, const struct swarmkv_msg *msg) ctx->my_future=future_create("for_remote", remote_caller_on_success, remote_caller_on_fail, - ctx); + ctx); __exec_cmd(db, &msg->caller, NULL, msg->cmd, ctx->my_future); return; } @@ -810,6 +810,7 @@ static void key_route_on_success(void *result, void *user) if(n_replica_node>0) { __exec_cmd(ctx->db, &ctx->db->self, replica_nodes+0, ctx->cmd, ctx->future_of_caller); + //__send_cmd(ctx->db, replica_nodes+0, ctx->cmd, ctx->future_of_caller); } if(self_is_a_replica) { @@ -848,6 +849,7 @@ void __on_msg_callback(struct swarmkv_msg *msg, void *arg) int cur_tid=__gettid(db); if(msg->type==MSG_TYPE_CMD) { + //command may be from other thread or other node. assert(cur_tid < db->opts->nr_worker_threads); exec_for_remote(db, msg); swarmkv_msg_free(msg); @@ -856,6 +858,7 @@ void __on_msg_callback(struct swarmkv_msg *msg, void *arg) { if(msg->caller_tid!=cur_tid) { + //The msg's onwership is transfered to swarmkv_mesh_send. swarmkv_mesh_send(db->mesh, cur_tid, msg->caller_tid, msg); } else @@ -895,7 +898,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t * return; } /*Initiating a non-auto-route command without target from local is NOT allowed.*/ - if(!spec->auto_route && is_called_by_self && !target_node) + if(0 && !spec->auto_route && is_called_by_self && !target_node) { reply=swarmkv_reply_new_error(error_no_target_node, spec->name); p=future_to_promise(future_of_caller); @@ -931,7 +934,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t * swarmkv_monitor_record_command(db->mod_monitor, spec->name, timespec_diff_usec(&start, &end)); - if(!is_called_by_self)//Remote call, non-recursive exec + if(0 && !is_called_by_self)//Remote call, non-recursive exec { struct promise *p=future_to_promise(future_of_caller); promise_success(p, reply); @@ -978,6 +981,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t * } return; } + void command_register(struct swarmkv_cmd_spec **table, const char *name, const char *hint, int arity, int key_offset, enum cmd_key_flag flag, enum key_not_found_reply failover, int auto_route, command_proc_func *proc, struct swarmkv_module *module) @@ -1141,9 +1145,9 @@ void command_spec_init(struct swarmkv *db) command_register(&(db->command_table), "CRDT MERGE", "key blob [key blob ...]", 2, 2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE, crdt_merge_command, db->mod_store); - command_register(&(db->command_table), "CRDT JOIN", "key IP:port", + command_register(&(db->command_table), "CRDT MEET", "key IP:port [IP:port ...]", 2, 2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE, - crdt_join_command, db->mod_store); + crdt_meet_command, db->mod_store); command_register(&(db->command_table), "CRDT DEL", "key", 1, 2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE, crdt_del_command, db->mod_store); diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c index e18b15c..afa17fe 100644 --- a/src/swarmkv_keyspace.c +++ b/src/swarmkv_keyspace.c @@ -175,12 +175,38 @@ void key_entry_deletion_notification(struct key_route_entry *key_entry, const no struct crdt_del_ctx *ctx=ALLOC(struct crdt_del_ctx, 1); node_copy(&ctx->peer, &replica->node); ctx->key=sdsdup(key_entry->key); - ctx->f=future_create("_del", crdt_del_on_succ, crdt_del_on_fail, ctx); + ctx->f=future_create("crdt_del", crdt_del_on_succ, crdt_del_on_fail, ctx); exec_cmd(exec_cmd_handle, self, &replica->node, crdt_del_cmd, ctx->f); } swarmkv_cmd_free(crdt_del_cmd); return; } +void key_entry_meet_replica(struct key_route_entry *key_entry, const node_t *self, exec_cmd_func *exec_cmd, struct swarmkv *exec_cmd_handle) +{ + struct replica_node *replica=NULL, *tmp=NULL; + int n_replica=HASH_COUNT(key_entry->hash_replica), i=0; + struct swarmkv_cmd *crdt_meet_cmd=swarmkv_cmd_new(3+n_replica); + crdt_meet_cmd->argv[0]=sdsnew("crdt"); + crdt_meet_cmd->argv[1]=sdsnew("meet"); + crdt_meet_cmd->argv[2]=sdsdup(key_entry->key); + + HASH_ITER(hh, key_entry->hash_replica, replica, tmp) + { + crdt_meet_cmd->argv[3+i]=node_addr2sds(&replica->node); + i++; + } + assert(i==n_replica); + HASH_ITER(hh, key_entry->hash_replica, replica, tmp) + { + struct crdt_del_ctx *ctx=ALLOC(struct crdt_del_ctx, 1); + node_copy(&ctx->peer, &replica->node); + ctx->key=sdsdup(key_entry->key); + ctx->f=future_create("crdt_meet", crdt_del_on_succ, crdt_del_on_fail, ctx); + exec_cmd(exec_cmd_handle, self, &replica->node, crdt_meet_cmd, ctx->f); + } + swarmkv_cmd_free(crdt_meet_cmd); + return; +} struct slot_runtime { @@ -1672,6 +1698,7 @@ enum cmd_exec_result key_route_generic(struct swarmkv_keyspace *ks, enum KEYSPAC HASH_ADD_KEYPTR(hh, slot_rt->keyroute_table, key_entry->key, sdslen(key_entry->key), key_entry); } key_entry_add_replica_node(key_entry, new_replica); + key_entry_meet_replica(key_entry, &ks->self, ks->exec_cmd_func, ks->exec_cmd_handle); *reply=key_entry_list_replica_nodes(key_entry); break; case KEYSPACE_XRADD: @@ -1679,6 +1706,7 @@ enum cmd_exec_result key_route_generic(struct swarmkv_keyspace *ks, enum KEYSPAC { key_entry_add_replica_node(key_entry, new_replica); *reply=key_entry_list_replica_nodes(key_entry); + key_entry_meet_replica(key_entry, &ks->self, ks->exec_cmd_func, ks->exec_cmd_handle); } else { diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c index 047db36..9fca213 100644 --- a/src/swarmkv_store.c +++ b/src/swarmkv_store.c @@ -342,7 +342,12 @@ void store_get_uuid(struct swarmkv_module* mod_store, uuid_t uuid) uuid_copy(uuid, store->my_uuid); return; } - +void store_get_node_addr(struct swarmkv_module* mod_store, node_t *node) +{ + struct swarmkv_store *store=module2store(mod_store); + node_copy(node, &store->self); + return; +} void sobj_need_sync(struct swarmkv_module *mod_store, struct sobj *obj) { struct swarmkv_store *store=module2store(mod_store); @@ -577,7 +582,7 @@ void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id) n_synced++; for(int i=0; i<utarray_len(ctr->replica_node_list); i++) { - printf("Debug: %s synced %s -> %s\n", ctr->obj.key, store->self.addr, ((node_t*)utarray_eltptr(ctr->replica_node_list, i))->addr); +// printf("Debug: %s synced %s -> %s\n", ctr->obj.key, store->self.addr, ((node_t*)utarray_eltptr(ctr->replica_node_list, i))->addr); } if(n_synced>=MAX_SYNC_PER_PERIOD) break; @@ -590,6 +595,8 @@ void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id) ret=sync_master_get_cmd(sync_master, &peer, &cmd); if(!ret) break; crdt_generic_call(store, CRDT_MERGE, cmd, &peer); + printf("Debug: %s synced %s -> %s\n", cmd->argv[2], store->self.addr, peer.addr); + swarmkv_cmd_free(cmd); cmd=NULL; } @@ -791,7 +798,10 @@ enum cmd_exec_result crdt_merge_command(struct swarmkv_module *mod_store, const int __attribute__((__unused__))tid=0; first_key_tid=store_gettid(mod_store, cmd->argv[2]); + node_t self; + store_get_node_addr(mod_store, &self); store_get_uuid(mod_store, uuid); + printf("Merge: %s merge %s -> %s\n", accessing_node->addr, cmd->argv[2], self.addr); for(size_t i=0; i<cmd->argc-2; i+=2) { key=cmd->argv[2+i]; @@ -828,23 +838,31 @@ enum cmd_exec_result crdt_del_command(struct swarmkv_module *mod_store, const st } return FINISHED; } -enum cmd_exec_result crdt_join_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply) +enum cmd_exec_result crdt_meet_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply) { - /* CRDT JOIN <key> IP:port*/ + /* CRDT MEET <key> IP:port [IP:port ...]*/ struct scontainer *ctr=NULL; const sds key=cmd->argv[2]; assert(accessing_node!=NULL);//should never invoked by local node_t replica_node; - node_init_from_sds(&replica_node, cmd->argv[3]); + struct swarmkv_store *store=module2store(mod_store); - assert(node_compare(&replica_node, &store->self)!=0); - printf("CRDT JOIN %s %s <- %s\n", key, store->self.addr, cmd->argv[3]); + //assert(node_compare(&replica_node, &store->self)!=0); + //printf("CRDT MEET %s %s <- %s\n", key, store->self.addr, cmd->argv[3]); ctr=store_lookup_scontainer(store, key); int added=0; if(ctr) { - added=scontainer_add_replica_node(ctr, &replica_node); + for(int i=0; i<cmd->argc-3; i++) + { + node_init_from_sds(&replica_node, cmd->argv[3+i]); + if(node_compare(&replica_node, &store->self)==0) + { + continue; + } + added+=scontainer_add_replica_node(ctr, &replica_node); + } *reply=swarmkv_reply_new_integer(added); } else @@ -910,7 +928,7 @@ enum cmd_exec_result crdt_exists_command(struct swarmkv_module *mod_store, const sds key=cmd->argv[2]; struct sobj *obj=NULL; obj=store_lookup(mod_store, key); - *reply=swarmkv_reply_new_integer(obj?1:0); + *reply=swarmkv_reply_new_integer((obj && obj->type!=OBJ_TYPE_UNDEFINED)?1:0); return FINISHED; } diff --git a/test/swarmkv_gtest.cpp b/test/swarmkv_gtest.cpp index 878ce60..1bf8be3 100644 --- a/test/swarmkv_gtest.cpp +++ b/test/swarmkv_gtest.cpp @@ -1425,7 +1425,7 @@ TEST(CloudNative, AnnounceIPPort) swarmkv_close(db[i]); } } -#define TEST_NODE_NUMBER 8 +#define TEST_NODE_NUMBER 2 class SwarmkvNNodes : public testing::Test { @@ -1449,8 +1449,9 @@ protected: opts[i]=swarmkv_options_new(); swarmkv_options_set_cluster_port(opts[i], 5210+i); swarmkv_options_set_health_check_port(opts[i], 6210+i); - swarmkv_options_set_worker_thread_number(opts[i], 2); + swarmkv_options_set_worker_thread_number(opts[i], 8); swarmkv_options_set_logger(opts[i], logger); + swarmkv_options_set_cluster_timeout_us(opts[i], 30*1000*1000); db[i]=swarmkv_open(opts[i], cluster_name, &err); if(err) { @@ -1504,7 +1505,14 @@ TEST_F(SwarmkvNNodes, SET_GET) swarmkv_reply_free(reply); wait_for_sync(); - //sleep(10); + sleep(2); + for(i=0; i<TEST_NODE_NUMBER; i++) + { + reply=swarmkv_command_on(db[i], swarmkv_self_address(db[i]), "CRDT EXISTS %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 1); + swarmkv_reply_free(reply); + } for(i=0; i<TEST_NODE_NUMBER; i++) { reply=swarmkv_command(db[i], "GET %s", key); |
