summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt2
-rw-r--r--docs/design.md2
-rw-r--r--examples/CMakeLists.txt4
-rw-r--r--src/future_promise.c2
-rw-r--r--src/inc_internal/future_promise.h4
-rw-r--r--src/inc_internal/swarmkv_cmd_spec.h11
-rw-r--r--src/inc_internal/swarmkv_common.h4
-rw-r--r--src/inc_internal/swarmkv_keyspace.h1
-rw-r--r--src/inc_internal/swarmkv_mesh.h3
-rw-r--r--src/inc_internal/swarmkv_rpc.h4
-rw-r--r--src/inc_internal/swarmkv_store.h8
-rw-r--r--src/swarmkv.c193
-rw-r--r--src/swarmkv_keyspace.c115
-rw-r--r--src/swarmkv_net.c2
-rw-r--r--src/swarmkv_rpc.c10
-rw-r--r--src/swarmkv_store.c341
-rw-r--r--src/t_hash.c46
-rw-r--r--src/t_set.c16
-rw-r--r--src/t_string.c6
-rw-r--r--src/t_token_bucket.c14
20 files changed, 423 insertions, 365 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 5e6f7fc..14074a3 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -10,7 +10,7 @@ include(Version)
set(CMAKE_C_FLAGS "-std=gnu99 -Wall")
set(CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS} -Wall)
-set(SWARMKV_DEPEND_DYN_LIB pthread uuid)
+set(SWARMKV_DEPEND_DYN_LIB pthread uuid m)
include_directories(${PROJECT_SOURCE_DIR}/inc/)
include_directories(/opt/MESA/include/)
diff --git a/docs/design.md b/docs/design.md
index e191425..4f85273 100644
--- a/docs/design.md
+++ b/docs/design.md
@@ -211,7 +211,7 @@ Reply message
A swarmkv instance has one key space thread and several (configurable) worker threads. Swarmkv uses [libevent](https://github.com/libevent/libevent) as a peer-to-peer communication infrastructure.
-The keyspace thread communicates with Consul.
+The keyspace thread communicates with Consul. Some of the operations are blocking style, so swarmkv uses a seperated keyspace thread.
- Watch global slot table changes.
- Watch leadership changes, run for leader if allowed.
diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt
index 865dcd1..417f4cf 100644
--- a/examples/CMakeLists.txt
+++ b/examples/CMakeLists.txt
@@ -1,5 +1,5 @@
include_directories(${PROJECT_SOURCE_DIR}include/)
add_executable(simple_example simple_example.c)
add_executable(async_example async_example.c)
-target_link_libraries(simple_example swarmkv-static pthread uuid)
-target_link_libraries(async_example swarmkv-static pthread uuid) \ No newline at end of file
+target_link_libraries(simple_example swarmkv-static ${SWARMKV_DEPEND_DYN_LIB})
+target_link_libraries(async_example swarmkv-static ${SWARMKV_DEPEND_DYN_LIB}) \ No newline at end of file
diff --git a/src/future_promise.c b/src/future_promise.c
index 0581ae3..bf27351 100644
--- a/src/future_promise.c
+++ b/src/future_promise.c
@@ -115,7 +115,7 @@ void promise_failed(struct promise * p, enum e_future_error error, const char *
return;
}
-void promise_success(struct promise * p, const void * result)
+void promise_success(struct promise * p, void * result)
{
if(!p->f.is_canceled)
{
diff --git a/src/inc_internal/future_promise.h b/src/inc_internal/future_promise.h
index 0b70c37..a2c0d0d 100644
--- a/src/inc_internal/future_promise.h
+++ b/src/inc_internal/future_promise.h
@@ -13,7 +13,7 @@ enum e_future_error
struct promise;
struct future;
-typedef void (future_success_cb)(const void *result, void * user);
+typedef void (future_success_cb)(void *result, void * user);
typedef void (future_failed_cb)(enum e_future_error err, const char * what, void * user);
typedef void (promise_ctx_destroy_cb)(void* ctx);
@@ -31,7 +31,7 @@ void future_destroy(struct future * f);
*/
struct promise * future_to_promise(struct future * f);
void promise_failed(struct promise * p, enum e_future_error error, const char * what);
-void promise_success(struct promise * p, const void * result);
+void promise_success(struct promise * p, void * result);
void promise_finish(struct promise * p);
void promise_allow_many_successes(struct promise *p);
diff --git a/src/inc_internal/swarmkv_cmd_spec.h b/src/inc_internal/swarmkv_cmd_spec.h
index c8ae08a..6a823a8 100644
--- a/src/inc_internal/swarmkv_cmd_spec.h
+++ b/src/inc_internal/swarmkv_cmd_spec.h
@@ -34,11 +34,10 @@ typedef int swarmkv_module_gettid_func_t(struct swarmkv_module *mod, sds key);
struct swarmkv_module
{
char name[SWARMKV_SYMBOL_MAX];
- swarmkv_module_gettid_func_t *gettid;
void *mod_ctx;
};
-enum cmd_keyroute_failover
+enum key_not_found_reply
{
REPLY_NA, /*Not Applicable*/
REPLY_INT_0,
@@ -51,9 +50,9 @@ enum cmd_keyroute_failover
typedef enum cmd_exec_result command_proc_func(struct swarmkv_module *module, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply);
-#define KEY_OFFSET_NONE -1
-#define KEY_OFFSET_ANY -2
-
+#define KEY_OFFSET_NONE -1
+#define KEY_OFFSET_TID -2
+#define KEY_OFFSET_SLOTID -3
struct swarmkv_cmd_spec
{
const char *name;
@@ -61,7 +60,7 @@ struct swarmkv_cmd_spec
int arity; /* Number of arguments, it is possible to use -N to say >= N */
int key_offset; /* The argument that's a key (-1 = no keys) */
enum cmd_key_flag flag; /* Command flag, see CMD_*. */
- enum cmd_keyroute_failover failover;
+ enum key_not_found_reply nokey_reply;
int auto_route; /* The node that executes the command must be explicitly specifeid. */
command_proc_func *proc;
diff --git a/src/inc_internal/swarmkv_common.h b/src/inc_internal/swarmkv_common.h
index 5465690..3e9e11a 100644
--- a/src/inc_internal/swarmkv_common.h
+++ b/src/inc_internal/swarmkv_common.h
@@ -124,4 +124,6 @@ void node_to_sockaddr(const node_t *node, struct sockaddr *sockaddr);
int http_blocking_request(enum evhttp_cmd_type type, const char* host, unsigned int port, const char* url, sds req_body, sds *resp_body);
typedef void exec_cmd_func(struct swarmkv *db, const node_t *accessing_node, const node_t *target_node, const struct swarmkv_cmd *cmd, struct future *future_of_caller);
-struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *target, int argc, sds *argv); \ No newline at end of file
+struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *target, int argc, sds *argv);
+
+int __gettid(struct swarmkv *db); \ No newline at end of file
diff --git a/src/inc_internal/swarmkv_keyspace.h b/src/inc_internal/swarmkv_keyspace.h
index ef0c6b2..3d644fe 100644
--- a/src/inc_internal/swarmkv_keyspace.h
+++ b/src/inc_internal/swarmkv_keyspace.h
@@ -10,6 +10,7 @@
struct swarmkv_module *swarmkv_keyspace_new(struct swarmkv_options *opts, const char *db_name, struct log_handle *logger, char **err);
void swarmkv_keyspace_free(struct swarmkv_module* mod_keyspace);
+void swarmkv_keyspace_periodic(struct swarmkv_module *mod_keyspace, int thread_id);
struct keyspace_info
{
unsigned int health_check_port;
diff --git a/src/inc_internal/swarmkv_mesh.h b/src/inc_internal/swarmkv_mesh.h
index 975bbff..4a6f04c 100644
--- a/src/inc_internal/swarmkv_mesh.h
+++ b/src/inc_internal/swarmkv_mesh.h
@@ -4,4 +4,5 @@
struct swarmkv_mesh;
int swarmkv_mesh_send(struct swarmkv_mesh *mesh, int current_thread_id, int dest_thread_id, struct swarmkv_msg *msg);
void swarmkv_mesh_set_on_msg_recv_cb(struct swarmkv_mesh *mesh, on_msg_callback_t *cb_func, void *cb_arg);
-struct swarmkv_mesh *swarmkv_mesh_new(struct event_base *evbase[], int nthreads, struct log_handle *logger); \ No newline at end of file
+struct swarmkv_mesh *swarmkv_mesh_new(struct event_base *evbase[], int nthreads, struct log_handle *logger);
+void swarmkv_mesh_free(struct swarmkv_mesh *mesh); \ No newline at end of file
diff --git a/src/inc_internal/swarmkv_rpc.h b/src/inc_internal/swarmkv_rpc.h
index 3ee47b6..de3ca13 100644
--- a/src/inc_internal/swarmkv_rpc.h
+++ b/src/inc_internal/swarmkv_rpc.h
@@ -4,8 +4,8 @@
#include "future_promise.h"
struct swarmkv_rpc_mgr;
-struct swarmkv_rpc_mgr *swarmkv_rpc_mgr_new();
+struct swarmkv_rpc_mgr *swarmkv_rpc_mgr_new(const struct swarmkv_options *opts, struct event_base *evbases[], int nr_worker_threads);
void swarmkv_rpc_mgr_free(struct swarmkv_rpc_mgr *mgr);
//Return a sequence number, which can be used to complete the request
long long swarmkv_rpc_launch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct future *f);
-void swarmkv_rpc_complete(struct swarmkv_rpc_mgr *mgr, int thread_id, long long sequence, const void *response);
+void swarmkv_rpc_complete(struct swarmkv_rpc_mgr *mgr, int thread_id, long long sequence, void *response);
diff --git a/src/inc_internal/swarmkv_store.h b/src/inc_internal/swarmkv_store.h
index 8338623..241168d 100644
--- a/src/inc_internal/swarmkv_store.h
+++ b/src/inc_internal/swarmkv_store.h
@@ -46,9 +46,8 @@ struct sobj
struct swarmkv_module *swarmkv_store_new(const struct swarmkv_options *opts, exec_cmd_func *send_cmd, void *handle_send_cmd);
void swarmkv_store_free(struct swarmkv_module* mod_store);
-void swarmkv_store_periodic(struct swarmkv_module *module);
+void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id);
void swarmkv_store_set_monitor_handle(struct swarmkv_module *mod_store, struct swarmkv_module *mod_monitor);
-void swarmkv_store_add_key(struct swarmkv_module *mod_store, const sds key, node_t *replica_nodes, size_t n_replica_node);
struct store_info
{
long long keys;
@@ -62,13 +61,14 @@ void swarmkv_store_info(struct swarmkv_module *module, struct store_info *info);
void sobj_need_sync(struct swarmkv_module *mod_store, struct sobj *obj);
int sobj_get_random_replica(struct sobj *obj, node_t *out);
+enum cmd_exec_result handle_undefined_object(struct sobj *obj, struct swarmkv_reply **reply);
void store_get_uuid(struct swarmkv_module* mod_store, uuid_t uuid);
struct sobj *store_lookup(struct swarmkv_module* mod_store, sds key);
-enum cmd_exec_result crdt_pull_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply);
-enum cmd_exec_result crdt_push_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply);
+enum cmd_exec_result crdt_get_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply);
+enum cmd_exec_result crdt_merge_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply);
enum cmd_exec_result crdt_del_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply);
enum cmd_exec_result crdt_join_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply);
enum cmd_exec_result crdt_keys_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply);
diff --git a/src/swarmkv.c b/src/swarmkv.c
index e8001e9..f552a29 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -69,7 +69,7 @@ struct swarmkv
struct event_base **evbases;
node_t self;
- struct swarmkv_rpc_mgr *async;
+ struct swarmkv_rpc_mgr *rpc_mgr;
struct swarmkv_mesh *mesh;
struct swarmkv_net *net;
@@ -119,7 +119,7 @@ struct local_caller_ctx
void * cb_arg;
struct future *my_future;
};
-static void local_caller_on_success(const void *result, void *user)
+static void local_caller_on_success(void *result, void *user)
{
struct local_caller_ctx *ctx=(struct local_caller_ctx*)user;
const struct swarmkv_reply *reply=(const struct swarmkv_reply *)result;
@@ -165,7 +165,7 @@ void remote_calller_ctx_free(struct remote_caller_ctx *ctx)
free(ctx);
return;
}
-static void remote_caller_on_success(const void *result, void *user)
+static void remote_caller_on_success(void *result, void *user)
{
struct remote_caller_ctx *ctx=(struct remote_caller_ctx*)user;
const struct swarmkv_reply *reply=(const struct swarmkv_reply*) result;
@@ -560,10 +560,10 @@ void swarmkv_threads_run(struct swarmkv *db)
return;
}
-static struct swarmkv_reply *keyroute_fail_reply(enum cmd_keyroute_failover failover_flag)
+static struct swarmkv_reply *key_not_found_reply(enum key_not_found_reply not_found_flag)
{
struct swarmkv_reply *reply=NULL;
- switch(failover_flag)
+ switch(not_found_flag)
{
case REPLY_INT_0:
reply=swarmkv_reply_new_integer(0);
@@ -663,37 +663,42 @@ int is_sufficient_arg_num(const struct swarmkv_cmd_spec *spec, const struct swar
struct cmd_ctx
{
struct swarmkv *db;
- struct swarmkv_cmd_spec *spec;
struct swarmkv_cmd *cmd;
int redirect_cnt;
- int thread_id;
- long long sequence;
- struct future *future_of_caller;
struct future *future_of_mine;
+ struct future *future_of_caller;
};
-struct cmd_ctx *cmd_ctx_new(struct swarmkv *db, const struct swarmkv_cmd *cmd, struct swarmkv_cmd_spec *spec, long long sequence)
+struct cmd_ctx *cmd_ctx_new(struct swarmkv *db, const struct swarmkv_cmd *cmd, struct future *f)
{
struct cmd_ctx *ctx=ALLOC(struct cmd_ctx, 1);
-
ctx->db=db;
- ctx->spec=spec;
ctx->cmd=swarmkv_cmd_dup(cmd);
ctx->redirect_cnt=0;
- ctx->sequence=sequence;
+ ctx->future_of_caller=f;
return ctx;
}
void cmd_ctx_free(struct cmd_ctx *ctx)
{
swarmkv_cmd_free(ctx->cmd);
- if(ctx->future_of_mine) future_destroy(ctx->future_of_mine);
+ future_destroy(ctx->future_of_mine);
free(ctx);
return;
}
-static void peer_exec_on_success(const void *result, void *user)
+static void generic_on_fail(enum e_future_error err, const char * what, void * user)
+{
+ struct cmd_ctx *ctx=(struct cmd_ctx*)user;
+ if(ctx->future_of_caller)
+ {
+ struct promise *p=future_to_promise(ctx->future_of_caller);
+ promise_failed(p, FUTURE_ERROR_EXCEPTION, what);
+ }
+ cmd_ctx_free(ctx);
+}
+static void peer_exec_on_success(void *result, void *user)
{
struct cmd_ctx *ctx = (struct cmd_ctx*)user;
- const struct swarmkv_reply *reply = (const struct swarmkv_reply*) result;
+ struct swarmkv_reply *reply = (struct swarmkv_reply*) result;
if(reply->type==SWARMKV_REPLY_NODE && 0==strncasecmp(reply->str, "-ASK", 4))
{
@@ -706,9 +711,9 @@ static void peer_exec_on_success(const void *result, void *user)
}
else
{
- struct promise *p=future_to_promise(ctx->future_of_caller);
char err_msg[256];
snprintf(err_msg, sizeof(err_msg), error_too_many_redirects, reply->str);
+ struct promise *p=future_to_promise(ctx->future_of_caller);
promise_failed(p, FUTURE_ERROR_EXCEPTION, err_msg);
cmd_ctx_free(ctx);
}
@@ -716,68 +721,94 @@ static void peer_exec_on_success(const void *result, void *user)
else
{
struct promise *p=future_to_promise(ctx->future_of_caller);
- promise_success(p, reply);
+ promise_success(p, (void*) reply);
cmd_ctx_free(ctx);
}
}
-
-static void key_route_on_success(const void *result, void *user)
+struct swarmkv_cmd *make_crdt_add_cmd(enum cmd_key_flag flag, const sds key, node_t replica[], size_t n_replica)
+{
+ struct swarmkv_cmd *crdt_add_cmd=NULL;
+ assert(flag==CMD_KEY_OW);
+ crdt_add_cmd=swarmkv_cmd_new(3+n_replica);
+ crdt_add_cmd->argv[0]=sdsnew("crdt");
+ crdt_add_cmd->argv[1]=sdsnew("add");
+ crdt_add_cmd->argv[2]=sdsdup(key);
+ for(size_t i=0; i<n_replica; i++)
+ {
+ crdt_add_cmd->argv[3+i]=node_addr2sds(replica+i);
+ }
+ return crdt_add_cmd;
+}
+static void crdt_add_on_success(void *result, void *user)
+{
+ struct cmd_ctx *ctx = (struct cmd_ctx*)user;
+ if(ctx->future_of_caller)
+ {
+ __exec_cmd(ctx->db, NULL, NULL, ctx->cmd, ctx->future_of_caller);
+ }
+ cmd_ctx_free(ctx);
+}
+static void key_route_on_success(void *result, void *user)
{
struct cmd_ctx *ctx = (struct cmd_ctx*)user;
const struct swarmkv_reply *reply = (const struct swarmkv_reply*) result;
struct swarmkv_reply *user_reply_for_keyspace_not_found=NULL;
size_t n_replica_node=0;
node_t *replica_nodes=NULL;
+ struct swarmkv_cmd_spec *spec=get_spec_by_argv(ctx->db, ctx->cmd->argc, ctx->cmd->argv);
node_list_new_from_reply(&replica_nodes, &n_replica_node, reply);
if(n_replica_node==0)
{
- user_reply_for_keyspace_not_found=keyroute_fail_reply(ctx->spec->failover);
- swarmkv_rpc_complete(ctx->db->async, ctx->thread_id, ctx->sequence, user_reply_for_keyspace_not_found);
+ user_reply_for_keyspace_not_found=key_not_found_reply(spec->nokey_reply);
+ struct promise *p=future_to_promise(ctx->future_of_caller);
+ promise_success(p, user_reply_for_keyspace_not_found);
swarmkv_reply_free(user_reply_for_keyspace_not_found);
user_reply_for_keyspace_not_found=NULL;
}
else
- {
- const sds key=ctx->cmd->argv[ctx->spec->key_offset];
+ {
+ const sds key=ctx->cmd->argv[spec->key_offset];
int self_is_a_replica=node_list_exists(replica_nodes, n_replica_node, &ctx->db->self);
if(self_is_a_replica)
{
n_replica_node-=node_list_remove(replica_nodes, n_replica_node, &ctx->db->self);
- swarmkv_store_add_key(ctx->db->mod_store, key, replica_nodes, n_replica_node);
+// swarmkv_store_add_key(ctx->db->mod_store, key, replica_nodes, n_replica_node);
}
- if(self_is_a_replica && n_replica_node==0)
+ if(n_replica_node>0)
{
- __exec_cmd(ctx->db, NULL, NULL, ctx->cmd, ctx->future_of_caller);
+ __exec_cmd(ctx->db, NULL, replica_nodes+0, ctx->cmd, ctx->future_of_caller);
}
- else
+ if(self_is_a_replica)
{
- __exec_cmd(ctx->db, NULL, replica_nodes+0, ctx->cmd, ctx->future_of_caller);
-
+ struct cmd_ctx *crdt_add_ctx=NULL;
+ struct swarmkv_cmd *crdt_add_cmd=make_crdt_add_cmd(spec->flag, key, replica_nodes, n_replica_node);
+ crdt_add_ctx=cmd_ctx_new(ctx->db, ctx->cmd, n_replica_node>0?NULL:ctx->future_of_caller);
+ crdt_add_ctx->future_of_mine=future_create("crdt_add", crdt_add_on_success, generic_on_fail, crdt_add_ctx);
+ __exec_cmd(ctx->db, NULL, NULL, crdt_add_cmd, crdt_add_ctx->future_of_mine);
+
}
- free(replica_nodes);
+ free(replica_nodes);
}
cmd_ctx_free(ctx);
}
-static void generic_on_fail(enum e_future_error err, const char * what, void * user)
-{
- struct cmd_ctx *ctx=(struct cmd_ctx*)user;
- struct promise *p=future_to_promise(ctx->future_of_caller);
- promise_failed(p, err, what);
- ctx->future_of_caller=NULL;
- cmd_ctx_free(ctx);
-}
-static int spec_gettid(struct swarmkv_cmd_spec *spec, const struct swarmkv_cmd *cmd)
+static int spec_gettid(struct swarmkv_cmd_spec *spec, const struct swarmkv_cmd *cmd, int nr_worker_threads)
{
- if(spec->key_offset<0)
- {
- return -1;
- }
- else
+ int tid=0;
+ switch(spec->key_offset)
{
- return spec->module->gettid(spec->module, cmd->argv[spec->key_offset]);
+ case KEY_OFFSET_TID:
+ tid=atoi(cmd->argv[2]);
+ break;
+ case KEY_OFFSET_SLOTID:
+ tid=atoi(cmd->argv[2])%nr_worker_threads;
+ break;
+ default:
+ tid=key_hash_slot(cmd->argv[spec->key_offset], sdslen(cmd->argv[spec->key_offset]))%nr_worker_threads;
+ break;
}
+ return tid;
}
void on_msg_callback(struct swarmkv_msg *msg, void *arg)
{
@@ -795,7 +826,7 @@ void on_msg_callback(struct swarmkv_msg *msg, void *arg)
}
else
{
- swarmkv_rpc_complete(db->async, thread_id, msg->sequence, msg->reply);
+ swarmkv_rpc_complete(db->rpc_mgr, thread_id, msg->sequence, msg->reply);
}
}
}
@@ -832,12 +863,13 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t *
swarmkv_reply_free(reply);
return;
}
- int target_thread_id=spec_gettid(spec, cmd);
- long long sequence=swarmkv_rpc_launch(db->async, current_thread_id, future_of_caller);
+ int target_thread_id=spec_gettid(spec, cmd, db->opts->nr_worker_threads);
+
struct swarmkv_msg *msg=NULL;
if(target_node && node_compare(&db->self, target_node))
{
//cmd is executed in target node's on_msg_callback
+ long long sequence=swarmkv_rpc_launch(db->rpc_mgr, current_thread_id, future_of_caller);
msg=swarmkv_msg_new_by_cmd(cmd, target_node, current_thread_id, sequence);
swarmkv_net_send(db->net, target_node, msg);
return;
@@ -845,6 +877,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t *
if(current_thread_id != target_thread_id)
{
//cmd is executed in target thread's on_msg_callback
+ long long sequence=swarmkv_rpc_launch(db->rpc_mgr, current_thread_id, future_of_caller);
msg=swarmkv_msg_new_by_cmd(cmd, &db->self, current_thread_id, sequence);
swarmkv_mesh_send(db->mesh, current_thread_id, target_thread_id, msg);
return;
@@ -861,20 +894,22 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t *
if(accessing_node)//Remote call, non-recursive exec
{
- swarmkv_rpc_complete(db->async, current_thread_id, sequence, reply);
+ struct promise *p=future_to_promise(future_of_caller);
+ promise_success(p, reply);
swarmkv_reply_free(reply);
}
switch(exec_ret)
{
case FINISHED:
{
- swarmkv_rpc_complete(db->async, current_thread_id, sequence, reply);
+ struct promise *p=future_to_promise(future_of_caller);
+ promise_success(p, reply);
swarmkv_reply_free(reply);
break;
}
case REDIRECT:
{
- struct cmd_ctx *ctx=cmd_ctx_new(db, cmd, spec, sequence);
+ struct cmd_ctx *ctx=cmd_ctx_new(db, cmd, future_of_caller);
node_init_from_reply(&peer, reply);
ctx->future_of_mine=future_create("peer_exec", peer_exec_on_success, generic_on_fail, ctx);
__exec_cmd(db, NULL, &peer, cmd, ctx->future_of_mine);
@@ -883,7 +918,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t *
case NEED_KEY_ROUTE:
{
struct swarmkv_cmd *keyspace_cmd=make_keyroute_cmd(spec->flag, cmd->argv[spec->key_offset], db->opts->dryrun? NULL:(&db->self));
- struct cmd_ctx *ctx=cmd_ctx_new(db, cmd, spec, sequence);
+ struct cmd_ctx *ctx=cmd_ctx_new(db, cmd, future_of_caller);
ctx->future_of_mine=future_create("key_route", key_route_on_success, generic_on_fail, ctx);
__exec_cmd(db, NULL, NULL, keyspace_cmd, ctx->future_of_mine);
swarmkv_cmd_free(keyspace_cmd);
@@ -900,7 +935,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *accessing_node, const node_t *
return;
}
void command_register(struct swarmkv_cmd_spec **table, const char *name, const char *hint,
- int arity, int key_offset, enum cmd_key_flag flag, enum cmd_keyroute_failover failover, int auto_route,
+ int arity, int key_offset, enum cmd_key_flag flag, enum key_not_found_reply failover, int auto_route,
command_proc_func *proc, struct swarmkv_module *module)
{
struct swarmkv_cmd_spec *spec=NULL;
@@ -911,7 +946,7 @@ void command_register(struct swarmkv_cmd_spec **table, const char *name, const c
spec->arity=arity;
spec->key_offset=key_offset;
spec->flag=flag;
- spec->failover=failover;
+ spec->nokey_reply=failover;
spec->proc=proc;
spec->auto_route=auto_route;
@@ -953,7 +988,7 @@ void command_spec_init(struct swarmkv *db)
1, 1, CMD_KEY_RW, REPLY_INT_0, AUTO_ROUTE,
persist_command, db->mod_keyspace);
command_register(&(db->command_table), "TYPE", "key",
- 1, 1, CMD_KEY_RW, REPLY_STR_NONE, AUTO_ROUTE,
+ 1, 1, CMD_KEY_RO, REPLY_STR_NONE, AUTO_ROUTE,
type_command, db->mod_store);
command_register(&(db->command_table), "KEYSLOT", "key",
1, 1, CMD_KEY_RO, REPLY_ERROR, AUTO_ROUTE,
@@ -1052,24 +1087,21 @@ void command_spec_init(struct swarmkv *db)
1, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE,
latency_command, db->mod_monitor);
-
-
-
/* low-level state-based CRDT synchronization commands*/
- command_register(&(db->command_table), "CRDT PULL", "key",
+ command_register(&(db->command_table), "CRDT GET", "key",
1, 2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE,
- crdt_pull_command, db->mod_store);
- command_register(&(db->command_table), "CRDT PUSH", "key blob [key blob ...]",
+ crdt_get_command, db->mod_store);
+ command_register(&(db->command_table), "CRDT MERGE", "key blob [key blob ...]",
2, 2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE,
- crdt_push_command, db->mod_store);
+ crdt_merge_command, db->mod_store);
command_register(&(db->command_table), "CRDT JOIN", "key IP:port",
2, 2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE,
crdt_join_command, db->mod_store);
command_register(&(db->command_table), "CRDT DEL", "key",
1, 2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE,
crdt_del_command, db->mod_store);
- command_register(&(db->command_table), "CRDT KEYS", "pattern",
- 1, -2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE,
+ command_register(&(db->command_table), "CRDT KEYS", "tid pattern",
+ 1, KEY_OFFSET_TID, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE,
crdt_keys_command, db->mod_store);
command_register(&(db->command_table), "CRDT EXISTS", "key",
1, 2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE,
@@ -1078,7 +1110,7 @@ void command_spec_init(struct swarmkv *db)
1, 2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE,
crdt_rlist_command, db->mod_store);
command_register(&(db->command_table), "CRDT INFO", "key",
- 1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, NOT_AUTO_ROUTE,
+ 1, 1, CMD_KEY_NA, REPLY_EMPTY_ARRAY, NOT_AUTO_ROUTE,
crdt_info_command, db->mod_store);
/* low-level keyspace operation commands */
@@ -1092,28 +1124,28 @@ void command_spec_init(struct swarmkv *db)
1, 2, CMD_KEY_OW, REPLY_NA, AUTO_ROUTE,
keyspace_xradd_command, db->mod_keyspace);
command_register(&(db->command_table), "KEYSPACE KEYS", "tid pattern",//worker-thread-id
- 1, KEY_OFFSET_NONE, CMD_KEY_RO, REPLY_NA, NOT_AUTO_ROUTE,
+ 1, KEY_OFFSET_TID, CMD_KEY_RO, REPLY_NA, NOT_AUTO_ROUTE,
keyspace_keys_command, db->mod_keyspace);
command_register(&(db->command_table), "KEYSPACE RDEL", "key IP:port",
- 1, 2, CMD_KEY_RO, REPLY_NA, AUTO_ROUTE,
+ 1, 2, CMD_KEY_RW, REPLY_NA, AUTO_ROUTE,
keyspace_rdel_command, db->mod_keyspace);
/* low-level keyspace reorgnization commands */
- command_register(&(db->command_table), "KEYSPACE SETSLOT", "<slot> IMPORTING|MIGRATING|NODE|STABLE IP:port",
- 2, 2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE,
+ command_register(&(db->command_table), "KEYSPACE SETSLOT", "slot IMPORTING|MIGRATING|NODE|STABLE IP:port",
+ 2, KEY_OFFSET_SLOTID, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE,
keyspace_setslot_command, db->mod_keyspace);
command_register(&(db->command_table), "KEYSPACE GETKEYSINSLOT", "slot",
- 1, 2, CMD_KEY_RO, REPLY_NA, NOT_AUTO_ROUTE,
+ 1, KEY_OFFSET_SLOTID, CMD_KEY_RO, REPLY_NA, NOT_AUTO_ROUTE,
keyspace_getkeysinslot_command, db->mod_keyspace);
command_register(&(db->command_table), "KEYSPACE ADDKEYSTOSLOT", "slot blob",
- 2, 2, CMD_KEY_OW, REPLY_NA, NOT_AUTO_ROUTE,
+ 2, KEY_OFFSET_SLOTID, CMD_KEY_OW, REPLY_NA, NOT_AUTO_ROUTE,
keyspace_addkeystoslot_command, db->mod_keyspace);
command_register(&(db->command_table), "KEYSPACE DELSLOTKEYS", "slot",
- 1, 2, CMD_KEY_RM, REPLY_NA, NOT_AUTO_ROUTE,
+ 1, KEY_OFFSET_SLOTID, CMD_KEY_RM, REPLY_NA, NOT_AUTO_ROUTE,
keyspace_delslotkeys_command, db->mod_keyspace);
command_register(&(db->command_table), "KEYSPACE COUNTKEYSINSLOT", "slot",
- 1, 2, CMD_KEY_RO, REPLY_NA, NOT_AUTO_ROUTE,
+ 1, KEY_OFFSET_SLOTID, CMD_KEY_RO, REPLY_NA, NOT_AUTO_ROUTE,
keyspace_countkeysinslot_command, db->mod_keyspace);
/* cluster commands are defined in swarmkv-cli.c */
@@ -1221,9 +1253,11 @@ void swarmkv_dispatch(struct swarmkv *db)
event_base_loop(db->evbases[tid], EVLOOP_ONCE|EVLOOP_NONBLOCK);
return;
}
-void wrap_store_periodic(evutil_socket_t fd, short what, void * arg)
+void __swarmkv_periodic(evutil_socket_t fd, short what, void * arg)
{
- swarmkv_store_periodic((struct swarmkv_module *)arg);
+ struct swarmkv_thread_ctx *thread=(struct swarmkv_thread_ctx *)arg;
+ swarmkv_store_periodic(thread->db->mod_store, thread->thread_id);
+ swarmkv_keyspace_periodic(thread->db->mod_keyspace, thread->thread_id);
}
struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name, char **err)
{
@@ -1266,7 +1300,7 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name,
{
db->evbases[i]=event_base_new();
}
-
+ db->rpc_mgr=swarmkv_rpc_mgr_new(db->opts, db->evbases, db->opts->nr_worker_threads);
db->mesh=swarmkv_mesh_new(db->evbases, db->opts->nr_worker_threads + db->opts->nr_caller_threads, db->logger);
db->mod_monitor=swarmkv_monitor_new(db->opts);
@@ -1297,7 +1331,7 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name,
struct timeval sync_interval = {db->opts->sync_interval_us/(1000*1000), db->opts->sync_interval_us%(1000*1000)};
for(int i=0; i<db->opts->nr_worker_threads; i++)
{
- db->worker_threads[i].store_periodic_ev=event_new(db->evbases[i], -1, EV_PERSIST, wrap_store_periodic, db->mod_store);
+ db->worker_threads[i].store_periodic_ev=event_new(db->evbases[i], -1, EV_PERSIST, __swarmkv_periodic, db->worker_threads+i);
evtimer_add(db->worker_threads[i].store_periodic_ev, &sync_interval);
}
@@ -1352,7 +1386,8 @@ void swarmkv_close(struct swarmkv * db)
db->mod_keyspace=NULL;
swarmkv_monitor_free(db->mod_monitor);
db->mod_monitor=NULL;
-
+ swarmkv_mesh_free(db->mesh);
+ swarmkv_rpc_mgr_free(db->rpc_mgr);
swarmkv_net_free(db->net);
db->net=NULL;
diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c
index c8eedfd..e00a973 100644
--- a/src/swarmkv_keyspace.c
+++ b/src/swarmkv_keyspace.c
@@ -142,7 +142,7 @@ void crdt_del_ctx_free(struct crdt_del_ctx *ctx)
free(ctx);
}
-static void crdt_del_on_succ(void* result, void* user)
+static void crdt_del_on_succ(void *result, void *user)
{
struct crdt_del_ctx *ctx = (struct crdt_del_ctx*)user;
@@ -176,7 +176,7 @@ void key_entry_deletion_notification(struct key_route_entry *key_entry, exec_cmd
node_copy(&ctx->peer, &replica->node);
ctx->key=sdsdup(key_entry->key);
ctx->f=future_create("key_del", crdt_del_on_succ, crdt_del_on_fail, ctx);
- exec_cmd(exec_cmd_handle, crdt_del_cmd, NULL, &replica->node, ctx->f);
+ exec_cmd(exec_cmd_handle, NULL, &replica->node, crdt_del_cmd, ctx->f);
}
swarmkv_cmd_free(crdt_del_cmd);
return;
@@ -192,7 +192,7 @@ struct slot_runtime
struct timeouts *expires;
int I_am_owner;
- pthread_mutex_t mutex;
+ pthread_mutex_t sanity_lock;
};
void http_connection_close_callback(struct evhttp_connection *conn, void *arg)
{
@@ -210,7 +210,7 @@ struct swarmkv_keyspace
int readable_tid;
node_t self;
uuid_t uuid;
- pthread_rwlock_t rwlock;
+ pthread_rwlock_t rwlock;
struct slot_runtime slot_rts[KEYSPACE_SLOT_NUM];
char consul_agent_host[MAX_IPV4_ADDR_LEN];
@@ -491,32 +491,11 @@ int consul_service_register(struct swarmkv_keyspace* ks)
cJSON_Delete(service);
return ret;
}
-static void __ks_dummy_event_handler(evutil_socket_t fd, short what, void * arg)
-{
- struct swarmkv_keyspace *ks=(struct swarmkv_keyspace *)arg;
- keyspace_active_expire_cycle(ks);
- return;
-}
-
-void *swarmkv_keyspace_thread(void *arg)
-{
- struct swarmkv_keyspace *ks=(struct swarmkv_keyspace *)arg;
- char thread_name[16];
- snprintf(thread_name, sizeof(thread_name), "swarmkv-ks");
- prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL);
- struct event * ev = event_new(ks->evbase, -1, EV_PERSIST, __ks_dummy_event_handler, ks);
- struct timeval timer_delay = {2, 0};
- evtimer_add(ev, &timer_delay);
- event_base_dispatch(ks->evbase);
- event_del(ev);
- event_free(ev);
- return NULL;
-}
void consul_session_create_on_success(void *result, void *arg)
{
struct evhttp_request *req=(struct evhttp_request *)result;
- struct swarmkv_keyspace* ks = (struct swarmkv_keyspace *)arg;
+ struct swarmkv_keyspace *ks = (struct swarmkv_keyspace *)arg;
int resp_code=0;
cJSON* session_id=NULL, *session_create_response=NULL;
@@ -662,7 +641,7 @@ void consul_session_check_async(struct swarmkv_keyspace* ks)
return;
}
-void acquire_session_lock_on_success(void* result, void *arg)
+void acquire_session_lock_on_success(void *result, void *arg)
{
struct evhttp_request *req=(struct evhttp_request *)result;
struct swarmkv_keyspace* ks=(struct swarmkv_keyspace*)arg;
@@ -702,7 +681,7 @@ void acquire_session_lock_on_fail(enum e_future_error err, const char * what, vo
consul_watch_leadership_changes_async(ks);
return;
}
-void consul_acquire_session_lock_async(struct swarmkv_keyspace* ks)
+void consul_acquire_session_lock_async(struct swarmkv_keyspace *ks)
{
sds req_body=node_print_json(&ks->self, ks->uuid);
@@ -727,7 +706,7 @@ void consul_acquire_session_lock_async(struct swarmkv_keyspace* ks)
sdsfree(req_body);
}
-void consul_run_for_leader_async(struct swarmkv_keyspace* ks)
+void consul_run_for_leader_async(struct swarmkv_keyspace *ks)
{
if(0==strlen(ks->consul_session_id))
{
@@ -853,7 +832,7 @@ void watch_slots_changes_on_success(void *result, void *arg)
slot_rt=ks->slot_rts+i;
slot=&(slot_rt->slot);
owner=&(slot->owner);
- pthread_mutex_lock(&slot_rt->mutex);
+ pthread_mutex_lock(&slot_rt->sanity_lock);
if(0!=node_compare(owner, &(new_slots[i].owner)))
{
node_copy(owner, &(new_slots[i].owner));
@@ -873,7 +852,7 @@ void watch_slots_changes_on_success(void *result, void *arg)
{
slot_rt->I_am_owner=0;
}
- pthread_mutex_unlock(&slot_rt->mutex);
+ pthread_mutex_unlock(&slot_rt->sanity_lock);
}
log_info(ks->logger, MODULE_SWARMKV_KEYSPACE, "key slots update finished.");
cJSON_Delete(metadata_array);
@@ -1051,7 +1030,7 @@ void remove_failed_nodes_from_key_route_table(struct swarmkv_keyspace *ks, node_
{
slot_rt=ks->slot_rts+slot_id;
if(!slot_rt->I_am_owner) continue;
- pthread_mutex_lock(&slot_rt->mutex);
+ pthread_mutex_lock(&slot_rt->sanity_lock);
HASH_ITER(hh, slot_rt->keyroute_table, key_entry, tmp_entry)
{
int is_modified=0;
@@ -1067,7 +1046,7 @@ void remove_failed_nodes_from_key_route_table(struct swarmkv_keyspace *ks, node_
}
if(is_modified) n_modified_key++;
}
- pthread_mutex_unlock(&slot_rt->mutex);
+ pthread_mutex_unlock(&slot_rt->sanity_lock);
}
HASH_ITER(hh, hash_health_node, node, tmp_node)
{
@@ -1141,7 +1120,7 @@ int keyslots_init(struct swarmkv_keyspace *ks)
slot->slot_id=i;
node_copy(&slot->owner, &ks->self);
slot_rt->expires=timeouts_open(0, &error);
- pthread_mutex_init(&slot_rt->mutex, NULL);
+ pthread_mutex_init(&slot_rt->sanity_lock, NULL);
}
char url[SWARMKV_URL_MAX]="";
@@ -1185,7 +1164,7 @@ void keyspace_lock(struct swarmkv_module *mod_keyspace, enum cmd_key_flag flag,
int slot_id=key_hash_slot(key, sdslen(key));
struct swarmkv_keyspace *ks = module2keyspace(mod_keyspace);
struct slot_runtime *slot_rt=ks->slot_rts+slot_id;
- pthread_mutex_lock(&slot_rt->mutex);
+ pthread_mutex_lock(&slot_rt->sanity_lock);
return;
}
void keyspace_unlock(struct swarmkv_module *mod_keyspace, enum cmd_key_flag flag, const sds key)
@@ -1194,17 +1173,24 @@ void keyspace_unlock(struct swarmkv_module *mod_keyspace, enum cmd_key_flag flag
int slot_id=key_hash_slot(key, sdslen(key));
struct swarmkv_keyspace *ks = module2keyspace(mod_keyspace);
struct slot_runtime *slot_rt=ks->slot_rts+slot_id;
- pthread_mutex_unlock(&slot_rt->mutex);
+ pthread_mutex_unlock(&slot_rt->sanity_lock);
return;
}
+void *swarmkv_keyspace_thread(void *arg)
+{
+ struct swarmkv_keyspace *ks=(struct swarmkv_keyspace *)arg;
+ char thread_name[16];
+ snprintf(thread_name, sizeof(thread_name), "swarmkv-ks");
+ prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL);
+ event_base_dispatch(ks->evbase);
+ return NULL;
+}
struct swarmkv_module *swarmkv_keyspace_new(struct swarmkv_options *opts, const char *db_name, struct log_handle *logger, char **err)
{
struct swarmkv_keyspace *ks=ALLOC(struct swarmkv_keyspace, 1);
strncpy(ks->module.name, "keyspace", sizeof(ks->module.name));
ks->module.mod_ctx=ks;
- ks->module.lock=keyspace_lock;
- ks->module.unlock=keyspace_unlock;
strncpy(ks->db_name, db_name, sizeof(ks->db_name));
node_init(&ks->self, opts->cluster_announce_ip, opts->cluster_announce_port);
uuid_copy(ks->uuid, opts->bin_uuid);
@@ -1291,7 +1277,7 @@ void swarmkv_keyspace_free(struct swarmkv_module* module)
HASH_DELETE(hh, slot_rt->keyroute_table, key_entry);
key_entry_free(key_entry);
}
- pthread_mutex_destroy(&slot_rt->mutex);
+ pthread_mutex_destroy(&slot_rt->sanity_lock);
}
consul_client_free(ks->consul_client);
ks->consul_client=NULL;
@@ -1314,10 +1300,10 @@ void swarmkv_keyspace_info(struct swarmkv_module *mod_keyspace, struct keyspace_
slot_rt=ks->slot_rts+i;
if(!slot_rt->I_am_owner) continue;
info->slots++;
- pthread_mutex_lock(&slot_rt->mutex);
+ pthread_mutex_lock(&slot_rt->sanity_lock);
info->keys+=HASH_COUNT(slot_rt->keyroute_table);
info->expires+=timeouts_count(slot_rt->expires);
- pthread_mutex_unlock(&slot_rt->mutex);
+ pthread_mutex_unlock(&slot_rt->sanity_lock);
}
}
enum cmd_exec_result keyslot_command(struct swarmkv_module *mod_keyspace, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply)
@@ -1388,13 +1374,13 @@ enum cmd_exec_result keyspace_setslot_command(struct swarmkv_module *mod_keyspac
}
slot_rt=&(ks->slot_rts[slot_id]);
- pthread_mutex_lock(&slot_rt->mutex);
+ pthread_mutex_lock(&slot_rt->sanity_lock);
slot_rt->state=state;
if(state!=STATE_STABLE)
{
node_copy(&slot_rt->rebalancing_peer, &peer_node);
}
- pthread_mutex_unlock(&slot_rt->mutex);
+ pthread_mutex_unlock(&slot_rt->sanity_lock);
*reply=swarmkv_reply_new_status("OK");
return FINISHED;
@@ -1547,9 +1533,9 @@ enum cmd_exec_result keyspace_getkeysinslot_command(struct swarmkv_module *mod_k
slot_rt=ks->slot_rts+slot_id;
char *blob=NULL;
size_t blob_sz;
- pthread_mutex_lock(&slot_rt->mutex);
+ pthread_mutex_lock(&slot_rt->sanity_lock);
key_slot_serialize(slot_rt, &blob, &blob_sz);
- pthread_mutex_unlock(&slot_rt->mutex);
+ pthread_mutex_unlock(&slot_rt->sanity_lock);
*reply=swarmkv_reply_new_string(blob, blob_sz);
free(blob);
blob=NULL;
@@ -1576,9 +1562,9 @@ enum cmd_exec_result keyspace_addkeystoslot_command(struct swarmkv_module *mod_k
slot_rt=ks->slot_rts+slot_id;
- pthread_mutex_lock(&slot_rt->mutex);
+ pthread_mutex_lock(&slot_rt->sanity_lock);
new_added_entry_num=key_slot_merge(slot_rt, blob, sdslen(blob));
- pthread_mutex_unlock(&slot_rt->mutex);
+ pthread_mutex_unlock(&slot_rt->sanity_lock);
*reply=swarmkv_reply_new_integer(new_added_entry_num);
return FINISHED;
@@ -1601,7 +1587,7 @@ enum cmd_exec_result keyspace_delslotkeys_command(struct swarmkv_module *mod_key
}
slot_rt=ks->slot_rts+slot_id;
- pthread_mutex_lock(&slot_rt->mutex);
+ pthread_mutex_lock(&slot_rt->sanity_lock);
HASH_ITER(hh, slot_rt->keyroute_table, key_entry, tmp)
{
HASH_DELETE(hh, slot_rt->keyroute_table, key_entry);
@@ -1612,7 +1598,7 @@ enum cmd_exec_result keyspace_delslotkeys_command(struct swarmkv_module *mod_key
key_entry_free(key_entry);
delete_num++;
}
- pthread_mutex_unlock(&slot_rt->mutex);
+ pthread_mutex_unlock(&slot_rt->sanity_lock);
*reply=swarmkv_reply_new_integer(delete_num);
return FINISHED;
}
@@ -1631,9 +1617,9 @@ enum cmd_exec_result keyspace_countkeysinslot_command(struct swarmkv_module *mod
return FINISHED;
}
slot_rt=ks->slot_rts+slot_id;
- pthread_mutex_lock(&slot_rt->mutex);
+ pthread_mutex_lock(&slot_rt->sanity_lock);
key_num=HASH_COUNT(slot_rt->keyroute_table);
- pthread_mutex_unlock(&slot_rt->mutex);
+ pthread_mutex_unlock(&slot_rt->sanity_lock);
*reply=swarmkv_reply_new_integer(key_num);
return FINISHED;
}
@@ -1754,10 +1740,10 @@ enum cmd_exec_result keyspace_keys_command(struct swarmkv_module *mod_keyspace,
for(i=0; i<KEYSPACE_SLOT_NUM; i++)
{
slot_rt=ks->slot_rts+i;
- pthread_mutex_lock(&slot_rt->mutex);
+ pthread_mutex_lock(&slot_rt->sanity_lock);
if(!slot_rt->I_am_owner && slot_rt->state!=STATE_IMPORTING)
{
- pthread_mutex_unlock(&slot_rt->mutex);
+ pthread_mutex_unlock(&slot_rt->sanity_lock);
continue;
}
@@ -1770,7 +1756,7 @@ enum cmd_exec_result keyspace_keys_command(struct swarmkv_module *mod_keyspace,
utarray_push_back(matched_replies, &r);
}
}
- pthread_mutex_unlock(&slot_rt->mutex);
+ pthread_mutex_unlock(&slot_rt->sanity_lock);
}
n_matched=utarray_len(matched_replies);
if(n_matched>0)
@@ -1986,8 +1972,12 @@ enum cmd_exec_result persist_command(struct swarmkv_module *mod_keyspace, const
}
#define ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP 1000
-void keyspace_active_expire_cycle(struct swarmkv_keyspace *ks)
-{
+void swarmkv_keyspace_periodic(struct swarmkv_module *mod_keyspace, int thread_id)
+{
+ struct swarmkv_keyspace *ks = module2keyspace(mod_keyspace);
+ int real_tid=__gettid(ks->exec_cmd_handle);
+ assert(real_tid==thread_id);
+
struct timespec now;
int i=0, j=0;
struct slot_runtime *slot_rt=NULL;
@@ -1998,9 +1988,17 @@ void keyspace_active_expire_cycle(struct swarmkv_keyspace *ks)
clock_gettime(CLOCK_MONOTONIC_COARSE, &start);
for(i=0; i<KEYSPACE_SLOT_NUM; i++)
{
+ if(i%ks->opts->nr_worker_threads!=thread_id) continue;
slot_rt=ks->slot_rts+i;
if(!slot_rt->I_am_owner) continue;
- pthread_mutex_lock(&slot_rt->mutex);
+ if(pthread_mutex_trylock(&slot_rt->sanity_lock))
+ {
+ pthread_mutex_unlock(&slot_rt->sanity_lock);
+ }
+ else
+ {
+ assert(0);
+ }
clock_gettime(CLOCK_REALTIME, &now);
timeouts_update(slot_rt->expires, now.tv_sec);
@@ -2016,7 +2014,7 @@ void keyspace_active_expire_cycle(struct swarmkv_keyspace *ks)
j++;
has_key_expired=1;
}
- pthread_mutex_unlock(&slot_rt->mutex);
+ pthread_mutex_unlock(&slot_rt->sanity_lock);
}
clock_gettime(CLOCK_MONOTONIC_COARSE, &end);
if(has_key_expired)
@@ -2025,4 +2023,3 @@ void keyspace_active_expire_cycle(struct swarmkv_keyspace *ks)
}
}
-
diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c
index d7a12c6..58822ca 100644
--- a/src/swarmkv_net.c
+++ b/src/swarmkv_net.c
@@ -596,7 +596,7 @@ void swarmkv_net_set_on_msg_callback(struct swarmkv_net *net, on_msg_callback_t
net->on_msg_cb_arg=cb_arg;
return;
}
-extern int __gettid(struct swarmkv *db);
+
void swarmkv_net_send(struct swarmkv_net *net, const node_t *dest, struct swarmkv_msg *msg)
{
int tid=__gettid((struct swarmkv *)(net->on_msg_cb_arg));
diff --git a/src/swarmkv_rpc.c b/src/swarmkv_rpc.c
index 24847db..f94cd10 100644
--- a/src/swarmkv_rpc.c
+++ b/src/swarmkv_rpc.c
@@ -36,9 +36,15 @@ struct swarmkv_rpc_mgr *swarmkv_rpc_mgr_new(const struct swarmkv_options *opts,
mgr->seq_generator=0;
mgr->nr_worker_threads=nr_worker_threads;
mgr->evbases=evbases;
+ mgr->rpc_table=ALLOC(struct swarmkv_rpc *, nr_worker_threads);
mgr->timeout_us=opts->cluster_timeout_us;
return mgr;
}
+void swarmkv_rpc_mgr_free(struct swarmkv_rpc_mgr *mgr)
+{
+ free(mgr->rpc_table);
+ free(mgr);
+}
void swarmkv_rpc_free(struct swarmkv_rpc *rpc)
{
event_del(rpc->timeout_ev);
@@ -54,7 +60,7 @@ static void rpc_timeout_callback(evutil_socket_t fd, short events, void *arg)
promise_failed(p, FUTURE_ERROR_TIMEOUT, "rpc timed out");
}
-long long swarmkv_rpc_lunch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct future *f)
+long long swarmkv_rpc_launch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct future *f)
{
struct swarmkv_rpc *rpc=ALLOC(struct swarmkv_rpc, 1);
rpc->sequence=++mgr->seq_generator;
@@ -67,7 +73,7 @@ long long swarmkv_rpc_lunch(struct swarmkv_rpc_mgr *mgr, int thread_id, struct f
HASH_ADD(hh, mgr->rpc_table[thread_id], sequence, sizeof(rpc->sequence), rpc);
return rpc->sequence;
}
-void swarmkv_rpc_complete(struct swarmkv_rpc_mgr *mgr, int thread_id, long long sequence, const void *reply)
+void swarmkv_rpc_complete(struct swarmkv_rpc_mgr *mgr, int thread_id, long long sequence, void *reply)
{
struct swarmkv_rpc *rpc=NULL;
HASH_FIND(hh, mgr->rpc_table[thread_id], &sequence, sizeof(sequence), rpc);
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c
index 0c98162..18b0257 100644
--- a/src/swarmkv_store.c
+++ b/src/swarmkv_store.c
@@ -136,12 +136,14 @@ struct swarmkv_obj_specs sobj_specs[__SWARMKV_OBJ_TYPE_MAX] =
}
};
#define MODULE_SWAMRKV_STORE module_name_str("swarmkv.store")
-#define STORE_SHARD_NUMBER 8
+
struct swarmkv_store_thread
{
struct scontainer *obj_table;
struct scontainer *sync_queue;
- pthread_rwlock_t rwlock;
+ pthread_mutex_t sanity_lock;
+ long long keys_to_sync;
+ long long n_keys;
};
struct swarmkv_store
{
@@ -149,10 +151,7 @@ struct swarmkv_store
node_t self;
size_t nr_worker_threads;
struct swarmkv_store_thread *threads;
- struct scontainer *obj_table[STORE_SHARD_NUMBER];
- struct scontainer *sync_queue[STORE_SHARD_NUMBER];
-// pthread_mutex_t mutex[STORE_SHARD_NUMBER];
- pthread_rwlock_t rwlock[STORE_SHARD_NUMBER];
+
exec_cmd_func *exec_cmd;
struct swarmkv *exec_cmd_handle;
uuid_t my_uuid;
@@ -173,40 +172,14 @@ struct swarmkv_store *module2store(struct swarmkv_module *module)
return store;
}
-int get_shard_id(sds key)
+int __store_gettid(sds key, int nr_worker_threads)
{
- int shard_idx=key_hash_slot(key, sdslen(key))%STORE_SHARD_NUMBER;
+ int shard_idx=key_hash_slot(key, sdslen(key))%nr_worker_threads;
return shard_idx;
}
-int store_gettid(struct swarmkv_module *mod, sds key)
-{
- struct swarmkv_store *store=module2store(mod);
- if(!key) return -1;
- int tid=key_hash_slot(key, sdslen(key))%store->nr_worker_threads;
- return tid;
-}
-void store_lock(struct swarmkv_module *mod, enum cmd_key_flag flag, sds key)
+int store_gettid(struct swarmkv_module * mod_store, sds key)
{
- struct swarmkv_store *store=module2store(mod);
- if(!key) return;
- int shard_idx=get_shard_id(key);
- if(flag==CMD_KEY_RO)
- {
- pthread_rwlock_rdlock(&(store->rwlock[shard_idx]));
- }
- else
- {
- pthread_rwlock_wrlock(&(store->rwlock[shard_idx]));
- }
- return;
-}
-void store_unlock(struct swarmkv_module *mod, enum cmd_key_flag flag, sds key)
-{
- struct swarmkv_store *store=module2store(mod);
- if(!key) return;
- int shard_idx=get_shard_id(key);
- pthread_rwlock_unlock(&(store->rwlock[shard_idx]));
-
+ return __store_gettid(key, module2store(mod_store)->nr_worker_threads);
}
struct scontainer
{
@@ -215,19 +188,19 @@ struct scontainer
char is_pending; //waiting for reply 0f CRDT PULL.
char is_in_table;
char is_in_sync_q;
- int shard_idx;
+ int tid;
UT_array *replica_node_list; //used by value owner for cache synchronization
UT_hash_handle hh;
struct scontainer *prev; //hook to sync queue
struct scontainer *next; //hook to sync queue
};
-static struct scontainer *scontainer_new(enum sobj_type type, const sds key)
+static struct scontainer *scontainer_new(enum sobj_type type, const sds key, int tid)
{
struct scontainer *ctr=ALLOC(struct scontainer, 1);
ctr->obj.type=type;
- ctr->obj.key=sdsdup(key);
- ctr->shard_idx=get_shard_id(key);
+ ctr->obj.key=sdsdup(key);
+ ctr->tid=tid;
gettimeofday(&ctr->op_timestamp, NULL);
return ctr;
}
@@ -299,44 +272,46 @@ static void scontainer_remove_replica_node(struct scontainer *ctr, const node_t
}
void store_add_scontainer(struct swarmkv_store *store, struct scontainer *ctr)
{
- scontainer_join(&(store->obj_table[ctr->shard_idx]), ctr);
+ scontainer_join(&(store->threads[ctr->tid].obj_table), ctr);
return;
}
void store_remove_scontainer(struct swarmkv_store *store, struct scontainer *ctr)
{
- HASH_DELETE(hh, store->obj_table[ctr->shard_idx], ctr);
+ HASH_DELETE(hh, store->threads[ctr->tid].obj_table, ctr);
ctr->is_in_table=0;
if(ctr->is_in_sync_q)
{
- DL_DELETE(store->sync_queue[ctr->shard_idx], ctr);
+ DL_DELETE(store->threads[ctr->tid].sync_queue, ctr);
ctr->is_in_sync_q=0;
}
}
typedef void sobj_callback_func_t(struct sobj * obj, void *cb_arg);
-void store_iterate_sobj(struct swarmkv_store *store, sobj_callback_func_t *cb, void *cb_arg)
+void store_iterate_sobj(struct swarmkv_store *store, int tid, sobj_callback_func_t *cb, void *cb_arg)
{
struct scontainer *ctr=NULL, *tmp=NULL;
-
-
- for(size_t i=0; i<STORE_SHARD_NUMBER; i++)
- {
- pthread_rwlock_rdlock(&(store->rwlock[i]));
- HASH_ITER(hh, store->obj_table[i], ctr, tmp)
- {
- if(!ctr->is_pending)
- {
- cb(&ctr->obj, cb_arg);
- }
+ HASH_ITER(hh, store->threads[tid].obj_table, ctr, tmp)
+ {
+ if(!ctr->is_pending)
+ {
+ cb(&ctr->obj, cb_arg);
}
- pthread_rwlock_unlock(&(store->rwlock[i]));
- }
-
+ }
}
struct scontainer *store_lookup_scontainer(struct swarmkv_store *store, sds key)
{
struct scontainer *ctr=NULL;
- int shard_idx=get_shard_id(key);
- ctr=scontainer_find(&(store->obj_table[shard_idx]), key);
+ int tid=__store_gettid(key, store->nr_worker_threads);
+ int real_tid=__gettid(store->exec_cmd_handle);
+ assert(tid==real_tid);
+ ctr=scontainer_find(&(store->threads[tid].obj_table), key);
+ if(0==pthread_mutex_trylock(&store->threads[tid].sanity_lock))
+ {
+ pthread_mutex_unlock(&store->threads[tid].sanity_lock);
+ }
+ else
+ {
+ assert(0);
+ }
if(ctr)
{
return ctr;
@@ -344,7 +319,7 @@ struct scontainer *store_lookup_scontainer(struct swarmkv_store *store, sds key)
else
{
return NULL;
- }
+ }
}
struct sobj *store_lookup(struct swarmkv_module *mod_store, sds key)
{
@@ -374,7 +349,7 @@ void sobj_need_sync(struct swarmkv_module *mod_store, struct sobj *obj)
gettimeofday(&ctr->op_timestamp, NULL);
if(!ctr->is_in_sync_q && ctr->replica_node_list)
{
- DL_APPEND(store->sync_queue[ctr->shard_idx], ctr);
+ DL_APPEND(store->threads[ctr->tid].sync_queue, ctr);
ctr->is_in_sync_q=1;
}
return;
@@ -387,6 +362,19 @@ int sobj_get_random_replica(struct sobj *obj, node_t *out)
node_copy(out, replica);
return 1;
}
+enum cmd_exec_result handle_undefined_object(struct sobj *obj, struct swarmkv_reply **reply)
+{
+ assert(obj->type==OBJ_TYPE_UNDEFINED);
+ node_t replica;
+ int ret=0;
+ ret=sobj_get_random_replica(obj, &replica);
+ if(ret)
+ {
+ *reply=swarmkv_reply_new_node(&replica, 1);
+ return REDIRECT;
+ }
+ return NEED_KEY_ROUTE;
+}
void scontainer_serialize(struct scontainer *ctr, char **blob, size_t *blob_sz)
{
char *value_blob=NULL;
@@ -455,8 +443,8 @@ void sobj_merge_blob(struct sobj *obj, const char *blob, size_t blob_sz, uuid_t
enum CRDT_OP
{
- CRDT_PULL,
- CRDT_PUSH,
+ CRDT_GET,
+ CRDT_MERGE,
CRDT_JOIN
};
struct crdt_generic_ctx
@@ -475,18 +463,16 @@ void crdt_generic_ctx_free(struct crdt_generic_ctx *ctx)
free(ctx);
}
-static void crdt_generic_on_succ(void* result, void* user)
+static void crdt_generic_on_succ(void *result, void *user)
{
struct crdt_generic_ctx *ctx=(struct crdt_generic_ctx *)user;
const struct swarmkv_reply *reply=(const struct swarmkv_reply*) result;
struct scontainer *ctr=NULL;
uuid_t uuid;
- struct swarmkv_module *mod_store=&(ctx->store->module);
store_get_uuid(&(ctx->store->module), uuid);
switch(ctx->op)
{
- case CRDT_PULL:
- store_lock(mod_store, CMD_KEY_OW, ctx->key);
+ case CRDT_GET:
ctr=store_lookup_scontainer(ctx->store, ctx->key);
if(ctr && reply->type==SWARMKV_REPLY_STRING)
{
@@ -498,9 +484,8 @@ static void crdt_generic_on_succ(void* result, void* user)
{
atomic_inc(&ctx->store->sync_err);
}
- store_unlock(&ctx->store->module, CMD_KEY_OW, ctx->key);
break;
- case CRDT_PUSH:
+ case CRDT_MERGE:
if(reply->type==SWARMKV_REPLY_INTEGER && reply->integer>0)
{
atomic_add(&ctx->store->sync_ok, reply->integer);
@@ -528,24 +513,23 @@ static void crdt_generic_on_succ(void* result, void* user)
crdt_generic_ctx_free(ctx);
return;
}
-static void store_remove_failed_peer(struct swarmkv_store *store, int shard, const node_t *peer)
+static void store_remove_failed_peer(struct swarmkv_store *store, int tid, const node_t *peer)
{
struct scontainer *ctr=NULL, *tmp=NULL;
- pthread_rwlock_wrlock(&store->rwlock[shard]);
- HASH_ITER(hh, store->obj_table[shard], ctr, tmp)
+ HASH_ITER(hh, store->threads[tid].obj_table, ctr, tmp)
{
scontainer_remove_replica_node(ctr, peer);
}
- pthread_rwlock_unlock(&store->rwlock[shard]);
+ return;
}
static void crdt_generic_on_fail(enum e_future_error err, const char * what, void * user)
{
struct crdt_generic_ctx *ctx=(struct crdt_generic_ctx *)user;
atomic_inc(&ctx->store->sync_err);
+ int tid=__gettid(ctx->store->exec_cmd_handle);
if(err==FUTURE_ERROR_TIMEOUT)
{
- int shard_idx=get_shard_id(ctx->key);
- store_remove_failed_peer(ctx->store, shard_idx, &ctx->peer);
+ store_remove_failed_peer(ctx->store, tid, &ctx->peer);
}
crdt_generic_ctx_free(ctx);
return;
@@ -564,55 +548,55 @@ void crdt_generic_call(struct swarmkv_store *store, enum CRDT_OP op, const struc
store->exec_cmd(store->exec_cmd_handle, NULL, peer, cmd, ctx->f);
return;
}
-
-void swarmkv_store_periodic(struct swarmkv_module * mod_store)
+#define MAX_SYNC_PER_PERIOD 100000
+void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id)
{
struct swarmkv_store *store=module2store(mod_store);
struct scontainer *ctr=NULL, *tmp=NULL;
struct timespec start, end;
- int has_key_synced=0;
-
+ int n_synced=0;
+ int real_tid=__gettid(store->exec_cmd_handle);
+ assert(real_tid==thread_id);
clock_gettime(CLOCK_MONOTONIC, &start);
//Ease the lock contention among work threads
- int shard=random()%STORE_SHARD_NUMBER;
- for(int i=0; i<STORE_SHARD_NUMBER; i++)
- {
- shard=(shard+i)%STORE_SHARD_NUMBER;
- struct sync_master *sync_master=sync_master_new();
- pthread_rwlock_wrlock(&store->rwlock[shard]);
- DL_FOREACH_SAFE(store->sync_queue[shard], ctr, tmp)
- {
- char *blob=NULL;
- size_t blob_sz=0;
- scontainer_serialize(ctr, &blob, &blob_sz);
- sync_master_add_obj(sync_master, ctr->obj.key, blob, blob_sz,
- utarray_front(ctr->replica_node_list), utarray_len(ctr->replica_node_list));
- DL_DELETE(store->sync_queue[shard], ctr);
- ctr->is_in_sync_q=0;
- store->synced++;
- has_key_synced=1;
- }
- pthread_rwlock_unlock(&store->rwlock[shard]);
- node_t peer;
- struct swarmkv_cmd *cmd=NULL;
- int ret=1;
- while(1)
- {
- ret=sync_master_get_cmd(sync_master, &peer, &cmd);
- if(!ret) break;
- crdt_generic_call(store, CRDT_PUSH, cmd, &peer);
- swarmkv_cmd_free(cmd);
- cmd=NULL;
- }
- sync_master_free(sync_master);
+ struct swarmkv_store_thread *thr=&store->threads[real_tid];
+
+ struct sync_master *sync_master=sync_master_new();
+ DL_FOREACH_SAFE(thr->sync_queue, ctr, tmp)
+ {
+ char *blob=NULL;
+ size_t blob_sz=0;
+ scontainer_serialize(ctr, &blob, &blob_sz);
+ sync_master_add_obj(sync_master, ctr->obj.key, blob, blob_sz,
+ utarray_front(ctr->replica_node_list), utarray_len(ctr->replica_node_list));
+ DL_DELETE(thr->sync_queue, ctr);
+ ctr->is_in_sync_q=0;
+ store->synced++;
+ n_synced++;
+ if(n_synced>=MAX_SYNC_PER_PERIOD) break;
}
-
+ node_t peer;
+ struct swarmkv_cmd *cmd=NULL;
+ int ret=1;
+ while(1)
+ {
+ ret=sync_master_get_cmd(sync_master, &peer, &cmd);
+ if(!ret) break;
+ crdt_generic_call(store, CRDT_MERGE, cmd, &peer);
+ swarmkv_cmd_free(cmd);
+ cmd=NULL;
+ }
+ sync_master_free(sync_master);
+ thr->n_keys=HASH_COUNT(thr->obj_table);
+ DL_COUNT(thr->sync_queue, ctr, thr->keys_to_sync);
clock_gettime(CLOCK_MONOTONIC, &end);
- if(has_key_synced)
+
+ if(n_synced)
{
swarmkv_monitor_record_event(store->mod_monitor, "crdt-sync-cycle", timespec_diff_usec(&start, &end));
}
}
+
struct swarmkv_module *swarmkv_store_new(const struct swarmkv_options *opts, exec_cmd_func *send_cmd, void *handle_send_cmd)
{
struct swarmkv_store *store=ALLOC(struct swarmkv_store, 1);
@@ -620,11 +604,6 @@ struct swarmkv_module *swarmkv_store_new(const struct swarmkv_options *opts, exe
store->module.mod_ctx=store;
store->nr_worker_threads=opts->nr_worker_threads;
store->threads=ALLOC(struct swarmkv_store_thread, opts->nr_worker_threads);
- for(size_t i=0; i<opts->nr_worker_threads; i++)
- {
- pthread_rwlock_init(&(store->threads[i].rwlock), NULL);
-
- }
uuid_copy(store->my_uuid, opts->bin_uuid);
node_init(&store->self, opts->cluster_announce_ip, opts->cluster_announce_port);
@@ -643,11 +622,10 @@ void swarmkv_store_free(struct swarmkv_module *mod_store)
HASH_ITER(hh, store->threads[i].obj_table, ctr, tmp)
{
- assert(ctr->shard_idx==i);
+ assert(ctr->tid==i);
scontainer_remove(&(store->threads[i].obj_table), ctr);
scontainer_free(ctr);
- }
- pthread_rwlock_destroy(&(store->threads[i].rwlock));
+ }
}
free(store);
return;
@@ -660,37 +638,67 @@ void swarmkv_store_set_monitor_handle(struct swarmkv_module *mod_store, struct s
void swarmkv_store_info(struct swarmkv_module *mod_store, struct store_info *info)
{
struct swarmkv_store *store=module2store(mod_store);
- struct scontainer *ctr=NULL;
- info->shards=STORE_SHARD_NUMBER;
+ info->shards=store->nr_worker_threads;
info->keys=0;
info->keys_to_sync=0;
- for(size_t i=0; i<STORE_SHARD_NUMBER; i++)
+ struct swarmkv_store_thread *thread=NULL;
+ for(size_t i=0; i<store->nr_worker_threads; i++)
{
- pthread_rwlock_rdlock(&store->rwlock[i]);
- DL_COUNT(store->sync_queue[i], ctr, info->keys_to_sync);
- info->keys+=HASH_COUNT(store->obj_table[i]);
- pthread_rwlock_unlock(&store->rwlock[i]);
+ thread = store->threads+i;
+ info->keys_to_sync += __sync_add_and_fetch(&thread->keys_to_sync, 0);
+ info->keys_to_sync += __sync_add_and_fetch(&thread->n_keys, 0);
}
info->sync_ok=__sync_add_and_fetch(&store->sync_ok, 0);
info->sync_err=__sync_add_and_fetch(&store->sync_err, 0);
info->synced=store->synced;
}
-void swarmkv_store_add_key(struct swarmkv_module *mod_store, const sds key, node_t *replica_nodes, size_t n_replica_node)
+
+UT_icd ut_array_matched_reply = {sizeof(struct swarmkv_reply *), NULL, NULL, NULL};
+
+struct pattern_match_arg
+{
+ sds pattern;
+ UT_array *matched_replies;
+};
+enum cmd_exec_result type_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply)
+{
+/*TYPE key*/
+ struct sobj *obj=NULL;
+ const sds key=cmd->argv[1];
+ obj=store_lookup(mod_store, key);
+ if(!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ *reply=swarmkv_reply_new_string_fmt(sobj_specs[obj->type].type_name);
+ return FINISHED;
+}
+enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply)
{
+/*CRDT ADD key IP:port [IP:port ...]*/
struct scontainer *ctr=NULL;
struct swarmkv_store *store=module2store(mod_store);
size_t max_pull_node_num=4;
- struct swarmkv_cmd *crdt_pull_cmd=NULL;
- crdt_pull_cmd=swarmkv_cmd_new(3);
- crdt_pull_cmd->argv[0]=sdsnew("crdt");
- crdt_pull_cmd->argv[1]=sdsnew("pull");
- crdt_pull_cmd->argv[2]=sdsdup(key);
- store_lock(mod_store, CMD_KEY_OW, key);
+ sds key=cmd->argv[2];
+ int tid=store_gettid(mod_store, key);
+ int real_tid=__gettid(store->exec_cmd_handle);
+ assert(tid==real_tid);
+ struct swarmkv_cmd *crdt_get_cmd=NULL;
+ crdt_get_cmd=swarmkv_cmd_new(3);
+ crdt_get_cmd->argv[0]=sdsnew("crdt");
+ crdt_get_cmd->argv[1]=sdsnew("get");
+ crdt_get_cmd->argv[2]=sdsdup(key);
+ size_t n_replica_node=cmd->argc-3;
+ node_t *replica_nodes=ALLOC(node_t, n_replica_node);
+ for(size_t i=0; i<n_replica_node; i++)
+ {
+ node_init_from_sds(&replica_nodes[i], cmd->argv[3+i]);
+ }
ctr=store_lookup_scontainer(store, key);
if(!ctr)
{
- ctr=scontainer_new(OBJ_TYPE_UNDEFINED, key);
+ ctr=scontainer_new(OBJ_TYPE_UNDEFINED, key, tid);
if(n_replica_node>0)//need crdt pull from replicas
{
ctr->is_pending=1;
@@ -705,53 +713,33 @@ void swarmkv_store_add_key(struct swarmkv_module *mod_store, const sds key, node
{
scontainer_add_replica_node(ctr, replica_nodes+i);
}
- store_unlock(mod_store, CMD_KEY_OW, key);
for(size_t i=0; i<n_replica_node; i++)
{
if(i<max_pull_node_num)
{
- crdt_generic_call(store, CRDT_PULL, crdt_pull_cmd, replica_nodes+i);
+ crdt_generic_call(store, CRDT_GET, crdt_get_cmd, replica_nodes+i);
}
else
{
struct swarmkv_cmd *crdt_join_cmd=NULL;
crdt_join_cmd=swarmkv_cmd_new(4);
crdt_join_cmd->argv[0]=sdsnew("crdt");
- crdt_join_cmd->argv[1]=sdsnew("join");
+ crdt_join_cmd->argv[1]=sdsnew("merge");
crdt_join_cmd->argv[2]=sdsdup(ctr->obj.key);
crdt_join_cmd->argv[3]=node_addr2sds(replica_nodes+i);
crdt_generic_call(store, CRDT_JOIN, crdt_join_cmd, replica_nodes+i);
swarmkv_cmd_free(crdt_join_cmd);
}
}
- swarmkv_cmd_free(crdt_pull_cmd);
- crdt_pull_cmd=NULL;
- return;
-}
-UT_icd ut_array_matched_reply = {sizeof(struct swarmkv_reply *), NULL, NULL, NULL};
-
-struct pattern_match_arg
-{
- sds pattern;
- UT_array *matched_replies;
-};
-enum cmd_exec_result type_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply)
-{
-/*TYPE key*/
- struct sobj *obj=NULL;
- const sds key=cmd->argv[1];
- obj=store_lookup(mod_store, key);
- if(!obj)
- {
- return NEED_KEY_ROUTE;
- }
- *reply=swarmkv_reply_new_string_fmt(sobj_specs[obj->type].type_name);
+ swarmkv_cmd_free(crdt_get_cmd);
+ crdt_get_cmd=NULL;
+ free(replica_nodes);
return FINISHED;
}
-enum cmd_exec_result crdt_pull_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply)
+enum cmd_exec_result crdt_get_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply)
{
-/* CRDT PULL key */
+/* CRDT GET key */
struct swarmkv_store *store=module2store(mod_store);
struct scontainer *ctr=NULL;
const sds key=cmd->argv[2];
@@ -781,17 +769,17 @@ enum cmd_exec_result crdt_pull_command(struct swarmkv_module *mod_store, const s
return FINISHED;
}
-enum cmd_exec_result crdt_push_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply)
+enum cmd_exec_result crdt_merge_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply)
{
-/* CRDT PUSH key blob [key blob ...]*/
+/* CRDT MERGE key blob [key blob ...]*/
struct sobj *obj=NULL;
sds key=NULL, blob=NULL;
assert(accessing_node!=NULL);//should never invoked by local
long long synced=0;
uuid_t uuid;
- int __attribute__((__unused__))first_key_shard_idx=0;
- int __attribute__((__unused__))shard_idx=0;
- first_key_shard_idx=get_shard_id(cmd->argv[2]);
+ int __attribute__((__unused__))first_key_tid=0;
+ int __attribute__((__unused__))tid=0;
+ first_key_tid=store_gettid(mod_store, cmd->argv[2]);
store_get_uuid(mod_store, uuid);
for(size_t i=0; i<cmd->argc-2; i+=2)
@@ -804,11 +792,10 @@ enum cmd_exec_result crdt_push_command(struct swarmkv_module *mod_store, const s
sobj_merge_blob(obj, blob, sdslen(blob), uuid);
synced++;
}
- shard_idx=get_shard_id(key);
- assert(shard_idx==first_key_shard_idx);
+ tid=store_gettid(mod_store, key);
+ assert(tid==first_key_tid);
}
*reply=swarmkv_reply_new_integer(synced);
- obj=store_lookup(mod_store, key);
return FINISHED;
}
@@ -872,13 +859,19 @@ static void pattern_match_function(struct sobj * obj, void *cb_arg)
enum cmd_exec_result crdt_keys_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply)
{
-//CRDT KEYS IP Port pattern
+//CRDT KEYS tid pattern
int i=0, n_matched=0;
struct pattern_match_arg cb_arg;
- cb_arg.pattern=cmd->argv[2];
+ int thread_id=atoll(cmd->argv[3]);
+ cb_arg.pattern=cmd->argv[4];
utarray_new(cb_arg.matched_replies, &ut_array_matched_reply);
struct swarmkv_store *store=module2store(mod_store);
- store_iterate_sobj(store, pattern_match_function, &cb_arg);
+ if(thread_id>store->nr_worker_threads)
+ {
+ *reply=swarmkv_reply_new_error("Invalid thread id");
+ return FINISHED;
+ }
+ store_iterate_sobj(store, thread_id, pattern_match_function, &cb_arg);
n_matched=utarray_len(cb_arg.matched_replies);
if(n_matched>0)
{
diff --git a/src/t_hash.c b/src/t_hash.c
index c600712..79464df 100644
--- a/src/t_hash.c
+++ b/src/t_hash.c
@@ -89,27 +89,29 @@ enum cmd_exec_result hdel_command(struct swarmkv_module *mod_store, const struct
{
return NEED_KEY_ROUTE;
}
+ if(obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
+ if(obj->type!=OBJ_TYPE_HASH)
+ {
+ *reply=swarmkv_reply_new_error(error_wrong_type);
+ return FINISHED;
+ }
size_t n_to_del=cmd->argc-2;
size_t i=0;
int ret=0, n_deleted=0;
sds field;
-
- if(obj->type==OBJ_TYPE_HASH)
+ for(i=0; i<n_to_del; i++)
{
- for(i=0; i<n_to_del; i++)
- {
- field=cmd->argv[2+i];
- ret=OR_map_remove(obj->hash, field, sdslen(field));
- if(ret>0) n_deleted++;
- }
- *reply=swarmkv_reply_new_integer(n_deleted);
- sobj_need_sync(mod_store, obj);
- }
- else
- {
- *reply=swarmkv_reply_new_error(error_wrong_type);
+ field=cmd->argv[2+i];
+ ret=OR_map_remove(obj->hash, field, sdslen(field));
+ if(ret>0) n_deleted++;
}
+ *reply=swarmkv_reply_new_integer(n_deleted);
+ sobj_need_sync(mod_store, obj);
+
return FINISHED;
}
enum cmd_exec_result hmget_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, const node_t *accessing_node, struct swarmkv_reply **reply)
@@ -123,6 +125,10 @@ enum cmd_exec_result hmget_command(struct swarmkv_module *mod_store, const struc
{
return NEED_KEY_ROUTE;
}
+ if(obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
if(obj->type!=OBJ_TYPE_HASH)
{
*reply=swarmkv_reply_new_error(error_wrong_type);
@@ -177,6 +183,10 @@ enum cmd_exec_result hgetall_command(struct swarmkv_module *mod_store, const str
{
return NEED_KEY_ROUTE;
}
+ if(obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
if(obj->type!=OBJ_TYPE_HASH)
{
*reply=swarmkv_reply_new_error(error_wrong_type);
@@ -255,6 +265,10 @@ enum cmd_exec_result hkeys_command(struct swarmkv_module *mod_store, const struc
{
return NEED_KEY_ROUTE;
}
+ if(obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
if(obj->type!=OBJ_TYPE_HASH)
{
*reply=swarmkv_reply_new_error(error_wrong_type);
@@ -284,6 +298,10 @@ enum cmd_exec_result hlen_command(struct swarmkv_module *mod_store, const struct
{
return NEED_KEY_ROUTE;
}
+ if(obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
if(obj->type!=OBJ_TYPE_HASH)
{
*reply=swarmkv_reply_new_error(error_wrong_type);
diff --git a/src/t_set.c b/src/t_set.c
index 5259f45..bfdeb45 100644
--- a/src/t_set.c
+++ b/src/t_set.c
@@ -56,6 +56,10 @@ enum cmd_exec_result srem_command(struct swarmkv_module *mod_store, const struct
{
return NEED_KEY_ROUTE;
}
+ if(obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
if(obj->type!=OBJ_TYPE_SET)
{
*reply=swarmkv_reply_new_error(error_wrong_type);
@@ -85,6 +89,10 @@ enum cmd_exec_result smembers_command(struct swarmkv_module *mod_store, const st
{
return NEED_KEY_ROUTE;
}
+ if(obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
if(obj->type!=OBJ_TYPE_SET)
{
*reply=swarmkv_reply_new_error(error_wrong_type);
@@ -116,6 +124,10 @@ enum cmd_exec_result sismember_command(struct swarmkv_module *mod_store, const s
{
return NEED_KEY_ROUTE;
}
+ if(obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
if(obj->type!=OBJ_TYPE_SET)
{
*reply=swarmkv_reply_new_error(error_wrong_type);
@@ -136,6 +148,10 @@ enum cmd_exec_result scard_command(struct swarmkv_module *mod_store, const struc
{
return NEED_KEY_ROUTE;
}
+ if(obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
if(obj->type!=OBJ_TYPE_SET)
{
*reply=swarmkv_reply_new_error(error_wrong_type);
diff --git a/src/t_string.c b/src/t_string.c
index d122510..3a5c463 100644
--- a/src/t_string.c
+++ b/src/t_string.c
@@ -16,7 +16,10 @@ enum cmd_exec_result get_command(struct swarmkv_module *mod_store, const struct
{
return NEED_KEY_ROUTE;
}
-
+ if(obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
if(obj->type==OBJ_TYPE_STRING)
{
LWW_register_get0(obj->string, &string_val, &string_sz);
@@ -49,7 +52,6 @@ enum cmd_exec_result set_command(struct swarmkv_module *mod_store, const struct
{
return NEED_KEY_ROUTE;
}
-
if(obj->type==OBJ_TYPE_UNDEFINED)
{
uuid_t uuid;
diff --git a/src/t_token_bucket.c b/src/t_token_bucket.c
index f710639..bcf8f78 100644
--- a/src/t_token_bucket.c
+++ b/src/t_token_bucket.c
@@ -8,19 +8,7 @@
#include <assert.h>
#include <stdbool.h>
//Unlike string, set and hash, XTCONSUME and XTINFO can only operate on an initialized token bucket.
-enum cmd_exec_result handle_undefined_object(struct sobj *obj, struct swarmkv_reply **reply)
-{
- assert(obj->type==OBJ_TYPE_UNDEFINED);
- node_t replica;
- int ret=0;
- ret=sobj_get_random_replica(obj, &replica);
- if(ret)
- {
- *reply=swarmkv_reply_new_node(&replica, 1);
- return REDIRECT;
- }
- return NEED_KEY_ROUTE;
-}
+
static int get_consume_type(sds s, enum tb_consume_type *consume_type)
{
if(0==strncasecmp(s, "NORMAL", sdslen(s)))