diff options
| author | Zheng Chao <[email protected]> | 2023-07-11 14:12:59 +0800 |
|---|---|---|
| committer | Zheng Chao <[email protected]> | 2023-07-11 14:12:59 +0800 |
| commit | 27da04165b6e36d78dd2a4dfd2bb4962af919d6b (patch) | |
| tree | 46b4c3da7206a6a5a79b3ca71fd18c7dd303684d | |
| parent | ed1292eeda196dfacf2b62c69ce5af19c2690101 (diff) | |
WIP
| -rw-r--r-- | CMakeLists.txt | 2 | ||||
| -rw-r--r-- | docs/design.md | 2 | ||||
| -rw-r--r-- | examples/CMakeLists.txt | 4 | ||||
| -rw-r--r-- | src/future_promise.c | 2 | ||||
| -rw-r--r-- | src/inc_internal/future_promise.h | 4 | ||||
| -rw-r--r-- | src/inc_internal/swarmkv_cmd_spec.h | 11 | ||||
| -rw-r--r-- | src/inc_internal/swarmkv_common.h | 4 | ||||
| -rw-r--r-- | src/inc_internal/swarmkv_keyspace.h | 1 | ||||
| -rw-r--r-- | src/inc_internal/swarmkv_mesh.h | 3 | ||||
| -rw-r--r-- | src/inc_internal/swarmkv_rpc.h | 4 | ||||
| -rw-r--r-- | src/inc_internal/swarmkv_store.h | 8 | ||||
| -rw-r--r-- | src/swarmkv.c | 193 | ||||
| -rw-r--r-- | src/swarmkv_keyspace.c | 115 | ||||
| -rw-r--r-- | src/swarmkv_net.c | 2 | ||||
| -rw-r--r-- | src/swarmkv_rpc.c | 10 | ||||
| -rw-r--r-- | src/swarmkv_store.c | 341 | ||||
| -rw-r--r-- | src/t_hash.c | 46 | ||||
| -rw-r--r-- | src/t_set.c | 16 | ||||
| -rw-r--r-- | src/t_string.c | 6 | ||||
| -rw-r--r-- | src/t_token_bucket.c | 14 |
20 files changed, 423 insertions, 365 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 5e6f7fc..14074a3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,7 +10,7 @@ include(Version) set(CMAKE_C_FLAGS "-std=gnu99 -Wall") set(CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS} -Wall) -set(SWARMKV_DEPEND_DYN_LIB pthread uuid) +set(SWARMKV_DEPEND_DYN_LIB pthread uuid m) include_directories(${PROJECT_SOURCE_DIR}/inc/) include_directories(/opt/MESA/include/) diff --git a/docs/design.md b/docs/design.md index e191425..4f85273 100644 --- a/docs/design.md +++ b/docs/design.md @@ -211,7 +211,7 @@ Reply message A swarmkv instance has one key space thread and several (configurable) worker threads. Swarmkv uses [libevent](https://github.com/libevent/libevent) as a peer-to-peer communication infrastructure. -The keyspace thread communicates with Consul. +The keyspace thread communicates with Consul. Some of the operations are blocking style, so swarmkv uses a seperated keyspace thread. - Watch global slot table changes. - Watch leadership changes, run for leader if allowed. diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 865dcd1..417f4cf 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -1,5 +1,5 @@ include_directories(${PROJECT_SOURCE_DIR}include/) add_executable(simple_example simple_example.c) add_executable(async_example async_example.c) -target_link_libraries(simple_example swarmkv-static pthread uuid) -target_link_libraries(async_example swarmkv-static pthread uuid)
\ No newline at end of file +target_link_libraries(simple_example swarmkv-static ${SWARMKV_DEPEND_DYN_LIB}) +target_link_libraries(async_example swarmkv-static ${SWARMKV_DEPEND_DYN_LIB})
\ No newline at end of file diff --git a/src/future_promise.c b/src/future_promise.c index 0581ae3..bf27351 100644 --- a/src/future_promise.c +++ b/src/future_promise.c @@ -115,7 +115,7 @@ void promise_failed(struct promise * p, enum e_future_error error, const char * return; } -void promise_success(struct promise * p, const void * result) +void promise_success(struct promise * p, void * result) { if(!p->f.is_canceled) { diff --git a/src/inc_internal/future_promise.h b/src/inc_internal/future_promise.h index 0b70c37..a2c0d0d 100644 --- a/src/inc_internal/future_promise.h +++ b/src/inc_internal/future_promise.h @@ -13,7 +13,7 @@ enum e_future_error struct promise; struct future; -typedef void (future_success_cb)(const void *result, void * user); +typedef void (future_success_cb)(void *result, void * user); typedef void (future_failed_cb)(enum e_future_error err, const char * what, void * user); typedef void (promise_ctx_destroy_cb)(void* ctx); @@ -31,7 +31,7 @@ void future_destroy(struct future * f); */ struct promise * future_to_promise(struct future * f); void promise_failed(struct promise * p, enum e_future_error error, const char * what); -void promise_success(struct promise * p, const void * result); +void promise_success(struct promise * p, void * result); void promise_finish(struct promise * p); void promise_allow_many_successes(struct promise *p); diff --git a/src/inc_internal/swarmkv_cmd_spec.h b/src/inc_internal/swarmkv_cmd_spec.h index c8ae08a..6a823a8 100644 --- a/src/inc_internal/swarmkv_cmd_spec.h +++ b/src/inc_internal/swarmkv_cmd_spec.h @@ -34,11 +34,10 @@ typedef int swarmkv_module_gettid_func_t(struct swarmkv_module *mod, sds key); struct swarmkv_module { char name[SWARMKV_SYMBOL_MAX]; - swarmkv_module_gettid_func_t *gettid; void *mod_ctx; }; -enum cmd_keyroute_failover +enum key_not_found_reply { REPLY_NA, /*Not Applicable*/ REPLY_INT_0, @@ -51,9 +50,9 @@ enum cmd_keyroute_failover typedef enum cmd_exec_result command_proc_func(struct swarmkv_module *module, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply); -#define KEY_OFFSET_NONE -1 -#define KEY_OFFSET_ANY -2 - +#define KEY_OFFSET_NONE -1 +#define KEY_OFFSET_TID -2 +#define KEY_OFFSET_SLOTID -3 struct swarmkv_cmd_spec { const char *name; @@ -61,7 +60,7 @@ struct swarmkv_cmd_spec int arity; /* Number of arguments, it is possible to use -N to say >= N */ int key_offset; /* The argument that's a key (-1 = no keys) */ enum cmd_key_flag flag; /* Command flag, see CMD_*. */ - enum cmd_keyroute_failover failover; + enum key_not_found_reply nokey_reply; int auto_route; /* The node that executes the command must be explicitly specifeid. */ command_proc_func *proc; diff --git a/src/inc_internal/swarmkv_common.h b/src/inc_internal/swarmkv_common.h index 5465690..3e9e11a 100644 --- a/src/inc_internal/swarmkv_common.h +++ b/src/inc_internal/swarmkv_common.h @@ -124,4 +124,6 @@ void node_to_sockaddr(const node_t *node, struct sockaddr *sockaddr); int http_blocking_request(enum evhttp_cmd_type type, const char* host, unsigned int port, const char* url, sds req_body, sds *resp_body); typedef void exec_cmd_func(struct swarmkv *db, const node_t *accessing_node, const node_t *target_node, const struct swarmkv_cmd *cmd, struct future *future_of_caller); -struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *target, int argc, sds *argv);
\ No newline at end of file +struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *target, int argc, sds *argv); + +int __gettid(struct swarmkv *db);
\ No newline at end of file diff --git a/src/inc_internal/swarmkv_keyspace.h b/src/inc_internal/swarmkv_keyspace.h index ef0c6b2..3d644fe 100644 --- a/src/inc_internal/swarmkv_keyspace.h +++ b/src/inc_internal/swarmkv_keyspace.h @@ -10,6 +10,7 @@ struct swarmkv_module *swarmkv_keyspace_new(struct swarmkv_options *opts, const char *db_name, struct log_handle *logger, char **err); void swarmkv_keyspace_free(struct swarmkv_module* mod_keyspace); +void swarmkv_keyspace_periodic(struct swarmkv_module *mod_keyspace, int thread_id); struct keyspace_info { unsigned int health_check_port; diff --git a/src/inc_internal/swarmkv_mesh.h b/src/inc_internal/swarmkv_mesh.h index 975bbff..4a6f04c 100644 --- a/src/inc_internal/swarmkv_mesh.h +++ b/src/inc_internal/swarmkv_mesh.h @@ -4,4 +4,5 @@ struct swarmkv_mesh; int swarmkv_mesh_send(struct swarmkv_mesh *mesh, int current_thread_id, int dest_thread_id, struct swarmkv_msg *msg); void swarmkv_mesh_set_on_msg_recv_cb(struct swarmkv_mesh *mesh, on_msg_callback_t *cb_func, void *cb_arg); -struct swarmkv_mesh *swarmkv_mesh_new(struct event_base *evbase[], int nthreads, struct log_handle *logger);
\ No newline at end of file +struct swarmkv_mesh *swarmkv_mesh_new(struct event_base *evbase[], int nthreads, struct log_handle *logger); +void swarmkv_mesh_free(struct swarmkv_mesh *mesh);
\ No newline at end of file diff --git a/src/inc_internal/swarmkv_rpc.h b/src/inc_internal/swarmkv_rpc.h index 3ee47b6..de3ca13 100644 --- a/src/inc_internal/swarmkv_rpc.h +++ b/src/inc_internal/swarmkv_rpc.h @@ -4,8 +4,8 @@ #include "future_promise.h" struct swarmkv_rpc_mgr; -struct swarmkv_rpc_mgr *swarmkv_rpc_mgr_new(); +struct swarmkv_rpc_mgr *swarmkv_rpc_mgr_new(const struct swarmkv_options *opts, struct event_base *evbases[], int nr_worker_threads); void swarmkv_rpc_mgr_free(struct swarmkv_rpc_mgr *mgr); //Return a sequence number, which can be used to complete the request long long swarmkv_rpc_launch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct future *f); -void swarmkv_rpc_complete(struct swarmkv_rpc_mgr *mgr, int thread_id, long long sequence, const void *response); +void swarmkv_rpc_complete(struct swarmkv_rpc_mgr *mgr, int thread_id, long long sequence, void *response); diff --git a/src/inc_internal/swarmkv_store.h b/src/inc_internal/swarmkv_store.h index 8338623..241168d 100644 --- a/src/inc_internal/swarmkv_store.h +++ b/src/inc_internal/swarmkv_store.h @@ -46,9 +46,8 @@ struct sobj struct swarmkv_module *swarmkv_store_new(const struct swarmkv_options *opts, exec_cmd_func *send_cmd, void *handle_send_cmd); void swarmkv_store_free(struct swarmkv_module* mod_store); -void swarmkv_store_periodic(struct swarmkv_module *module); +void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id); void swarmkv_store_set_monitor_handle(struct swarmkv_module *mod_store, struct swarmkv_module *mod_monitor); -void swarmkv_store_add_key(struct swarmkv_module *mod_store, const sds key, node_t *replica_nodes, size_t n_replica_node); struct store_info { long long keys; @@ -62,13 +61,14 @@ void swarmkv_store_info(struct swarmkv_module *module, struct store_info *info); void sobj_need_sync(struct swarmkv_module *mod_store, struct sobj *obj); int sobj_get_random_replica(struct sobj *obj, node_t *out); +enum cmd_exec_result handle_undefined_object(struct sobj *obj, struct swarmkv_reply **reply); void store_get_uuid(struct swarmkv_module* mod_store, uuid_t uuid); struct sobj *store_lookup(struct swarmkv_module* mod_store, sds key); -enum cmd_exec_result crdt_pull_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply); -enum cmd_exec_result crdt_push_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply); +enum cmd_exec_result crdt_get_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply); +enum cmd_exec_result crdt_merge_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply); enum cmd_exec_result crdt_del_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply); enum cmd_exec_result crdt_join_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply); enum cmd_exec_result crdt_keys_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply); diff --git a/src/swarmkv.c b/src/swarmkv.c index e8001e9..f552a29 100644 --- a/src/swarmkv.c +++ b/src/swarmkv.c @@ -69,7 +69,7 @@ struct swarmkv struct event_base **evbases; node_t self; - struct swarmkv_rpc_mgr *async; + struct swarmkv_rpc_mgr *rpc_mgr; struct swarmkv_mesh *mesh; struct swarmkv_net *net; @@ -119,7 +119,7 @@ struct local_caller_ctx void * cb_arg; struct future *my_future; }; -static void local_caller_on_success(const void *result, void *user) +static void local_caller_on_success(void *result, void *user) { struct local_caller_ctx *ctx=(struct local_caller_ctx*)user; const struct swarmkv_reply *reply=(const struct swarmkv_reply *)result; @@ -165,7 +165,7 @@ void remote_calller_ctx_free(struct remote_caller_ctx *ctx) free(ctx); return; } -static void remote_caller_on_success(const void *result, void *user) +static void remote_caller_on_success(void *result, void *user) { struct remote_caller_ctx *ctx=(struct remote_caller_ctx*)user; const struct swarmkv_reply *reply=(const struct swarmkv_reply*) result; @@ -560,10 +560,10 @@ void swarmkv_threads_run(struct swarmkv *db) return; } -static struct swarmkv_reply *keyroute_fail_reply(enum cmd_keyroute_failover failover_flag) +static struct swarmkv_reply *key_not_found_reply(enum key_not_found_reply not_found_flag) { struct swarmkv_reply *reply=NULL; - switch(failover_flag) + switch(not_found_flag) { case REPLY_INT_0: reply=swarmkv_reply_new_integer(0); @@ -663,37 +663,42 @@ int is_sufficient_arg_num(const struct swarmkv_cmd_spec *spec, const struct swar struct cmd_ctx { struct swarmkv *db; - struct swarmkv_cmd_spec *spec; struct swarmkv_cmd *cmd; int redirect_cnt; - int thread_id; - long long sequence; - struct future *future_of_caller; struct future *future_of_mine; + struct future *future_of_caller; }; -struct cmd_ctx *cmd_ctx_new(struct swarmkv *db, const struct swarmkv_cmd *cmd, struct swarmkv_cmd_spec *spec, long long sequence) +struct cmd_ctx *cmd_ctx_new(struct swarmkv *db, const struct swarmkv_cmd *cmd, struct future *f) { struct cmd_ctx *ctx=ALLOC(struct cmd_ctx, 1); - ctx->db=db; - ctx->spec=spec; ctx->cmd=swarmkv_cmd_dup(cmd); ctx->redirect_cnt=0; - ctx->sequence=sequence; + ctx->future_of_caller=f; return ctx; } void cmd_ctx_free(struct cmd_ctx *ctx) { swarmkv_cmd_free(ctx->cmd); - if(ctx->future_of_mine) future_destroy(ctx->future_of_mine); + future_destroy(ctx->future_of_mine); free(ctx); return; } -static void peer_exec_on_success(const void *result, void *user) +static void generic_on_fail(enum e_future_error err, const char * what, void * user) +{ + struct cmd_ctx *ctx=(struct cmd_ctx*)user; + if(ctx->future_of_caller) + { + struct promise *p=future_to_promise(ctx->future_of_caller); + promise_failed(p, FUTURE_ERROR_EXCEPTION, what); + } + cmd_ctx_free(ctx); +} +static void peer_exec_on_success(void *result, void *user) { struct cmd_ctx *ctx = (struct cmd_ctx*)user; - const struct swarmkv_reply *reply = (const struct swarmkv_reply*) result; + struct swarmkv_reply *reply = (struct swarmkv_reply*) result; if(reply->type==SWARMKV_REPLY_NODE && 0==strncasecmp(reply->str, "-ASK", 4)) { @@ -706,9 +711,9 @@ static void peer_exec_on_success(const void *result, void *user) } else { - struct promise *p=future_to_promise(ctx->future_of_caller); char err_msg[256]; snprintf(err_msg, sizeof(err_msg), error_too_many_redirects, reply->str); + struct promise *p=future_to_promise(ctx->future_of_caller); promise_failed(p, FUTURE_ERROR_EXCEPTION, err_msg); cmd_ctx_free(ctx); } @@ -716,68 +721,94 @@ static void peer_exec_on_success(const void *result, void *user) else { struct promise *p=future_to_promise(ctx->future_of_caller); - promise_success(p, reply); + promise_success(p, (void*) reply); cmd_ctx_free(ctx); } } - -static void key_route_on_success(const void *result, void *user) +struct swarmkv_cmd *make_crdt_add_cmd(enum cmd_key_flag flag, const sds key, node_t replica[], size_t n_replica) +{ + struct swarmkv_cmd *crdt_add_cmd=NULL; + assert(flag==CMD_KEY_OW); + crdt_add_cmd=swarmkv_cmd_new(3+n_replica); + crdt_add_cmd->argv[0]=sdsnew("crdt"); + crdt_add_cmd->argv[1]=sdsnew("add"); + crdt_add_cmd->argv[2]=sdsdup(key); + for(size_t i=0; i<n_replica; i++) + { + crdt_add_cmd->argv[3+i]=node_addr2sds(replica+i); + } + return crdt_add_cmd; +} +static void crdt_add_on_success(void *result, void *user) +{ + struct cmd_ctx *ctx = (struct cmd_ctx*)user; + if(ctx->future_of_caller) + { + __exec_cmd(ctx->db, NULL, NULL, ctx->cmd, ctx->future_of_caller); + } + cmd_ctx_free(ctx); +} +static void key_route_on_success(void *result, void *user) { struct cmd_ctx *ctx = (struct cmd_ctx*)user; const struct swarmkv_reply *reply = (const struct swarmkv_reply*) result; struct swarmkv_reply *user_reply_for_keyspace_not_found=NULL; size_t n_replica_node=0; node_t *replica_nodes=NULL; + struct swarmkv_cmd_spec *spec=get_spec_by_argv(ctx->db, ctx->cmd->argc, ctx->cmd->argv); node_list_new_from_reply(&replica_nodes, &n_replica_node, reply); if(n_replica_node==0) { - user_reply_for_keyspace_not_found=keyroute_fail_reply(ctx->spec->failover); - swarmkv_rpc_complete(ctx->db->async, ctx->thread_id, ctx->sequence, user_reply_for_keyspace_not_found); + user_reply_for_keyspace_not_found=key_not_found_reply(spec->nokey_reply); + struct promise *p=future_to_promise(ctx->future_of_caller); + promise_success(p, user_reply_for_keyspace_not_found); swarmkv_reply_free(user_reply_for_keyspace_not_found); user_reply_for_keyspace_not_found=NULL; } else - { - const sds key=ctx->cmd->argv[ctx->spec->key_offset]; + { + const sds key=ctx->cmd->argv[spec->key_offset]; int self_is_a_replica=node_list_exists(replica_nodes, n_replica_node, &ctx->db->self); if(self_is_a_replica) { n_replica_node-=node_list_remove(replica_nodes, n_replica_node, &ctx->db->self); - swarmkv_store_add_key(ctx->db->mod_store, key, replica_nodes, n_replica_node); +// swarmkv_store_add_key(ctx->db->mod_store, key, replica_nodes, n_replica_node); } - if(self_is_a_replica && n_replica_node==0) + if(n_replica_node>0) { - __exec_cmd(ctx->db, NULL, NULL, ctx->cmd, ctx->future_of_caller); + __exec_cmd(ctx->db, NULL, replica_nodes+0, ctx->cmd, ctx->future_of_caller); } - else + if(self_is_a_replica) { - __exec_cmd(ctx->db, NULL, replica_nodes+0, ctx->cmd, ctx->future_of_caller); - + struct cmd_ctx *crdt_add_ctx=NULL; + struct swarmkv_cmd *crdt_add_cmd=make_crdt_add_cmd(spec->flag, key, replica_nodes, n_replica_node); + crdt_add_ctx=cmd_ctx_new(ctx->db, ctx->cmd, n_replica_node>0?NULL:ctx->future_of_caller); + crdt_add_ctx->future_of_mine=future_create("crdt_add", crdt_add_on_success, generic_on_fail, crdt_add_ctx); + __exec_cmd(ctx->db, NULL, NULL, crdt_add_cmd, crdt_add_ctx->future_of_mine); + } - free(replica_nodes); + free(replica_nodes); } cmd_ctx_free(ctx); } -static void generic_on_fail(enum e_future_error err, const char * what, void * user) -{ - struct cmd_ctx *ctx=(struct cmd_ctx*)user; - struct promise *p=future_to_promise(ctx->future_of_caller); - promise_failed(p, err, what); - ctx->future_of_caller=NULL; - cmd_ctx_free(ctx); -} -static int spec_gettid(struct swarmkv_cmd_spec *spec, const struct swarmkv_cmd *cmd) +static int spec_gettid(struct swarmkv_cmd_spec *spec, const struct swarmkv_cmd *cmd, int nr_worker_threads) { - if(spec->key_offset<0) - { - return -1; - } - else + int tid=0; + switch(spec->key_offset) { - return spec->module->gettid(spec->module, cmd->argv[spec->key_offset]); + case KEY_OFFSET_TID: + tid=atoi(cmd->argv[2]); + break; + case KEY_OFFSET_SLOTID: + tid=atoi(cmd->argv[2])%nr_worker_threads; + break; + default: + tid=key_hash_slot(cmd->argv[spec->key_offset], sdslen(cmd->argv[spec->key_offset]))%nr_worker_threads; + break; } + return tid; } void on_msg_callback(struct swarmkv_msg *msg, void *arg) { @@ -795,7 +826,7 @@ void on_msg_callback(struct swarmkv_msg *msg, void *arg) } else { - swarmkv_rpc_complete(db->async, thread_id, msg->sequence, msg->reply); + swarmkv_rpc_complete(db->rpc_mgr, thread_id, msg->sequence, msg->reply); } } } @@ -832,12 +863,13 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t * swarmkv_reply_free(reply); return; } - int target_thread_id=spec_gettid(spec, cmd); - long long sequence=swarmkv_rpc_launch(db->async, current_thread_id, future_of_caller); + int target_thread_id=spec_gettid(spec, cmd, db->opts->nr_worker_threads); + struct swarmkv_msg *msg=NULL; if(target_node && node_compare(&db->self, target_node)) { //cmd is executed in target node's on_msg_callback + long long sequence=swarmkv_rpc_launch(db->rpc_mgr, current_thread_id, future_of_caller); msg=swarmkv_msg_new_by_cmd(cmd, target_node, current_thread_id, sequence); swarmkv_net_send(db->net, target_node, msg); return; @@ -845,6 +877,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t * if(current_thread_id != target_thread_id) { //cmd is executed in target thread's on_msg_callback + long long sequence=swarmkv_rpc_launch(db->rpc_mgr, current_thread_id, future_of_caller); msg=swarmkv_msg_new_by_cmd(cmd, &db->self, current_thread_id, sequence); swarmkv_mesh_send(db->mesh, current_thread_id, target_thread_id, msg); return; @@ -861,20 +894,22 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t * if(accessing_node)//Remote call, non-recursive exec { - swarmkv_rpc_complete(db->async, current_thread_id, sequence, reply); + struct promise *p=future_to_promise(future_of_caller); + promise_success(p, reply); swarmkv_reply_free(reply); } switch(exec_ret) { case FINISHED: { - swarmkv_rpc_complete(db->async, current_thread_id, sequence, reply); + struct promise *p=future_to_promise(future_of_caller); + promise_success(p, reply); swarmkv_reply_free(reply); break; } case REDIRECT: { - struct cmd_ctx *ctx=cmd_ctx_new(db, cmd, spec, sequence); + struct cmd_ctx *ctx=cmd_ctx_new(db, cmd, future_of_caller); node_init_from_reply(&peer, reply); ctx->future_of_mine=future_create("peer_exec", peer_exec_on_success, generic_on_fail, ctx); __exec_cmd(db, NULL, &peer, cmd, ctx->future_of_mine); @@ -883,7 +918,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t * case NEED_KEY_ROUTE: { struct swarmkv_cmd *keyspace_cmd=make_keyroute_cmd(spec->flag, cmd->argv[spec->key_offset], db->opts->dryrun? NULL:(&db->self)); - struct cmd_ctx *ctx=cmd_ctx_new(db, cmd, spec, sequence); + struct cmd_ctx *ctx=cmd_ctx_new(db, cmd, future_of_caller); ctx->future_of_mine=future_create("key_route", key_route_on_success, generic_on_fail, ctx); __exec_cmd(db, NULL, NULL, keyspace_cmd, ctx->future_of_mine); swarmkv_cmd_free(keyspace_cmd); @@ -900,7 +935,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t * return; } void command_register(struct swarmkv_cmd_spec **table, const char *name, const char *hint, - int arity, int key_offset, enum cmd_key_flag flag, enum cmd_keyroute_failover failover, int auto_route, + int arity, int key_offset, enum cmd_key_flag flag, enum key_not_found_reply failover, int auto_route, command_proc_func *proc, struct swarmkv_module *module) { struct swarmkv_cmd_spec *spec=NULL; @@ -911,7 +946,7 @@ void command_register(struct swarmkv_cmd_spec **table, const char *name, const c spec->arity=arity; spec->key_offset=key_offset; spec->flag=flag; - spec->failover=failover; + spec->nokey_reply=failover; spec->proc=proc; spec->auto_route=auto_route; @@ -953,7 +988,7 @@ void command_spec_init(struct swarmkv *db) 1, 1, CMD_KEY_RW, REPLY_INT_0, AUTO_ROUTE, persist_command, db->mod_keyspace); command_register(&(db->command_table), "TYPE", "key", - 1, 1, CMD_KEY_RW, REPLY_STR_NONE, AUTO_ROUTE, + 1, 1, CMD_KEY_RO, REPLY_STR_NONE, AUTO_ROUTE, type_command, db->mod_store); command_register(&(db->command_table), "KEYSLOT", "key", 1, 1, CMD_KEY_RO, REPLY_ERROR, AUTO_ROUTE, @@ -1052,24 +1087,21 @@ void command_spec_init(struct swarmkv *db) 1, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE, latency_command, db->mod_monitor); - - - /* low-level state-based CRDT synchronization commands*/ - command_register(&(db->command_table), "CRDT PULL", "key", + command_register(&(db->command_table), "CRDT GET", "key", 1, 2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE, - crdt_pull_command, db->mod_store); - command_register(&(db->command_table), "CRDT PUSH", "key blob [key blob ...]", + crdt_get_command, db->mod_store); + command_register(&(db->command_table), "CRDT MERGE", "key blob [key blob ...]", 2, 2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE, - crdt_push_command, db->mod_store); + crdt_merge_command, db->mod_store); command_register(&(db->command_table), "CRDT JOIN", "key IP:port", 2, 2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE, crdt_join_command, db->mod_store); command_register(&(db->command_table), "CRDT DEL", "key", 1, 2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE, crdt_del_command, db->mod_store); - command_register(&(db->command_table), "CRDT KEYS", "pattern", - 1, -2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE, + command_register(&(db->command_table), "CRDT KEYS", "tid pattern", + 1, KEY_OFFSET_TID, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE, crdt_keys_command, db->mod_store); command_register(&(db->command_table), "CRDT EXISTS", "key", 1, 2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE, @@ -1078,7 +1110,7 @@ void command_spec_init(struct swarmkv *db) 1, 2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE, crdt_rlist_command, db->mod_store); command_register(&(db->command_table), "CRDT INFO", "key", - 1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, NOT_AUTO_ROUTE, + 1, 1, CMD_KEY_NA, REPLY_EMPTY_ARRAY, NOT_AUTO_ROUTE, crdt_info_command, db->mod_store); /* low-level keyspace operation commands */ @@ -1092,28 +1124,28 @@ void command_spec_init(struct swarmkv *db) 1, 2, CMD_KEY_OW, REPLY_NA, AUTO_ROUTE, keyspace_xradd_command, db->mod_keyspace); command_register(&(db->command_table), "KEYSPACE KEYS", "tid pattern",//worker-thread-id - 1, KEY_OFFSET_NONE, CMD_KEY_RO, REPLY_NA, NOT_AUTO_ROUTE, + 1, KEY_OFFSET_TID, CMD_KEY_RO, REPLY_NA, NOT_AUTO_ROUTE, keyspace_keys_command, db->mod_keyspace); command_register(&(db->command_table), "KEYSPACE RDEL", "key IP:port", - 1, 2, CMD_KEY_RO, REPLY_NA, AUTO_ROUTE, + 1, 2, CMD_KEY_RW, REPLY_NA, AUTO_ROUTE, keyspace_rdel_command, db->mod_keyspace); /* low-level keyspace reorgnization commands */ - command_register(&(db->command_table), "KEYSPACE SETSLOT", "<slot> IMPORTING|MIGRATING|NODE|STABLE IP:port", - 2, 2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE, + command_register(&(db->command_table), "KEYSPACE SETSLOT", "slot IMPORTING|MIGRATING|NODE|STABLE IP:port", + 2, KEY_OFFSET_SLOTID, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE, keyspace_setslot_command, db->mod_keyspace); command_register(&(db->command_table), "KEYSPACE GETKEYSINSLOT", "slot", - 1, 2, CMD_KEY_RO, REPLY_NA, NOT_AUTO_ROUTE, + 1, KEY_OFFSET_SLOTID, CMD_KEY_RO, REPLY_NA, NOT_AUTO_ROUTE, keyspace_getkeysinslot_command, db->mod_keyspace); command_register(&(db->command_table), "KEYSPACE ADDKEYSTOSLOT", "slot blob", - 2, 2, CMD_KEY_OW, REPLY_NA, NOT_AUTO_ROUTE, + 2, KEY_OFFSET_SLOTID, CMD_KEY_OW, REPLY_NA, NOT_AUTO_ROUTE, keyspace_addkeystoslot_command, db->mod_keyspace); command_register(&(db->command_table), "KEYSPACE DELSLOTKEYS", "slot", - 1, 2, CMD_KEY_RM, REPLY_NA, NOT_AUTO_ROUTE, + 1, KEY_OFFSET_SLOTID, CMD_KEY_RM, REPLY_NA, NOT_AUTO_ROUTE, keyspace_delslotkeys_command, db->mod_keyspace); command_register(&(db->command_table), "KEYSPACE COUNTKEYSINSLOT", "slot", - 1, 2, CMD_KEY_RO, REPLY_NA, NOT_AUTO_ROUTE, + 1, KEY_OFFSET_SLOTID, CMD_KEY_RO, REPLY_NA, NOT_AUTO_ROUTE, keyspace_countkeysinslot_command, db->mod_keyspace); /* cluster commands are defined in swarmkv-cli.c */ @@ -1221,9 +1253,11 @@ void swarmkv_dispatch(struct swarmkv *db) event_base_loop(db->evbases[tid], EVLOOP_ONCE|EVLOOP_NONBLOCK); return; } -void wrap_store_periodic(evutil_socket_t fd, short what, void * arg) +void __swarmkv_periodic(evutil_socket_t fd, short what, void * arg) { - swarmkv_store_periodic((struct swarmkv_module *)arg); + struct swarmkv_thread_ctx *thread=(struct swarmkv_thread_ctx *)arg; + swarmkv_store_periodic(thread->db->mod_store, thread->thread_id); + swarmkv_keyspace_periodic(thread->db->mod_keyspace, thread->thread_id); } struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name, char **err) { @@ -1266,7 +1300,7 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name, { db->evbases[i]=event_base_new(); } - + db->rpc_mgr=swarmkv_rpc_mgr_new(db->opts, db->evbases, db->opts->nr_worker_threads); db->mesh=swarmkv_mesh_new(db->evbases, db->opts->nr_worker_threads + db->opts->nr_caller_threads, db->logger); db->mod_monitor=swarmkv_monitor_new(db->opts); @@ -1297,7 +1331,7 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name, struct timeval sync_interval = {db->opts->sync_interval_us/(1000*1000), db->opts->sync_interval_us%(1000*1000)}; for(int i=0; i<db->opts->nr_worker_threads; i++) { - db->worker_threads[i].store_periodic_ev=event_new(db->evbases[i], -1, EV_PERSIST, wrap_store_periodic, db->mod_store); + db->worker_threads[i].store_periodic_ev=event_new(db->evbases[i], -1, EV_PERSIST, __swarmkv_periodic, db->worker_threads+i); evtimer_add(db->worker_threads[i].store_periodic_ev, &sync_interval); } @@ -1352,7 +1386,8 @@ void swarmkv_close(struct swarmkv * db) db->mod_keyspace=NULL; swarmkv_monitor_free(db->mod_monitor); db->mod_monitor=NULL; - + swarmkv_mesh_free(db->mesh); + swarmkv_rpc_mgr_free(db->rpc_mgr); swarmkv_net_free(db->net); db->net=NULL; diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c index c8eedfd..e00a973 100644 --- a/src/swarmkv_keyspace.c +++ b/src/swarmkv_keyspace.c @@ -142,7 +142,7 @@ void crdt_del_ctx_free(struct crdt_del_ctx *ctx) free(ctx); } -static void crdt_del_on_succ(void* result, void* user) +static void crdt_del_on_succ(void *result, void *user) { struct crdt_del_ctx *ctx = (struct crdt_del_ctx*)user; @@ -176,7 +176,7 @@ void key_entry_deletion_notification(struct key_route_entry *key_entry, exec_cmd node_copy(&ctx->peer, &replica->node); ctx->key=sdsdup(key_entry->key); ctx->f=future_create("key_del", crdt_del_on_succ, crdt_del_on_fail, ctx); - exec_cmd(exec_cmd_handle, crdt_del_cmd, NULL, &replica->node, ctx->f); + exec_cmd(exec_cmd_handle, NULL, &replica->node, crdt_del_cmd, ctx->f); } swarmkv_cmd_free(crdt_del_cmd); return; @@ -192,7 +192,7 @@ struct slot_runtime struct timeouts *expires; int I_am_owner; - pthread_mutex_t mutex; + pthread_mutex_t sanity_lock; }; void http_connection_close_callback(struct evhttp_connection *conn, void *arg) { @@ -210,7 +210,7 @@ struct swarmkv_keyspace int readable_tid; node_t self; uuid_t uuid; - pthread_rwlock_t rwlock; + pthread_rwlock_t rwlock; struct slot_runtime slot_rts[KEYSPACE_SLOT_NUM]; char consul_agent_host[MAX_IPV4_ADDR_LEN]; @@ -491,32 +491,11 @@ int consul_service_register(struct swarmkv_keyspace* ks) cJSON_Delete(service); return ret; } -static void __ks_dummy_event_handler(evutil_socket_t fd, short what, void * arg) -{ - struct swarmkv_keyspace *ks=(struct swarmkv_keyspace *)arg; - keyspace_active_expire_cycle(ks); - return; -} - -void *swarmkv_keyspace_thread(void *arg) -{ - struct swarmkv_keyspace *ks=(struct swarmkv_keyspace *)arg; - char thread_name[16]; - snprintf(thread_name, sizeof(thread_name), "swarmkv-ks"); - prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL); - struct event * ev = event_new(ks->evbase, -1, EV_PERSIST, __ks_dummy_event_handler, ks); - struct timeval timer_delay = {2, 0}; - evtimer_add(ev, &timer_delay); - event_base_dispatch(ks->evbase); - event_del(ev); - event_free(ev); - return NULL; -} void consul_session_create_on_success(void *result, void *arg) { struct evhttp_request *req=(struct evhttp_request *)result; - struct swarmkv_keyspace* ks = (struct swarmkv_keyspace *)arg; + struct swarmkv_keyspace *ks = (struct swarmkv_keyspace *)arg; int resp_code=0; cJSON* session_id=NULL, *session_create_response=NULL; @@ -662,7 +641,7 @@ void consul_session_check_async(struct swarmkv_keyspace* ks) return; } -void acquire_session_lock_on_success(void* result, void *arg) +void acquire_session_lock_on_success(void *result, void *arg) { struct evhttp_request *req=(struct evhttp_request *)result; struct swarmkv_keyspace* ks=(struct swarmkv_keyspace*)arg; @@ -702,7 +681,7 @@ void acquire_session_lock_on_fail(enum e_future_error err, const char * what, vo consul_watch_leadership_changes_async(ks); return; } -void consul_acquire_session_lock_async(struct swarmkv_keyspace* ks) +void consul_acquire_session_lock_async(struct swarmkv_keyspace *ks) { sds req_body=node_print_json(&ks->self, ks->uuid); @@ -727,7 +706,7 @@ void consul_acquire_session_lock_async(struct swarmkv_keyspace* ks) sdsfree(req_body); } -void consul_run_for_leader_async(struct swarmkv_keyspace* ks) +void consul_run_for_leader_async(struct swarmkv_keyspace *ks) { if(0==strlen(ks->consul_session_id)) { @@ -853,7 +832,7 @@ void watch_slots_changes_on_success(void *result, void *arg) slot_rt=ks->slot_rts+i; slot=&(slot_rt->slot); owner=&(slot->owner); - pthread_mutex_lock(&slot_rt->mutex); + pthread_mutex_lock(&slot_rt->sanity_lock); if(0!=node_compare(owner, &(new_slots[i].owner))) { node_copy(owner, &(new_slots[i].owner)); @@ -873,7 +852,7 @@ void watch_slots_changes_on_success(void *result, void *arg) { slot_rt->I_am_owner=0; } - pthread_mutex_unlock(&slot_rt->mutex); + pthread_mutex_unlock(&slot_rt->sanity_lock); } log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "key slots update finished."); cJSON_Delete(metadata_array); @@ -1051,7 +1030,7 @@ void remove_failed_nodes_from_key_route_table(struct swarmkv_keyspace *ks, node_ { slot_rt=ks->slot_rts+slot_id; if(!slot_rt->I_am_owner) continue; - pthread_mutex_lock(&slot_rt->mutex); + pthread_mutex_lock(&slot_rt->sanity_lock); HASH_ITER(hh, slot_rt->keyroute_table, key_entry, tmp_entry) { int is_modified=0; @@ -1067,7 +1046,7 @@ void remove_failed_nodes_from_key_route_table(struct swarmkv_keyspace *ks, node_ } if(is_modified) n_modified_key++; } - pthread_mutex_unlock(&slot_rt->mutex); + pthread_mutex_unlock(&slot_rt->sanity_lock); } HASH_ITER(hh, hash_health_node, node, tmp_node) { @@ -1141,7 +1120,7 @@ int keyslots_init(struct swarmkv_keyspace *ks) slot->slot_id=i; node_copy(&slot->owner, &ks->self); slot_rt->expires=timeouts_open(0, &error); - pthread_mutex_init(&slot_rt->mutex, NULL); + pthread_mutex_init(&slot_rt->sanity_lock, NULL); } char url[SWARMKV_URL_MAX]=""; @@ -1185,7 +1164,7 @@ void keyspace_lock(struct swarmkv_module *mod_keyspace, enum cmd_key_flag flag, int slot_id=key_hash_slot(key, sdslen(key)); struct swarmkv_keyspace *ks = module2keyspace(mod_keyspace); struct slot_runtime *slot_rt=ks->slot_rts+slot_id; - pthread_mutex_lock(&slot_rt->mutex); + pthread_mutex_lock(&slot_rt->sanity_lock); return; } void keyspace_unlock(struct swarmkv_module *mod_keyspace, enum cmd_key_flag flag, const sds key) @@ -1194,17 +1173,24 @@ void keyspace_unlock(struct swarmkv_module *mod_keyspace, enum cmd_key_flag flag int slot_id=key_hash_slot(key, sdslen(key)); struct swarmkv_keyspace *ks = module2keyspace(mod_keyspace); struct slot_runtime *slot_rt=ks->slot_rts+slot_id; - pthread_mutex_unlock(&slot_rt->mutex); + pthread_mutex_unlock(&slot_rt->sanity_lock); return; } +void *swarmkv_keyspace_thread(void *arg) +{ + struct swarmkv_keyspace *ks=(struct swarmkv_keyspace *)arg; + char thread_name[16]; + snprintf(thread_name, sizeof(thread_name), "swarmkv-ks"); + prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL); + event_base_dispatch(ks->evbase); + return NULL; +} struct swarmkv_module *swarmkv_keyspace_new(struct swarmkv_options *opts, const char *db_name, struct log_handle *logger, char **err) { struct swarmkv_keyspace *ks=ALLOC(struct swarmkv_keyspace, 1); strncpy(ks->module.name, "keyspace", sizeof(ks->module.name)); ks->module.mod_ctx=ks; - ks->module.lock=keyspace_lock; - ks->module.unlock=keyspace_unlock; strncpy(ks->db_name, db_name, sizeof(ks->db_name)); node_init(&ks->self, opts->cluster_announce_ip, opts->cluster_announce_port); uuid_copy(ks->uuid, opts->bin_uuid); @@ -1291,7 +1277,7 @@ void swarmkv_keyspace_free(struct swarmkv_module* module) HASH_DELETE(hh, slot_rt->keyroute_table, key_entry); key_entry_free(key_entry); } - pthread_mutex_destroy(&slot_rt->mutex); + pthread_mutex_destroy(&slot_rt->sanity_lock); } consul_client_free(ks->consul_client); ks->consul_client=NULL; @@ -1314,10 +1300,10 @@ void swarmkv_keyspace_info(struct swarmkv_module *mod_keyspace, struct keyspace_ slot_rt=ks->slot_rts+i; if(!slot_rt->I_am_owner) continue; info->slots++; - pthread_mutex_lock(&slot_rt->mutex); + pthread_mutex_lock(&slot_rt->sanity_lock); info->keys+=HASH_COUNT(slot_rt->keyroute_table); info->expires+=timeouts_count(slot_rt->expires); - pthread_mutex_unlock(&slot_rt->mutex); + pthread_mutex_unlock(&slot_rt->sanity_lock); } } enum cmd_exec_result keyslot_command(struct swarmkv_module *mod_keyspace, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply) @@ -1388,13 +1374,13 @@ enum cmd_exec_result keyspace_setslot_command(struct swarmkv_module *mod_keyspac } slot_rt=&(ks->slot_rts[slot_id]); - pthread_mutex_lock(&slot_rt->mutex); + pthread_mutex_lock(&slot_rt->sanity_lock); slot_rt->state=state; if(state!=STATE_STABLE) { node_copy(&slot_rt->rebalancing_peer, &peer_node); } - pthread_mutex_unlock(&slot_rt->mutex); + pthread_mutex_unlock(&slot_rt->sanity_lock); *reply=swarmkv_reply_new_status("OK"); return FINISHED; @@ -1547,9 +1533,9 @@ enum cmd_exec_result keyspace_getkeysinslot_command(struct swarmkv_module *mod_k slot_rt=ks->slot_rts+slot_id; char *blob=NULL; size_t blob_sz; - pthread_mutex_lock(&slot_rt->mutex); + pthread_mutex_lock(&slot_rt->sanity_lock); key_slot_serialize(slot_rt, &blob, &blob_sz); - pthread_mutex_unlock(&slot_rt->mutex); + pthread_mutex_unlock(&slot_rt->sanity_lock); *reply=swarmkv_reply_new_string(blob, blob_sz); free(blob); blob=NULL; @@ -1576,9 +1562,9 @@ enum cmd_exec_result keyspace_addkeystoslot_command(struct swarmkv_module *mod_k slot_rt=ks->slot_rts+slot_id; - pthread_mutex_lock(&slot_rt->mutex); + pthread_mutex_lock(&slot_rt->sanity_lock); new_added_entry_num=key_slot_merge(slot_rt, blob, sdslen(blob)); - pthread_mutex_unlock(&slot_rt->mutex); + pthread_mutex_unlock(&slot_rt->sanity_lock); *reply=swarmkv_reply_new_integer(new_added_entry_num); return FINISHED; @@ -1601,7 +1587,7 @@ enum cmd_exec_result keyspace_delslotkeys_command(struct swarmkv_module *mod_key } slot_rt=ks->slot_rts+slot_id; - pthread_mutex_lock(&slot_rt->mutex); + pthread_mutex_lock(&slot_rt->sanity_lock); HASH_ITER(hh, slot_rt->keyroute_table, key_entry, tmp) { HASH_DELETE(hh, slot_rt->keyroute_table, key_entry); @@ -1612,7 +1598,7 @@ enum cmd_exec_result keyspace_delslotkeys_command(struct swarmkv_module *mod_key key_entry_free(key_entry); delete_num++; } - pthread_mutex_unlock(&slot_rt->mutex); + pthread_mutex_unlock(&slot_rt->sanity_lock); *reply=swarmkv_reply_new_integer(delete_num); return FINISHED; } @@ -1631,9 +1617,9 @@ enum cmd_exec_result keyspace_countkeysinslot_command(struct swarmkv_module *mod return FINISHED; } slot_rt=ks->slot_rts+slot_id; - pthread_mutex_lock(&slot_rt->mutex); + pthread_mutex_lock(&slot_rt->sanity_lock); key_num=HASH_COUNT(slot_rt->keyroute_table); - pthread_mutex_unlock(&slot_rt->mutex); + pthread_mutex_unlock(&slot_rt->sanity_lock); *reply=swarmkv_reply_new_integer(key_num); return FINISHED; } @@ -1754,10 +1740,10 @@ enum cmd_exec_result keyspace_keys_command(struct swarmkv_module *mod_keyspace, for(i=0; i<KEYSPACE_SLOT_NUM; i++) { slot_rt=ks->slot_rts+i; - pthread_mutex_lock(&slot_rt->mutex); + pthread_mutex_lock(&slot_rt->sanity_lock); if(!slot_rt->I_am_owner && slot_rt->state!=STATE_IMPORTING) { - pthread_mutex_unlock(&slot_rt->mutex); + pthread_mutex_unlock(&slot_rt->sanity_lock); continue; } @@ -1770,7 +1756,7 @@ enum cmd_exec_result keyspace_keys_command(struct swarmkv_module *mod_keyspace, utarray_push_back(matched_replies, &r); } } - pthread_mutex_unlock(&slot_rt->mutex); + pthread_mutex_unlock(&slot_rt->sanity_lock); } n_matched=utarray_len(matched_replies); if(n_matched>0) @@ -1986,8 +1972,12 @@ enum cmd_exec_result persist_command(struct swarmkv_module *mod_keyspace, const } #define ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP 1000 -void keyspace_active_expire_cycle(struct swarmkv_keyspace *ks) -{ +void swarmkv_keyspace_periodic(struct swarmkv_module *mod_keyspace, int thread_id) +{ + struct swarmkv_keyspace *ks = module2keyspace(mod_keyspace); + int real_tid=__gettid(ks->exec_cmd_handle); + assert(real_tid==thread_id); + struct timespec now; int i=0, j=0; struct slot_runtime *slot_rt=NULL; @@ -1998,9 +1988,17 @@ void keyspace_active_expire_cycle(struct swarmkv_keyspace *ks) clock_gettime(CLOCK_MONOTONIC_COARSE, &start); for(i=0; i<KEYSPACE_SLOT_NUM; i++) { + if(i%ks->opts->nr_worker_threads!=thread_id) continue; slot_rt=ks->slot_rts+i; if(!slot_rt->I_am_owner) continue; - pthread_mutex_lock(&slot_rt->mutex); + if(pthread_mutex_trylock(&slot_rt->sanity_lock)) + { + pthread_mutex_unlock(&slot_rt->sanity_lock); + } + else + { + assert(0); + } clock_gettime(CLOCK_REALTIME, &now); timeouts_update(slot_rt->expires, now.tv_sec); @@ -2016,7 +2014,7 @@ void keyspace_active_expire_cycle(struct swarmkv_keyspace *ks) j++; has_key_expired=1; } - pthread_mutex_unlock(&slot_rt->mutex); + pthread_mutex_unlock(&slot_rt->sanity_lock); } clock_gettime(CLOCK_MONOTONIC_COARSE, &end); if(has_key_expired) @@ -2025,4 +2023,3 @@ void keyspace_active_expire_cycle(struct swarmkv_keyspace *ks) } } - diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c index d7a12c6..58822ca 100644 --- a/src/swarmkv_net.c +++ b/src/swarmkv_net.c @@ -596,7 +596,7 @@ void swarmkv_net_set_on_msg_callback(struct swarmkv_net *net, on_msg_callback_t net->on_msg_cb_arg=cb_arg; return; } -extern int __gettid(struct swarmkv *db); + void swarmkv_net_send(struct swarmkv_net *net, const node_t *dest, struct swarmkv_msg *msg) { int tid=__gettid((struct swarmkv *)(net->on_msg_cb_arg)); diff --git a/src/swarmkv_rpc.c b/src/swarmkv_rpc.c index 24847db..f94cd10 100644 --- a/src/swarmkv_rpc.c +++ b/src/swarmkv_rpc.c @@ -36,9 +36,15 @@ struct swarmkv_rpc_mgr *swarmkv_rpc_mgr_new(const struct swarmkv_options *opts, mgr->seq_generator=0; mgr->nr_worker_threads=nr_worker_threads; mgr->evbases=evbases; + mgr->rpc_table=ALLOC(struct swarmkv_rpc *, nr_worker_threads); mgr->timeout_us=opts->cluster_timeout_us; return mgr; } +void swarmkv_rpc_mgr_free(struct swarmkv_rpc_mgr *mgr) +{ + free(mgr->rpc_table); + free(mgr); +} void swarmkv_rpc_free(struct swarmkv_rpc *rpc) { event_del(rpc->timeout_ev); @@ -54,7 +60,7 @@ static void rpc_timeout_callback(evutil_socket_t fd, short events, void *arg) promise_failed(p, FUTURE_ERROR_TIMEOUT, "rpc timed out"); } -long long swarmkv_rpc_lunch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct future *f) +long long swarmkv_rpc_launch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct future *f) { struct swarmkv_rpc *rpc=ALLOC(struct swarmkv_rpc, 1); rpc->sequence=++mgr->seq_generator; @@ -67,7 +73,7 @@ long long swarmkv_rpc_lunch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct f HASH_ADD(hh, mgr->rpc_table[thread_id], sequence, sizeof(rpc->sequence), rpc); return rpc->sequence; } -void swarmkv_rpc_complete(struct swarmkv_rpc_mgr *mgr, int thread_id, long long sequence, const void *reply) +void swarmkv_rpc_complete(struct swarmkv_rpc_mgr *mgr, int thread_id, long long sequence, void *reply) { struct swarmkv_rpc *rpc=NULL; HASH_FIND(hh, mgr->rpc_table[thread_id], &sequence, sizeof(sequence), rpc); diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c index 0c98162..18b0257 100644 --- a/src/swarmkv_store.c +++ b/src/swarmkv_store.c @@ -136,12 +136,14 @@ struct swarmkv_obj_specs sobj_specs[__SWARMKV_OBJ_TYPE_MAX] = } }; #define MODULE_SWAMRKV_STORE module_name_str("swarmkv.store") -#define STORE_SHARD_NUMBER 8 + struct swarmkv_store_thread { struct scontainer *obj_table; struct scontainer *sync_queue; - pthread_rwlock_t rwlock; + pthread_mutex_t sanity_lock; + long long keys_to_sync; + long long n_keys; }; struct swarmkv_store { @@ -149,10 +151,7 @@ struct swarmkv_store node_t self; size_t nr_worker_threads; struct swarmkv_store_thread *threads; - struct scontainer *obj_table[STORE_SHARD_NUMBER]; - struct scontainer *sync_queue[STORE_SHARD_NUMBER]; -// pthread_mutex_t mutex[STORE_SHARD_NUMBER]; - pthread_rwlock_t rwlock[STORE_SHARD_NUMBER]; + exec_cmd_func *exec_cmd; struct swarmkv *exec_cmd_handle; uuid_t my_uuid; @@ -173,40 +172,14 @@ struct swarmkv_store *module2store(struct swarmkv_module *module) return store; } -int get_shard_id(sds key) +int __store_gettid(sds key, int nr_worker_threads) { - int shard_idx=key_hash_slot(key, sdslen(key))%STORE_SHARD_NUMBER; + int shard_idx=key_hash_slot(key, sdslen(key))%nr_worker_threads; return shard_idx; } -int store_gettid(struct swarmkv_module *mod, sds key) -{ - struct swarmkv_store *store=module2store(mod); - if(!key) return -1; - int tid=key_hash_slot(key, sdslen(key))%store->nr_worker_threads; - return tid; -} -void store_lock(struct swarmkv_module *mod, enum cmd_key_flag flag, sds key) +int store_gettid(struct swarmkv_module * mod_store, sds key) { - struct swarmkv_store *store=module2store(mod); - if(!key) return; - int shard_idx=get_shard_id(key); - if(flag==CMD_KEY_RO) - { - pthread_rwlock_rdlock(&(store->rwlock[shard_idx])); - } - else - { - pthread_rwlock_wrlock(&(store->rwlock[shard_idx])); - } - return; -} -void store_unlock(struct swarmkv_module *mod, enum cmd_key_flag flag, sds key) -{ - struct swarmkv_store *store=module2store(mod); - if(!key) return; - int shard_idx=get_shard_id(key); - pthread_rwlock_unlock(&(store->rwlock[shard_idx])); - + return __store_gettid(key, module2store(mod_store)->nr_worker_threads); } struct scontainer { @@ -215,19 +188,19 @@ struct scontainer char is_pending; //waiting for reply 0f CRDT PULL. char is_in_table; char is_in_sync_q; - int shard_idx; + int tid; UT_array *replica_node_list; //used by value owner for cache synchronization UT_hash_handle hh; struct scontainer *prev; //hook to sync queue struct scontainer *next; //hook to sync queue }; -static struct scontainer *scontainer_new(enum sobj_type type, const sds key) +static struct scontainer *scontainer_new(enum sobj_type type, const sds key, int tid) { struct scontainer *ctr=ALLOC(struct scontainer, 1); ctr->obj.type=type; - ctr->obj.key=sdsdup(key); - ctr->shard_idx=get_shard_id(key); + ctr->obj.key=sdsdup(key); + ctr->tid=tid; gettimeofday(&ctr->op_timestamp, NULL); return ctr; } @@ -299,44 +272,46 @@ static void scontainer_remove_replica_node(struct scontainer *ctr, const node_t } void store_add_scontainer(struct swarmkv_store *store, struct scontainer *ctr) { - scontainer_join(&(store->obj_table[ctr->shard_idx]), ctr); + scontainer_join(&(store->threads[ctr->tid].obj_table), ctr); return; } void store_remove_scontainer(struct swarmkv_store *store, struct scontainer *ctr) { - HASH_DELETE(hh, store->obj_table[ctr->shard_idx], ctr); + HASH_DELETE(hh, store->threads[ctr->tid].obj_table, ctr); ctr->is_in_table=0; if(ctr->is_in_sync_q) { - DL_DELETE(store->sync_queue[ctr->shard_idx], ctr); + DL_DELETE(store->threads[ctr->tid].sync_queue, ctr); ctr->is_in_sync_q=0; } } typedef void sobj_callback_func_t(struct sobj * obj, void *cb_arg); -void store_iterate_sobj(struct swarmkv_store *store, sobj_callback_func_t *cb, void *cb_arg) +void store_iterate_sobj(struct swarmkv_store *store, int tid, sobj_callback_func_t *cb, void *cb_arg) { struct scontainer *ctr=NULL, *tmp=NULL; - - - for(size_t i=0; i<STORE_SHARD_NUMBER; i++) - { - pthread_rwlock_rdlock(&(store->rwlock[i])); - HASH_ITER(hh, store->obj_table[i], ctr, tmp) - { - if(!ctr->is_pending) - { - cb(&ctr->obj, cb_arg); - } + HASH_ITER(hh, store->threads[tid].obj_table, ctr, tmp) + { + if(!ctr->is_pending) + { + cb(&ctr->obj, cb_arg); } - pthread_rwlock_unlock(&(store->rwlock[i])); - } - + } } struct scontainer *store_lookup_scontainer(struct swarmkv_store *store, sds key) { struct scontainer *ctr=NULL; - int shard_idx=get_shard_id(key); - ctr=scontainer_find(&(store->obj_table[shard_idx]), key); + int tid=__store_gettid(key, store->nr_worker_threads); + int real_tid=__gettid(store->exec_cmd_handle); + assert(tid==real_tid); + ctr=scontainer_find(&(store->threads[tid].obj_table), key); + if(0==pthread_mutex_trylock(&store->threads[tid].sanity_lock)) + { + pthread_mutex_unlock(&store->threads[tid].sanity_lock); + } + else + { + assert(0); + } if(ctr) { return ctr; @@ -344,7 +319,7 @@ struct scontainer *store_lookup_scontainer(struct swarmkv_store *store, sds key) else { return NULL; - } + } } struct sobj *store_lookup(struct swarmkv_module *mod_store, sds key) { @@ -374,7 +349,7 @@ void sobj_need_sync(struct swarmkv_module *mod_store, struct sobj *obj) gettimeofday(&ctr->op_timestamp, NULL); if(!ctr->is_in_sync_q && ctr->replica_node_list) { - DL_APPEND(store->sync_queue[ctr->shard_idx], ctr); + DL_APPEND(store->threads[ctr->tid].sync_queue, ctr); ctr->is_in_sync_q=1; } return; @@ -387,6 +362,19 @@ int sobj_get_random_replica(struct sobj *obj, node_t *out) node_copy(out, replica); return 1; } +enum cmd_exec_result handle_undefined_object(struct sobj *obj, struct swarmkv_reply **reply) +{ + assert(obj->type==OBJ_TYPE_UNDEFINED); + node_t replica; + int ret=0; + ret=sobj_get_random_replica(obj, &replica); + if(ret) + { + *reply=swarmkv_reply_new_node(&replica, 1); + return REDIRECT; + } + return NEED_KEY_ROUTE; +} void scontainer_serialize(struct scontainer *ctr, char **blob, size_t *blob_sz) { char *value_blob=NULL; @@ -455,8 +443,8 @@ void sobj_merge_blob(struct sobj *obj, const char *blob, size_t blob_sz, uuid_t enum CRDT_OP { - CRDT_PULL, - CRDT_PUSH, + CRDT_GET, + CRDT_MERGE, CRDT_JOIN }; struct crdt_generic_ctx @@ -475,18 +463,16 @@ void crdt_generic_ctx_free(struct crdt_generic_ctx *ctx) free(ctx); } -static void crdt_generic_on_succ(void* result, void* user) +static void crdt_generic_on_succ(void *result, void *user) { struct crdt_generic_ctx *ctx=(struct crdt_generic_ctx *)user; const struct swarmkv_reply *reply=(const struct swarmkv_reply*) result; struct scontainer *ctr=NULL; uuid_t uuid; - struct swarmkv_module *mod_store=&(ctx->store->module); store_get_uuid(&(ctx->store->module), uuid); switch(ctx->op) { - case CRDT_PULL: - store_lock(mod_store, CMD_KEY_OW, ctx->key); + case CRDT_GET: ctr=store_lookup_scontainer(ctx->store, ctx->key); if(ctr && reply->type==SWARMKV_REPLY_STRING) { @@ -498,9 +484,8 @@ static void crdt_generic_on_succ(void* result, void* user) { atomic_inc(&ctx->store->sync_err); } - store_unlock(&ctx->store->module, CMD_KEY_OW, ctx->key); break; - case CRDT_PUSH: + case CRDT_MERGE: if(reply->type==SWARMKV_REPLY_INTEGER && reply->integer>0) { atomic_add(&ctx->store->sync_ok, reply->integer); @@ -528,24 +513,23 @@ static void crdt_generic_on_succ(void* result, void* user) crdt_generic_ctx_free(ctx); return; } -static void store_remove_failed_peer(struct swarmkv_store *store, int shard, const node_t *peer) +static void store_remove_failed_peer(struct swarmkv_store *store, int tid, const node_t *peer) { struct scontainer *ctr=NULL, *tmp=NULL; - pthread_rwlock_wrlock(&store->rwlock[shard]); - HASH_ITER(hh, store->obj_table[shard], ctr, tmp) + HASH_ITER(hh, store->threads[tid].obj_table, ctr, tmp) { scontainer_remove_replica_node(ctr, peer); } - pthread_rwlock_unlock(&store->rwlock[shard]); + return; } static void crdt_generic_on_fail(enum e_future_error err, const char * what, void * user) { struct crdt_generic_ctx *ctx=(struct crdt_generic_ctx *)user; atomic_inc(&ctx->store->sync_err); + int tid=__gettid(ctx->store->exec_cmd_handle); if(err==FUTURE_ERROR_TIMEOUT) { - int shard_idx=get_shard_id(ctx->key); - store_remove_failed_peer(ctx->store, shard_idx, &ctx->peer); + store_remove_failed_peer(ctx->store, tid, &ctx->peer); } crdt_generic_ctx_free(ctx); return; @@ -564,55 +548,55 @@ void crdt_generic_call(struct swarmkv_store *store, enum CRDT_OP op, const struc store->exec_cmd(store->exec_cmd_handle, NULL, peer, cmd, ctx->f); return; } - -void swarmkv_store_periodic(struct swarmkv_module * mod_store) +#define MAX_SYNC_PER_PERIOD 100000 +void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id) { struct swarmkv_store *store=module2store(mod_store); struct scontainer *ctr=NULL, *tmp=NULL; struct timespec start, end; - int has_key_synced=0; - + int n_synced=0; + int real_tid=__gettid(store->exec_cmd_handle); + assert(real_tid==thread_id); clock_gettime(CLOCK_MONOTONIC, &start); //Ease the lock contention among work threads - int shard=random()%STORE_SHARD_NUMBER; - for(int i=0; i<STORE_SHARD_NUMBER; i++) - { - shard=(shard+i)%STORE_SHARD_NUMBER; - struct sync_master *sync_master=sync_master_new(); - pthread_rwlock_wrlock(&store->rwlock[shard]); - DL_FOREACH_SAFE(store->sync_queue[shard], ctr, tmp) - { - char *blob=NULL; - size_t blob_sz=0; - scontainer_serialize(ctr, &blob, &blob_sz); - sync_master_add_obj(sync_master, ctr->obj.key, blob, blob_sz, - utarray_front(ctr->replica_node_list), utarray_len(ctr->replica_node_list)); - DL_DELETE(store->sync_queue[shard], ctr); - ctr->is_in_sync_q=0; - store->synced++; - has_key_synced=1; - } - pthread_rwlock_unlock(&store->rwlock[shard]); - node_t peer; - struct swarmkv_cmd *cmd=NULL; - int ret=1; - while(1) - { - ret=sync_master_get_cmd(sync_master, &peer, &cmd); - if(!ret) break; - crdt_generic_call(store, CRDT_PUSH, cmd, &peer); - swarmkv_cmd_free(cmd); - cmd=NULL; - } - sync_master_free(sync_master); + struct swarmkv_store_thread *thr=&store->threads[real_tid]; + + struct sync_master *sync_master=sync_master_new(); + DL_FOREACH_SAFE(thr->sync_queue, ctr, tmp) + { + char *blob=NULL; + size_t blob_sz=0; + scontainer_serialize(ctr, &blob, &blob_sz); + sync_master_add_obj(sync_master, ctr->obj.key, blob, blob_sz, + utarray_front(ctr->replica_node_list), utarray_len(ctr->replica_node_list)); + DL_DELETE(thr->sync_queue, ctr); + ctr->is_in_sync_q=0; + store->synced++; + n_synced++; + if(n_synced>=MAX_SYNC_PER_PERIOD) break; } - + node_t peer; + struct swarmkv_cmd *cmd=NULL; + int ret=1; + while(1) + { + ret=sync_master_get_cmd(sync_master, &peer, &cmd); + if(!ret) break; + crdt_generic_call(store, CRDT_MERGE, cmd, &peer); + swarmkv_cmd_free(cmd); + cmd=NULL; + } + sync_master_free(sync_master); + thr->n_keys=HASH_COUNT(thr->obj_table); + DL_COUNT(thr->sync_queue, ctr, thr->keys_to_sync); clock_gettime(CLOCK_MONOTONIC, &end); - if(has_key_synced) + + if(n_synced) { swarmkv_monitor_record_event(store->mod_monitor, "crdt-sync-cycle", timespec_diff_usec(&start, &end)); } } + struct swarmkv_module *swarmkv_store_new(const struct swarmkv_options *opts, exec_cmd_func *send_cmd, void *handle_send_cmd) { struct swarmkv_store *store=ALLOC(struct swarmkv_store, 1); @@ -620,11 +604,6 @@ struct swarmkv_module *swarmkv_store_new(const struct swarmkv_options *opts, exe store->module.mod_ctx=store; store->nr_worker_threads=opts->nr_worker_threads; store->threads=ALLOC(struct swarmkv_store_thread, opts->nr_worker_threads); - for(size_t i=0; i<opts->nr_worker_threads; i++) - { - pthread_rwlock_init(&(store->threads[i].rwlock), NULL); - - } uuid_copy(store->my_uuid, opts->bin_uuid); node_init(&store->self, opts->cluster_announce_ip, opts->cluster_announce_port); @@ -643,11 +622,10 @@ void swarmkv_store_free(struct swarmkv_module *mod_store) HASH_ITER(hh, store->threads[i].obj_table, ctr, tmp) { - assert(ctr->shard_idx==i); + assert(ctr->tid==i); scontainer_remove(&(store->threads[i].obj_table), ctr); scontainer_free(ctr); - } - pthread_rwlock_destroy(&(store->threads[i].rwlock)); + } } free(store); return; @@ -660,37 +638,67 @@ void swarmkv_store_set_monitor_handle(struct swarmkv_module *mod_store, struct s void swarmkv_store_info(struct swarmkv_module *mod_store, struct store_info *info) { struct swarmkv_store *store=module2store(mod_store); - struct scontainer *ctr=NULL; - info->shards=STORE_SHARD_NUMBER; + info->shards=store->nr_worker_threads; info->keys=0; info->keys_to_sync=0; - for(size_t i=0; i<STORE_SHARD_NUMBER; i++) + struct swarmkv_store_thread *thread=NULL; + for(size_t i=0; i<store->nr_worker_threads; i++) { - pthread_rwlock_rdlock(&store->rwlock[i]); - DL_COUNT(store->sync_queue[i], ctr, info->keys_to_sync); - info->keys+=HASH_COUNT(store->obj_table[i]); - pthread_rwlock_unlock(&store->rwlock[i]); + thread = store->threads+i; + info->keys_to_sync += __sync_add_and_fetch(&thread->keys_to_sync, 0); + info->keys_to_sync += __sync_add_and_fetch(&thread->n_keys, 0); } info->sync_ok=__sync_add_and_fetch(&store->sync_ok, 0); info->sync_err=__sync_add_and_fetch(&store->sync_err, 0); info->synced=store->synced; } -void swarmkv_store_add_key(struct swarmkv_module *mod_store, const sds key, node_t *replica_nodes, size_t n_replica_node) + +UT_icd ut_array_matched_reply = {sizeof(struct swarmkv_reply *), NULL, NULL, NULL}; + +struct pattern_match_arg +{ + sds pattern; + UT_array *matched_replies; +}; +enum cmd_exec_result type_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply) +{ +/*TYPE key*/ + struct sobj *obj=NULL; + const sds key=cmd->argv[1]; + obj=store_lookup(mod_store, key); + if(!obj) + { + return NEED_KEY_ROUTE; + } + *reply=swarmkv_reply_new_string_fmt(sobj_specs[obj->type].type_name); + return FINISHED; +} +enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply) { +/*CRDT ADD key IP:port [IP:port ...]*/ struct scontainer *ctr=NULL; struct swarmkv_store *store=module2store(mod_store); size_t max_pull_node_num=4; - struct swarmkv_cmd *crdt_pull_cmd=NULL; - crdt_pull_cmd=swarmkv_cmd_new(3); - crdt_pull_cmd->argv[0]=sdsnew("crdt"); - crdt_pull_cmd->argv[1]=sdsnew("pull"); - crdt_pull_cmd->argv[2]=sdsdup(key); - store_lock(mod_store, CMD_KEY_OW, key); + sds key=cmd->argv[2]; + int tid=store_gettid(mod_store, key); + int real_tid=__gettid(store->exec_cmd_handle); + assert(tid==real_tid); + struct swarmkv_cmd *crdt_get_cmd=NULL; + crdt_get_cmd=swarmkv_cmd_new(3); + crdt_get_cmd->argv[0]=sdsnew("crdt"); + crdt_get_cmd->argv[1]=sdsnew("get"); + crdt_get_cmd->argv[2]=sdsdup(key); + size_t n_replica_node=cmd->argc-3; + node_t *replica_nodes=ALLOC(node_t, n_replica_node); + for(size_t i=0; i<n_replica_node; i++) + { + node_init_from_sds(&replica_nodes[i], cmd->argv[3+i]); + } ctr=store_lookup_scontainer(store, key); if(!ctr) { - ctr=scontainer_new(OBJ_TYPE_UNDEFINED, key); + ctr=scontainer_new(OBJ_TYPE_UNDEFINED, key, tid); if(n_replica_node>0)//need crdt pull from replicas { ctr->is_pending=1; @@ -705,53 +713,33 @@ void swarmkv_store_add_key(struct swarmkv_module *mod_store, const sds key, node { scontainer_add_replica_node(ctr, replica_nodes+i); } - store_unlock(mod_store, CMD_KEY_OW, key); for(size_t i=0; i<n_replica_node; i++) { if(i<max_pull_node_num) { - crdt_generic_call(store, CRDT_PULL, crdt_pull_cmd, replica_nodes+i); + crdt_generic_call(store, CRDT_GET, crdt_get_cmd, replica_nodes+i); } else { struct swarmkv_cmd *crdt_join_cmd=NULL; crdt_join_cmd=swarmkv_cmd_new(4); crdt_join_cmd->argv[0]=sdsnew("crdt"); - crdt_join_cmd->argv[1]=sdsnew("join"); + crdt_join_cmd->argv[1]=sdsnew("merge"); crdt_join_cmd->argv[2]=sdsdup(ctr->obj.key); crdt_join_cmd->argv[3]=node_addr2sds(replica_nodes+i); crdt_generic_call(store, CRDT_JOIN, crdt_join_cmd, replica_nodes+i); swarmkv_cmd_free(crdt_join_cmd); } } - swarmkv_cmd_free(crdt_pull_cmd); - crdt_pull_cmd=NULL; - return; -} -UT_icd ut_array_matched_reply = {sizeof(struct swarmkv_reply *), NULL, NULL, NULL}; - -struct pattern_match_arg -{ - sds pattern; - UT_array *matched_replies; -}; -enum cmd_exec_result type_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply) -{ -/*TYPE key*/ - struct sobj *obj=NULL; - const sds key=cmd->argv[1]; - obj=store_lookup(mod_store, key); - if(!obj) - { - return NEED_KEY_ROUTE; - } - *reply=swarmkv_reply_new_string_fmt(sobj_specs[obj->type].type_name); + swarmkv_cmd_free(crdt_get_cmd); + crdt_get_cmd=NULL; + free(replica_nodes); return FINISHED; } -enum cmd_exec_result crdt_pull_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply) +enum cmd_exec_result crdt_get_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply) { -/* CRDT PULL key */ +/* CRDT GET key */ struct swarmkv_store *store=module2store(mod_store); struct scontainer *ctr=NULL; const sds key=cmd->argv[2]; @@ -781,17 +769,17 @@ enum cmd_exec_result crdt_pull_command(struct swarmkv_module *mod_store, const s return FINISHED; } -enum cmd_exec_result crdt_push_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply) +enum cmd_exec_result crdt_merge_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply) { -/* CRDT PUSH key blob [key blob ...]*/ +/* CRDT MERGE key blob [key blob ...]*/ struct sobj *obj=NULL; sds key=NULL, blob=NULL; assert(accessing_node!=NULL);//should never invoked by local long long synced=0; uuid_t uuid; - int __attribute__((__unused__))first_key_shard_idx=0; - int __attribute__((__unused__))shard_idx=0; - first_key_shard_idx=get_shard_id(cmd->argv[2]); + int __attribute__((__unused__))first_key_tid=0; + int __attribute__((__unused__))tid=0; + first_key_tid=store_gettid(mod_store, cmd->argv[2]); store_get_uuid(mod_store, uuid); for(size_t i=0; i<cmd->argc-2; i+=2) @@ -804,11 +792,10 @@ enum cmd_exec_result crdt_push_command(struct swarmkv_module *mod_store, const s sobj_merge_blob(obj, blob, sdslen(blob), uuid); synced++; } - shard_idx=get_shard_id(key); - assert(shard_idx==first_key_shard_idx); + tid=store_gettid(mod_store, key); + assert(tid==first_key_tid); } *reply=swarmkv_reply_new_integer(synced); - obj=store_lookup(mod_store, key); return FINISHED; } @@ -872,13 +859,19 @@ static void pattern_match_function(struct sobj * obj, void *cb_arg) enum cmd_exec_result crdt_keys_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply) { -//CRDT KEYS IP Port pattern +//CRDT KEYS tid pattern int i=0, n_matched=0; struct pattern_match_arg cb_arg; - cb_arg.pattern=cmd->argv[2]; + int thread_id=atoll(cmd->argv[3]); + cb_arg.pattern=cmd->argv[4]; utarray_new(cb_arg.matched_replies, &ut_array_matched_reply); struct swarmkv_store *store=module2store(mod_store); - store_iterate_sobj(store, pattern_match_function, &cb_arg); + if(thread_id>store->nr_worker_threads) + { + *reply=swarmkv_reply_new_error("Invalid thread id"); + return FINISHED; + } + store_iterate_sobj(store, thread_id, pattern_match_function, &cb_arg); n_matched=utarray_len(cb_arg.matched_replies); if(n_matched>0) { diff --git a/src/t_hash.c b/src/t_hash.c index c600712..79464df 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -89,27 +89,29 @@ enum cmd_exec_result hdel_command(struct swarmkv_module *mod_store, const struct { return NEED_KEY_ROUTE; } + if(obj->type == OBJ_TYPE_UNDEFINED) + { + return handle_undefined_object(obj, reply); + } + if(obj->type!=OBJ_TYPE_HASH) + { + *reply=swarmkv_reply_new_error(error_wrong_type); + return FINISHED; + } size_t n_to_del=cmd->argc-2; size_t i=0; int ret=0, n_deleted=0; sds field; - - if(obj->type==OBJ_TYPE_HASH) + for(i=0; i<n_to_del; i++) { - for(i=0; i<n_to_del; i++) - { - field=cmd->argv[2+i]; - ret=OR_map_remove(obj->hash, field, sdslen(field)); - if(ret>0) n_deleted++; - } - *reply=swarmkv_reply_new_integer(n_deleted); - sobj_need_sync(mod_store, obj); - } - else - { - *reply=swarmkv_reply_new_error(error_wrong_type); + field=cmd->argv[2+i]; + ret=OR_map_remove(obj->hash, field, sdslen(field)); + if(ret>0) n_deleted++; } + *reply=swarmkv_reply_new_integer(n_deleted); + sobj_need_sync(mod_store, obj); + return FINISHED; } enum cmd_exec_result hmget_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply) @@ -123,6 +125,10 @@ enum cmd_exec_result hmget_command(struct swarmkv_module *mod_store, const struc { return NEED_KEY_ROUTE; } + if(obj->type == OBJ_TYPE_UNDEFINED) + { + return handle_undefined_object(obj, reply); + } if(obj->type!=OBJ_TYPE_HASH) { *reply=swarmkv_reply_new_error(error_wrong_type); @@ -177,6 +183,10 @@ enum cmd_exec_result hgetall_command(struct swarmkv_module *mod_store, const str { return NEED_KEY_ROUTE; } + if(obj->type == OBJ_TYPE_UNDEFINED) + { + return handle_undefined_object(obj, reply); + } if(obj->type!=OBJ_TYPE_HASH) { *reply=swarmkv_reply_new_error(error_wrong_type); @@ -255,6 +265,10 @@ enum cmd_exec_result hkeys_command(struct swarmkv_module *mod_store, const struc { return NEED_KEY_ROUTE; } + if(obj->type == OBJ_TYPE_UNDEFINED) + { + return handle_undefined_object(obj, reply); + } if(obj->type!=OBJ_TYPE_HASH) { *reply=swarmkv_reply_new_error(error_wrong_type); @@ -284,6 +298,10 @@ enum cmd_exec_result hlen_command(struct swarmkv_module *mod_store, const struct { return NEED_KEY_ROUTE; } + if(obj->type == OBJ_TYPE_UNDEFINED) + { + return handle_undefined_object(obj, reply); + } if(obj->type!=OBJ_TYPE_HASH) { *reply=swarmkv_reply_new_error(error_wrong_type); diff --git a/src/t_set.c b/src/t_set.c index 5259f45..bfdeb45 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -56,6 +56,10 @@ enum cmd_exec_result srem_command(struct swarmkv_module *mod_store, const struct { return NEED_KEY_ROUTE; } + if(obj->type == OBJ_TYPE_UNDEFINED) + { + return handle_undefined_object(obj, reply); + } if(obj->type!=OBJ_TYPE_SET) { *reply=swarmkv_reply_new_error(error_wrong_type); @@ -85,6 +89,10 @@ enum cmd_exec_result smembers_command(struct swarmkv_module *mod_store, const st { return NEED_KEY_ROUTE; } + if(obj->type == OBJ_TYPE_UNDEFINED) + { + return handle_undefined_object(obj, reply); + } if(obj->type!=OBJ_TYPE_SET) { *reply=swarmkv_reply_new_error(error_wrong_type); @@ -116,6 +124,10 @@ enum cmd_exec_result sismember_command(struct swarmkv_module *mod_store, const s { return NEED_KEY_ROUTE; } + if(obj->type == OBJ_TYPE_UNDEFINED) + { + return handle_undefined_object(obj, reply); + } if(obj->type!=OBJ_TYPE_SET) { *reply=swarmkv_reply_new_error(error_wrong_type); @@ -136,6 +148,10 @@ enum cmd_exec_result scard_command(struct swarmkv_module *mod_store, const struc { return NEED_KEY_ROUTE; } + if(obj->type == OBJ_TYPE_UNDEFINED) + { + return handle_undefined_object(obj, reply); + } if(obj->type!=OBJ_TYPE_SET) { *reply=swarmkv_reply_new_error(error_wrong_type); diff --git a/src/t_string.c b/src/t_string.c index d122510..3a5c463 100644 --- a/src/t_string.c +++ b/src/t_string.c @@ -16,7 +16,10 @@ enum cmd_exec_result get_command(struct swarmkv_module *mod_store, const struct { return NEED_KEY_ROUTE; } - + if(obj->type == OBJ_TYPE_UNDEFINED) + { + return handle_undefined_object(obj, reply); + } if(obj->type==OBJ_TYPE_STRING) { LWW_register_get0(obj->string, &string_val, &string_sz); @@ -49,7 +52,6 @@ enum cmd_exec_result set_command(struct swarmkv_module *mod_store, const struct { return NEED_KEY_ROUTE; } - if(obj->type==OBJ_TYPE_UNDEFINED) { uuid_t uuid; diff --git a/src/t_token_bucket.c b/src/t_token_bucket.c index f710639..bcf8f78 100644 --- a/src/t_token_bucket.c +++ b/src/t_token_bucket.c @@ -8,19 +8,7 @@ #include <assert.h> #include <stdbool.h> //Unlike string, set and hash, XTCONSUME and XTINFO can only operate on an initialized token bucket. -enum cmd_exec_result handle_undefined_object(struct sobj *obj, struct swarmkv_reply **reply) -{ - assert(obj->type==OBJ_TYPE_UNDEFINED); - node_t replica; - int ret=0; - ret=sobj_get_random_replica(obj, &replica); - if(ret) - { - *reply=swarmkv_reply_new_node(&replica, 1); - return REDIRECT; - } - return NEED_KEY_ROUTE; -} + static int get_consume_type(sds s, enum tb_consume_type *consume_type) { if(0==strncasecmp(s, "NORMAL", sdslen(s))) |
