summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt4
-rw-r--r--src/swarmkv.c96
-rw-r--r--src/swarmkv_api.c613
-rw-r--r--src/swarmkv_common.c24
-rw-r--r--src/swarmkv_common.h18
-rw-r--r--src/swarmkv_error.h1
-rw-r--r--src/swarmkv_keyspace.c41
-rw-r--r--src/swarmkv_mesh.c4
-rw-r--r--src/swarmkv_message.c26
-rw-r--r--src/swarmkv_monitor.c214
-rw-r--r--src/swarmkv_monitor.h7
-rw-r--r--src/swarmkv_net.c2
-rw-r--r--src/swarmkv_store.c115
-rw-r--r--src/swarmkv_sync.c68
-rw-r--r--src/swarmkv_sync.h7
-rw-r--r--src/swarmkv_utils.c1
-rw-r--r--src/swarmkv_utils.h2
-rw-r--r--src/t_cms.c2
18 files changed, 778 insertions, 467 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 6c8b3e5..83d5aa5 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -1,5 +1,5 @@
set(SWARMKV_MAJOR_VERSION 4)
-set(SWARMKV_MINOR_VERSION 2)
+set(SWARMKV_MINOR_VERSION 3)
set(SWARMKV_PATCH_VERSION 0)
set(SWARMKV_VERSION ${SWARMKV_MAJOR_VERSION}.${SWARMKV_MINOR_VERSION}.${SWARMKV_PATCH_VERSION})
@@ -25,7 +25,7 @@ add_definitions(-D_GNU_SOURCE)
add_definitions(-fPIC)
set(SWARMKV_SRC swarmkv.c swarmkv_api.c
swarmkv_mesh.c swarmkv_rpc.c swarmkv_message.c swarmkv_net.c
- swarmkv_store.c swarmkv_sync.c swarmkv_keyspace.c swarmkv_monitor.c
+ swarmkv_sync.c swarmkv_store.c swarmkv_keyspace.c swarmkv_monitor.c
t_string.c t_set.c t_token_bucket.c t_hash.c t_bloom_filter.c t_cms.c
swarmkv_common.c swarmkv_utils.c future_promise.c http_client.c)
diff --git a/src/swarmkv.c b/src/swarmkv.c
index 382f56e..8cf8d91 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -108,7 +108,7 @@ struct swarmkv_options *swarmkv_get0_options(struct swarmkv *db)
return db->opts;
}
-int __gettid(struct swarmkv *db)
+int swarmkv_gettid(const struct swarmkv *db)
{
// int __sys_tid=syscall(SYS_gettid);
for(int i=0; i<db->opts->nr_worker_threads + db->opts->nr_caller_threads; i++)
@@ -187,7 +187,7 @@ static void remoter_caller_ctx_send_reply(struct remote_caller_ctx *ctx, const s
{
struct swarmkv *db = ctx->db;
struct swarmkv_msg *msg=swarmkv_msg_new_by_reply(reply, &ctx->remote, ctx->remote_tid, &db->self, ctx->sequence);
- int cur_tid=__gettid(db);
+ int cur_tid=swarmkv_gettid(db);
const char *err_str=NULL;
if(0==node_compare(&db->self, &msg->caller))
{
@@ -471,7 +471,12 @@ enum cmd_exec_result __attribute__((optimize("O0"))) debug_command(struct swarmk
}
return FINISHED;
}
-
+enum cmd_exec_result print_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+ printf("%s", cmd->argv[1]);
+ *reply=swarmkv_reply_new_status("OK");
+ return FINISHED;
+}
enum cmd_exec_result config_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
{
//Unused
@@ -544,7 +549,7 @@ void *swarmkv_worker_thread(void *arg)
{
struct swarmkv *db = (struct swarmkv *)arg;
swarmkv_register_thread(db);
- int tid=__gettid(db);
+ int tid=swarmkv_gettid(db);
struct swarmkv_thread *thr = db->threads+tid;
char thread_name[16];
snprintf(thread_name, sizeof(thread_name), "swarmkv-%u", thr->thread_id);
@@ -619,41 +624,41 @@ static struct swarmkv_reply *key_not_found_reply(enum key_not_found_reply not_fo
return reply;
}
-static struct swarmkv_cmd *make_keyroute_cmd(enum cmd_key_flag flag, const sds key, const node_t *obj_owner)
+static struct swarmkv_cmd *make_keyroute_cmd(enum cmd_key_flag flag, const sds key, int dry_run, const node_t *caller)
{
struct swarmkv_cmd *keyroute_cmd=NULL;
switch(flag)
{
case CMD_KEY_RO:
case CMD_KEY_RW:
- if(obj_owner)
+ if(!dry_run)
{
- keyroute_cmd=swarmkv_cmd_new(4);
+ keyroute_cmd=swarmkv_cmd_new(4, caller);
keyroute_cmd->argv[0]=sdsnew("keyspace");
keyroute_cmd->argv[1]=sdsnew("xradd");
keyroute_cmd->argv[2]=sdsdup(key);
- keyroute_cmd->argv[3]=node_addr2sds(obj_owner);
+ keyroute_cmd->argv[3]=node_addr2sds(caller);
}
else
{
- keyroute_cmd=swarmkv_cmd_new(3);
+ keyroute_cmd=swarmkv_cmd_new(3, caller);
keyroute_cmd->argv[0]=sdsnew("keyspace");
keyroute_cmd->argv[1]=sdsnew("rlist");
keyroute_cmd->argv[2]=sdsdup(key);
- }
+ }
break;
case CMD_KEY_OW:
- if(obj_owner)
+ if(!dry_run)
{
- keyroute_cmd=swarmkv_cmd_new(4);
+ keyroute_cmd=swarmkv_cmd_new(4, caller);
keyroute_cmd->argv[0]=sdsnew("keyspace");
keyroute_cmd->argv[1]=sdsnew("radd");
keyroute_cmd->argv[2]=sdsdup(key);
- keyroute_cmd->argv[3]=node_addr2sds(obj_owner);
+ keyroute_cmd->argv[3]=node_addr2sds(caller);
}
else
{
- keyroute_cmd=swarmkv_cmd_new(3);
+ keyroute_cmd=swarmkv_cmd_new(3, caller);
keyroute_cmd->argv[0]=sdsnew("keyspace");
keyroute_cmd->argv[1]=sdsnew("radd");
keyroute_cmd->argv[2]=sdsdup(key);
@@ -661,7 +666,7 @@ static struct swarmkv_cmd *make_keyroute_cmd(enum cmd_key_flag flag, const sds k
break;
case CMD_KEY_RM:
assert(0);
- keyroute_cmd=swarmkv_cmd_new(3);
+ keyroute_cmd=swarmkv_cmd_new(3, caller);
keyroute_cmd->argv[0]=sdsnew("keyspace");
keyroute_cmd->argv[1]=sdsnew("del");
keyroute_cmd->argv[2]=sdsdup(key);
@@ -754,10 +759,10 @@ static void peer_exec_on_success(void *result, void *user)
cmd_ctx_free(ctx);
}
}
-struct swarmkv_cmd *make_crdt_add_cmd(const sds key, node_t replica[], size_t n_replica)
+struct swarmkv_cmd *make_crdt_add_cmd(const sds key, node_t replica[], size_t n_replica, const node_t *caller)
{
struct swarmkv_cmd *crdt_add_cmd=NULL;
- crdt_add_cmd=swarmkv_cmd_new(3+n_replica);
+ crdt_add_cmd=swarmkv_cmd_new(3+n_replica, caller);
crdt_add_cmd->argv[0]=sdsnew("crdt");
crdt_add_cmd->argv[1]=sdsnew("add");
crdt_add_cmd->argv[2]=sdsdup(key);
@@ -807,12 +812,11 @@ static void key_route_on_success(void *result, void *user)
if(n_replica_node>0)
{
__exec_cmd(ctx->db, replica_nodes+0, ctx->cmd, ctx->future_of_caller);
- //__send_cmd(ctx->db, replica_nodes+0, ctx->cmd, ctx->future_of_caller);
}
if(self_is_a_replica)
{
struct cmd_ctx *crdt_add_ctx=NULL;
- struct swarmkv_cmd *crdt_add_cmd=make_crdt_add_cmd(key, replica_nodes, n_replica_node);
+ struct swarmkv_cmd *crdt_add_cmd=make_crdt_add_cmd(key, replica_nodes, n_replica_node, &ctx->db->self);
crdt_add_ctx=cmd_ctx_new(ctx->db, ctx->cmd, n_replica_node>0?NULL:ctx->future_of_caller);
crdt_add_ctx->future_of_mine=future_create("crdt_add", crdt_add_on_success, generic_on_fail, crdt_add_ctx);
__exec_cmd(ctx->db, &ctx->db->self, crdt_add_cmd, crdt_add_ctx->future_of_mine);
@@ -844,7 +848,7 @@ static int spec_gettid(struct swarmkv_cmd_spec *spec, const struct swarmkv_cmd *
void __on_msg_callback(struct swarmkv_msg *msg, void *arg)
{
struct swarmkv *db = (struct swarmkv *)arg;
- int cur_tid=__gettid(db);
+ int cur_tid=swarmkv_gettid(db);
if(msg->type==MSG_TYPE_CMD)
{
//command may be from other thread or other node.
@@ -878,27 +882,13 @@ void __on_msg_callback(struct swarmkv_msg *msg, void *arg)
}
}
}
-static const char *exec_ret2string(enum cmd_exec_result ret)
-{
- switch(ret)
- {
- case NEED_KEY_ROUTE:
- return "NEED_KEY_ROUTE";
- case REDIRECT:
- return "REDIRECT";
- case FINISHED:
- return "FINISHED";
- default:
- assert(0);
- }
-}
#define INTER_THREAD_RPC_TIMEOUT_AHEAD 1000
void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swarmkv_cmd *cmd, struct future *future_of_caller)
{
struct swarmkv_cmd_spec *spec=NULL;
struct swarmkv_reply *reply=NULL;
struct promise *p=NULL;
- int cur_tid=__gettid(db);
+ int cur_tid=swarmkv_gettid(db);
spec=get_spec_by_argv(db, cmd->argc, cmd->argv);
if(!spec)
@@ -981,20 +971,8 @@ void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swar
clock_gettime(CLOCK_MONOTONIC_COARSE, &start);
exec_ret=spec->proc(spec->module, cmd, &reply);
clock_gettime(CLOCK_MONOTONIC_COARSE, &end);
- swarmkv_monitor_record_command(db->mod_monitor, spec->name, timespec_diff_usec(&start, &end));
+ swarmkv_monitor_record_command(db->mod_monitor, spec, cmd, timespec_diff_usec(&start, &end), exec_ret);
- //if(strcasestr(spec->name, "CRDT"))
- if(0){
- struct timeval now;
- gettimeofday(&now, NULL);
- printf("%ld.%.6ld %s %d %s %s %s\n",
- now.tv_sec, now.tv_usec,
- db->self.addr,
- db->threads[cur_tid].recusion_depth,
- spec->name,
- spec->key_offset<0?"NULL":cmd->argv[spec->key_offset],
- exec_ret2string(exec_ret));
- }
switch(exec_ret)
{
case FINISHED:
@@ -1018,7 +996,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swar
}
case NEED_KEY_ROUTE:
{
- struct swarmkv_cmd *keyspace_cmd=make_keyroute_cmd(spec->flag, cmd->argv[spec->key_offset], db->opts->dryrun? NULL:(&db->self));
+ struct swarmkv_cmd *keyspace_cmd=make_keyroute_cmd(spec->flag, cmd->argv[spec->key_offset], db->opts->dryrun, &db->self);
struct cmd_ctx *ctx=cmd_ctx_new(db, cmd, future_of_caller);
ctx->future_of_mine=future_create("key_route", key_route_on_success, generic_on_fail, ctx);
__exec_cmd(db, NULL, keyspace_cmd, ctx->future_of_mine);
@@ -1226,13 +1204,19 @@ void command_spec_init(struct swarmkv *db)
command_register(&(db->command_table), "PING", "IP:port",
1, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE,
ping_command, &db->module);
+ command_register(&(db->command_table), "PRINT", "text",
+ 1, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE,
+ print_command, &db->module);
command_register(&(db->command_table), "COMMAND LIST", "",
0, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE,
command_list_command, &db->module);
command_register(&(db->command_table), "LATENCY", "<subcommand>",
1, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE,
latency_command, db->mod_monitor);
-
+ command_register(&(db->command_table), "MONREG", "IP:port",
+ 1, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE,
+ monreg_command, db->mod_monitor);
+
/* low-level state-based CRDT synchronization commands*/
command_register(&(db->command_table), "CRDT ADD", "key [IP:port ...]",
1, 2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE,
@@ -1403,7 +1387,7 @@ static void evloop_timeout_cb(evutil_socket_t fd, short event, void *arg)
}
void swarmkv_caller_loop(struct swarmkv *db, int flags, struct timeval *tv)
{
- int tid=__gettid(db);
+ int tid=swarmkv_gettid(db);
//must initiate from caller threads, and caller thread ID is larger than worker thread ID
assert(tid >= db->opts->nr_worker_threads);
struct swarmkv_thread *thr=db->threads+tid;
@@ -1425,14 +1409,14 @@ void swarmkv_caller_loop(struct swarmkv *db, int flags, struct timeval *tv)
}
void swarmkv_caller_loop_break(struct swarmkv *db)
{
- int tid=__gettid(db);
+ int tid=swarmkv_gettid(db);
//must initiate from caller threads, and caller thread ID is larger than worker thread ID
assert(tid >= db->opts->nr_worker_threads);
event_base_loopbreak(db->ref_evbases[tid]);
}
long long swarmkv_caller_get_pending_commands(struct swarmkv *db)
{
- int tid=__gettid(db);
+ int tid=swarmkv_gettid(db);
//must initiate from caller threads, and caller thread ID is larger than worker thread ID
assert(tid >= db->opts->nr_worker_threads);
return swarmkv_rpc_mgr_count(db->rpc_mgr, tid);
@@ -1493,7 +1477,7 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name,
}
event_config_free(ev_cfg);
- db->mod_monitor=swarmkv_monitor_new(db->opts);
+ db->mod_monitor=swarmkv_monitor_new(db->opts, db);
db->rpc_mgr=swarmkv_rpc_mgr_new(db->ref_evbases, opts->total_threads, opts->cluster_timeout_us);
db->mesh=swarmkv_mesh_new(db->ref_evbases, opts->total_threads, db->logger);
@@ -1623,6 +1607,10 @@ const char *swarmkv_self_address(const struct swarmkv *db)
{
return db->self.addr;
}
+const node_t *swarmkv_self_node(const struct swarmkv *db)
+{
+ return &db->self;
+}
void swarmkv_self_uuid(const struct swarmkv *db, char buff[37])
{
uuid_unparse(db->opts->bin_uuid, buff);
diff --git a/src/swarmkv_api.c b/src/swarmkv_api.c
index f408274..6d98f15 100644
--- a/src/swarmkv_api.c
+++ b/src/swarmkv_api.c
@@ -10,7 +10,7 @@
#define MODULE_SWAMRKV_API module_name_str("swarmkv.api")
void exec_for_local(struct swarmkv *db, const struct swarmkv_cmd *cmd, const node_t *target_node, swarmkv_on_reply_callback_t * cb, void * cb_arg);
-
+const node_t *swarmkv_self_node(const struct swarmkv *db);
struct swarmkv_readoptions
{
@@ -27,7 +27,7 @@ struct swarmkv_options* swarmkv_options_new(void)
opts->cluster_port=5210;
opts->health_check_port=0;
opts->loglevel=0;
- opts->cluster_timeout_us=500*1000;//Default 500ms
+ opts->cluster_timeout_us=500*1000;//Default 500ms
opts->sync_interval_us=10*1000; //Default 10ms
strcpy(opts->bind_address, "0.0.0.0");
strcpy(opts->cluster_announce_ip, "127.0.0.1");
@@ -159,146 +159,266 @@ int swarmkv_options_set_max_dispatch_interval(struct swarmkv_options *opts, cons
opts->eb_max_callbacks=max_callbacks;
return 0;
}
+void swarmkv_async_command_on_argv(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *target, int argc, const char *argv[], size_t *argv_len)
+{
+ struct swarmkv_cmd *cmd=NULL;
+ cmd=swarmkv_cmd_new(argc, swarmkv_self_node(db));
+ for(size_t i=0; i<argc; i++)
+ {
+ assert(argv[i]);
+ assert(argv_len[i]>0);
+ cmd->argv[i]=sdsnewlen(argv[i], argv_len[i]);
+ }
+ if(!target)
+ {
+ exec_for_local(db, cmd, NULL, cb, cb_arg);
+ }
+ else
+ {
+ node_t target_node;
+ memset(&target_node, 0, sizeof(node_t));
+ node_init_from_sds(&target_node, target);
+ exec_for_local(db, cmd, &target_node, cb, cb_arg);
+ }
+ swarmkv_cmd_free(cmd);
+}
+struct blocking_query_ctx
+{
+ struct swarmkv_reply *reply;
+ struct swarmkv *db;
+};
+void blocking_query_cb(const struct swarmkv_reply *reply, void * arg)
+{
+ struct blocking_query_ctx *ctx=(struct blocking_query_ctx*) arg;
+ ctx->reply=swarmkv_reply_dup(reply);
+ swarmkv_caller_loop_break(ctx->db);
+ return;
+}
+struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *target, int argc, const char *argv[], size_t *argv_len)
+{
+ struct blocking_query_ctx ctx;
+ memset(&ctx, 0, sizeof(ctx));
+ ctx.db=db;
+ ctx.reply=NULL;
+ swarmkv_async_command_on_argv(db, blocking_query_cb, &ctx, target, argc, argv, argv_len);
+ long long pending_cmds=0;
+ pending_cmds=swarmkv_caller_get_pending_commands(db);
+ if(ctx.reply==NULL)
+ {
+ assert(pending_cmds==1);
+ swarmkv_caller_loop(db, SWARMKV_LOOP_NO_EXIT_ON_EMPTY, NULL);
+ }
+ assert(ctx.reply!=NULL);
+ struct swarmkv_reply *reply=NULL;
+ reply=ctx.reply;
+ ctx.reply=NULL;
+ return reply;
+}
+struct swarmkv_reply *swarmkv_command_on(struct swarmkv *db, const char *target, const char *format, ...)
+{
+ char *cmd_str=NULL;
+ va_list ap;
+ va_start(ap, format);
+ vasprintf(&cmd_str, format, ap);
+ va_end(ap);
+
+ int argc=0;
+ sds *argv=NULL;
+
+ argv=sdssplitargs(cmd_str, &argc);
+ size_t argv_len[argc];
+ for(int i=0; i<argc; i++)
+ {
+ argv_len[i]=sdslen(argv[i]);
+ }
+ struct swarmkv_reply *reply=NULL;
+ reply=swarmkv_command_on_argv(db, target, argc, (const char**) argv, argv_len);
+
+ if (argv)
+ {
+ sdsfreesplitres(argv, argc);
+ }
+ free(cmd_str);
+
+ return reply;
+}
+struct swarmkv_reply *swarmkv_command(struct swarmkv *db, const char *format, ...)
+{
+ struct swarmkv_reply *reply=NULL;
+ char *cmd_str=NULL;
+ va_list ap;
+ va_start(ap, format);
+ vasprintf(&cmd_str, format, ap);
+ va_end(ap);
+ reply=swarmkv_command_on(db, NULL, cmd_str);
+ free(cmd_str);
+ return reply;
+}
+void swarmkv_async_command_on(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *target, const char *format, ...)
+{
+ int argc=0; sds *argv=NULL;
+ char *cmd_str=NULL;
+
+ va_list ap;
+ va_start(ap, format);
+ vasprintf(&cmd_str, format, ap);
+ va_end(ap);
+
+ argv=sdssplitargs(cmd_str, &argc);
+ size_t argv_len[argc];
+ for(size_t i=0; i<argc; i++)
+ {
+ argv_len[i]=sdslen(argv[i]);
+ }
+ swarmkv_async_command_on_argv(db, cb, cb_arg, target, argc, (const char**)argv, argv_len);
+
+ if (argv)
+ {
+ sdsfreesplitres(argv, argc);
+ }
+ free(cmd_str);
+}
+void swarmkv_async_command(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *format, ...)
+{
+ char *cmd_str=NULL;
+ va_list ap;
+ va_start(ap,format);
+ vasprintf(&cmd_str, format, ap);
+ va_end(ap);
+ swarmkv_async_command_on(db, cb, cb_arg, NULL, cmd_str);
+}
void swarmkv_set(struct swarmkv * db,
const char * key, size_t keylen, const char * value, size_t vallen, swarmkv_on_reply_callback_t * cb, void * cb_arg)
{
- struct swarmkv_cmd *cmd=NULL;
- cmd=swarmkv_cmd_new(3);
- cmd->argv[0]=sdsnew("set");
- cmd->argv[1]=sdsnewlen(key, keylen);
- cmd->argv[2]=sdsnewlen(value, vallen);
-
- exec_for_local(db, cmd, NULL, cb, cb_arg);
- swarmkv_cmd_free(cmd);
+ const char *argv[3];
+ size_t argv_len[3];
+ argv[0]="set";
+ argv_len[0]=4;
+ argv[1]=key;
+ argv_len[1]=keylen;
+ argv[2]=value;
+ argv_len[2]=vallen;
+ swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 3, argv, argv_len);
return;
-
}
void swarmkv_get(struct swarmkv * db,
const char *key, size_t keylen, swarmkv_on_reply_callback_t *cb, void *cb_arg)
{
- struct swarmkv_cmd *cmd=NULL;
- cmd=swarmkv_cmd_new(2);
- cmd->argv[0]=sdsnew("get");
- cmd->argv[1]=sdsnewlen(key, keylen);
- exec_for_local(db, cmd, NULL, cb, cb_arg);
- swarmkv_cmd_free(cmd);
+ const char *argv[2];
+ size_t argv_len[2];
+ argv[0]="get";
+ argv_len[0]=4;
+ argv[1]=key;
+ argv_len[1]=keylen;
+ swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2, argv, argv_len);
return;
}
void swarmkv_expire(struct swarmkv *db, const char *key, size_t keylen, int seconds, swarmkv_on_reply_callback_t *cb, void *cb_arg)
{
- struct swarmkv_cmd *cmd=NULL;
- cmd=swarmkv_cmd_new(3);
- cmd->argv[0]=sdsnew("expire");
- cmd->argv[1]=sdsnewlen(key, keylen);
- cmd->argv[2]=sdsfromlonglong(seconds);
- exec_for_local(db, cmd, NULL, cb, cb_arg);
- swarmkv_cmd_free(cmd);
+ const char *argv[3];
+ size_t argv_len[3];
+ argv[0]="expire";
+ argv_len[0]=strlen(argv[0]);
+ argv[1]=key;
+ argv_len[1]=keylen;
+ char buffer[32];
+ sprintf(buffer, "%d", seconds);
+ argv[2]=buffer;
+ argv_len[2]=strlen(buffer);
+ swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 3, argv, argv_len);
return;
}
void swarmkv_ttl(struct swarmkv *db, const char *key, size_t keylen, swarmkv_on_reply_callback_t *cb, void *cb_arg)
{
- struct swarmkv_cmd *cmd=NULL;
- cmd=swarmkv_cmd_new(2);
- cmd->argv[0]=sdsnew("ttl");
- cmd->argv[1]=sdsnewlen(key, keylen);
- exec_for_local(db, cmd, NULL, cb, cb_arg);
- swarmkv_cmd_free(cmd);
+ const char *argv[2];
+ size_t argv_len[2];
+ argv[0]="ttl";
+ argv_len[0]=4;
+ argv[1]=key;
+ argv_len[1]=keylen;
+ swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2, argv, argv_len);
return;
}
void swarmkv_persist(struct swarmkv *db, const char *key, size_t keylen, swarmkv_on_reply_callback_t *cb, void *cb_arg)
{
- struct swarmkv_cmd *cmd=NULL;
- cmd=swarmkv_cmd_new(2);
- cmd->argv[0]=sdsnew("persist");
- cmd->argv[1]=sdsnewlen(key, keylen);
- exec_for_local(db, cmd, NULL, cb, cb_arg);
- swarmkv_cmd_free(cmd);
+ const char *argv[2];
+ size_t argv_len[2];
+ argv[0]="persist";
+ argv_len[0]=strlen(argv[0]);
+ argv[1]=key;
+ argv_len[1]=keylen;
+ swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2, argv, argv_len);
return;
}
void swarmkv_sadd(struct swarmkv *db, const char* key, size_t keylen, const char *members[], const size_t members_len[], size_t n_members, swarmkv_on_reply_callback_t *cb, void *cb_arg)
{
- struct swarmkv_cmd *cmd=NULL;
- cmd=swarmkv_cmd_new(2+n_members);
- cmd->argv[0]=sdsnew("sadd");
- cmd->argv[1]=sdsnewlen(key, keylen);
+ const char *argv[2+n_members];
+ size_t argv_len[2+n_members];
+ argv[0]="sadd";
+ argv_len[0]=strlen(argv[0]);
+ argv[1]=key;
+ argv_len[1]=keylen;
for(size_t i=0; i<n_members; i++)
{
- cmd->argv[2+i]=sdsnewlen(members[i], members_len[i]);
+ argv[2+i]=members[i];
+ argv_len[2+i]=members_len[i];
}
- exec_for_local(db, cmd, NULL, cb, cb_arg);
- swarmkv_cmd_free(cmd);
-
+ swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2+n_members, argv, argv_len);
+ return;
}
void swarmkv_srem(struct swarmkv *db, const char* key, size_t keylen, const char *members[], const size_t members_len[], size_t n_members, swarmkv_on_reply_callback_t *cb, void *cb_arg)
{
- struct swarmkv_cmd *cmd=NULL;
- cmd=swarmkv_cmd_new(2+n_members);
- cmd->argv[0]=sdsnew("srem");
- cmd->argv[1]=sdsnewlen(key, keylen);
+ const char *argv[2+n_members];
+ size_t argv_len[2+n_members];
+ argv[0]="srem";
+ argv_len[0]=strlen(argv[0]);
+ argv[1]=key;
+ argv_len[1]=keylen;
for(size_t i=0; i<n_members; i++)
{
- cmd->argv[2+i]=sdsnewlen(members[i], members_len[i]);
- }
- exec_for_local(db, cmd, NULL, cb, cb_arg);
- swarmkv_cmd_free(cmd);
-
-}
-void swarmkv_bfadd(struct swarmkv * db, const char * key, size_t keylen, const char *items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t *cb, void *cb_arg)
-{
- struct swarmkv_cmd *cmd=NULL;
- cmd=swarmkv_cmd_new(2+n_items);
- cmd->argv[0]=sdsnew("bfadd");
- cmd->argv[1]=sdsnewlen(key, keylen);
- for(size_t i=0; i<n_items; i++)
- {
- cmd->argv[2+i]=sdsnewlen(items[i], items_len[i]);
- }
- exec_for_local(db, cmd, NULL, cb, cb_arg);
- swarmkv_cmd_free(cmd);
-}
-void swarmkv_bfexists(struct swarmkv * db, const char * key, size_t keylen, const char *items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t *cb, void *cb_arg)
-{
- struct swarmkv_cmd *cmd=NULL;
- cmd=swarmkv_cmd_new(2+n_items);
- cmd->argv[0]=sdsnew("bfexists");
- cmd->argv[1]=sdsnewlen(key, keylen);
- for(size_t i=0; i<n_items; i++)
- {
- cmd->argv[2+i]=sdsnewlen(items[i], items_len[i]);
+ argv[2+i]=members[i];
+ argv_len[2+i]=members_len[i];
}
- exec_for_local(db, cmd, NULL, cb, cb_arg);
- swarmkv_cmd_free(cmd);
+ swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2+n_members, argv, argv_len);
+ return;
}
void swarmkv_sismember(struct swarmkv *db, const char* key, size_t keylen, const char *member, size_t member_len, swarmkv_on_reply_callback_t *cb, void *cb_arg)
{
- struct swarmkv_cmd *cmd=NULL;
- cmd=swarmkv_cmd_new(3);
- cmd->argv[0]=sdsnew("sismember");
- cmd->argv[1]=sdsnewlen(key, keylen);
- cmd->argv[2]=sdsnewlen(member, member_len);
- exec_for_local(db, cmd, NULL, cb, cb_arg);
- swarmkv_cmd_free(cmd);
+ const char *argv[3];
+ size_t argv_len[3];
+ argv[0]="sismember";
+ argv_len[0]=strlen(argv[0]);
+ argv[1]=key;
+ argv_len[1]=keylen;
+ argv[2]=member;
+ argv_len[2]=member_len;
+ swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 3, argv, argv_len);
+ return;
}
void swarmkv_smembers(struct swarmkv *db, const char* key, size_t keylen, swarmkv_on_reply_callback_t *cb, void *cb_arg)
{
- struct swarmkv_cmd *cmd=NULL;
- cmd=swarmkv_cmd_new(2);
- cmd->argv[0]=sdsnew("smembers");
- cmd->argv[1]=sdsnewlen(key, keylen);
- exec_for_local(db, cmd, NULL, cb, cb_arg);
- swarmkv_cmd_free(cmd);
+ const char *argv[2];
+ size_t argv_len[2];
+ argv[0]="smembers";
+ argv_len[0]=strlen(argv[0]);
+ argv[1]=key;
+ argv_len[1]=keylen;
+ swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2, argv, argv_len);
+ return;
}
void swarmkv_scard(struct swarmkv *db, const char* key, size_t keylen, swarmkv_on_reply_callback_t *cb, void *cb_arg)
{
- struct swarmkv_cmd *cmd=NULL;
- cmd=swarmkv_cmd_new(2);
- cmd->argv[0]=sdsnew("scard");
- cmd->argv[1]=sdsnewlen(key, keylen);
- exec_for_local(db, cmd, NULL, cb, cb_arg);
- swarmkv_cmd_free(cmd);
+ const char *argv[2];
+ size_t argv_len[2];
+ argv[0]="scard";
+ argv_len[0]=strlen(argv[0]);
+ argv[1]=key;
+ argv_len[1]=keylen;
+ swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2, argv, argv_len);
+ return;
}
int swarmkv_hset_string(struct swarmkv *db,
@@ -307,218 +427,149 @@ int swarmkv_hset_string(struct swarmkv *db,
return 0;
}
void swarmkv_del(struct swarmkv * db, const char * key, size_t keylen, swarmkv_on_reply_callback_t *cb, void *cb_arg)
-{
- struct swarmkv_cmd *cmd=NULL;
- cmd=swarmkv_cmd_new(2);
- cmd->argv[0]=sdsnew("del");
- cmd->argv[1]=sdsnewlen(key, keylen);
- exec_for_local(db, cmd, NULL, cb, cb_arg);
- swarmkv_cmd_free(cmd);
+{
+ const char *argv[2];
+ size_t argv_len[2];
+ argv[0]="del";
+ argv_len[0]=strlen(argv[0]);
+ argv[1]=key;
+ argv_len[1]=keylen;
+ swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2, argv, argv_len);
return;
}
void swarmkv_incrby(struct swarmkv * db, const char * key, size_t keylen, long long increment, swarmkv_on_reply_callback_t *cb, void *cb_arg)
-{
- struct swarmkv_cmd *cmd=NULL;
- cmd=swarmkv_cmd_new(3);
- cmd->argv[0]=sdsnew("incrby");
- cmd->argv[1]=sdsnewlen(key, keylen);
- cmd->argv[2]=sdsfromlonglong(increment);
- exec_for_local(db, cmd, NULL, cb, cb_arg);
- swarmkv_cmd_free(cmd);
+{
+ const char *argv[3];
+ size_t argv_len[3];
+ argv[0]="incrby";
+ argv_len[0]=strlen(argv[0]);
+ argv[1]=key;
+ argv_len[1]=keylen;
+ char buffer[32];
+ sprintf(buffer, "%lld", increment);
+ argv[2]=buffer;
+ argv_len[2]=strlen(buffer);
+ swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 3, argv, argv_len);
return;
}
void swarmkv_tconsume(struct swarmkv * db, const char * key, size_t keylen, long long tokens, swarmkv_on_reply_callback_t *cb, void *cb_arg)
-{
- struct swarmkv_cmd *cmd=NULL;
- cmd=swarmkv_cmd_new(3);
- cmd->argv[0]=sdsnew("tconsume");
- cmd->argv[1]=sdsnewlen(key, keylen);
- cmd->argv[2]=sdsfromlonglong(tokens);
- exec_for_local(db, cmd, NULL, cb, cb_arg);
- swarmkv_cmd_free(cmd);
+{
+ const char *argv[3];
+ size_t argv_len[3];
+ argv[0]="tconsume";
+ argv_len[0]=strlen(argv[0]);
+ argv[1]=key;
+ argv_len[1]=keylen;
+ char buffer[32];
+ sprintf(buffer, "%lld", tokens);
+ argv[2]=buffer;
+ argv_len[2]=strlen(buffer);
+ swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 3, argv, argv_len);
return;
}
void swarmkv_ftconsume(struct swarmkv * db, const char * key, size_t keylen, const char * member, size_t member_len, long long weight, long long tokens, swarmkv_on_reply_callback_t *cb, void *cb_arg)
-{
- struct swarmkv_cmd *cmd=NULL;
- cmd=swarmkv_cmd_new(5);
- cmd->argv[0]=sdsnew("ftconsume");
- cmd->argv[1]=sdsnewlen(key, keylen);
- cmd->argv[2]=sdsnewlen(member, member_len);
- cmd->argv[3]=sdsfromlonglong(weight);
- cmd->argv[4]=sdsfromlonglong(tokens);
- exec_for_local(db, cmd, NULL, cb, cb_arg);
- swarmkv_cmd_free(cmd);
+{
+ const char *argv[5];
+ size_t argv_len[5];
+ argv[0]="ftconsume";
+ argv_len[0]=strlen(argv[0]);
+ argv[1]=key;
+ argv_len[1]=keylen;
+ argv[2]=member;
+ argv_len[2]=member_len;
+ char buffer1[32];
+ sprintf(buffer1, "%lld", weight);
+ argv[3]=buffer1;
+ argv_len[3]=strlen(buffer1);
+ char buffer2[32];
+ sprintf(buffer2, "%lld", tokens);
+ argv[4]=buffer2;
+ argv_len[4]=strlen(buffer2);
+ swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 5, argv, argv_len);
return;
}
void swarmkv_btconsume(struct swarmkv * db, const char * key, size_t keylen, const char * member, size_t member_len, long long tokens, swarmkv_on_reply_callback_t *cb, void *cb_arg)
-{
- struct swarmkv_cmd *cmd=NULL;
- cmd=swarmkv_cmd_new(4);
- cmd->argv[0]=sdsnew("btconsume");
- cmd->argv[1]=sdsnewlen(key, keylen);
- cmd->argv[2]=sdsnewlen(member, member_len);
- cmd->argv[3]=sdsfromlonglong(tokens);
- exec_for_local(db, cmd, NULL, cb, cb_arg);
- swarmkv_cmd_free(cmd);
- return;
-}
-struct blocking_query_ctx
{
- struct swarmkv_reply *reply;
- struct swarmkv *db;
-};
-void blocking_query_cb(const struct swarmkv_reply *reply, void * arg)
-{
- struct blocking_query_ctx *ctx=(struct blocking_query_ctx*) arg;
- ctx->reply=swarmkv_reply_dup(reply);
- swarmkv_caller_loop_break(ctx->db);
+ const char *argv[4];
+ size_t argv_len[4];
+ argv[0]="btconsume";
+ argv_len[0]=strlen(argv[0]);
+ argv[1]=key;
+ argv_len[1]=keylen;
+ argv[2]=member;
+ argv_len[2]=member_len;
+ char buffer[32];
+ sprintf(buffer, "%lld", tokens);
+ argv[3]=buffer;
+ argv_len[3]=strlen(buffer);
+ swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 4, argv, argv_len);
return;
}
-struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *target, int argc, sds *argv)
-{
- struct swarmkv_cmd *cmd=NULL;
- struct swarmkv_reply *reply=NULL;
- struct blocking_query_ctx ctx;
- memset(&ctx, 0, sizeof(ctx));
- node_t target_node;
- memset(&target_node, 0, sizeof(node_t));
- ctx.db=db;
- ctx.reply=NULL;
- cmd=swarmkv_cmd_new(argc);
- for(int i=0; i<argc; i++)
- {
- cmd->argv[i]=sdsdup(argv[i]);
- }
-
- if(!target)
- {
- exec_for_local(db, cmd, NULL, blocking_query_cb, &ctx);
- }
- else
- {
- node_init_from_sds(&target_node, target);
- exec_for_local(db, cmd, &target_node, blocking_query_cb, &ctx);
- }
- long long pending_cmds=0;
- pending_cmds=swarmkv_caller_get_pending_commands(db);
- if(ctx.reply==NULL)
- {
- assert(pending_cmds==1);
- swarmkv_caller_loop(db, SWARMKV_LOOP_NO_EXIT_ON_EMPTY, NULL);
- }
- assert(ctx.reply!=NULL);
- reply=ctx.reply;
- ctx.reply=NULL;
-
- swarmkv_cmd_free(cmd);
- return reply;
-}
-
-struct swarmkv_reply *swarmkv_command_on(struct swarmkv *db, const char *target, const char *format, ...)
-{
- char *cmd_str=NULL;
- va_list ap;
- va_start(ap,format);
- vasprintf(&cmd_str, format, ap);
- va_end(ap);
-
- int argc=0;
- sds *argv=NULL;
-
- argv=sdssplitargs(cmd_str, &argc);
- struct swarmkv_reply *reply=NULL;
- reply=swarmkv_command_on_argv(db, target, argc, argv);
-
- if (argv)
- {
- sdsfreesplitres(argv, argc);
- }
- free(cmd_str);
-
- return reply;
-}
-struct swarmkv_reply *swarmkv_command(struct swarmkv *db, const char *format, ...)
+void swarmkv_bfadd(struct swarmkv * db, const char * key, size_t keylen, const char *items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t *cb, void *cb_arg)
{
- char *cmd_str=NULL;
- va_list ap;
- va_start(ap, format);
- vasprintf(&cmd_str, format, ap);
- va_end(ap);
-
- int argc=0;
- sds *argv=NULL;
- argv=sdssplitargs(cmd_str, &argc);
- struct swarmkv_reply *reply=NULL;
- reply=swarmkv_command_on_argv(db, NULL, argc, argv);
-
- if (argv)
+ const char *argv[2+n_items];
+ size_t argv_len[2+n_items];
+ argv[0]="bfadd";
+ argv_len[0]=strlen(argv[0]);
+ argv[1]=key;
+ argv_len[1]=keylen;
+ for(size_t i=0; i<n_items; i++)
{
- sdsfreesplitres(argv, argc);
+ argv[2+i]=items[i];
+ argv_len[2+i]=items_len[i];
}
- free(cmd_str);
-
- return reply;
+ swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2+n_items, argv, argv_len);
+ return;
}
-void swarmkv_async_command_on_argv(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *target, int argc, sds *argv)
+void swarmkv_bfmexists(struct swarmkv * db, const char * key, size_t keylen, const char *items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t *cb, void *cb_arg)
{
- struct swarmkv_cmd *cmd=NULL;
- cmd=swarmkv_cmd_new(argc);
- for(size_t i=0; i<argc; i++)
+ const char *argv[2+n_items];
+ size_t argv_len[2+n_items];
+ argv[0]="bfmexists";
+ argv_len[0]=strlen(argv[0]);
+ argv[1]=key;
+ argv_len[1]=keylen;
+ for(size_t i=0; i<n_items; i++)
{
- cmd->argv[i]=sdsdup(argv[i]);
+ argv[2+i]=items[i];
+ argv_len[2+i]=items_len[i];
}
-
- if(!target)
- {
- exec_for_local(db, cmd, NULL, cb, cb_arg);
- }
- else
- {
- node_t target_node;
- memset(&target_node, 0, sizeof(node_t));
- node_init_from_sds(&target_node, target);
- exec_for_local(db, cmd, &target_node, cb, cb_arg);
- }
- swarmkv_cmd_free(cmd);
+ swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2+n_items, argv, argv_len);
+ return;
}
-void swarmkv_async_command_on(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *target, const char *format, ...)
+void swarmkv_cmsincrby(struct swarmkv * db, const char * key, size_t keylen, const char * items[], const size_t items_len[], long long increments[], size_t n_increment, swarmkv_on_reply_callback_t * cb, void * cb_arg)
{
- int argc=0; sds *argv=NULL;
- char *cmd_str=NULL;
-
- va_list ap;
- va_start(ap,format);
- vasprintf(&cmd_str, format, ap);
- va_end(ap);
-
- argv=sdssplitargs(cmd_str, &argc);
-
- swarmkv_async_command_on_argv(db, cb, cb_arg, target, argc, argv);
-
- if (argv)
+ const char *argv[2+2*n_increment];
+ size_t argv_len[2+2*n_increment];
+ char inc_buffer[n_increment][32];
+ argv[0]="cmsincrby";
+ argv_len[0]=strlen(argv[0]);
+ argv[1]=key;
+ argv_len[1]=keylen;
+ for(size_t i=0; i<n_increment; i++)
{
- sdsfreesplitres(argv, argc);
+ sprintf(inc_buffer[i], "%lld", increments[i]);
+ argv[2+2*i]=items[i];
+ argv_len[2+2*i]=items_len[i];
+ argv[3+2*i]=inc_buffer[i];
+ argv_len[3+2*i]=strlen(inc_buffer[i]);
}
- free(cmd_str);
+ swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2+2*n_increment, argv, argv_len);
+ return;
}
-void swarmkv_async_command(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *format, ...)
+void swarmkv_cmsmquery(struct swarmkv * db, const char * key, size_t keylen, const char * items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t * cb, void * cb_arg)
{
- int argc=0; sds *argv=NULL;
- char *cmd_str=NULL;
-
- va_list ap;
- va_start(ap,format);
- vasprintf(&cmd_str, format, ap);
- va_end(ap);
-
- argv=sdssplitargs(cmd_str, &argc);
-
- swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, argc, argv);
-
- if (argv)
+ const char *argv[2+n_items];
+ size_t argv_len[2+n_items];
+ argv[0]="cmsmquery";
+ argv_len[0]=strlen(argv[0]);
+ argv[1]=key;
+ argv_len[1]=keylen;
+ for(size_t i=0; i<n_items; i++)
{
- sdsfreesplitres(argv, argc);
+ argv[2+i]=items[i];
+ argv_len[2+i]=items_len[i];
}
- free(cmd_str);
+ swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2+n_items, argv, argv_len);
+ return;
} \ No newline at end of file
diff --git a/src/swarmkv_common.c b/src/swarmkv_common.c
index faa1a04..d915b8a 100644
--- a/src/swarmkv_common.c
+++ b/src/swarmkv_common.c
@@ -287,6 +287,10 @@ sds node_addr2sds(const node_t *node)
{
return sdsnew(node->addr);
}
+const char *node_addr2cstr(const node_t *node)
+{
+ return node->addr;
+}
int node_init_from_sds(node_t *node, const char *addr_str)
{
memset(node, 0, sizeof(node_t));
@@ -348,7 +352,7 @@ void node_init(node_t *node, const char *ipv4, unsigned int port)
snprintf(node->addr, sizeof(node->addr), "%s:%u", ipv4, port);
return;
}
-void node_init_from_string(node_t *node, const char *addr_str)
+void node_init_from_cstr(node_t *node, const char *addr_str)
{
memset(node, 0, sizeof(node_t));
assert(strlen(addr_str)<sizeof(node_t));
@@ -356,6 +360,13 @@ void node_init_from_string(node_t *node, const char *addr_str)
strncpy(node->addr, addr_str, sizeof(node->addr));
return;
}
+void node_init_from_string(node_t *node, const char *addr_str, size_t sz_addr)
+{
+ memset(node, 0, sizeof(node_t));
+ assert(sz_addr<=sizeof(node->addr));
+ strncpy(node->addr, addr_str, sz_addr);
+ return;
+}
int node_init_from_reply(node_t *node, const struct swarmkv_reply *reply)
{
if(reply->type!=SWARMKV_REPLY_NODE)
@@ -368,7 +379,7 @@ int node_init_from_reply(node_t *node, const struct swarmkv_reply *reply)
{
p+=5;
}
- node_init_from_string(node, p);
+ node_init_from_cstr(node, p);
return 0;
}
int node_list_new_from_reply(node_t **node_list, size_t *n_node, const struct swarmkv_reply *reply)
@@ -559,10 +570,11 @@ void swarmkv_reply_free(struct swarmkv_reply *reply)
}
free(reply);
}
-struct swarmkv_cmd *swarmkv_cmd_new(size_t argc)
+struct swarmkv_cmd *swarmkv_cmd_new(size_t argc, const node_t *caller)
{
size_t size=sizeof(struct swarmkv_cmd)+argc*sizeof(sds);
struct swarmkv_cmd *cmd=(struct swarmkv_cmd*)malloc(size);
+ node_copy(&cmd->caller, caller);
cmd->argc=argc;
cmd->argv=(sds*)((char*)cmd+sizeof(struct swarmkv_cmd));
return cmd;
@@ -580,7 +592,7 @@ void swarmkv_cmd_free(struct swarmkv_cmd *p)
}
struct swarmkv_cmd* swarmkv_cmd_dup(const struct swarmkv_cmd *origin)
{
- struct swarmkv_cmd* copy=swarmkv_cmd_new(origin->argc);
+ struct swarmkv_cmd* copy=swarmkv_cmd_new(origin->argc, &origin->caller);
size_t i=0;
for(i=0; i<origin->argc; i++)
{
@@ -657,7 +669,7 @@ void json2keyslots(sds json_buffer, void *slot_container_base, size_t sz_slot_co
{
iterator=(struct key_slot*)(slot_container_base+sz_slot_container*j+offset_slot);
iterator->slot_id=j;
- node_init_from_string(&iterator->owner, owner->valuestring);
+ node_init_from_cstr(&iterator->owner, owner->valuestring);
k++;
}
}
@@ -714,7 +726,7 @@ void leader_response2leader_node(const char *resp_body, node_t *leader)
cJSON *item=NULL;
root=cJSON_Parse(resp_body);
item=cJSON_GetObjectItem(root, "node");
- node_init_from_string(leader, item->valuestring);;
+ node_init_from_cstr(leader, item->valuestring);;
cJSON_Delete(root);
}
/* This is an modified version of keyHashSlot of Redis source code.
diff --git a/src/swarmkv_common.h b/src/swarmkv_common.h
index 3d427cf..d4f53e5 100644
--- a/src/swarmkv_common.h
+++ b/src/swarmkv_common.h
@@ -85,11 +85,12 @@ struct swarmkv_options
struct swarmkv_cmd
{
+ node_t caller;
size_t argc;
sds *argv;
};
-struct swarmkv_cmd *swarmkv_cmd_new(size_t argc);
+struct swarmkv_cmd *swarmkv_cmd_new(size_t argc, const node_t *originator);
void swarmkv_cmd_free(struct swarmkv_cmd *p);
struct swarmkv_cmd *swarmkv_cmd_dup(const struct swarmkv_cmd *origin);
int swarmkv_cmd_parse_integer(const struct swarmkv_cmd *cmd, const char *name, long long *ival);
@@ -117,21 +118,22 @@ int node_list_remove(node_t *list, size_t n_list, const node_t *node);
sds node_print_json(const node_t *self, uuid_t uuid);
sds node_addr2sds(const node_t *node);
+const char *node_addr2cstr(const node_t *node);
int node_init_from_sds(node_t *node, const char *addr_str);
int node_init_from_reply(node_t *node, const struct swarmkv_reply *reply);
+void node_init(node_t *node, const char *ipv4, unsigned int port);
+void node_init_from_cstr(node_t *node, const char *addr_str);
+void node_init_from_string(node_t *node, const char *addr_str, size_t sz_addr);
+
/* Return 0 if node1 == node 2 */
int node_compare(const node_t *node1, const node_t *node2);
void node_copy(node_t *dst, const node_t *src);
+int node_sanity(const node_t *node);
int node_is_empty(const node_t *node);
-void node_init(node_t *node, const char *ipv4, unsigned int port);
-void node_init_from_string(node_t *node, const char *addr_str);
+
int node_parse(const node_t *node, unsigned int *port, char *addr_str, size_t sz_addr);
void node_init_from_sockaddr(node_t *node, const struct sockaddr * sa);
void node_to_sockaddr(const node_t *node, struct sockaddr *sockaddr);
int http_blocking_request(enum evhttp_cmd_type type, const char* host, unsigned int port, const char* url, sds req_body, sds *resp_body);
-//typedef void exec_cmd_func(struct swarmkv *db, const node_t *target_node, const struct swarmkv_cmd *cmd, struct future *future_of_caller);
-struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *target, int argc, sds *argv);
-void swarmkv_async_command_on_argv(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *target, int argc, sds *argv);
-
-int __gettid(struct swarmkv *db); \ No newline at end of file
+const node_t *swarmkv_self_node(const struct swarmkv *db); \ No newline at end of file
diff --git a/src/swarmkv_error.h b/src/swarmkv_error.h
index 221d107..5b6522b 100644
--- a/src/swarmkv_error.h
+++ b/src/swarmkv_error.h
@@ -9,6 +9,7 @@
#define error_arg_not_valid_address "ERR arg `%s` is not `IP:port` format"
#define error_arg_not_valid_float "ERR arg `%s` is not a valid float or out of range"
#define error_arg_not_valid_uuid "ERR arg `%s` is not a valid UUID"
+#define error_arg_not_valid_node "ERR arg `%s` is not a valid IP:port"
#define error_arg_parse_failed "ERR arg `%s` parse failed"
#define error_arg_string_should_be "ERR arg `%s` should be `%s`"
#define error_need_additional_arg "ERR arg `%s` should be fllowed by more args"
diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c
index 1ca01c5..582cf52 100644
--- a/src/swarmkv_keyspace.c
+++ b/src/swarmkv_keyspace.c
@@ -156,19 +156,23 @@ void crdt_op_on_reply(const struct swarmkv_reply *reply, void * arg)
void key_entry_deletion_notification(struct key_route_entry *key_entry, struct swarmkv *exec_cmd_handle)
{
struct replica_node *replica=NULL, *tmp=NULL;
- struct swarmkv_cmd *crdt_del_cmd=swarmkv_cmd_new(3);
- crdt_del_cmd->argv[0]=sdsnew("crdt");
- crdt_del_cmd->argv[1]=sdsnew("del");
- crdt_del_cmd->argv[2]=sdsdup(key_entry->key);
+ const char *argv[3];
+ size_t argv_len[3];
+ argv[0]="crdt";
+ argv_len[0]=strlen(argv[0]);
+ argv[1]="del";
+ argv_len[1]=strlen(argv[1]);
+ argv[2]=key_entry->key;
+ argv_len[2]=sdslen(key_entry->key);
+
HASH_ITER(hh, key_entry->hash_replica, replica, tmp)
{
struct crdt_op_ctx *ctx=ALLOC(struct crdt_op_ctx, 1);
node_copy(&ctx->peer, &replica->node);
ctx->key=sdsdup(key_entry->key);
ctx->is_del=1;
- swarmkv_async_command_on_argv(exec_cmd_handle, crdt_op_on_reply, ctx, replica->node.addr, crdt_del_cmd->argc, crdt_del_cmd->argv);
+ swarmkv_async_command_on_argv(exec_cmd_handle, crdt_op_on_reply, ctx, replica->node.addr, 3, argv, argv_len);
}
- swarmkv_cmd_free(crdt_del_cmd);
return;
}
@@ -176,14 +180,18 @@ void key_entry_meet_replica(struct key_route_entry *key_entry, struct swarmkv *e
{
struct replica_node *replica=NULL, *tmp=NULL;
int n_replica=HASH_COUNT(key_entry->hash_replica), i=0;
- struct swarmkv_cmd *crdt_meet_cmd=swarmkv_cmd_new(3+n_replica);
- crdt_meet_cmd->argv[0]=sdsnew("crdt");
- crdt_meet_cmd->argv[1]=sdsnew("meet");
- crdt_meet_cmd->argv[2]=sdsdup(key_entry->key);
-
+ const char *argv[3+n_replica];
+ size_t argv_len[3+n_replica];
+ argv[0]="crdt";
+ argv_len[0]=strlen(argv[0]);
+ argv[1]="meet";
+ argv_len[1]=strlen(argv[1]);
+ argv[2]=key_entry->key;
+ argv_len[2]=sdslen(key_entry->key);
HASH_ITER(hh, key_entry->hash_replica, replica, tmp)
{
- crdt_meet_cmd->argv[3+i]=node_addr2sds(&replica->node);
+ argv[3+i]=node_addr2cstr(&replica->node);
+ argv_len[3+i]=strlen(argv[3+i]);
i++;
}
assert(i==n_replica);
@@ -192,9 +200,8 @@ void key_entry_meet_replica(struct key_route_entry *key_entry, struct swarmkv *e
struct crdt_op_ctx *ctx=ALLOC(struct crdt_op_ctx, 1);
node_copy(&ctx->peer, &replica->node);
ctx->key=sdsdup(key_entry->key);
- swarmkv_async_command_on_argv(exec_cmd_handle, crdt_op_on_reply, ctx, replica->node.addr, crdt_meet_cmd->argc, crdt_meet_cmd->argv);
+ swarmkv_async_command_on_argv(exec_cmd_handle, crdt_op_on_reply, ctx, replica->node.addr, 3+n_replica, argv, argv_len);
}
- swarmkv_cmd_free(crdt_meet_cmd);
return;
}
@@ -1443,7 +1450,7 @@ enum cmd_exec_result keyspace_getkeysinslot_command(struct swarmkv_module *mod_k
static int ks_get_tid(struct swarmkv_keyspace *ks, int slot_id)
{
int tid=swarmkv_keyspace_slot2tid(&ks->module, slot_id);
- int real_tid=__gettid(ks->exec_cmd_handle);
+ int real_tid=swarmkv_gettid(ks->exec_cmd_handle);
assert(tid==real_tid);
return tid;
}
@@ -1646,7 +1653,7 @@ enum cmd_exec_result keyspace_keys_command(struct swarmkv_module *mod_keyspace,
int is_matched=0;
UT_array *matched_replies=NULL;
struct swarmkv_reply *r=NULL;
- int real_tid=__gettid(ks->exec_cmd_handle);
+ int real_tid=swarmkv_gettid(ks->exec_cmd_handle);
int thread_id=atoll(cmd->argv[2]);
assert(real_tid==thread_id);
const sds pattern=cmd->argv[3];
@@ -1903,7 +1910,7 @@ int swarmkv_keyspace_slot2tid(struct swarmkv_module *mod_keyspace, int slot_id)
void swarmkv_keyspace_periodic(struct swarmkv_module *mod_keyspace, int thread_id)
{
struct swarmkv_keyspace *ks = module2keyspace(mod_keyspace);
- int real_tid=__gettid(ks->exec_cmd_handle);
+ int real_tid=swarmkv_gettid(ks->exec_cmd_handle);
assert(real_tid==thread_id);
struct ks_thread *thr=ks->threads+thread_id;
diff --git a/src/swarmkv_mesh.c b/src/swarmkv_mesh.c
index 0ffc636..62d6129 100644
--- a/src/swarmkv_mesh.c
+++ b/src/swarmkv_mesh.c
@@ -44,7 +44,7 @@ int swarmkv_mesh_send(struct swarmkv_mesh *mesh, int current_thread_id, int dest
assert(current_thread_id != dest_thread_id);
struct swarmkv_mesh_thread *curr_thr=mesh->threads+current_thread_id;
struct swarmkv_mesh_thread *dest_thr=mesh->threads+dest_thread_id;
- int tid=__gettid((struct swarmkv *)(mesh->on_msg_cb_arg));
+ int tid=swarmkv_gettid((struct swarmkv *)(mesh->on_msg_cb_arg));
assert(tid==current_thread_id);
ringbuf_t *dest_ring=dest_thr->ring;
assert(msg->magic == SWARMKV_MSG_MAGIC);
@@ -88,7 +88,7 @@ static void mesh_on_eventfd_read_cb(evutil_socket_t fd, short what, void * arg)
ringbuf_t *ring=thr->ring;
uint64_t n_msg=0;
- int tid=__gettid((struct swarmkv *)(mesh->on_msg_cb_arg));
+ int tid=swarmkv_gettid((struct swarmkv *)(mesh->on_msg_cb_arg));
assert(tid==thr->thread_id);
ssize_t s = read(thr->efd, &n_msg, sizeof(uint64_t));
diff --git a/src/swarmkv_message.c b/src/swarmkv_message.c
index 8db7ac9..6f2607b 100644
--- a/src/swarmkv_message.c
+++ b/src/swarmkv_message.c
@@ -75,19 +75,24 @@ static struct swarmkv_cmd *swarmkv_cmd_deserialize(const char *blob, size_t size
mpack_tree_init_data(&tree, blob, size);
mpack_tree_parse(&tree);
- mpack_node_t item;
+ mpack_node_t item, array_node;
mpack_node_t root=mpack_tree_root(&tree);
struct swarmkv_cmd *cmd=NULL;
const char *arg=NULL;
- size_t arg_sz=0;
-
- cmd=swarmkv_cmd_new(mpack_node_array_length(root));
+ size_t sz=0;
+ node_t caller;
+ item=mpack_node_map_cstr(root, "caller");
+ node_init_from_string(&caller, mpack_node_str(item), mpack_node_strlen(item));
+ assert(node_sanity(&caller));
+ //assert(sz==sizeof(node_t));
+ array_node=mpack_node_map_cstr(root, "argv");
+ cmd=swarmkv_cmd_new(mpack_node_array_length(array_node), &caller);
for(size_t i=0; i<cmd->argc; i++)
{
- item=mpack_node_array_at(root, i);
+ item=mpack_node_array_at(array_node, i);
arg=mpack_node_str(item);
- arg_sz=mpack_node_strlen(item);
- cmd->argv[i]=sdsnewlen(arg, arg_sz);
+ sz=mpack_node_strlen(item);
+ cmd->argv[i]=sdsnewlen(arg, sz);
}
if (mpack_tree_destroy(&tree) != mpack_ok) {
fprintf(stderr, "An error occurred decoding a cmd blob!\n");
@@ -164,15 +169,20 @@ static void swarmkv_cmd_serialize(const struct swarmkv_cmd *cmd, char **blob, si
char *root_mpack_buff=NULL;
size_t root_mpack_sz=0;
- mpack_writer_t writer;
+ mpack_writer_t writer;
size_t i=0;
mpack_writer_init_growable(&writer, &root_mpack_buff, &root_mpack_sz);
+ mpack_build_map(&writer);
+ mpack_write_cstr(&writer, "caller");
+ mpack_write_cstr(&writer, node_addr2cstr(&cmd->caller));
+ mpack_write_cstr(&writer, "argv");
mpack_build_array(&writer);
for(i=0; i<cmd->argc; i++)
{
mpack_write_str(&writer, cmd->argv[i], (uint32_t)sdslen(cmd->argv[i]));
}
mpack_complete_array(&writer);
+ mpack_complete_map(&writer);
if (mpack_writer_destroy(&writer) != mpack_ok) {
fprintf(stderr, "An error occurred encoding the cmd!\n");
diff --git a/src/swarmkv_monitor.c b/src/swarmkv_monitor.c
index 0856b1c..e0c070c 100644
--- a/src/swarmkv_monitor.c
+++ b/src/swarmkv_monitor.c
@@ -2,6 +2,7 @@
#include "sds.h"
#include "log.h"
#include "uthash.h"
+#include "utlist.h"
#include "swarmkv_monitor.h"
#include "swarmkv_common.h"
#include "swarmkv_utils.h"
@@ -220,6 +221,31 @@ struct swarmkv_reply *recorder_metric_to_reply(const struct recorder_metric *met
reply->elements[1]=swarmkv_reply_new_string_fmt(metric_buff);
return reply;
}
+struct monitor_client
+{
+ node_t node;
+ sds pattern;
+ struct monitor_client *next;
+};
+struct monitor_client *monitor_client_new(const node_t *node, sds pattern)
+{
+ struct monitor_client *client=ALLOC(struct monitor_client, 1);
+ node_copy(&client->node, node);
+ client->pattern = pattern? sdsdup(pattern):NULL;
+ return client;
+}
+void monitor_client_free(struct monitor_client *client)
+{
+ sdsfree(client->pattern);
+ free(client);
+ return;
+}
+struct swarmkv_monitor_thread
+{
+ int n_client;
+ struct monitor_client *clients;
+ pthread_mutex_t mutex;
+};
struct swarmkv_monitor
{
struct swarmkv_module module;
@@ -229,7 +255,9 @@ struct swarmkv_monitor
long long max_latency_usec;
int significant_figures;
int nr_worker_threads;
- pthread_mutex_t mutex;//only one latency command can be executed at a time
+ struct swarmkv_monitor_thread *threads;
+ pthread_mutex_t latency_execute_mutex; //only one latency command can be executed at a time
+ struct swarmkv *ref_db;
};
struct swarmkv_monitor *module2monitor(struct swarmkv_module *module)
@@ -239,7 +267,7 @@ struct swarmkv_monitor *module2monitor(struct swarmkv_module *module)
assert(monitor==module->mod_ctx);
return monitor;
}
-struct swarmkv_module *swarmkv_monitor_new(const struct swarmkv_options *opts)
+struct swarmkv_module *swarmkv_monitor_new(const struct swarmkv_options *opts, struct swarmkv *db)
{
struct swarmkv_monitor *monitor=ALLOC(struct swarmkv_monitor, 1);
monitor->max_latency_usec=opts->cluster_timeout_us;
@@ -247,7 +275,14 @@ struct swarmkv_module *swarmkv_monitor_new(const struct swarmkv_options *opts)
monitor->peers=ALLOC(struct recorder *, monitor->nr_worker_threads);
strncpy(monitor->module.name, "monitor", sizeof(monitor->module.name));
monitor->module.mod_ctx=monitor;
- pthread_mutex_init(&monitor->mutex, NULL);
+ pthread_mutex_init(&monitor->latency_execute_mutex, NULL);
+ monitor->threads=ALLOC(struct swarmkv_monitor_thread, monitor->nr_worker_threads);
+ for(size_t i=0; i<monitor->nr_worker_threads; i++)
+ {
+ pthread_mutex_init(&monitor->threads[i].mutex, PTHREAD_PROCESS_PRIVATE);
+ monitor->threads[i].clients=NULL;
+ }
+ monitor->ref_db=db;
return &monitor->module;
}
void swarmkv_monitor_register_command(struct swarmkv_module *mod_monitor, const char *command)
@@ -274,15 +309,142 @@ void swarmkv_monitor_free(struct swarmkv_module *mod_monitor)
for(size_t i=0; i<monitor->nr_worker_threads; i++)
{
recorder_table_free(monitor->peers+i);
+ struct monitor_client *client=NULL, *tmp=NULL;
+ LL_FOREACH_SAFE(monitor->threads[i].clients, client, tmp)
+ {
+ LL_DELETE(monitor->threads[i].clients, client);
+ monitor_client_free(client);
+ }
}
+ free(monitor->threads);
free(monitor->peers);
monitor->peers=NULL;
free(monitor);
}
-void swarmkv_monitor_record_command(struct swarmkv_module *mod_monitor, const char *cmd_name, long long latency_usec)
+struct print_ctx
+{
+ node_t node;
+ int tid;
+ struct swarmkv_monitor *monitor;
+};
+static void print_on_reply(const struct swarmkv_reply *reply, void *user)
+{
+ struct print_ctx *ctx=(struct print_ctx *)user;
+ int tid=swarmkv_gettid(ctx->monitor->ref_db);
+ assert(tid==ctx->tid);
+ if(reply->type != SWARMKV_REPLY_STATUS)
+ {
+ struct monitor_client *client=NULL, *tmp=NULL;
+ pthread_mutex_lock(&ctx->monitor->threads[ctx->tid].mutex);
+ LL_FOREACH_SAFE(ctx->monitor->threads[ctx->tid].clients, client, tmp)
+ {
+ if(0==node_compare(&client->node, &ctx->node))
+ {
+ ctx->monitor->threads[ctx->tid].n_client--;
+ LL_DELETE(ctx->monitor->threads[ctx->tid].clients, client);
+ monitor_client_free(client);
+ }
+ }
+ pthread_mutex_unlock(&ctx->monitor->threads[ctx->tid].mutex);
+ }
+ free(ctx);
+}
+static const char *exec_ret2string(enum cmd_exec_result ret)
+{
+ switch(ret)
+ {
+ case NEED_KEY_ROUTE:
+ return "KRT";
+ case REDIRECT:
+ return "RDR";
+ case FINISHED:
+ return "FIN";
+ default:
+ assert(0);
+ }
+}
+void swarmkv_monitor_record_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd_spec *spec, const struct swarmkv_cmd *cmd, long long latency_usec, enum cmd_exec_result result)
{
struct swarmkv_monitor *monitor=module2monitor(mod_monitor);
- recorder_table_record_latency(&monitor->commands, cmd_name, strlen(cmd_name), latency_usec, monitor->max_latency_usec, 0);
+ recorder_table_record_latency(&monitor->commands, spec->name, strlen(spec->name), latency_usec, monitor->max_latency_usec, 0);
+ int tid=swarmkv_gettid(monitor->ref_db);
+ assert(tid<monitor->nr_worker_threads);
+ if(0==strncasecmp(spec->name, "PRINT", strlen("PRINT")))
+ {
+ //Do not log print command to avoid infinit recursion.
+ return;
+ }
+ if(monitor->threads[tid].n_client == 0)
+ {
+ //No monitor client registered.
+ return;
+ }
+
+ struct monitor_client *client=NULL;
+ int matched_client=0;
+ node_t matched_client_nodes[monitor->threads[tid].n_client];
+ pthread_mutex_lock(&monitor->threads[tid].mutex);
+ LL_FOREACH(monitor->threads[tid].clients, client)
+ {
+ if(client->pattern)
+ {
+ int is_matched=0;
+ for(int i=0; i<cmd->argc; i++)
+ {
+ is_matched=stringmatchlen(client->pattern, sdslen(client->pattern), cmd->argv[i], sdslen(cmd->argv[i]), 0);
+ if(is_matched)
+ {
+ break;
+ }
+ }
+ if(!is_matched)
+ {
+ continue;
+ }
+ }
+ node_copy(matched_client_nodes+matched_client, &client->node);
+ matched_client++;
+ }
+ //To avoid dead lock, we should not hold the lock when executing command.
+ pthread_mutex_unlock(&monitor->threads[tid].mutex);
+ assert(matched_client <= monitor->threads[tid].n_client);
+ if(matched_client==0)
+ {
+ return;
+ }
+
+ sds cmdrepr = sdsempty();
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+ cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec, (long)tv.tv_usec);
+ cmdrepr = sdscatprintf(cmdrepr,"[%d %c %s %s] ", tid,
+ 0==node_compare(&cmd->caller, swarmkv_self_node(monitor->ref_db))?'L':'R',
+ node_addr2cstr(&cmd->caller),
+ exec_ret2string(result));
+ for(int i=0; i<cmd->argc; i++)
+ {
+ cmdrepr = sdscatrepr(cmdrepr, (char*)cmd->argv[i], sdslen(cmd->argv[i]));
+ if (i != cmd->argc-1)
+ cmdrepr = sdscatlen(cmdrepr," ",1);
+ }
+ cmdrepr = sdscatprintf(cmdrepr, "\r\n");
+
+ const char *print_cmd_argv[2];
+ size_t print_cmd_argv_len[2];
+ print_cmd_argv[0]="PRINT";
+ print_cmd_argv_len[0]=strlen(print_cmd_argv[0]);
+ print_cmd_argv[1]=cmdrepr;
+ print_cmd_argv_len[1]=sdslen(cmdrepr);
+
+ for(int i=0; i<matched_client; i++)
+ {
+ struct print_ctx *print_ctx=ALLOC(struct print_ctx, 1);
+ node_copy(&print_ctx->node, &matched_client_nodes[i]);
+ print_ctx->tid=tid;
+ print_ctx->monitor=monitor;
+ swarmkv_async_command_on_argv(monitor->ref_db, print_on_reply, print_ctx, print_ctx->node.addr, 2, print_cmd_argv, print_cmd_argv_len);
+ }
+ sdsfree(cmdrepr);
return;
}
void swarmkv_monitor_record_peer(struct swarmkv_module *mod_monitor, node_t *peer, long long latency_usec, int thread_id)
@@ -333,7 +495,7 @@ struct swarmkv_reply *latency_generic(struct recorder **table, const char *key)
enum cmd_exec_result latency_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
{
struct swarmkv_monitor *monitor=module2monitor(mod_monitor);
- pthread_mutex_lock(&monitor->mutex);
+ pthread_mutex_lock(&monitor->latency_execute_mutex);
if(cmd->argc==2 && !strcasecmp(cmd->argv[1], "help") )
{
const char *help = {
@@ -408,15 +570,43 @@ enum cmd_exec_result latency_command(struct swarmkv_module *mod_monitor, const s
{
*reply=swarmkv_reply_new_error(erorr_subcommand_syntax, cmd->argv[1], cmd->argv[0]);
}
- pthread_mutex_unlock(&monitor->mutex);
+ pthread_mutex_unlock(&monitor->latency_execute_mutex);
return FINISHED;
}
-enum cmd_exec_result lastcmds_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+enum cmd_exec_result monreg_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
{
-/*LASTCMDS [offset]*/
+/* MONREG IP:port [pattern]*/
struct swarmkv_monitor *monitor=module2monitor(mod_monitor);
- pthread_mutex_lock(&monitor->mutex);
-
- pthread_mutex_unlock(&monitor->mutex);
+ node_t node;
+ int ret=0;
+ ret=node_init_from_sds(&node, cmd->argv[1]);
+ if(ret<0)
+ {
+ *reply=swarmkv_reply_new_error(error_arg_not_valid_node);
+ return FINISHED;
+ }
+ for(int i=0; i<monitor->nr_worker_threads; i++)
+ {
+ struct monitor_client *client=NULL;
+ int found=0;
+ pthread_mutex_lock(&monitor->threads[i].mutex);
+ LL_FOREACH(monitor->threads[i].clients, client)
+ {
+ if(0==node_compare(&client->node, &node))
+ {
+ sdsfree(client->pattern);
+ client->pattern=cmd->argc>2?sdsdup(cmd->argv[2]):NULL;
+ found=1;
+ }
+ }
+ if(!found)
+ {
+ client=monitor_client_new(&node, cmd->argc>2?cmd->argv[2]:NULL);
+ LL_APPEND(monitor->threads[i].clients, client);
+ monitor->threads[i].n_client++;
+ }
+ pthread_mutex_unlock(&monitor->threads[i].mutex);
+ }
+ *reply=swarmkv_reply_new_status("OK");
return FINISHED;
} \ No newline at end of file
diff --git a/src/swarmkv_monitor.h b/src/swarmkv_monitor.h
index 67273ea..f134618 100644
--- a/src/swarmkv_monitor.h
+++ b/src/swarmkv_monitor.h
@@ -6,12 +6,13 @@
#include <unistd.h>
#include <string.h>
-struct swarmkv_module *swarmkv_monitor_new(const struct swarmkv_options *opts);
+struct swarmkv_module *swarmkv_monitor_new(const struct swarmkv_options *opts, struct swarmkv *db);
void swarmkv_monitor_register_command(struct swarmkv_module *mod_monitor, const char *command);
void swarmkv_monitor_free(struct swarmkv_module *mod_monitor);
-void swarmkv_monitor_record_command(struct swarmkv_module *mod_monitor, const char *cmd_name, long long latency_usec);
+void swarmkv_monitor_record_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd_spec *spec, const struct swarmkv_cmd *cmd, long long latency_usec, enum cmd_exec_result result);
void swarmkv_monitor_register_event(struct swarmkv_module *mod_monitor, const char *event);
void swarmkv_monitor_record_peer(struct swarmkv_module *mod_monitor, node_t *peer, long long latency_usec, int thread_id);
void swarmkv_monitor_record_event(struct swarmkv_module *mod_monitor, const char *event_name, long long latency_usec);
-enum cmd_exec_result latency_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); \ No newline at end of file
+enum cmd_exec_result latency_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result monreg_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); \ No newline at end of file
diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c
index 4607d1d..0ead34b 100644
--- a/src/swarmkv_net.c
+++ b/src/swarmkv_net.c
@@ -618,7 +618,7 @@ void swarmkv_net_set_on_msg_callback(struct swarmkv_net *net, on_msg_callback_t
#define MAX_OUTPUT_BUFFER_SIZE 1024*1024*64
int swarmkv_net_send(struct swarmkv_net *net, const node_t *dest, struct swarmkv_msg *msg, const char **err_str)
{
- int tid=__gettid((struct swarmkv *)(net->on_msg_cb_arg));
+ int tid=swarmkv_gettid((struct swarmkv *)(net->on_msg_cb_arg));
assert(tid<net->nr_threads);
struct snet_thread *thr=net->threads+tid;
struct snet_conn *conn=NULL;
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c
index 189a561..f1a3ac8 100644
--- a/src/swarmkv_store.c
+++ b/src/swarmkv_store.c
@@ -294,7 +294,7 @@ struct scontainer *store_lookup_scontainer(struct swarmkv_store *store, sds key)
{
struct scontainer *ctr=NULL;
int designated_tid=key2tid(key, store->opts->nr_worker_threads);
- int real_tid=__gettid(store->exec_cmd_handle);
+ int real_tid=swarmkv_gettid(store->exec_cmd_handle);
assert(designated_tid==real_tid);
ctr=scontainer_find(&(store->threads[designated_tid].obj_table), key);
if(0==pthread_mutex_trylock(&store->threads[designated_tid].sanity_lock))
@@ -541,16 +541,16 @@ static void crdt_generic_on_reply(const struct swarmkv_reply *reply, void *user)
return;
}
-void crdt_generic_call(struct swarmkv_store *store, enum CRDT_OP op, const struct swarmkv_cmd *cmd, const node_t *peer)
+void crdt_generic_call(struct swarmkv_store *store, enum CRDT_OP op, const node_t *peer, int argc, const char *argv[], size_t *argv_len)
{
struct crdt_generic_ctx *ctx=NULL;
assert(peer);
ctx=ALLOC(struct crdt_generic_ctx, 1);
ctx->op=op;
ctx->store=store;
- ctx->key=sdsdup(cmd->argv[2]);
+ ctx->key=sdsnew(argv[2]);
node_copy(&ctx->peer, peer);
- swarmkv_async_command_on_argv(store->exec_cmd_handle, crdt_generic_on_reply, ctx, peer->addr, cmd->argc, cmd->argv);
+ swarmkv_async_command_on_argv(store->exec_cmd_handle, crdt_generic_on_reply, ctx, peer->addr, argc, argv, argv_len);
return;
}
#define MONITOR_SYNC_EVENT_NAME "crdt-sync-cycle"
@@ -574,17 +574,21 @@ int store_batch_sync(struct swarmkv_store *store, int tid)
}
else
{
- struct swarmkv_cmd *cmd=swarmkv_cmd_new(4);
- cmd->argv[0]=sdsnew("crdt");
- cmd->argv[1]=sdsnew("merge");
- cmd->argv[2]=sdsnew(ctr->obj.key);
- cmd->argv[3]=sdsnewlen(blob, blob_sz);
+ const char *argv[4];
+ size_t argv_len[4];
+ argv[0]="crdt";
+ argv_len[0]=strlen(argv[0]);
+ argv[1]="merge";
+ argv_len[1]=strlen(argv[1]);
+ argv[2]=ctr->obj.key;
+ argv_len[2]=sdslen(ctr->obj.key);
+ argv[3]=blob;
+ argv_len[3]=blob_sz;
for(int i=0; i<utarray_len(ctr->replica_node_list); i++)
{
node_t *peer=utarray_eltptr(ctr->replica_node_list, i);
- crdt_generic_call(store, CRDT_MERGE, cmd, peer);
+ crdt_generic_call(store, CRDT_MERGE, peer, 4, argv, argv_len);
}
- swarmkv_cmd_free(cmd);
free(blob);
}
DL_DELETE(thr->sync_queue, ctr);
@@ -593,16 +597,22 @@ int store_batch_sync(struct swarmkv_store *store, int tid)
n_synced++;
if(n_synced>=MAX_SYNC_PER_PERIOD) break;
}
- node_t peer;
- struct swarmkv_cmd *cmd=NULL;
- int ret=1;
- while(1)
+
+ struct sync_task *task=NULL;
+ task=sync_master_get_task(sync_master);
+ while(task)
{
- ret=sync_master_get_cmd(sync_master, &peer, &cmd);
- if(!ret) break;
- crdt_generic_call(store, CRDT_MERGE, cmd, &peer);
- swarmkv_cmd_free(cmd);
- cmd=NULL;
+ size_t n_data=sync_task_key_count(task);
+ const char *argv[n_data*2+2];
+ size_t argv_len[n_data*2+2];
+ argv[0]="crdt";
+ argv_len[0]=strlen(argv[0]);
+ argv[1]="merge";
+ argv_len[1]=strlen(argv[1]);
+ sync_task_read_key_blob(task, argv+2, argv_len+2, n_data*2);
+ crdt_generic_call(store, CRDT_MERGE, sync_task_peer(task), n_data*2+2, argv, argv_len);
+ sync_task_free(task);
+ task=sync_master_get_task(sync_master);
}
sync_master_free(sync_master);
return n_synced;
@@ -613,7 +623,7 @@ void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id)
struct scontainer *ctr=NULL, *tmp=NULL;
struct timespec start, end;
int n_synced=0;
- int real_tid=__gettid(store->exec_cmd_handle);
+ int real_tid=swarmkv_gettid(store->exec_cmd_handle);
assert(real_tid==thread_id);
clock_gettime(CLOCK_MONOTONIC, &start);
@@ -630,20 +640,22 @@ void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id)
char *blob=NULL;
size_t blob_sz=0;
scontainer_serialize(ctr, &blob, &blob_sz);
-
- struct swarmkv_cmd *cmd=swarmkv_cmd_new(4);
- cmd->argv[0]=sdsnew("crdt");
- cmd->argv[1]=sdsnew("merge");
- cmd->argv[2]=sdsnew(ctr->obj.key);
- cmd->argv[3]=sdsnewlen(blob, blob_sz);
+ const char *argv[4];
+ size_t argv_len[4];
+ argv[0]="crdt";
+ argv_len[0]=strlen(argv[0]);
+ argv[1]="merge";
+ argv_len[1]=strlen(argv[1]);
+ argv[2]=ctr->obj.key;
+ argv_len[2]=sdslen(ctr->obj.key);
+ argv[3]=blob;
+ argv_len[3]=blob_sz;
for(int i=0; i<utarray_len(ctr->replica_node_list); i++)
{
node_t *peer=utarray_eltptr(ctr->replica_node_list, i);
- crdt_generic_call(store, CRDT_MERGE, cmd, peer);
+ crdt_generic_call(store, CRDT_MERGE, peer, 4, argv, argv_len);
}
- swarmkv_cmd_free(cmd);
free(blob);
-
DL_DELETE(thr->sync_queue, ctr);
ctr->is_in_sync_q=0;
store->synced++;
@@ -752,13 +764,11 @@ enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const st
sds key=cmd->argv[2];
int tid=store_gettid(mod_store, key);
- int real_tid=__gettid(store->exec_cmd_handle);
+ int real_tid=swarmkv_gettid(store->exec_cmd_handle);
assert(tid==real_tid);
- struct swarmkv_cmd *crdt_get_cmd=NULL;
- crdt_get_cmd=swarmkv_cmd_new(3);
- crdt_get_cmd->argv[0]=sdsnew("crdt");
- crdt_get_cmd->argv[1]=sdsnew("get");
- crdt_get_cmd->argv[2]=sdsdup(key);
+
+
+
size_t n_replica_node=cmd->argc-3;
ctr=store_lookup_scontainer(store, key);
@@ -782,23 +792,34 @@ enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const st
assert(node_compare(replica_nodes+i, &store->self)!=0);
scontainer_add_replica_node(ctr, replica_nodes+i);
}
- struct swarmkv_cmd *crdt_meet_cmd=NULL;
+ const char *crdt_get_argv[3];
+ size_t crdt_get_argv_len[3];
+ crdt_get_argv[0]="crdt";
+ crdt_get_argv_len[0]=strlen(crdt_get_argv[0]);
+ crdt_get_argv[1]="get";
+ crdt_get_argv_len[1]=strlen(crdt_get_argv[1]);
+ crdt_get_argv[2]=key;
+ crdt_get_argv_len[2]=sdslen(key);
+
+ const char *crdt_meet_argv[4];
+ size_t crdt_meet_argv_len[4];
for(size_t i=0; i<n_replica_node; i++)
{
if(i<max_pull_node_num)
{
- crdt_generic_call(store, CRDT_GET, crdt_get_cmd, replica_nodes+i);
+ crdt_generic_call(store, CRDT_GET, replica_nodes+i, 3, crdt_get_argv, crdt_get_argv_len);
}
- crdt_meet_cmd=swarmkv_cmd_new(4);
- crdt_meet_cmd->argv[0]=sdsnew("crdt");
- crdt_meet_cmd->argv[1]=sdsnew("meet");
- crdt_meet_cmd->argv[2]=sdsdup(ctr->obj.key);
- crdt_meet_cmd->argv[3]=node_addr2sds(&store->self);
- crdt_generic_call(store, CRDT_MEET, crdt_meet_cmd, replica_nodes+i);
- swarmkv_cmd_free(crdt_meet_cmd);
+
+ crdt_meet_argv[0]="crdt";
+ crdt_meet_argv_len[0]=strlen(crdt_meet_argv[0]);
+ crdt_meet_argv[1]="meet";
+ crdt_meet_argv_len[1]=strlen(crdt_meet_argv[1]);
+ crdt_meet_argv[2]=key;
+ crdt_meet_argv_len[2]=sdslen(key);
+ crdt_meet_argv[3]=node_addr2cstr(&store->self);
+ crdt_meet_argv_len[3]=strlen(crdt_meet_argv[3]);
+ crdt_generic_call(store, CRDT_MEET, replica_nodes+i, 4, crdt_meet_argv, crdt_meet_argv_len);
}
- swarmkv_cmd_free(crdt_get_cmd);
- crdt_get_cmd=NULL;
free(replica_nodes);
*reply=swarmkv_reply_new_status("OK");
return FINISHED;
diff --git a/src/swarmkv_sync.c b/src/swarmkv_sync.c
index 17dffe9..a47f931 100644
--- a/src/swarmkv_sync.c
+++ b/src/swarmkv_sync.c
@@ -81,36 +81,58 @@ void sync_master_add_obj(struct sync_master *master, const sds key, char *blob,
return;
}
-int sync_master_get_cmd(struct sync_master *master, node_t *peer, struct swarmkv_cmd **cmd)
+struct sync_task *sync_master_get_task(struct sync_master *master)
{
struct sync_task *task=NULL, *tmp=NULL;
- struct sync_data **p=NULL, *data=NULL;
- struct swarmkv_cmd *c=NULL;
- size_t n_data=0;
- *cmd=NULL;
+
+ //We are not iterating the whole table, just get the first one
HASH_ITER(hh, master->task_table, task, tmp)
{
- n_data=utarray_len(task->sync_data_list);
- c=swarmkv_cmd_new(2+n_data*2);
- c->argv[0]=sdsnew("crdt");
- c->argv[1]=sdsnew("merge");
- for(size_t i=0; i<n_data; i++)
- {
- p=utarray_eltptr(task->sync_data_list, i);
- data=*p;
- c->argv[2+i*2]=sdsdup(data->key);
- c->argv[2+i*2+1]=sdsnewlen(data->blob, data->blob_sz);
- sync_data_free(data);
- master->synced++;
- }
- node_copy(peer, &task->peer);
HASH_DEL(master->task_table, task);
- utarray_free(task->sync_data_list);
- free(task);
+ master->synced += utarray_len(task->sync_data_list);
break;
}
- *cmd=c;
- return (c==NULL?0:1);
+ return task;
+}
+void sync_task_free(struct sync_task *task)
+{
+ struct sync_data **p=NULL, *data=NULL;
+ size_t n_data=utarray_len(task->sync_data_list);
+ for(size_t i=0; i<n_data; i++)
+ {
+ p=utarray_eltptr(task->sync_data_list, i);
+ data=*p;
+ sync_data_free(data);
+ }
+ utarray_free(task->sync_data_list);
+ free(task);
+ return;
+}
+const node_t *sync_task_peer(struct sync_task *task)
+{
+ return &task->peer;
+}
+size_t sync_task_key_count(struct sync_task *task)
+{
+ return utarray_len(task->sync_data_list);
+}
+void sync_task_read_key_blob(struct sync_task *task, const char *argv[], size_t *argv_len, int argc)
+{
+ struct sync_data **p=NULL, *data=NULL;
+ int n_data=utarray_len(task->sync_data_list);
+ assert(argc == 2*n_data);
+ for(int i=0, j=0; i<n_data && j<argc; i++, j+=2)
+ {
+ p=utarray_eltptr(task->sync_data_list, i);
+ data=*p;
+ assert(data->key);
+ assert(data->blob);
+ argv[j]=data->key;
+ argv_len[j]=sdslen(data->key);
+ argv[j+1]=data->blob;
+ argv_len[j+1]=data->blob_sz;
+ }
+ return;
}
void sync_master_free(struct sync_master *master)
{
diff --git a/src/swarmkv_sync.h b/src/swarmkv_sync.h
index 36f96ff..84a2175 100644
--- a/src/swarmkv_sync.h
+++ b/src/swarmkv_sync.h
@@ -3,7 +3,12 @@
#include "sds.h"
struct sync_master;
+struct sync_task;
struct sync_master *sync_master_new(void);
void sync_master_add_obj(struct sync_master *mgr, const sds key, char *blob, size_t blob_sz, const node_t *peers, size_t n_peer);
-int sync_master_get_cmd(struct sync_master *mgr, node_t *peer, struct swarmkv_cmd **cmd);
+struct sync_task *sync_master_get_task(struct sync_master *master);
+void sync_task_free(struct sync_task *task);
+const node_t *sync_task_peer(struct sync_task *task);
+size_t sync_task_key_count(struct sync_task *task);
+void sync_task_read_key_blob(struct sync_task *task, const char *argv[], size_t *argv_len, int argc);
void sync_master_free(struct sync_master *mgr); \ No newline at end of file
diff --git a/src/swarmkv_utils.c b/src/swarmkv_utils.c
index ab0b920..adbd188 100644
--- a/src/swarmkv_utils.c
+++ b/src/swarmkv_utils.c
@@ -56,7 +56,6 @@ char* replace_char(char* str, char find, char replace){
return str;
}
-/* Glob-style pattern matching. */
//source https://github.com/redis/redis/blob/7.0/src/util.c
int stringmatchlen(const char *pattern, int patternLen,
const char *string, int stringLen, int nocase)
diff --git a/src/swarmkv_utils.h b/src/swarmkv_utils.h
index 55f5dbc..8951523 100644
--- a/src/swarmkv_utils.h
+++ b/src/swarmkv_utils.h
@@ -40,6 +40,8 @@ const char *module_name_str(const char *name);
const char *swarmkv_util_pthread_cond_timedwait_error_to_string(const int timed_wait_rv);
char *toLower(char* s);
char *toUpper(char* s);
+// Glob-style pattern matching.
+// Return 1 if matched, 0 if not matched
int stringmatchlen(const char *pattern, int patternLen, const char *string, int stringLen, int nocase);
long long ustime(void);
int is_number(const char *string, size_t sz, long long *number);
diff --git a/src/t_cms.c b/src/t_cms.c
index 5339cc9..587d3e2 100644
--- a/src/t_cms.c
+++ b/src/t_cms.c
@@ -156,7 +156,7 @@ enum cmd_exec_result cmsincrby_command(struct swarmkv_module *mod_store, const s
}
enum cmd_exec_result cmsmquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
{
-/*CMSQUERY key item [item ...]*/
+/*CMSMQUERY key item [item ...]*/
struct sobj *obj=NULL;
const sds key=cmd->argv[1];