diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/CMakeLists.txt | 4 | ||||
| -rw-r--r-- | src/swarmkv.c | 96 | ||||
| -rw-r--r-- | src/swarmkv_api.c | 613 | ||||
| -rw-r--r-- | src/swarmkv_common.c | 24 | ||||
| -rw-r--r-- | src/swarmkv_common.h | 18 | ||||
| -rw-r--r-- | src/swarmkv_error.h | 1 | ||||
| -rw-r--r-- | src/swarmkv_keyspace.c | 41 | ||||
| -rw-r--r-- | src/swarmkv_mesh.c | 4 | ||||
| -rw-r--r-- | src/swarmkv_message.c | 26 | ||||
| -rw-r--r-- | src/swarmkv_monitor.c | 214 | ||||
| -rw-r--r-- | src/swarmkv_monitor.h | 7 | ||||
| -rw-r--r-- | src/swarmkv_net.c | 2 | ||||
| -rw-r--r-- | src/swarmkv_store.c | 115 | ||||
| -rw-r--r-- | src/swarmkv_sync.c | 68 | ||||
| -rw-r--r-- | src/swarmkv_sync.h | 7 | ||||
| -rw-r--r-- | src/swarmkv_utils.c | 1 | ||||
| -rw-r--r-- | src/swarmkv_utils.h | 2 | ||||
| -rw-r--r-- | src/t_cms.c | 2 |
18 files changed, 778 insertions, 467 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 6c8b3e5..83d5aa5 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,5 +1,5 @@ set(SWARMKV_MAJOR_VERSION 4) -set(SWARMKV_MINOR_VERSION 2) +set(SWARMKV_MINOR_VERSION 3) set(SWARMKV_PATCH_VERSION 0) set(SWARMKV_VERSION ${SWARMKV_MAJOR_VERSION}.${SWARMKV_MINOR_VERSION}.${SWARMKV_PATCH_VERSION}) @@ -25,7 +25,7 @@ add_definitions(-D_GNU_SOURCE) add_definitions(-fPIC) set(SWARMKV_SRC swarmkv.c swarmkv_api.c swarmkv_mesh.c swarmkv_rpc.c swarmkv_message.c swarmkv_net.c - swarmkv_store.c swarmkv_sync.c swarmkv_keyspace.c swarmkv_monitor.c + swarmkv_sync.c swarmkv_store.c swarmkv_keyspace.c swarmkv_monitor.c t_string.c t_set.c t_token_bucket.c t_hash.c t_bloom_filter.c t_cms.c swarmkv_common.c swarmkv_utils.c future_promise.c http_client.c) diff --git a/src/swarmkv.c b/src/swarmkv.c index 382f56e..8cf8d91 100644 --- a/src/swarmkv.c +++ b/src/swarmkv.c @@ -108,7 +108,7 @@ struct swarmkv_options *swarmkv_get0_options(struct swarmkv *db) return db->opts; } -int __gettid(struct swarmkv *db) +int swarmkv_gettid(const struct swarmkv *db) { // int __sys_tid=syscall(SYS_gettid); for(int i=0; i<db->opts->nr_worker_threads + db->opts->nr_caller_threads; i++) @@ -187,7 +187,7 @@ static void remoter_caller_ctx_send_reply(struct remote_caller_ctx *ctx, const s { struct swarmkv *db = ctx->db; struct swarmkv_msg *msg=swarmkv_msg_new_by_reply(reply, &ctx->remote, ctx->remote_tid, &db->self, ctx->sequence); - int cur_tid=__gettid(db); + int cur_tid=swarmkv_gettid(db); const char *err_str=NULL; if(0==node_compare(&db->self, &msg->caller)) { @@ -471,7 +471,12 @@ enum cmd_exec_result __attribute__((optimize("O0"))) debug_command(struct swarmk } return FINISHED; } - +enum cmd_exec_result print_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) +{ + printf("%s", cmd->argv[1]); + *reply=swarmkv_reply_new_status("OK"); + return FINISHED; +} enum cmd_exec_result config_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) { //Unused @@ -544,7 +549,7 @@ void *swarmkv_worker_thread(void *arg) { struct swarmkv *db = (struct swarmkv *)arg; swarmkv_register_thread(db); - int tid=__gettid(db); + int tid=swarmkv_gettid(db); struct swarmkv_thread *thr = db->threads+tid; char thread_name[16]; snprintf(thread_name, sizeof(thread_name), "swarmkv-%u", thr->thread_id); @@ -619,41 +624,41 @@ static struct swarmkv_reply *key_not_found_reply(enum key_not_found_reply not_fo return reply; } -static struct swarmkv_cmd *make_keyroute_cmd(enum cmd_key_flag flag, const sds key, const node_t *obj_owner) +static struct swarmkv_cmd *make_keyroute_cmd(enum cmd_key_flag flag, const sds key, int dry_run, const node_t *caller) { struct swarmkv_cmd *keyroute_cmd=NULL; switch(flag) { case CMD_KEY_RO: case CMD_KEY_RW: - if(obj_owner) + if(!dry_run) { - keyroute_cmd=swarmkv_cmd_new(4); + keyroute_cmd=swarmkv_cmd_new(4, caller); keyroute_cmd->argv[0]=sdsnew("keyspace"); keyroute_cmd->argv[1]=sdsnew("xradd"); keyroute_cmd->argv[2]=sdsdup(key); - keyroute_cmd->argv[3]=node_addr2sds(obj_owner); + keyroute_cmd->argv[3]=node_addr2sds(caller); } else { - keyroute_cmd=swarmkv_cmd_new(3); + keyroute_cmd=swarmkv_cmd_new(3, caller); keyroute_cmd->argv[0]=sdsnew("keyspace"); keyroute_cmd->argv[1]=sdsnew("rlist"); keyroute_cmd->argv[2]=sdsdup(key); - } + } break; case CMD_KEY_OW: - if(obj_owner) + if(!dry_run) { - keyroute_cmd=swarmkv_cmd_new(4); + keyroute_cmd=swarmkv_cmd_new(4, caller); keyroute_cmd->argv[0]=sdsnew("keyspace"); keyroute_cmd->argv[1]=sdsnew("radd"); keyroute_cmd->argv[2]=sdsdup(key); - keyroute_cmd->argv[3]=node_addr2sds(obj_owner); + keyroute_cmd->argv[3]=node_addr2sds(caller); } else { - keyroute_cmd=swarmkv_cmd_new(3); + keyroute_cmd=swarmkv_cmd_new(3, caller); keyroute_cmd->argv[0]=sdsnew("keyspace"); keyroute_cmd->argv[1]=sdsnew("radd"); keyroute_cmd->argv[2]=sdsdup(key); @@ -661,7 +666,7 @@ static struct swarmkv_cmd *make_keyroute_cmd(enum cmd_key_flag flag, const sds k break; case CMD_KEY_RM: assert(0); - keyroute_cmd=swarmkv_cmd_new(3); + keyroute_cmd=swarmkv_cmd_new(3, caller); keyroute_cmd->argv[0]=sdsnew("keyspace"); keyroute_cmd->argv[1]=sdsnew("del"); keyroute_cmd->argv[2]=sdsdup(key); @@ -754,10 +759,10 @@ static void peer_exec_on_success(void *result, void *user) cmd_ctx_free(ctx); } } -struct swarmkv_cmd *make_crdt_add_cmd(const sds key, node_t replica[], size_t n_replica) +struct swarmkv_cmd *make_crdt_add_cmd(const sds key, node_t replica[], size_t n_replica, const node_t *caller) { struct swarmkv_cmd *crdt_add_cmd=NULL; - crdt_add_cmd=swarmkv_cmd_new(3+n_replica); + crdt_add_cmd=swarmkv_cmd_new(3+n_replica, caller); crdt_add_cmd->argv[0]=sdsnew("crdt"); crdt_add_cmd->argv[1]=sdsnew("add"); crdt_add_cmd->argv[2]=sdsdup(key); @@ -807,12 +812,11 @@ static void key_route_on_success(void *result, void *user) if(n_replica_node>0) { __exec_cmd(ctx->db, replica_nodes+0, ctx->cmd, ctx->future_of_caller); - //__send_cmd(ctx->db, replica_nodes+0, ctx->cmd, ctx->future_of_caller); } if(self_is_a_replica) { struct cmd_ctx *crdt_add_ctx=NULL; - struct swarmkv_cmd *crdt_add_cmd=make_crdt_add_cmd(key, replica_nodes, n_replica_node); + struct swarmkv_cmd *crdt_add_cmd=make_crdt_add_cmd(key, replica_nodes, n_replica_node, &ctx->db->self); crdt_add_ctx=cmd_ctx_new(ctx->db, ctx->cmd, n_replica_node>0?NULL:ctx->future_of_caller); crdt_add_ctx->future_of_mine=future_create("crdt_add", crdt_add_on_success, generic_on_fail, crdt_add_ctx); __exec_cmd(ctx->db, &ctx->db->self, crdt_add_cmd, crdt_add_ctx->future_of_mine); @@ -844,7 +848,7 @@ static int spec_gettid(struct swarmkv_cmd_spec *spec, const struct swarmkv_cmd * void __on_msg_callback(struct swarmkv_msg *msg, void *arg) { struct swarmkv *db = (struct swarmkv *)arg; - int cur_tid=__gettid(db); + int cur_tid=swarmkv_gettid(db); if(msg->type==MSG_TYPE_CMD) { //command may be from other thread or other node. @@ -878,27 +882,13 @@ void __on_msg_callback(struct swarmkv_msg *msg, void *arg) } } } -static const char *exec_ret2string(enum cmd_exec_result ret) -{ - switch(ret) - { - case NEED_KEY_ROUTE: - return "NEED_KEY_ROUTE"; - case REDIRECT: - return "REDIRECT"; - case FINISHED: - return "FINISHED"; - default: - assert(0); - } -} #define INTER_THREAD_RPC_TIMEOUT_AHEAD 1000 void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swarmkv_cmd *cmd, struct future *future_of_caller) { struct swarmkv_cmd_spec *spec=NULL; struct swarmkv_reply *reply=NULL; struct promise *p=NULL; - int cur_tid=__gettid(db); + int cur_tid=swarmkv_gettid(db); spec=get_spec_by_argv(db, cmd->argc, cmd->argv); if(!spec) @@ -981,20 +971,8 @@ void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swar clock_gettime(CLOCK_MONOTONIC_COARSE, &start); exec_ret=spec->proc(spec->module, cmd, &reply); clock_gettime(CLOCK_MONOTONIC_COARSE, &end); - swarmkv_monitor_record_command(db->mod_monitor, spec->name, timespec_diff_usec(&start, &end)); + swarmkv_monitor_record_command(db->mod_monitor, spec, cmd, timespec_diff_usec(&start, &end), exec_ret); - //if(strcasestr(spec->name, "CRDT")) - if(0){ - struct timeval now; - gettimeofday(&now, NULL); - printf("%ld.%.6ld %s %d %s %s %s\n", - now.tv_sec, now.tv_usec, - db->self.addr, - db->threads[cur_tid].recusion_depth, - spec->name, - spec->key_offset<0?"NULL":cmd->argv[spec->key_offset], - exec_ret2string(exec_ret)); - } switch(exec_ret) { case FINISHED: @@ -1018,7 +996,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swar } case NEED_KEY_ROUTE: { - struct swarmkv_cmd *keyspace_cmd=make_keyroute_cmd(spec->flag, cmd->argv[spec->key_offset], db->opts->dryrun? NULL:(&db->self)); + struct swarmkv_cmd *keyspace_cmd=make_keyroute_cmd(spec->flag, cmd->argv[spec->key_offset], db->opts->dryrun, &db->self); struct cmd_ctx *ctx=cmd_ctx_new(db, cmd, future_of_caller); ctx->future_of_mine=future_create("key_route", key_route_on_success, generic_on_fail, ctx); __exec_cmd(db, NULL, keyspace_cmd, ctx->future_of_mine); @@ -1226,13 +1204,19 @@ void command_spec_init(struct swarmkv *db) command_register(&(db->command_table), "PING", "IP:port", 1, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE, ping_command, &db->module); + command_register(&(db->command_table), "PRINT", "text", + 1, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE, + print_command, &db->module); command_register(&(db->command_table), "COMMAND LIST", "", 0, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE, command_list_command, &db->module); command_register(&(db->command_table), "LATENCY", "<subcommand>", 1, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE, latency_command, db->mod_monitor); - + command_register(&(db->command_table), "MONREG", "IP:port", + 1, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE, + monreg_command, db->mod_monitor); + /* low-level state-based CRDT synchronization commands*/ command_register(&(db->command_table), "CRDT ADD", "key [IP:port ...]", 1, 2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE, @@ -1403,7 +1387,7 @@ static void evloop_timeout_cb(evutil_socket_t fd, short event, void *arg) } void swarmkv_caller_loop(struct swarmkv *db, int flags, struct timeval *tv) { - int tid=__gettid(db); + int tid=swarmkv_gettid(db); //must initiate from caller threads, and caller thread ID is larger than worker thread ID assert(tid >= db->opts->nr_worker_threads); struct swarmkv_thread *thr=db->threads+tid; @@ -1425,14 +1409,14 @@ void swarmkv_caller_loop(struct swarmkv *db, int flags, struct timeval *tv) } void swarmkv_caller_loop_break(struct swarmkv *db) { - int tid=__gettid(db); + int tid=swarmkv_gettid(db); //must initiate from caller threads, and caller thread ID is larger than worker thread ID assert(tid >= db->opts->nr_worker_threads); event_base_loopbreak(db->ref_evbases[tid]); } long long swarmkv_caller_get_pending_commands(struct swarmkv *db) { - int tid=__gettid(db); + int tid=swarmkv_gettid(db); //must initiate from caller threads, and caller thread ID is larger than worker thread ID assert(tid >= db->opts->nr_worker_threads); return swarmkv_rpc_mgr_count(db->rpc_mgr, tid); @@ -1493,7 +1477,7 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name, } event_config_free(ev_cfg); - db->mod_monitor=swarmkv_monitor_new(db->opts); + db->mod_monitor=swarmkv_monitor_new(db->opts, db); db->rpc_mgr=swarmkv_rpc_mgr_new(db->ref_evbases, opts->total_threads, opts->cluster_timeout_us); db->mesh=swarmkv_mesh_new(db->ref_evbases, opts->total_threads, db->logger); @@ -1623,6 +1607,10 @@ const char *swarmkv_self_address(const struct swarmkv *db) { return db->self.addr; } +const node_t *swarmkv_self_node(const struct swarmkv *db) +{ + return &db->self; +} void swarmkv_self_uuid(const struct swarmkv *db, char buff[37]) { uuid_unparse(db->opts->bin_uuid, buff); diff --git a/src/swarmkv_api.c b/src/swarmkv_api.c index f408274..6d98f15 100644 --- a/src/swarmkv_api.c +++ b/src/swarmkv_api.c @@ -10,7 +10,7 @@ #define MODULE_SWAMRKV_API module_name_str("swarmkv.api") void exec_for_local(struct swarmkv *db, const struct swarmkv_cmd *cmd, const node_t *target_node, swarmkv_on_reply_callback_t * cb, void * cb_arg); - +const node_t *swarmkv_self_node(const struct swarmkv *db); struct swarmkv_readoptions { @@ -27,7 +27,7 @@ struct swarmkv_options* swarmkv_options_new(void) opts->cluster_port=5210; opts->health_check_port=0; opts->loglevel=0; - opts->cluster_timeout_us=500*1000;//Default 500ms + opts->cluster_timeout_us=500*1000;//Default 500ms opts->sync_interval_us=10*1000; //Default 10ms strcpy(opts->bind_address, "0.0.0.0"); strcpy(opts->cluster_announce_ip, "127.0.0.1"); @@ -159,146 +159,266 @@ int swarmkv_options_set_max_dispatch_interval(struct swarmkv_options *opts, cons opts->eb_max_callbacks=max_callbacks; return 0; } +void swarmkv_async_command_on_argv(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *target, int argc, const char *argv[], size_t *argv_len) +{ + struct swarmkv_cmd *cmd=NULL; + cmd=swarmkv_cmd_new(argc, swarmkv_self_node(db)); + for(size_t i=0; i<argc; i++) + { + assert(argv[i]); + assert(argv_len[i]>0); + cmd->argv[i]=sdsnewlen(argv[i], argv_len[i]); + } + if(!target) + { + exec_for_local(db, cmd, NULL, cb, cb_arg); + } + else + { + node_t target_node; + memset(&target_node, 0, sizeof(node_t)); + node_init_from_sds(&target_node, target); + exec_for_local(db, cmd, &target_node, cb, cb_arg); + } + swarmkv_cmd_free(cmd); +} +struct blocking_query_ctx +{ + struct swarmkv_reply *reply; + struct swarmkv *db; +}; +void blocking_query_cb(const struct swarmkv_reply *reply, void * arg) +{ + struct blocking_query_ctx *ctx=(struct blocking_query_ctx*) arg; + ctx->reply=swarmkv_reply_dup(reply); + swarmkv_caller_loop_break(ctx->db); + return; +} +struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *target, int argc, const char *argv[], size_t *argv_len) +{ + struct blocking_query_ctx ctx; + memset(&ctx, 0, sizeof(ctx)); + ctx.db=db; + ctx.reply=NULL; + swarmkv_async_command_on_argv(db, blocking_query_cb, &ctx, target, argc, argv, argv_len); + long long pending_cmds=0; + pending_cmds=swarmkv_caller_get_pending_commands(db); + if(ctx.reply==NULL) + { + assert(pending_cmds==1); + swarmkv_caller_loop(db, SWARMKV_LOOP_NO_EXIT_ON_EMPTY, NULL); + } + assert(ctx.reply!=NULL); + struct swarmkv_reply *reply=NULL; + reply=ctx.reply; + ctx.reply=NULL; + return reply; +} +struct swarmkv_reply *swarmkv_command_on(struct swarmkv *db, const char *target, const char *format, ...) +{ + char *cmd_str=NULL; + va_list ap; + va_start(ap, format); + vasprintf(&cmd_str, format, ap); + va_end(ap); + + int argc=0; + sds *argv=NULL; + + argv=sdssplitargs(cmd_str, &argc); + size_t argv_len[argc]; + for(int i=0; i<argc; i++) + { + argv_len[i]=sdslen(argv[i]); + } + struct swarmkv_reply *reply=NULL; + reply=swarmkv_command_on_argv(db, target, argc, (const char**) argv, argv_len); + + if (argv) + { + sdsfreesplitres(argv, argc); + } + free(cmd_str); + + return reply; +} +struct swarmkv_reply *swarmkv_command(struct swarmkv *db, const char *format, ...) +{ + struct swarmkv_reply *reply=NULL; + char *cmd_str=NULL; + va_list ap; + va_start(ap, format); + vasprintf(&cmd_str, format, ap); + va_end(ap); + reply=swarmkv_command_on(db, NULL, cmd_str); + free(cmd_str); + return reply; +} +void swarmkv_async_command_on(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *target, const char *format, ...) +{ + int argc=0; sds *argv=NULL; + char *cmd_str=NULL; + + va_list ap; + va_start(ap, format); + vasprintf(&cmd_str, format, ap); + va_end(ap); + + argv=sdssplitargs(cmd_str, &argc); + size_t argv_len[argc]; + for(size_t i=0; i<argc; i++) + { + argv_len[i]=sdslen(argv[i]); + } + swarmkv_async_command_on_argv(db, cb, cb_arg, target, argc, (const char**)argv, argv_len); + + if (argv) + { + sdsfreesplitres(argv, argc); + } + free(cmd_str); +} +void swarmkv_async_command(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *format, ...) +{ + char *cmd_str=NULL; + va_list ap; + va_start(ap,format); + vasprintf(&cmd_str, format, ap); + va_end(ap); + swarmkv_async_command_on(db, cb, cb_arg, NULL, cmd_str); +} void swarmkv_set(struct swarmkv * db, const char * key, size_t keylen, const char * value, size_t vallen, swarmkv_on_reply_callback_t * cb, void * cb_arg) { - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(3); - cmd->argv[0]=sdsnew("set"); - cmd->argv[1]=sdsnewlen(key, keylen); - cmd->argv[2]=sdsnewlen(value, vallen); - - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); + const char *argv[3]; + size_t argv_len[3]; + argv[0]="set"; + argv_len[0]=4; + argv[1]=key; + argv_len[1]=keylen; + argv[2]=value; + argv_len[2]=vallen; + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 3, argv, argv_len); return; - } void swarmkv_get(struct swarmkv * db, const char *key, size_t keylen, swarmkv_on_reply_callback_t *cb, void *cb_arg) { - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(2); - cmd->argv[0]=sdsnew("get"); - cmd->argv[1]=sdsnewlen(key, keylen); - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); + const char *argv[2]; + size_t argv_len[2]; + argv[0]="get"; + argv_len[0]=4; + argv[1]=key; + argv_len[1]=keylen; + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2, argv, argv_len); return; } void swarmkv_expire(struct swarmkv *db, const char *key, size_t keylen, int seconds, swarmkv_on_reply_callback_t *cb, void *cb_arg) { - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(3); - cmd->argv[0]=sdsnew("expire"); - cmd->argv[1]=sdsnewlen(key, keylen); - cmd->argv[2]=sdsfromlonglong(seconds); - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); + const char *argv[3]; + size_t argv_len[3]; + argv[0]="expire"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + char buffer[32]; + sprintf(buffer, "%d", seconds); + argv[2]=buffer; + argv_len[2]=strlen(buffer); + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 3, argv, argv_len); return; } void swarmkv_ttl(struct swarmkv *db, const char *key, size_t keylen, swarmkv_on_reply_callback_t *cb, void *cb_arg) { - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(2); - cmd->argv[0]=sdsnew("ttl"); - cmd->argv[1]=sdsnewlen(key, keylen); - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); + const char *argv[2]; + size_t argv_len[2]; + argv[0]="ttl"; + argv_len[0]=4; + argv[1]=key; + argv_len[1]=keylen; + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2, argv, argv_len); return; } void swarmkv_persist(struct swarmkv *db, const char *key, size_t keylen, swarmkv_on_reply_callback_t *cb, void *cb_arg) { - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(2); - cmd->argv[0]=sdsnew("persist"); - cmd->argv[1]=sdsnewlen(key, keylen); - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); + const char *argv[2]; + size_t argv_len[2]; + argv[0]="persist"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2, argv, argv_len); return; } void swarmkv_sadd(struct swarmkv *db, const char* key, size_t keylen, const char *members[], const size_t members_len[], size_t n_members, swarmkv_on_reply_callback_t *cb, void *cb_arg) { - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(2+n_members); - cmd->argv[0]=sdsnew("sadd"); - cmd->argv[1]=sdsnewlen(key, keylen); + const char *argv[2+n_members]; + size_t argv_len[2+n_members]; + argv[0]="sadd"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; for(size_t i=0; i<n_members; i++) { - cmd->argv[2+i]=sdsnewlen(members[i], members_len[i]); + argv[2+i]=members[i]; + argv_len[2+i]=members_len[i]; } - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); - + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2+n_members, argv, argv_len); + return; } void swarmkv_srem(struct swarmkv *db, const char* key, size_t keylen, const char *members[], const size_t members_len[], size_t n_members, swarmkv_on_reply_callback_t *cb, void *cb_arg) { - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(2+n_members); - cmd->argv[0]=sdsnew("srem"); - cmd->argv[1]=sdsnewlen(key, keylen); + const char *argv[2+n_members]; + size_t argv_len[2+n_members]; + argv[0]="srem"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; for(size_t i=0; i<n_members; i++) { - cmd->argv[2+i]=sdsnewlen(members[i], members_len[i]); - } - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); - -} -void swarmkv_bfadd(struct swarmkv * db, const char * key, size_t keylen, const char *items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t *cb, void *cb_arg) -{ - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(2+n_items); - cmd->argv[0]=sdsnew("bfadd"); - cmd->argv[1]=sdsnewlen(key, keylen); - for(size_t i=0; i<n_items; i++) - { - cmd->argv[2+i]=sdsnewlen(items[i], items_len[i]); - } - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); -} -void swarmkv_bfexists(struct swarmkv * db, const char * key, size_t keylen, const char *items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t *cb, void *cb_arg) -{ - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(2+n_items); - cmd->argv[0]=sdsnew("bfexists"); - cmd->argv[1]=sdsnewlen(key, keylen); - for(size_t i=0; i<n_items; i++) - { - cmd->argv[2+i]=sdsnewlen(items[i], items_len[i]); + argv[2+i]=members[i]; + argv_len[2+i]=members_len[i]; } - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2+n_members, argv, argv_len); + return; } void swarmkv_sismember(struct swarmkv *db, const char* key, size_t keylen, const char *member, size_t member_len, swarmkv_on_reply_callback_t *cb, void *cb_arg) { - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(3); - cmd->argv[0]=sdsnew("sismember"); - cmd->argv[1]=sdsnewlen(key, keylen); - cmd->argv[2]=sdsnewlen(member, member_len); - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); + const char *argv[3]; + size_t argv_len[3]; + argv[0]="sismember"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + argv[2]=member; + argv_len[2]=member_len; + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 3, argv, argv_len); + return; } void swarmkv_smembers(struct swarmkv *db, const char* key, size_t keylen, swarmkv_on_reply_callback_t *cb, void *cb_arg) { - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(2); - cmd->argv[0]=sdsnew("smembers"); - cmd->argv[1]=sdsnewlen(key, keylen); - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); + const char *argv[2]; + size_t argv_len[2]; + argv[0]="smembers"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2, argv, argv_len); + return; } void swarmkv_scard(struct swarmkv *db, const char* key, size_t keylen, swarmkv_on_reply_callback_t *cb, void *cb_arg) { - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(2); - cmd->argv[0]=sdsnew("scard"); - cmd->argv[1]=sdsnewlen(key, keylen); - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); + const char *argv[2]; + size_t argv_len[2]; + argv[0]="scard"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2, argv, argv_len); + return; } int swarmkv_hset_string(struct swarmkv *db, @@ -307,218 +427,149 @@ int swarmkv_hset_string(struct swarmkv *db, return 0; } void swarmkv_del(struct swarmkv * db, const char * key, size_t keylen, swarmkv_on_reply_callback_t *cb, void *cb_arg) -{ - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(2); - cmd->argv[0]=sdsnew("del"); - cmd->argv[1]=sdsnewlen(key, keylen); - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); +{ + const char *argv[2]; + size_t argv_len[2]; + argv[0]="del"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2, argv, argv_len); return; } void swarmkv_incrby(struct swarmkv * db, const char * key, size_t keylen, long long increment, swarmkv_on_reply_callback_t *cb, void *cb_arg) -{ - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(3); - cmd->argv[0]=sdsnew("incrby"); - cmd->argv[1]=sdsnewlen(key, keylen); - cmd->argv[2]=sdsfromlonglong(increment); - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); +{ + const char *argv[3]; + size_t argv_len[3]; + argv[0]="incrby"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + char buffer[32]; + sprintf(buffer, "%lld", increment); + argv[2]=buffer; + argv_len[2]=strlen(buffer); + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 3, argv, argv_len); return; } void swarmkv_tconsume(struct swarmkv * db, const char * key, size_t keylen, long long tokens, swarmkv_on_reply_callback_t *cb, void *cb_arg) -{ - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(3); - cmd->argv[0]=sdsnew("tconsume"); - cmd->argv[1]=sdsnewlen(key, keylen); - cmd->argv[2]=sdsfromlonglong(tokens); - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); +{ + const char *argv[3]; + size_t argv_len[3]; + argv[0]="tconsume"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + char buffer[32]; + sprintf(buffer, "%lld", tokens); + argv[2]=buffer; + argv_len[2]=strlen(buffer); + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 3, argv, argv_len); return; } void swarmkv_ftconsume(struct swarmkv * db, const char * key, size_t keylen, const char * member, size_t member_len, long long weight, long long tokens, swarmkv_on_reply_callback_t *cb, void *cb_arg) -{ - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(5); - cmd->argv[0]=sdsnew("ftconsume"); - cmd->argv[1]=sdsnewlen(key, keylen); - cmd->argv[2]=sdsnewlen(member, member_len); - cmd->argv[3]=sdsfromlonglong(weight); - cmd->argv[4]=sdsfromlonglong(tokens); - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); +{ + const char *argv[5]; + size_t argv_len[5]; + argv[0]="ftconsume"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + argv[2]=member; + argv_len[2]=member_len; + char buffer1[32]; + sprintf(buffer1, "%lld", weight); + argv[3]=buffer1; + argv_len[3]=strlen(buffer1); + char buffer2[32]; + sprintf(buffer2, "%lld", tokens); + argv[4]=buffer2; + argv_len[4]=strlen(buffer2); + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 5, argv, argv_len); return; } void swarmkv_btconsume(struct swarmkv * db, const char * key, size_t keylen, const char * member, size_t member_len, long long tokens, swarmkv_on_reply_callback_t *cb, void *cb_arg) -{ - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(4); - cmd->argv[0]=sdsnew("btconsume"); - cmd->argv[1]=sdsnewlen(key, keylen); - cmd->argv[2]=sdsnewlen(member, member_len); - cmd->argv[3]=sdsfromlonglong(tokens); - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); - return; -} -struct blocking_query_ctx { - struct swarmkv_reply *reply; - struct swarmkv *db; -}; -void blocking_query_cb(const struct swarmkv_reply *reply, void * arg) -{ - struct blocking_query_ctx *ctx=(struct blocking_query_ctx*) arg; - ctx->reply=swarmkv_reply_dup(reply); - swarmkv_caller_loop_break(ctx->db); + const char *argv[4]; + size_t argv_len[4]; + argv[0]="btconsume"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + argv[2]=member; + argv_len[2]=member_len; + char buffer[32]; + sprintf(buffer, "%lld", tokens); + argv[3]=buffer; + argv_len[3]=strlen(buffer); + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 4, argv, argv_len); return; } -struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *target, int argc, sds *argv) -{ - struct swarmkv_cmd *cmd=NULL; - struct swarmkv_reply *reply=NULL; - struct blocking_query_ctx ctx; - memset(&ctx, 0, sizeof(ctx)); - node_t target_node; - memset(&target_node, 0, sizeof(node_t)); - ctx.db=db; - ctx.reply=NULL; - cmd=swarmkv_cmd_new(argc); - for(int i=0; i<argc; i++) - { - cmd->argv[i]=sdsdup(argv[i]); - } - - if(!target) - { - exec_for_local(db, cmd, NULL, blocking_query_cb, &ctx); - } - else - { - node_init_from_sds(&target_node, target); - exec_for_local(db, cmd, &target_node, blocking_query_cb, &ctx); - } - long long pending_cmds=0; - pending_cmds=swarmkv_caller_get_pending_commands(db); - if(ctx.reply==NULL) - { - assert(pending_cmds==1); - swarmkv_caller_loop(db, SWARMKV_LOOP_NO_EXIT_ON_EMPTY, NULL); - } - assert(ctx.reply!=NULL); - reply=ctx.reply; - ctx.reply=NULL; - - swarmkv_cmd_free(cmd); - return reply; -} - -struct swarmkv_reply *swarmkv_command_on(struct swarmkv *db, const char *target, const char *format, ...) -{ - char *cmd_str=NULL; - va_list ap; - va_start(ap,format); - vasprintf(&cmd_str, format, ap); - va_end(ap); - - int argc=0; - sds *argv=NULL; - - argv=sdssplitargs(cmd_str, &argc); - struct swarmkv_reply *reply=NULL; - reply=swarmkv_command_on_argv(db, target, argc, argv); - - if (argv) - { - sdsfreesplitres(argv, argc); - } - free(cmd_str); - - return reply; -} -struct swarmkv_reply *swarmkv_command(struct swarmkv *db, const char *format, ...) +void swarmkv_bfadd(struct swarmkv * db, const char * key, size_t keylen, const char *items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t *cb, void *cb_arg) { - char *cmd_str=NULL; - va_list ap; - va_start(ap, format); - vasprintf(&cmd_str, format, ap); - va_end(ap); - - int argc=0; - sds *argv=NULL; - argv=sdssplitargs(cmd_str, &argc); - struct swarmkv_reply *reply=NULL; - reply=swarmkv_command_on_argv(db, NULL, argc, argv); - - if (argv) + const char *argv[2+n_items]; + size_t argv_len[2+n_items]; + argv[0]="bfadd"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + for(size_t i=0; i<n_items; i++) { - sdsfreesplitres(argv, argc); + argv[2+i]=items[i]; + argv_len[2+i]=items_len[i]; } - free(cmd_str); - - return reply; + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2+n_items, argv, argv_len); + return; } -void swarmkv_async_command_on_argv(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *target, int argc, sds *argv) +void swarmkv_bfmexists(struct swarmkv * db, const char * key, size_t keylen, const char *items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t *cb, void *cb_arg) { - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(argc); - for(size_t i=0; i<argc; i++) + const char *argv[2+n_items]; + size_t argv_len[2+n_items]; + argv[0]="bfmexists"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + for(size_t i=0; i<n_items; i++) { - cmd->argv[i]=sdsdup(argv[i]); + argv[2+i]=items[i]; + argv_len[2+i]=items_len[i]; } - - if(!target) - { - exec_for_local(db, cmd, NULL, cb, cb_arg); - } - else - { - node_t target_node; - memset(&target_node, 0, sizeof(node_t)); - node_init_from_sds(&target_node, target); - exec_for_local(db, cmd, &target_node, cb, cb_arg); - } - swarmkv_cmd_free(cmd); + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2+n_items, argv, argv_len); + return; } -void swarmkv_async_command_on(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *target, const char *format, ...) +void swarmkv_cmsincrby(struct swarmkv * db, const char * key, size_t keylen, const char * items[], const size_t items_len[], long long increments[], size_t n_increment, swarmkv_on_reply_callback_t * cb, void * cb_arg) { - int argc=0; sds *argv=NULL; - char *cmd_str=NULL; - - va_list ap; - va_start(ap,format); - vasprintf(&cmd_str, format, ap); - va_end(ap); - - argv=sdssplitargs(cmd_str, &argc); - - swarmkv_async_command_on_argv(db, cb, cb_arg, target, argc, argv); - - if (argv) + const char *argv[2+2*n_increment]; + size_t argv_len[2+2*n_increment]; + char inc_buffer[n_increment][32]; + argv[0]="cmsincrby"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + for(size_t i=0; i<n_increment; i++) { - sdsfreesplitres(argv, argc); + sprintf(inc_buffer[i], "%lld", increments[i]); + argv[2+2*i]=items[i]; + argv_len[2+2*i]=items_len[i]; + argv[3+2*i]=inc_buffer[i]; + argv_len[3+2*i]=strlen(inc_buffer[i]); } - free(cmd_str); + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2+2*n_increment, argv, argv_len); + return; } -void swarmkv_async_command(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *format, ...) +void swarmkv_cmsmquery(struct swarmkv * db, const char * key, size_t keylen, const char * items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t * cb, void * cb_arg) { - int argc=0; sds *argv=NULL; - char *cmd_str=NULL; - - va_list ap; - va_start(ap,format); - vasprintf(&cmd_str, format, ap); - va_end(ap); - - argv=sdssplitargs(cmd_str, &argc); - - swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, argc, argv); - - if (argv) + const char *argv[2+n_items]; + size_t argv_len[2+n_items]; + argv[0]="cmsmquery"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + for(size_t i=0; i<n_items; i++) { - sdsfreesplitres(argv, argc); + argv[2+i]=items[i]; + argv_len[2+i]=items_len[i]; } - free(cmd_str); + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2+n_items, argv, argv_len); + return; }
\ No newline at end of file diff --git a/src/swarmkv_common.c b/src/swarmkv_common.c index faa1a04..d915b8a 100644 --- a/src/swarmkv_common.c +++ b/src/swarmkv_common.c @@ -287,6 +287,10 @@ sds node_addr2sds(const node_t *node) { return sdsnew(node->addr); } +const char *node_addr2cstr(const node_t *node) +{ + return node->addr; +} int node_init_from_sds(node_t *node, const char *addr_str) { memset(node, 0, sizeof(node_t)); @@ -348,7 +352,7 @@ void node_init(node_t *node, const char *ipv4, unsigned int port) snprintf(node->addr, sizeof(node->addr), "%s:%u", ipv4, port); return; } -void node_init_from_string(node_t *node, const char *addr_str) +void node_init_from_cstr(node_t *node, const char *addr_str) { memset(node, 0, sizeof(node_t)); assert(strlen(addr_str)<sizeof(node_t)); @@ -356,6 +360,13 @@ void node_init_from_string(node_t *node, const char *addr_str) strncpy(node->addr, addr_str, sizeof(node->addr)); return; } +void node_init_from_string(node_t *node, const char *addr_str, size_t sz_addr) +{ + memset(node, 0, sizeof(node_t)); + assert(sz_addr<=sizeof(node->addr)); + strncpy(node->addr, addr_str, sz_addr); + return; +} int node_init_from_reply(node_t *node, const struct swarmkv_reply *reply) { if(reply->type!=SWARMKV_REPLY_NODE) @@ -368,7 +379,7 @@ int node_init_from_reply(node_t *node, const struct swarmkv_reply *reply) { p+=5; } - node_init_from_string(node, p); + node_init_from_cstr(node, p); return 0; } int node_list_new_from_reply(node_t **node_list, size_t *n_node, const struct swarmkv_reply *reply) @@ -559,10 +570,11 @@ void swarmkv_reply_free(struct swarmkv_reply *reply) } free(reply); } -struct swarmkv_cmd *swarmkv_cmd_new(size_t argc) +struct swarmkv_cmd *swarmkv_cmd_new(size_t argc, const node_t *caller) { size_t size=sizeof(struct swarmkv_cmd)+argc*sizeof(sds); struct swarmkv_cmd *cmd=(struct swarmkv_cmd*)malloc(size); + node_copy(&cmd->caller, caller); cmd->argc=argc; cmd->argv=(sds*)((char*)cmd+sizeof(struct swarmkv_cmd)); return cmd; @@ -580,7 +592,7 @@ void swarmkv_cmd_free(struct swarmkv_cmd *p) } struct swarmkv_cmd* swarmkv_cmd_dup(const struct swarmkv_cmd *origin) { - struct swarmkv_cmd* copy=swarmkv_cmd_new(origin->argc); + struct swarmkv_cmd* copy=swarmkv_cmd_new(origin->argc, &origin->caller); size_t i=0; for(i=0; i<origin->argc; i++) { @@ -657,7 +669,7 @@ void json2keyslots(sds json_buffer, void *slot_container_base, size_t sz_slot_co { iterator=(struct key_slot*)(slot_container_base+sz_slot_container*j+offset_slot); iterator->slot_id=j; - node_init_from_string(&iterator->owner, owner->valuestring); + node_init_from_cstr(&iterator->owner, owner->valuestring); k++; } } @@ -714,7 +726,7 @@ void leader_response2leader_node(const char *resp_body, node_t *leader) cJSON *item=NULL; root=cJSON_Parse(resp_body); item=cJSON_GetObjectItem(root, "node"); - node_init_from_string(leader, item->valuestring);; + node_init_from_cstr(leader, item->valuestring);; cJSON_Delete(root); } /* This is an modified version of keyHashSlot of Redis source code. diff --git a/src/swarmkv_common.h b/src/swarmkv_common.h index 3d427cf..d4f53e5 100644 --- a/src/swarmkv_common.h +++ b/src/swarmkv_common.h @@ -85,11 +85,12 @@ struct swarmkv_options struct swarmkv_cmd { + node_t caller; size_t argc; sds *argv; }; -struct swarmkv_cmd *swarmkv_cmd_new(size_t argc); +struct swarmkv_cmd *swarmkv_cmd_new(size_t argc, const node_t *originator); void swarmkv_cmd_free(struct swarmkv_cmd *p); struct swarmkv_cmd *swarmkv_cmd_dup(const struct swarmkv_cmd *origin); int swarmkv_cmd_parse_integer(const struct swarmkv_cmd *cmd, const char *name, long long *ival); @@ -117,21 +118,22 @@ int node_list_remove(node_t *list, size_t n_list, const node_t *node); sds node_print_json(const node_t *self, uuid_t uuid); sds node_addr2sds(const node_t *node); +const char *node_addr2cstr(const node_t *node); int node_init_from_sds(node_t *node, const char *addr_str); int node_init_from_reply(node_t *node, const struct swarmkv_reply *reply); +void node_init(node_t *node, const char *ipv4, unsigned int port); +void node_init_from_cstr(node_t *node, const char *addr_str); +void node_init_from_string(node_t *node, const char *addr_str, size_t sz_addr); + /* Return 0 if node1 == node 2 */ int node_compare(const node_t *node1, const node_t *node2); void node_copy(node_t *dst, const node_t *src); +int node_sanity(const node_t *node); int node_is_empty(const node_t *node); -void node_init(node_t *node, const char *ipv4, unsigned int port); -void node_init_from_string(node_t *node, const char *addr_str); + int node_parse(const node_t *node, unsigned int *port, char *addr_str, size_t sz_addr); void node_init_from_sockaddr(node_t *node, const struct sockaddr * sa); void node_to_sockaddr(const node_t *node, struct sockaddr *sockaddr); int http_blocking_request(enum evhttp_cmd_type type, const char* host, unsigned int port, const char* url, sds req_body, sds *resp_body); -//typedef void exec_cmd_func(struct swarmkv *db, const node_t *target_node, const struct swarmkv_cmd *cmd, struct future *future_of_caller); -struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *target, int argc, sds *argv); -void swarmkv_async_command_on_argv(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *target, int argc, sds *argv); - -int __gettid(struct swarmkv *db);
\ No newline at end of file +const node_t *swarmkv_self_node(const struct swarmkv *db);
\ No newline at end of file diff --git a/src/swarmkv_error.h b/src/swarmkv_error.h index 221d107..5b6522b 100644 --- a/src/swarmkv_error.h +++ b/src/swarmkv_error.h @@ -9,6 +9,7 @@ #define error_arg_not_valid_address "ERR arg `%s` is not `IP:port` format" #define error_arg_not_valid_float "ERR arg `%s` is not a valid float or out of range" #define error_arg_not_valid_uuid "ERR arg `%s` is not a valid UUID" +#define error_arg_not_valid_node "ERR arg `%s` is not a valid IP:port" #define error_arg_parse_failed "ERR arg `%s` parse failed" #define error_arg_string_should_be "ERR arg `%s` should be `%s`" #define error_need_additional_arg "ERR arg `%s` should be fllowed by more args" diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c index 1ca01c5..582cf52 100644 --- a/src/swarmkv_keyspace.c +++ b/src/swarmkv_keyspace.c @@ -156,19 +156,23 @@ void crdt_op_on_reply(const struct swarmkv_reply *reply, void * arg) void key_entry_deletion_notification(struct key_route_entry *key_entry, struct swarmkv *exec_cmd_handle) { struct replica_node *replica=NULL, *tmp=NULL; - struct swarmkv_cmd *crdt_del_cmd=swarmkv_cmd_new(3); - crdt_del_cmd->argv[0]=sdsnew("crdt"); - crdt_del_cmd->argv[1]=sdsnew("del"); - crdt_del_cmd->argv[2]=sdsdup(key_entry->key); + const char *argv[3]; + size_t argv_len[3]; + argv[0]="crdt"; + argv_len[0]=strlen(argv[0]); + argv[1]="del"; + argv_len[1]=strlen(argv[1]); + argv[2]=key_entry->key; + argv_len[2]=sdslen(key_entry->key); + HASH_ITER(hh, key_entry->hash_replica, replica, tmp) { struct crdt_op_ctx *ctx=ALLOC(struct crdt_op_ctx, 1); node_copy(&ctx->peer, &replica->node); ctx->key=sdsdup(key_entry->key); ctx->is_del=1; - swarmkv_async_command_on_argv(exec_cmd_handle, crdt_op_on_reply, ctx, replica->node.addr, crdt_del_cmd->argc, crdt_del_cmd->argv); + swarmkv_async_command_on_argv(exec_cmd_handle, crdt_op_on_reply, ctx, replica->node.addr, 3, argv, argv_len); } - swarmkv_cmd_free(crdt_del_cmd); return; } @@ -176,14 +180,18 @@ void key_entry_meet_replica(struct key_route_entry *key_entry, struct swarmkv *e { struct replica_node *replica=NULL, *tmp=NULL; int n_replica=HASH_COUNT(key_entry->hash_replica), i=0; - struct swarmkv_cmd *crdt_meet_cmd=swarmkv_cmd_new(3+n_replica); - crdt_meet_cmd->argv[0]=sdsnew("crdt"); - crdt_meet_cmd->argv[1]=sdsnew("meet"); - crdt_meet_cmd->argv[2]=sdsdup(key_entry->key); - + const char *argv[3+n_replica]; + size_t argv_len[3+n_replica]; + argv[0]="crdt"; + argv_len[0]=strlen(argv[0]); + argv[1]="meet"; + argv_len[1]=strlen(argv[1]); + argv[2]=key_entry->key; + argv_len[2]=sdslen(key_entry->key); HASH_ITER(hh, key_entry->hash_replica, replica, tmp) { - crdt_meet_cmd->argv[3+i]=node_addr2sds(&replica->node); + argv[3+i]=node_addr2cstr(&replica->node); + argv_len[3+i]=strlen(argv[3+i]); i++; } assert(i==n_replica); @@ -192,9 +200,8 @@ void key_entry_meet_replica(struct key_route_entry *key_entry, struct swarmkv *e struct crdt_op_ctx *ctx=ALLOC(struct crdt_op_ctx, 1); node_copy(&ctx->peer, &replica->node); ctx->key=sdsdup(key_entry->key); - swarmkv_async_command_on_argv(exec_cmd_handle, crdt_op_on_reply, ctx, replica->node.addr, crdt_meet_cmd->argc, crdt_meet_cmd->argv); + swarmkv_async_command_on_argv(exec_cmd_handle, crdt_op_on_reply, ctx, replica->node.addr, 3+n_replica, argv, argv_len); } - swarmkv_cmd_free(crdt_meet_cmd); return; } @@ -1443,7 +1450,7 @@ enum cmd_exec_result keyspace_getkeysinslot_command(struct swarmkv_module *mod_k static int ks_get_tid(struct swarmkv_keyspace *ks, int slot_id) { int tid=swarmkv_keyspace_slot2tid(&ks->module, slot_id); - int real_tid=__gettid(ks->exec_cmd_handle); + int real_tid=swarmkv_gettid(ks->exec_cmd_handle); assert(tid==real_tid); return tid; } @@ -1646,7 +1653,7 @@ enum cmd_exec_result keyspace_keys_command(struct swarmkv_module *mod_keyspace, int is_matched=0; UT_array *matched_replies=NULL; struct swarmkv_reply *r=NULL; - int real_tid=__gettid(ks->exec_cmd_handle); + int real_tid=swarmkv_gettid(ks->exec_cmd_handle); int thread_id=atoll(cmd->argv[2]); assert(real_tid==thread_id); const sds pattern=cmd->argv[3]; @@ -1903,7 +1910,7 @@ int swarmkv_keyspace_slot2tid(struct swarmkv_module *mod_keyspace, int slot_id) void swarmkv_keyspace_periodic(struct swarmkv_module *mod_keyspace, int thread_id) { struct swarmkv_keyspace *ks = module2keyspace(mod_keyspace); - int real_tid=__gettid(ks->exec_cmd_handle); + int real_tid=swarmkv_gettid(ks->exec_cmd_handle); assert(real_tid==thread_id); struct ks_thread *thr=ks->threads+thread_id; diff --git a/src/swarmkv_mesh.c b/src/swarmkv_mesh.c index 0ffc636..62d6129 100644 --- a/src/swarmkv_mesh.c +++ b/src/swarmkv_mesh.c @@ -44,7 +44,7 @@ int swarmkv_mesh_send(struct swarmkv_mesh *mesh, int current_thread_id, int dest assert(current_thread_id != dest_thread_id); struct swarmkv_mesh_thread *curr_thr=mesh->threads+current_thread_id; struct swarmkv_mesh_thread *dest_thr=mesh->threads+dest_thread_id; - int tid=__gettid((struct swarmkv *)(mesh->on_msg_cb_arg)); + int tid=swarmkv_gettid((struct swarmkv *)(mesh->on_msg_cb_arg)); assert(tid==current_thread_id); ringbuf_t *dest_ring=dest_thr->ring; assert(msg->magic == SWARMKV_MSG_MAGIC); @@ -88,7 +88,7 @@ static void mesh_on_eventfd_read_cb(evutil_socket_t fd, short what, void * arg) ringbuf_t *ring=thr->ring; uint64_t n_msg=0; - int tid=__gettid((struct swarmkv *)(mesh->on_msg_cb_arg)); + int tid=swarmkv_gettid((struct swarmkv *)(mesh->on_msg_cb_arg)); assert(tid==thr->thread_id); ssize_t s = read(thr->efd, &n_msg, sizeof(uint64_t)); diff --git a/src/swarmkv_message.c b/src/swarmkv_message.c index 8db7ac9..6f2607b 100644 --- a/src/swarmkv_message.c +++ b/src/swarmkv_message.c @@ -75,19 +75,24 @@ static struct swarmkv_cmd *swarmkv_cmd_deserialize(const char *blob, size_t size mpack_tree_init_data(&tree, blob, size); mpack_tree_parse(&tree); - mpack_node_t item; + mpack_node_t item, array_node; mpack_node_t root=mpack_tree_root(&tree); struct swarmkv_cmd *cmd=NULL; const char *arg=NULL; - size_t arg_sz=0; - - cmd=swarmkv_cmd_new(mpack_node_array_length(root)); + size_t sz=0; + node_t caller; + item=mpack_node_map_cstr(root, "caller"); + node_init_from_string(&caller, mpack_node_str(item), mpack_node_strlen(item)); + assert(node_sanity(&caller)); + //assert(sz==sizeof(node_t)); + array_node=mpack_node_map_cstr(root, "argv"); + cmd=swarmkv_cmd_new(mpack_node_array_length(array_node), &caller); for(size_t i=0; i<cmd->argc; i++) { - item=mpack_node_array_at(root, i); + item=mpack_node_array_at(array_node, i); arg=mpack_node_str(item); - arg_sz=mpack_node_strlen(item); - cmd->argv[i]=sdsnewlen(arg, arg_sz); + sz=mpack_node_strlen(item); + cmd->argv[i]=sdsnewlen(arg, sz); } if (mpack_tree_destroy(&tree) != mpack_ok) { fprintf(stderr, "An error occurred decoding a cmd blob!\n"); @@ -164,15 +169,20 @@ static void swarmkv_cmd_serialize(const struct swarmkv_cmd *cmd, char **blob, si char *root_mpack_buff=NULL; size_t root_mpack_sz=0; - mpack_writer_t writer; + mpack_writer_t writer; size_t i=0; mpack_writer_init_growable(&writer, &root_mpack_buff, &root_mpack_sz); + mpack_build_map(&writer); + mpack_write_cstr(&writer, "caller"); + mpack_write_cstr(&writer, node_addr2cstr(&cmd->caller)); + mpack_write_cstr(&writer, "argv"); mpack_build_array(&writer); for(i=0; i<cmd->argc; i++) { mpack_write_str(&writer, cmd->argv[i], (uint32_t)sdslen(cmd->argv[i])); } mpack_complete_array(&writer); + mpack_complete_map(&writer); if (mpack_writer_destroy(&writer) != mpack_ok) { fprintf(stderr, "An error occurred encoding the cmd!\n"); diff --git a/src/swarmkv_monitor.c b/src/swarmkv_monitor.c index 0856b1c..e0c070c 100644 --- a/src/swarmkv_monitor.c +++ b/src/swarmkv_monitor.c @@ -2,6 +2,7 @@ #include "sds.h" #include "log.h" #include "uthash.h" +#include "utlist.h" #include "swarmkv_monitor.h" #include "swarmkv_common.h" #include "swarmkv_utils.h" @@ -220,6 +221,31 @@ struct swarmkv_reply *recorder_metric_to_reply(const struct recorder_metric *met reply->elements[1]=swarmkv_reply_new_string_fmt(metric_buff); return reply; } +struct monitor_client +{ + node_t node; + sds pattern; + struct monitor_client *next; +}; +struct monitor_client *monitor_client_new(const node_t *node, sds pattern) +{ + struct monitor_client *client=ALLOC(struct monitor_client, 1); + node_copy(&client->node, node); + client->pattern = pattern? sdsdup(pattern):NULL; + return client; +} +void monitor_client_free(struct monitor_client *client) +{ + sdsfree(client->pattern); + free(client); + return; +} +struct swarmkv_monitor_thread +{ + int n_client; + struct monitor_client *clients; + pthread_mutex_t mutex; +}; struct swarmkv_monitor { struct swarmkv_module module; @@ -229,7 +255,9 @@ struct swarmkv_monitor long long max_latency_usec; int significant_figures; int nr_worker_threads; - pthread_mutex_t mutex;//only one latency command can be executed at a time + struct swarmkv_monitor_thread *threads; + pthread_mutex_t latency_execute_mutex; //only one latency command can be executed at a time + struct swarmkv *ref_db; }; struct swarmkv_monitor *module2monitor(struct swarmkv_module *module) @@ -239,7 +267,7 @@ struct swarmkv_monitor *module2monitor(struct swarmkv_module *module) assert(monitor==module->mod_ctx); return monitor; } -struct swarmkv_module *swarmkv_monitor_new(const struct swarmkv_options *opts) +struct swarmkv_module *swarmkv_monitor_new(const struct swarmkv_options *opts, struct swarmkv *db) { struct swarmkv_monitor *monitor=ALLOC(struct swarmkv_monitor, 1); monitor->max_latency_usec=opts->cluster_timeout_us; @@ -247,7 +275,14 @@ struct swarmkv_module *swarmkv_monitor_new(const struct swarmkv_options *opts) monitor->peers=ALLOC(struct recorder *, monitor->nr_worker_threads); strncpy(monitor->module.name, "monitor", sizeof(monitor->module.name)); monitor->module.mod_ctx=monitor; - pthread_mutex_init(&monitor->mutex, NULL); + pthread_mutex_init(&monitor->latency_execute_mutex, NULL); + monitor->threads=ALLOC(struct swarmkv_monitor_thread, monitor->nr_worker_threads); + for(size_t i=0; i<monitor->nr_worker_threads; i++) + { + pthread_mutex_init(&monitor->threads[i].mutex, PTHREAD_PROCESS_PRIVATE); + monitor->threads[i].clients=NULL; + } + monitor->ref_db=db; return &monitor->module; } void swarmkv_monitor_register_command(struct swarmkv_module *mod_monitor, const char *command) @@ -274,15 +309,142 @@ void swarmkv_monitor_free(struct swarmkv_module *mod_monitor) for(size_t i=0; i<monitor->nr_worker_threads; i++) { recorder_table_free(monitor->peers+i); + struct monitor_client *client=NULL, *tmp=NULL; + LL_FOREACH_SAFE(monitor->threads[i].clients, client, tmp) + { + LL_DELETE(monitor->threads[i].clients, client); + monitor_client_free(client); + } } + free(monitor->threads); free(monitor->peers); monitor->peers=NULL; free(monitor); } -void swarmkv_monitor_record_command(struct swarmkv_module *mod_monitor, const char *cmd_name, long long latency_usec) +struct print_ctx +{ + node_t node; + int tid; + struct swarmkv_monitor *monitor; +}; +static void print_on_reply(const struct swarmkv_reply *reply, void *user) +{ + struct print_ctx *ctx=(struct print_ctx *)user; + int tid=swarmkv_gettid(ctx->monitor->ref_db); + assert(tid==ctx->tid); + if(reply->type != SWARMKV_REPLY_STATUS) + { + struct monitor_client *client=NULL, *tmp=NULL; + pthread_mutex_lock(&ctx->monitor->threads[ctx->tid].mutex); + LL_FOREACH_SAFE(ctx->monitor->threads[ctx->tid].clients, client, tmp) + { + if(0==node_compare(&client->node, &ctx->node)) + { + ctx->monitor->threads[ctx->tid].n_client--; + LL_DELETE(ctx->monitor->threads[ctx->tid].clients, client); + monitor_client_free(client); + } + } + pthread_mutex_unlock(&ctx->monitor->threads[ctx->tid].mutex); + } + free(ctx); +} +static const char *exec_ret2string(enum cmd_exec_result ret) +{ + switch(ret) + { + case NEED_KEY_ROUTE: + return "KRT"; + case REDIRECT: + return "RDR"; + case FINISHED: + return "FIN"; + default: + assert(0); + } +} +void swarmkv_monitor_record_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd_spec *spec, const struct swarmkv_cmd *cmd, long long latency_usec, enum cmd_exec_result result) { struct swarmkv_monitor *monitor=module2monitor(mod_monitor); - recorder_table_record_latency(&monitor->commands, cmd_name, strlen(cmd_name), latency_usec, monitor->max_latency_usec, 0); + recorder_table_record_latency(&monitor->commands, spec->name, strlen(spec->name), latency_usec, monitor->max_latency_usec, 0); + int tid=swarmkv_gettid(monitor->ref_db); + assert(tid<monitor->nr_worker_threads); + if(0==strncasecmp(spec->name, "PRINT", strlen("PRINT"))) + { + //Do not log print command to avoid infinit recursion. + return; + } + if(monitor->threads[tid].n_client == 0) + { + //No monitor client registered. + return; + } + + struct monitor_client *client=NULL; + int matched_client=0; + node_t matched_client_nodes[monitor->threads[tid].n_client]; + pthread_mutex_lock(&monitor->threads[tid].mutex); + LL_FOREACH(monitor->threads[tid].clients, client) + { + if(client->pattern) + { + int is_matched=0; + for(int i=0; i<cmd->argc; i++) + { + is_matched=stringmatchlen(client->pattern, sdslen(client->pattern), cmd->argv[i], sdslen(cmd->argv[i]), 0); + if(is_matched) + { + break; + } + } + if(!is_matched) + { + continue; + } + } + node_copy(matched_client_nodes+matched_client, &client->node); + matched_client++; + } + //To avoid dead lock, we should not hold the lock when executing command. + pthread_mutex_unlock(&monitor->threads[tid].mutex); + assert(matched_client <= monitor->threads[tid].n_client); + if(matched_client==0) + { + return; + } + + sds cmdrepr = sdsempty(); + struct timeval tv; + gettimeofday(&tv, NULL); + cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec, (long)tv.tv_usec); + cmdrepr = sdscatprintf(cmdrepr,"[%d %c %s %s] ", tid, + 0==node_compare(&cmd->caller, swarmkv_self_node(monitor->ref_db))?'L':'R', + node_addr2cstr(&cmd->caller), + exec_ret2string(result)); + for(int i=0; i<cmd->argc; i++) + { + cmdrepr = sdscatrepr(cmdrepr, (char*)cmd->argv[i], sdslen(cmd->argv[i])); + if (i != cmd->argc-1) + cmdrepr = sdscatlen(cmdrepr," ",1); + } + cmdrepr = sdscatprintf(cmdrepr, "\r\n"); + + const char *print_cmd_argv[2]; + size_t print_cmd_argv_len[2]; + print_cmd_argv[0]="PRINT"; + print_cmd_argv_len[0]=strlen(print_cmd_argv[0]); + print_cmd_argv[1]=cmdrepr; + print_cmd_argv_len[1]=sdslen(cmdrepr); + + for(int i=0; i<matched_client; i++) + { + struct print_ctx *print_ctx=ALLOC(struct print_ctx, 1); + node_copy(&print_ctx->node, &matched_client_nodes[i]); + print_ctx->tid=tid; + print_ctx->monitor=monitor; + swarmkv_async_command_on_argv(monitor->ref_db, print_on_reply, print_ctx, print_ctx->node.addr, 2, print_cmd_argv, print_cmd_argv_len); + } + sdsfree(cmdrepr); return; } void swarmkv_monitor_record_peer(struct swarmkv_module *mod_monitor, node_t *peer, long long latency_usec, int thread_id) @@ -333,7 +495,7 @@ struct swarmkv_reply *latency_generic(struct recorder **table, const char *key) enum cmd_exec_result latency_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) { struct swarmkv_monitor *monitor=module2monitor(mod_monitor); - pthread_mutex_lock(&monitor->mutex); + pthread_mutex_lock(&monitor->latency_execute_mutex); if(cmd->argc==2 && !strcasecmp(cmd->argv[1], "help") ) { const char *help = { @@ -408,15 +570,43 @@ enum cmd_exec_result latency_command(struct swarmkv_module *mod_monitor, const s { *reply=swarmkv_reply_new_error(erorr_subcommand_syntax, cmd->argv[1], cmd->argv[0]); } - pthread_mutex_unlock(&monitor->mutex); + pthread_mutex_unlock(&monitor->latency_execute_mutex); return FINISHED; } -enum cmd_exec_result lastcmds_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) +enum cmd_exec_result monreg_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) { -/*LASTCMDS [offset]*/ +/* MONREG IP:port [pattern]*/ struct swarmkv_monitor *monitor=module2monitor(mod_monitor); - pthread_mutex_lock(&monitor->mutex); - - pthread_mutex_unlock(&monitor->mutex); + node_t node; + int ret=0; + ret=node_init_from_sds(&node, cmd->argv[1]); + if(ret<0) + { + *reply=swarmkv_reply_new_error(error_arg_not_valid_node); + return FINISHED; + } + for(int i=0; i<monitor->nr_worker_threads; i++) + { + struct monitor_client *client=NULL; + int found=0; + pthread_mutex_lock(&monitor->threads[i].mutex); + LL_FOREACH(monitor->threads[i].clients, client) + { + if(0==node_compare(&client->node, &node)) + { + sdsfree(client->pattern); + client->pattern=cmd->argc>2?sdsdup(cmd->argv[2]):NULL; + found=1; + } + } + if(!found) + { + client=monitor_client_new(&node, cmd->argc>2?cmd->argv[2]:NULL); + LL_APPEND(monitor->threads[i].clients, client); + monitor->threads[i].n_client++; + } + pthread_mutex_unlock(&monitor->threads[i].mutex); + } + *reply=swarmkv_reply_new_status("OK"); return FINISHED; }
\ No newline at end of file diff --git a/src/swarmkv_monitor.h b/src/swarmkv_monitor.h index 67273ea..f134618 100644 --- a/src/swarmkv_monitor.h +++ b/src/swarmkv_monitor.h @@ -6,12 +6,13 @@ #include <unistd.h> #include <string.h> -struct swarmkv_module *swarmkv_monitor_new(const struct swarmkv_options *opts); +struct swarmkv_module *swarmkv_monitor_new(const struct swarmkv_options *opts, struct swarmkv *db); void swarmkv_monitor_register_command(struct swarmkv_module *mod_monitor, const char *command); void swarmkv_monitor_free(struct swarmkv_module *mod_monitor); -void swarmkv_monitor_record_command(struct swarmkv_module *mod_monitor, const char *cmd_name, long long latency_usec); +void swarmkv_monitor_record_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd_spec *spec, const struct swarmkv_cmd *cmd, long long latency_usec, enum cmd_exec_result result); void swarmkv_monitor_register_event(struct swarmkv_module *mod_monitor, const char *event); void swarmkv_monitor_record_peer(struct swarmkv_module *mod_monitor, node_t *peer, long long latency_usec, int thread_id); void swarmkv_monitor_record_event(struct swarmkv_module *mod_monitor, const char *event_name, long long latency_usec); -enum cmd_exec_result latency_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
\ No newline at end of file +enum cmd_exec_result latency_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); +enum cmd_exec_result monreg_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
\ No newline at end of file diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c index 4607d1d..0ead34b 100644 --- a/src/swarmkv_net.c +++ b/src/swarmkv_net.c @@ -618,7 +618,7 @@ void swarmkv_net_set_on_msg_callback(struct swarmkv_net *net, on_msg_callback_t #define MAX_OUTPUT_BUFFER_SIZE 1024*1024*64 int swarmkv_net_send(struct swarmkv_net *net, const node_t *dest, struct swarmkv_msg *msg, const char **err_str) { - int tid=__gettid((struct swarmkv *)(net->on_msg_cb_arg)); + int tid=swarmkv_gettid((struct swarmkv *)(net->on_msg_cb_arg)); assert(tid<net->nr_threads); struct snet_thread *thr=net->threads+tid; struct snet_conn *conn=NULL; diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c index 189a561..f1a3ac8 100644 --- a/src/swarmkv_store.c +++ b/src/swarmkv_store.c @@ -294,7 +294,7 @@ struct scontainer *store_lookup_scontainer(struct swarmkv_store *store, sds key) { struct scontainer *ctr=NULL; int designated_tid=key2tid(key, store->opts->nr_worker_threads); - int real_tid=__gettid(store->exec_cmd_handle); + int real_tid=swarmkv_gettid(store->exec_cmd_handle); assert(designated_tid==real_tid); ctr=scontainer_find(&(store->threads[designated_tid].obj_table), key); if(0==pthread_mutex_trylock(&store->threads[designated_tid].sanity_lock)) @@ -541,16 +541,16 @@ static void crdt_generic_on_reply(const struct swarmkv_reply *reply, void *user) return; } -void crdt_generic_call(struct swarmkv_store *store, enum CRDT_OP op, const struct swarmkv_cmd *cmd, const node_t *peer) +void crdt_generic_call(struct swarmkv_store *store, enum CRDT_OP op, const node_t *peer, int argc, const char *argv[], size_t *argv_len) { struct crdt_generic_ctx *ctx=NULL; assert(peer); ctx=ALLOC(struct crdt_generic_ctx, 1); ctx->op=op; ctx->store=store; - ctx->key=sdsdup(cmd->argv[2]); + ctx->key=sdsnew(argv[2]); node_copy(&ctx->peer, peer); - swarmkv_async_command_on_argv(store->exec_cmd_handle, crdt_generic_on_reply, ctx, peer->addr, cmd->argc, cmd->argv); + swarmkv_async_command_on_argv(store->exec_cmd_handle, crdt_generic_on_reply, ctx, peer->addr, argc, argv, argv_len); return; } #define MONITOR_SYNC_EVENT_NAME "crdt-sync-cycle" @@ -574,17 +574,21 @@ int store_batch_sync(struct swarmkv_store *store, int tid) } else { - struct swarmkv_cmd *cmd=swarmkv_cmd_new(4); - cmd->argv[0]=sdsnew("crdt"); - cmd->argv[1]=sdsnew("merge"); - cmd->argv[2]=sdsnew(ctr->obj.key); - cmd->argv[3]=sdsnewlen(blob, blob_sz); + const char *argv[4]; + size_t argv_len[4]; + argv[0]="crdt"; + argv_len[0]=strlen(argv[0]); + argv[1]="merge"; + argv_len[1]=strlen(argv[1]); + argv[2]=ctr->obj.key; + argv_len[2]=sdslen(ctr->obj.key); + argv[3]=blob; + argv_len[3]=blob_sz; for(int i=0; i<utarray_len(ctr->replica_node_list); i++) { node_t *peer=utarray_eltptr(ctr->replica_node_list, i); - crdt_generic_call(store, CRDT_MERGE, cmd, peer); + crdt_generic_call(store, CRDT_MERGE, peer, 4, argv, argv_len); } - swarmkv_cmd_free(cmd); free(blob); } DL_DELETE(thr->sync_queue, ctr); @@ -593,16 +597,22 @@ int store_batch_sync(struct swarmkv_store *store, int tid) n_synced++; if(n_synced>=MAX_SYNC_PER_PERIOD) break; } - node_t peer; - struct swarmkv_cmd *cmd=NULL; - int ret=1; - while(1) + + struct sync_task *task=NULL; + task=sync_master_get_task(sync_master); + while(task) { - ret=sync_master_get_cmd(sync_master, &peer, &cmd); - if(!ret) break; - crdt_generic_call(store, CRDT_MERGE, cmd, &peer); - swarmkv_cmd_free(cmd); - cmd=NULL; + size_t n_data=sync_task_key_count(task); + const char *argv[n_data*2+2]; + size_t argv_len[n_data*2+2]; + argv[0]="crdt"; + argv_len[0]=strlen(argv[0]); + argv[1]="merge"; + argv_len[1]=strlen(argv[1]); + sync_task_read_key_blob(task, argv+2, argv_len+2, n_data*2); + crdt_generic_call(store, CRDT_MERGE, sync_task_peer(task), n_data*2+2, argv, argv_len); + sync_task_free(task); + task=sync_master_get_task(sync_master); } sync_master_free(sync_master); return n_synced; @@ -613,7 +623,7 @@ void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id) struct scontainer *ctr=NULL, *tmp=NULL; struct timespec start, end; int n_synced=0; - int real_tid=__gettid(store->exec_cmd_handle); + int real_tid=swarmkv_gettid(store->exec_cmd_handle); assert(real_tid==thread_id); clock_gettime(CLOCK_MONOTONIC, &start); @@ -630,20 +640,22 @@ void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id) char *blob=NULL; size_t blob_sz=0; scontainer_serialize(ctr, &blob, &blob_sz); - - struct swarmkv_cmd *cmd=swarmkv_cmd_new(4); - cmd->argv[0]=sdsnew("crdt"); - cmd->argv[1]=sdsnew("merge"); - cmd->argv[2]=sdsnew(ctr->obj.key); - cmd->argv[3]=sdsnewlen(blob, blob_sz); + const char *argv[4]; + size_t argv_len[4]; + argv[0]="crdt"; + argv_len[0]=strlen(argv[0]); + argv[1]="merge"; + argv_len[1]=strlen(argv[1]); + argv[2]=ctr->obj.key; + argv_len[2]=sdslen(ctr->obj.key); + argv[3]=blob; + argv_len[3]=blob_sz; for(int i=0; i<utarray_len(ctr->replica_node_list); i++) { node_t *peer=utarray_eltptr(ctr->replica_node_list, i); - crdt_generic_call(store, CRDT_MERGE, cmd, peer); + crdt_generic_call(store, CRDT_MERGE, peer, 4, argv, argv_len); } - swarmkv_cmd_free(cmd); free(blob); - DL_DELETE(thr->sync_queue, ctr); ctr->is_in_sync_q=0; store->synced++; @@ -752,13 +764,11 @@ enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const st sds key=cmd->argv[2]; int tid=store_gettid(mod_store, key); - int real_tid=__gettid(store->exec_cmd_handle); + int real_tid=swarmkv_gettid(store->exec_cmd_handle); assert(tid==real_tid); - struct swarmkv_cmd *crdt_get_cmd=NULL; - crdt_get_cmd=swarmkv_cmd_new(3); - crdt_get_cmd->argv[0]=sdsnew("crdt"); - crdt_get_cmd->argv[1]=sdsnew("get"); - crdt_get_cmd->argv[2]=sdsdup(key); + + + size_t n_replica_node=cmd->argc-3; ctr=store_lookup_scontainer(store, key); @@ -782,23 +792,34 @@ enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const st assert(node_compare(replica_nodes+i, &store->self)!=0); scontainer_add_replica_node(ctr, replica_nodes+i); } - struct swarmkv_cmd *crdt_meet_cmd=NULL; + const char *crdt_get_argv[3]; + size_t crdt_get_argv_len[3]; + crdt_get_argv[0]="crdt"; + crdt_get_argv_len[0]=strlen(crdt_get_argv[0]); + crdt_get_argv[1]="get"; + crdt_get_argv_len[1]=strlen(crdt_get_argv[1]); + crdt_get_argv[2]=key; + crdt_get_argv_len[2]=sdslen(key); + + const char *crdt_meet_argv[4]; + size_t crdt_meet_argv_len[4]; for(size_t i=0; i<n_replica_node; i++) { if(i<max_pull_node_num) { - crdt_generic_call(store, CRDT_GET, crdt_get_cmd, replica_nodes+i); + crdt_generic_call(store, CRDT_GET, replica_nodes+i, 3, crdt_get_argv, crdt_get_argv_len); } - crdt_meet_cmd=swarmkv_cmd_new(4); - crdt_meet_cmd->argv[0]=sdsnew("crdt"); - crdt_meet_cmd->argv[1]=sdsnew("meet"); - crdt_meet_cmd->argv[2]=sdsdup(ctr->obj.key); - crdt_meet_cmd->argv[3]=node_addr2sds(&store->self); - crdt_generic_call(store, CRDT_MEET, crdt_meet_cmd, replica_nodes+i); - swarmkv_cmd_free(crdt_meet_cmd); + + crdt_meet_argv[0]="crdt"; + crdt_meet_argv_len[0]=strlen(crdt_meet_argv[0]); + crdt_meet_argv[1]="meet"; + crdt_meet_argv_len[1]=strlen(crdt_meet_argv[1]); + crdt_meet_argv[2]=key; + crdt_meet_argv_len[2]=sdslen(key); + crdt_meet_argv[3]=node_addr2cstr(&store->self); + crdt_meet_argv_len[3]=strlen(crdt_meet_argv[3]); + crdt_generic_call(store, CRDT_MEET, replica_nodes+i, 4, crdt_meet_argv, crdt_meet_argv_len); } - swarmkv_cmd_free(crdt_get_cmd); - crdt_get_cmd=NULL; free(replica_nodes); *reply=swarmkv_reply_new_status("OK"); return FINISHED; diff --git a/src/swarmkv_sync.c b/src/swarmkv_sync.c index 17dffe9..a47f931 100644 --- a/src/swarmkv_sync.c +++ b/src/swarmkv_sync.c @@ -81,36 +81,58 @@ void sync_master_add_obj(struct sync_master *master, const sds key, char *blob, return; } -int sync_master_get_cmd(struct sync_master *master, node_t *peer, struct swarmkv_cmd **cmd) +struct sync_task *sync_master_get_task(struct sync_master *master) { struct sync_task *task=NULL, *tmp=NULL; - struct sync_data **p=NULL, *data=NULL; - struct swarmkv_cmd *c=NULL; - size_t n_data=0; - *cmd=NULL; + + //We are not iterating the whole table, just get the first one HASH_ITER(hh, master->task_table, task, tmp) { - n_data=utarray_len(task->sync_data_list); - c=swarmkv_cmd_new(2+n_data*2); - c->argv[0]=sdsnew("crdt"); - c->argv[1]=sdsnew("merge"); - for(size_t i=0; i<n_data; i++) - { - p=utarray_eltptr(task->sync_data_list, i); - data=*p; - c->argv[2+i*2]=sdsdup(data->key); - c->argv[2+i*2+1]=sdsnewlen(data->blob, data->blob_sz); - sync_data_free(data); - master->synced++; - } - node_copy(peer, &task->peer); HASH_DEL(master->task_table, task); - utarray_free(task->sync_data_list); - free(task); + master->synced += utarray_len(task->sync_data_list); break; } - *cmd=c; - return (c==NULL?0:1); + return task; +} +void sync_task_free(struct sync_task *task) +{ + struct sync_data **p=NULL, *data=NULL; + size_t n_data=utarray_len(task->sync_data_list); + for(size_t i=0; i<n_data; i++) + { + p=utarray_eltptr(task->sync_data_list, i); + data=*p; + sync_data_free(data); + } + utarray_free(task->sync_data_list); + free(task); + return; +} +const node_t *sync_task_peer(struct sync_task *task) +{ + return &task->peer; +} +size_t sync_task_key_count(struct sync_task *task) +{ + return utarray_len(task->sync_data_list); +} +void sync_task_read_key_blob(struct sync_task *task, const char *argv[], size_t *argv_len, int argc) +{ + struct sync_data **p=NULL, *data=NULL; + int n_data=utarray_len(task->sync_data_list); + assert(argc == 2*n_data); + for(int i=0, j=0; i<n_data && j<argc; i++, j+=2) + { + p=utarray_eltptr(task->sync_data_list, i); + data=*p; + assert(data->key); + assert(data->blob); + argv[j]=data->key; + argv_len[j]=sdslen(data->key); + argv[j+1]=data->blob; + argv_len[j+1]=data->blob_sz; + } + return; } void sync_master_free(struct sync_master *master) { diff --git a/src/swarmkv_sync.h b/src/swarmkv_sync.h index 36f96ff..84a2175 100644 --- a/src/swarmkv_sync.h +++ b/src/swarmkv_sync.h @@ -3,7 +3,12 @@ #include "sds.h" struct sync_master; +struct sync_task; struct sync_master *sync_master_new(void); void sync_master_add_obj(struct sync_master *mgr, const sds key, char *blob, size_t blob_sz, const node_t *peers, size_t n_peer); -int sync_master_get_cmd(struct sync_master *mgr, node_t *peer, struct swarmkv_cmd **cmd); +struct sync_task *sync_master_get_task(struct sync_master *master); +void sync_task_free(struct sync_task *task); +const node_t *sync_task_peer(struct sync_task *task); +size_t sync_task_key_count(struct sync_task *task); +void sync_task_read_key_blob(struct sync_task *task, const char *argv[], size_t *argv_len, int argc); void sync_master_free(struct sync_master *mgr);
\ No newline at end of file diff --git a/src/swarmkv_utils.c b/src/swarmkv_utils.c index ab0b920..adbd188 100644 --- a/src/swarmkv_utils.c +++ b/src/swarmkv_utils.c @@ -56,7 +56,6 @@ char* replace_char(char* str, char find, char replace){ return str; } -/* Glob-style pattern matching. */ //source https://github.com/redis/redis/blob/7.0/src/util.c int stringmatchlen(const char *pattern, int patternLen, const char *string, int stringLen, int nocase) diff --git a/src/swarmkv_utils.h b/src/swarmkv_utils.h index 55f5dbc..8951523 100644 --- a/src/swarmkv_utils.h +++ b/src/swarmkv_utils.h @@ -40,6 +40,8 @@ const char *module_name_str(const char *name); const char *swarmkv_util_pthread_cond_timedwait_error_to_string(const int timed_wait_rv); char *toLower(char* s); char *toUpper(char* s); +// Glob-style pattern matching. +// Return 1 if matched, 0 if not matched int stringmatchlen(const char *pattern, int patternLen, const char *string, int stringLen, int nocase); long long ustime(void); int is_number(const char *string, size_t sz, long long *number); diff --git a/src/t_cms.c b/src/t_cms.c index 5339cc9..587d3e2 100644 --- a/src/t_cms.c +++ b/src/t_cms.c @@ -156,7 +156,7 @@ enum cmd_exec_result cmsincrby_command(struct swarmkv_module *mod_store, const s } enum cmd_exec_result cmsmquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) { -/*CMSQUERY key item [item ...]*/ +/*CMSMQUERY key item [item ...]*/ struct sobj *obj=NULL; const sds key=cmd->argv[1]; |
