diff options
| author | 郑超 <[email protected]> | 2024-04-01 09:24:43 +0000 |
|---|---|---|
| committer | 郑超 <[email protected]> | 2024-04-01 09:24:43 +0000 |
| commit | f58c197ffcbf1322cada5bd82fee2f35ee0a81d0 (patch) | |
| tree | 6c33ca478382e1a860095c05319a712d936ba827 | |
| parent | a2d902c3434fe0edb3ecc9d70a95b9d6b790f379 (diff) | |
Feature/monitor4.3.0
| -rw-r--r-- | docs/cli.md | 64 | ||||
| -rw-r--r-- | docs/command_toc.md | 2 | ||||
| -rw-r--r-- | docs/commands/bloom_filter.md | 35 | ||||
| -rw-r--r-- | docs/commands/count_min_sketch.md | 177 | ||||
| -rw-r--r-- | docs/commands/trouble_shooting.md | 3 | ||||
| -rw-r--r-- | include/swarmkv/swarmkv.h | 12 | ||||
| -rw-r--r-- | readme.md | 2 | ||||
| -rw-r--r-- | src/CMakeLists.txt | 4 | ||||
| -rw-r--r-- | src/swarmkv.c | 96 | ||||
| -rw-r--r-- | src/swarmkv_api.c | 613 | ||||
| -rw-r--r-- | src/swarmkv_common.c | 24 | ||||
| -rw-r--r-- | src/swarmkv_common.h | 18 | ||||
| -rw-r--r-- | src/swarmkv_error.h | 1 | ||||
| -rw-r--r-- | src/swarmkv_keyspace.c | 41 | ||||
| -rw-r--r-- | src/swarmkv_mesh.c | 4 | ||||
| -rw-r--r-- | src/swarmkv_message.c | 26 | ||||
| -rw-r--r-- | src/swarmkv_monitor.c | 214 | ||||
| -rw-r--r-- | src/swarmkv_monitor.h | 7 | ||||
| -rw-r--r-- | src/swarmkv_net.c | 2 | ||||
| -rw-r--r-- | src/swarmkv_store.c | 115 | ||||
| -rw-r--r-- | src/swarmkv_sync.c | 68 | ||||
| -rw-r--r-- | src/swarmkv_sync.h | 7 | ||||
| -rw-r--r-- | src/swarmkv_utils.c | 1 | ||||
| -rw-r--r-- | src/swarmkv_utils.h | 2 | ||||
| -rw-r--r-- | src/t_cms.c | 2 | ||||
| -rw-r--r-- | test/swarmkv_gtest.cpp | 311 | ||||
| -rw-r--r-- | tools/swarmkv_cli.c | 78 |
27 files changed, 1342 insertions, 587 deletions
diff --git a/docs/cli.md b/docs/cli.md index f31dfa2..32de3d9 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -2,6 +2,13 @@ The SwarmKV command line interface (swarmkv-cli) is a terminal program used to send commands to and read replies from the SwarmKV Cluster. It has two main modes: an interactive Read Eval Print Loop (REPL) mode where the user types SwarmKV commands and receives replies, and a command mode where swarmkv-cli is executed with additional arguments and the reply is printed to the standard output. +You can start the `swarmkv-cli` by running the following command: + +``` +$ ./swarmkv-cli -n cluster-name +``` +The `--exec` option is used to execute a command in a non-interactive fashion. + ### Running the same command N times In interactive mode, you can prefixing the command name by a number to run it N times. @@ -23,10 +30,67 @@ Prefixing by two number are repeat times and interval (unit: second, default: 0 (integer) 7 (integer) 8 ``` + ### Attach/Detach Low-level commands don't have auto-route ability. If you execute low-level command via `swarmkv-cli`, you should execute `ATTACH IP:port` first. The `DETACH` command exits attaching model. Another use case is the ability to attach to a node and execute commands on it. For example, you can connect to a node and execute the DEBUG command to debug the node. +### CLUSTER CREATE +Syntax + +``` +CLUSTER CREATE cluster-name IP:port [IP:port ...] +``` +The `CLUSTER CREATE` command creates a new cluster with the specified name and adds the specified nodes to it. + +### CLUSTER NODES + +Syntax + +``` +CLUSTER NODES +``` +List active nodes by communicate with Consul. + +### MONITOR +Syntax + +``` +MONITOR [pattern] +``` + +`MONITOR` is a debugging command to see all the command executed by a specific node. It is implemented by registering the client at the attached node with the `MONREG` command, after which the attached node streams back every executed command using the `PRINT` command. + +The optional `pattern` argument is a glob-style pattern that is matched against the command name and arguments. If the pattern is specified, only commands that match the pattern are streamed. +``` +1711950111.894233 [1 R 127.0.0.1:43385 KRT] "get" "key" "value" +1711950111.894324 [1 L 127.0.0.1:5212 RDR] "keyspace" "xradd" "key" "127.0.0.1:5212" +``` +The abbreviations are listed before: +- `R` for remote +- `L` for local +- `KRT` for keyspace route +- `RDR` for redirect +- `FIN` for finished + +### CLUSTER SANITY +Syntax + +``` +CLUSTER SANITY check | heal +``` + +The `CLUSTER SANITY` command checks for consistency between the keyspace and crdtspace. Inconsistencies are mostly caused by node crashes or restarts. The `heal` option fixes the inconsistency by adding and removing replica addresses from the keyspace. + +### CLUSTER ADDSLOTOWNER +Syntax + +``` +CLUSTER ADDSLOTOWNER IP:Port [IP:Port ...] +``` + +The `CLUSTER ADDSLOTOWNER` command assigns the slots to new nodes. + ### Known Issues If a swarmkv-cli instance attempts to connect to itself using an address other than 127.0.0.1:port, it will result in a crash.
\ No newline at end of file diff --git a/docs/command_toc.md b/docs/command_toc.md index fd558e2..7d89f28 100644 --- a/docs/command_toc.md +++ b/docs/command_toc.md @@ -9,7 +9,7 @@ The supported command are category as follows: * [Hash Type](./commands/hash.md) * [Token Bucket Types](./commands/token_bucket.md) * [Bloom Filter Type](./commands/bloom_filter.md) - +* [Count-Min Sketch Type](./commands/count_min_sketch.md) ## COMMAND LIST Syntax diff --git a/docs/commands/bloom_filter.md b/docs/commands/bloom_filter.md index 0fe9474..eae9661 100644 --- a/docs/commands/bloom_filter.md +++ b/docs/commands/bloom_filter.md @@ -86,10 +86,10 @@ Syntax BFINFO key ``` Returns information about a Bloom filter. -- Error -- error_rate arguments specified in `BFINIT`. -- Capacity -- capacity arguments specified in `BFINIT`. -- TimeWindowMs -- window-milliseconds arguments specified in `BFINIT`. -- TimeSlices -- slice-number arguments specified in `BFINIT`. +- Error -- error_rate argument specified in `BFINIT`. +- Capacity -- capacity argument specified in `BFINIT`. +- TimeWindowMs -- window-milliseconds argument specified in `BFINIT`. +- TimeSlices -- slice-number argument specified in `BFINIT`. - HashNum -- Hash function number caculated by the error_rate, which is log2(1/error_rate). - TotalSlices -- The actual slice number after expansion. - MaxExpansionTimes -- The maximum expansion times of every time slice. @@ -100,4 +100,29 @@ Returns information about a Bloom filter. Return - Array reply with argument name (Simple string reply) and value (Integer reply or Double reply) pairs -- Empty array reply if key does not exist.
\ No newline at end of file +- Empty array reply if key does not exist. + +Examples +``` +swarmkv-2-nodes> bfinfo bf-key + 1) "Error" + 2) (double) 0.000100 + 3) "Capacity" + 4) (integer) 1000000 + 5) "TimeWindowMs" + 6) (integer) 300000 + 7) "TimeSlices" + 8) (integer) 24 + 9) "HashNum" +10) (integer) 14 +11) "TotalSlices" +12) (integer) 38 +13) "MaxExpansionTimes" +14) (integer) 1 +15) "ApproximateItemNum" +16) (integer) 0 +17) "FillRatio" +18) (double) 0.000000 +19) "OldestItemTime" +20) "0.000" +```
\ No newline at end of file diff --git a/docs/commands/count_min_sketch.md b/docs/commands/count_min_sketch.md new file mode 100644 index 0000000..2e7605e --- /dev/null +++ b/docs/commands/count_min_sketch.md @@ -0,0 +1,177 @@ +## Count-min Sketch + +Count-Min Sketch is a probabilistic data structure that can be used to estimate the frequency of events/elements in a stream of data. +The CMS is not the best data structure to count frequency of a uniformly distributed stream. + +### CMSINITBYDIM + +Syntax + +``` +CMSINITBYDIM key width depth +``` +Initializes a Count-Min Sketch to dimensions specified by user. + +Parameters: +- key: The name of the sketch. +- width: Number of counters in each array. Reduces the error size. +- depth: Number of counter-arrays. Reduces the probability for an error of a certain size (percentage of total count). + +Return + +- Simple String Reply: OK if executed correctly. +- Empty Array if key already exists or wrong parameters. + +### CMSINITBYPROB + +Syntax + +``` +CMSINITBYPROB key error probability +``` +Initializes a Count-Min Sketch to accommodate requested tolerances. The error parameter will determine the width w of your sketch and the probability will determine the number of hash functions (depth d). The error rate we choose will determine the threshold above which we can trust the result from the sketch. + +Parameters: +- key: The name of the sketch. +- error: Estimate size of error. The error is a percent of **total counted items**, but not the count of single item. This determines the width of the sketch. +- probability: The desired probability for inflated count. This should be a decimal value between 0 and 1. This effects the depth of the sketch. The closer this number is to zero, the greater the memory consumption per item and the more CPU usage per operation. + +Return + +- Simple String Reply: OK if executed correctly. +- Empty Array if key already exists or wrong parameters. + +Examples +Assume you select an error rate of 0.1% (0.001) with a certainty of 99.8% (0.998). This means you have an error probability of 0.02% (0.002). +``` +swarmkv-2-nodes> cmsinitbyprob concurrent-sessions 0.001 0.002 +OK +swarmkv-2-nodes> cmsinfo concurrent-sessions + 1) "Width" + 2) (integer) 2000 + 3) "Depth" + 4) (integer) 9 + 5) "Error" + 6) (double) 0.001000 + 7) "Probability" + 8) (double) 0.001953 + 9) "Count" +10) (integer) 0 +11) "ReplicaNumber" +12) (integer) 0 +swarmkv-2-nodes> +``` + +### CMSINCRBY + +Syntax + +``` +CMSINCRBY key item increment [item increment ...] +``` + +Increases the count of item by increment. Multiple items can be increased with one call. +Parameters: +- key: The name of the sketch. +- item: The item which counter is to be increased. Negtive value is allowed for decrement. +- increment: Amount by which the item counter is to be increased. + +Return +- Array reply of Integer reply with an updated min-count of each of the items in the sketch. +- Empty Array if key does not exist. + +### CMSQUERY + +Syntax +``` +CMSQUERY key item +``` +Returns the count for an item in a sketch. + +Parameters: +- key: The name of the sketch. +- item: The item for which to return the count. + +Return +- Integer reply: The count of the item in the sketch. +- Integer 0 if key does not exist. + +### CMSMQUERY + +Syntax +``` +CMSMQUERY key item [item ...] +``` +Returns the count for one or more items in a sketch. It's the bulk version of `CMSQUERY`. + +Parameters: +- key: The name of the sketch. +- item: One or more items for which to return the count. + +Return +- Array reply of Integer reply with a min-count of each of the items in the sketch. +- Empty Array if key does not exist. + +### CMSRLIST + +Syntax +``` +CMSRLIST key +``` +Returns the list of replica uuid. + +Parameters: +- key: The name of the sketch. + +Return +- Array reply with the list of replica uuid. +- Empty Array if key does not exist. + +### CMSRCLEAR + +Syntax +``` +CMSRCLEAR key uuid +``` +Zeroes the replica identified by the given uuid. This command is intend to reset count when node holding this replica is down. + +Return +- Simple String Reply: OK if executed correctly. +- Erorr reply if the replica specified by uuid is not found. + +### CMSINFO + +Syntax +``` +CMSINFO key +``` +Returns the information of the sketch. +- Width: If the CMS is init with `CMSINITBYPROB`, width = 2/error. +- Depth: If the CMS is init with `CMSINITBYPROB`, depth = -log2(probability). +- Probability: The probability of inflated count. +- Error: The error rate of the sketch. +- Count: The total count of the sketch. The inflated threshold is Count*Error. +- ReplicaNumber: The number of replicas of the sketch. + +Parameters: +- key: The name of the sketch. + +Return +- Array reply with information of the sketch. + +Examples +``` +swarmkv-2-nodes> cmsinfo cms-key + 1) "Width" + 2) (integer) 8192 + 3) "Depth" + 4) (integer) 8 + 5) "Error" + 6) (double) 0.000244 + 7) "Probability" + 8) (double) 0.003906 + 9) "Count" +10) (integer) 0 +11) "ReplicaNumber" +12) (integer) 0 +```
\ No newline at end of file diff --git a/docs/commands/trouble_shooting.md b/docs/commands/trouble_shooting.md index 7628f69..5af3b32 100644 --- a/docs/commands/trouble_shooting.md +++ b/docs/commands/trouble_shooting.md @@ -90,12 +90,13 @@ Subcommands are: * RESET [command|event|peer] - Reset data of a specified catalog or all the data if no catalog provided. + ### DEBUG Syntax ``` -DEBUG <subcommand> [<arg> [value] [opt] ...]. +DEBUG <subcommand> [<arg> [value] [opt] ...] ``` Subcommands are: diff --git a/include/swarmkv/swarmkv.h b/include/swarmkv/swarmkv.h index 9b1e2e1..c161b2f 100644 --- a/include/swarmkv/swarmkv.h +++ b/include/swarmkv/swarmkv.h @@ -131,11 +131,13 @@ long long swarmkv_caller_get_pending_commands(struct swarmkv *db); //Blocking function struct swarmkv_reply *swarmkv_command(struct swarmkv *db, const char *format, ...)__attribute__ ((format (printf, 2, 3))); struct swarmkv_reply *swarmkv_command_on(struct swarmkv *db, const char *target, const char *format, ...)__attribute__ ((format (printf, 3, 4))); +struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *target, int argc, const char *argv[], size_t *argv_len); //Non-blocking function typedef void swarmkv_on_reply_callback_t(const struct swarmkv_reply *reply, void * arg); -void swarmkv_async_command_on(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *target, const char *format, ...)__attribute__ ((format (printf, 5, 6))); void swarmkv_async_command(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *format, ...)__attribute__ ((format (printf, 4, 5))); +void swarmkv_async_command_on(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *target, const char *format, ...)__attribute__ ((format (printf, 5, 6))); +void swarmkv_async_command_on_argv(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *target, int argc, const char *argv[], size_t *argv_len); void swarmkv_get(struct swarmkv * db, const char * key, size_t keylen, swarmkv_on_reply_callback_t * cb, void * arg); @@ -145,12 +147,10 @@ void swarmkv_set(struct swarmkv * db, void swarmkv_del(struct swarmkv * db, const char * key, size_t keylen, swarmkv_on_reply_callback_t *cb, void *cb_arg); void swarmkv_incrby(struct swarmkv * db, const char * key, size_t keylen, long long delta, swarmkv_on_reply_callback_t *cb, void *cb_arg); - void swarmkv_expire(struct swarmkv *db, const char *key, size_t keylen, int seconds, swarmkv_on_reply_callback_t *cb, void *cb_arg); void swarmkv_ttl(struct swarmkv *db, const char *key, size_t keylen, swarmkv_on_reply_callback_t *cb, void *cb_arg); void swarmkv_persist(struct swarmkv *db, const char *key, size_t keylen, swarmkv_on_reply_callback_t *cb, void *cb_arg); - void swarmkv_sadd(struct swarmkv *db, const char *key, size_t keylen, const char *members[], const size_t members_len[], size_t n_members, swarmkv_on_reply_callback_t *cb, void *cb_arg); void swarmkv_srem(struct swarmkv *db, const char *key, size_t keylen, const char *members[], const size_t members_len[], size_t n_members, swarmkv_on_reply_callback_t *cb, void *cb_arg); void swarmkv_sismember(struct swarmkv *db, const char *key, size_t keylen, const char *member, size_t member_len, swarmkv_on_reply_callback_t *cb, void *cb_arg); @@ -162,7 +162,10 @@ void swarmkv_ftconsume(struct swarmkv * db, const char * key, size_t keylen, con void swarmkv_btconsume(struct swarmkv * db, const char * key, size_t keylen, const char * member, size_t member_len, long long tokens, swarmkv_on_reply_callback_t *cb, void *cb_arg); void swarmkv_bfadd(struct swarmkv * db, const char * key, size_t keylen, const char *items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t *cb, void *cb_arg); -void swarmkv_bfexists(struct swarmkv * db, const char * key, size_t keylen, const char *items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t *cb, void *cb_arg); +void swarmkv_bfmexists(struct swarmkv * db, const char * key, size_t keylen, const char *items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t *cb, void *cb_arg); + +void swarmkv_cmsincrby(struct swarmkv * db, const char * key, size_t keylen, const char * items[], const size_t items_len[], long long increments[], size_t n_increment, swarmkv_on_reply_callback_t * cb, void * cb_arg); +void swarmkv_cmsmquery(struct swarmkv * db, const char * key, size_t keylen, const char * items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t * cb, void * cb_arg); //Used by swarmkv-cli size_t swarmkv_get_possible_command_name(struct swarmkv *db, const char *prefix, const char *cmd_names[], size_t sz); @@ -171,6 +174,7 @@ char *swarmkv_get_command_hint(struct swarmkv *db, const char* cmd_name); const char *swarmkv_self_address(const struct swarmkv *db); void swarmkv_self_uuid(const struct swarmkv *db, char buff[37]); +int swarmkv_gettid(const struct swarmkv *db); #ifdef __cplusplus } /* end extern "C" */ #endif @@ -144,7 +144,7 @@ int main(int argc, char **argv) return 0; } ``` - +It's recommended to use [jemalloc](https://github.com/jemalloc/jemalloc) for better performance. # Further documentation diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 6c8b3e5..83d5aa5 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,5 +1,5 @@ set(SWARMKV_MAJOR_VERSION 4) -set(SWARMKV_MINOR_VERSION 2) +set(SWARMKV_MINOR_VERSION 3) set(SWARMKV_PATCH_VERSION 0) set(SWARMKV_VERSION ${SWARMKV_MAJOR_VERSION}.${SWARMKV_MINOR_VERSION}.${SWARMKV_PATCH_VERSION}) @@ -25,7 +25,7 @@ add_definitions(-D_GNU_SOURCE) add_definitions(-fPIC) set(SWARMKV_SRC swarmkv.c swarmkv_api.c swarmkv_mesh.c swarmkv_rpc.c swarmkv_message.c swarmkv_net.c - swarmkv_store.c swarmkv_sync.c swarmkv_keyspace.c swarmkv_monitor.c + swarmkv_sync.c swarmkv_store.c swarmkv_keyspace.c swarmkv_monitor.c t_string.c t_set.c t_token_bucket.c t_hash.c t_bloom_filter.c t_cms.c swarmkv_common.c swarmkv_utils.c future_promise.c http_client.c) diff --git a/src/swarmkv.c b/src/swarmkv.c index 382f56e..8cf8d91 100644 --- a/src/swarmkv.c +++ b/src/swarmkv.c @@ -108,7 +108,7 @@ struct swarmkv_options *swarmkv_get0_options(struct swarmkv *db) return db->opts; } -int __gettid(struct swarmkv *db) +int swarmkv_gettid(const struct swarmkv *db) { // int __sys_tid=syscall(SYS_gettid); for(int i=0; i<db->opts->nr_worker_threads + db->opts->nr_caller_threads; i++) @@ -187,7 +187,7 @@ static void remoter_caller_ctx_send_reply(struct remote_caller_ctx *ctx, const s { struct swarmkv *db = ctx->db; struct swarmkv_msg *msg=swarmkv_msg_new_by_reply(reply, &ctx->remote, ctx->remote_tid, &db->self, ctx->sequence); - int cur_tid=__gettid(db); + int cur_tid=swarmkv_gettid(db); const char *err_str=NULL; if(0==node_compare(&db->self, &msg->caller)) { @@ -471,7 +471,12 @@ enum cmd_exec_result __attribute__((optimize("O0"))) debug_command(struct swarmk } return FINISHED; } - +enum cmd_exec_result print_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) +{ + printf("%s", cmd->argv[1]); + *reply=swarmkv_reply_new_status("OK"); + return FINISHED; +} enum cmd_exec_result config_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) { //Unused @@ -544,7 +549,7 @@ void *swarmkv_worker_thread(void *arg) { struct swarmkv *db = (struct swarmkv *)arg; swarmkv_register_thread(db); - int tid=__gettid(db); + int tid=swarmkv_gettid(db); struct swarmkv_thread *thr = db->threads+tid; char thread_name[16]; snprintf(thread_name, sizeof(thread_name), "swarmkv-%u", thr->thread_id); @@ -619,41 +624,41 @@ static struct swarmkv_reply *key_not_found_reply(enum key_not_found_reply not_fo return reply; } -static struct swarmkv_cmd *make_keyroute_cmd(enum cmd_key_flag flag, const sds key, const node_t *obj_owner) +static struct swarmkv_cmd *make_keyroute_cmd(enum cmd_key_flag flag, const sds key, int dry_run, const node_t *caller) { struct swarmkv_cmd *keyroute_cmd=NULL; switch(flag) { case CMD_KEY_RO: case CMD_KEY_RW: - if(obj_owner) + if(!dry_run) { - keyroute_cmd=swarmkv_cmd_new(4); + keyroute_cmd=swarmkv_cmd_new(4, caller); keyroute_cmd->argv[0]=sdsnew("keyspace"); keyroute_cmd->argv[1]=sdsnew("xradd"); keyroute_cmd->argv[2]=sdsdup(key); - keyroute_cmd->argv[3]=node_addr2sds(obj_owner); + keyroute_cmd->argv[3]=node_addr2sds(caller); } else { - keyroute_cmd=swarmkv_cmd_new(3); + keyroute_cmd=swarmkv_cmd_new(3, caller); keyroute_cmd->argv[0]=sdsnew("keyspace"); keyroute_cmd->argv[1]=sdsnew("rlist"); keyroute_cmd->argv[2]=sdsdup(key); - } + } break; case CMD_KEY_OW: - if(obj_owner) + if(!dry_run) { - keyroute_cmd=swarmkv_cmd_new(4); + keyroute_cmd=swarmkv_cmd_new(4, caller); keyroute_cmd->argv[0]=sdsnew("keyspace"); keyroute_cmd->argv[1]=sdsnew("radd"); keyroute_cmd->argv[2]=sdsdup(key); - keyroute_cmd->argv[3]=node_addr2sds(obj_owner); + keyroute_cmd->argv[3]=node_addr2sds(caller); } else { - keyroute_cmd=swarmkv_cmd_new(3); + keyroute_cmd=swarmkv_cmd_new(3, caller); keyroute_cmd->argv[0]=sdsnew("keyspace"); keyroute_cmd->argv[1]=sdsnew("radd"); keyroute_cmd->argv[2]=sdsdup(key); @@ -661,7 +666,7 @@ static struct swarmkv_cmd *make_keyroute_cmd(enum cmd_key_flag flag, const sds k break; case CMD_KEY_RM: assert(0); - keyroute_cmd=swarmkv_cmd_new(3); + keyroute_cmd=swarmkv_cmd_new(3, caller); keyroute_cmd->argv[0]=sdsnew("keyspace"); keyroute_cmd->argv[1]=sdsnew("del"); keyroute_cmd->argv[2]=sdsdup(key); @@ -754,10 +759,10 @@ static void peer_exec_on_success(void *result, void *user) cmd_ctx_free(ctx); } } -struct swarmkv_cmd *make_crdt_add_cmd(const sds key, node_t replica[], size_t n_replica) +struct swarmkv_cmd *make_crdt_add_cmd(const sds key, node_t replica[], size_t n_replica, const node_t *caller) { struct swarmkv_cmd *crdt_add_cmd=NULL; - crdt_add_cmd=swarmkv_cmd_new(3+n_replica); + crdt_add_cmd=swarmkv_cmd_new(3+n_replica, caller); crdt_add_cmd->argv[0]=sdsnew("crdt"); crdt_add_cmd->argv[1]=sdsnew("add"); crdt_add_cmd->argv[2]=sdsdup(key); @@ -807,12 +812,11 @@ static void key_route_on_success(void *result, void *user) if(n_replica_node>0) { __exec_cmd(ctx->db, replica_nodes+0, ctx->cmd, ctx->future_of_caller); - //__send_cmd(ctx->db, replica_nodes+0, ctx->cmd, ctx->future_of_caller); } if(self_is_a_replica) { struct cmd_ctx *crdt_add_ctx=NULL; - struct swarmkv_cmd *crdt_add_cmd=make_crdt_add_cmd(key, replica_nodes, n_replica_node); + struct swarmkv_cmd *crdt_add_cmd=make_crdt_add_cmd(key, replica_nodes, n_replica_node, &ctx->db->self); crdt_add_ctx=cmd_ctx_new(ctx->db, ctx->cmd, n_replica_node>0?NULL:ctx->future_of_caller); crdt_add_ctx->future_of_mine=future_create("crdt_add", crdt_add_on_success, generic_on_fail, crdt_add_ctx); __exec_cmd(ctx->db, &ctx->db->self, crdt_add_cmd, crdt_add_ctx->future_of_mine); @@ -844,7 +848,7 @@ static int spec_gettid(struct swarmkv_cmd_spec *spec, const struct swarmkv_cmd * void __on_msg_callback(struct swarmkv_msg *msg, void *arg) { struct swarmkv *db = (struct swarmkv *)arg; - int cur_tid=__gettid(db); + int cur_tid=swarmkv_gettid(db); if(msg->type==MSG_TYPE_CMD) { //command may be from other thread or other node. @@ -878,27 +882,13 @@ void __on_msg_callback(struct swarmkv_msg *msg, void *arg) } } } -static const char *exec_ret2string(enum cmd_exec_result ret) -{ - switch(ret) - { - case NEED_KEY_ROUTE: - return "NEED_KEY_ROUTE"; - case REDIRECT: - return "REDIRECT"; - case FINISHED: - return "FINISHED"; - default: - assert(0); - } -} #define INTER_THREAD_RPC_TIMEOUT_AHEAD 1000 void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swarmkv_cmd *cmd, struct future *future_of_caller) { struct swarmkv_cmd_spec *spec=NULL; struct swarmkv_reply *reply=NULL; struct promise *p=NULL; - int cur_tid=__gettid(db); + int cur_tid=swarmkv_gettid(db); spec=get_spec_by_argv(db, cmd->argc, cmd->argv); if(!spec) @@ -981,20 +971,8 @@ void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swar clock_gettime(CLOCK_MONOTONIC_COARSE, &start); exec_ret=spec->proc(spec->module, cmd, &reply); clock_gettime(CLOCK_MONOTONIC_COARSE, &end); - swarmkv_monitor_record_command(db->mod_monitor, spec->name, timespec_diff_usec(&start, &end)); + swarmkv_monitor_record_command(db->mod_monitor, spec, cmd, timespec_diff_usec(&start, &end), exec_ret); - //if(strcasestr(spec->name, "CRDT")) - if(0){ - struct timeval now; - gettimeofday(&now, NULL); - printf("%ld.%.6ld %s %d %s %s %s\n", - now.tv_sec, now.tv_usec, - db->self.addr, - db->threads[cur_tid].recusion_depth, - spec->name, - spec->key_offset<0?"NULL":cmd->argv[spec->key_offset], - exec_ret2string(exec_ret)); - } switch(exec_ret) { case FINISHED: @@ -1018,7 +996,7 @@ void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swar } case NEED_KEY_ROUTE: { - struct swarmkv_cmd *keyspace_cmd=make_keyroute_cmd(spec->flag, cmd->argv[spec->key_offset], db->opts->dryrun? NULL:(&db->self)); + struct swarmkv_cmd *keyspace_cmd=make_keyroute_cmd(spec->flag, cmd->argv[spec->key_offset], db->opts->dryrun, &db->self); struct cmd_ctx *ctx=cmd_ctx_new(db, cmd, future_of_caller); ctx->future_of_mine=future_create("key_route", key_route_on_success, generic_on_fail, ctx); __exec_cmd(db, NULL, keyspace_cmd, ctx->future_of_mine); @@ -1226,13 +1204,19 @@ void command_spec_init(struct swarmkv *db) command_register(&(db->command_table), "PING", "IP:port", 1, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE, ping_command, &db->module); + command_register(&(db->command_table), "PRINT", "text", + 1, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE, + print_command, &db->module); command_register(&(db->command_table), "COMMAND LIST", "", 0, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE, command_list_command, &db->module); command_register(&(db->command_table), "LATENCY", "<subcommand>", 1, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE, latency_command, db->mod_monitor); - + command_register(&(db->command_table), "MONREG", "IP:port", + 1, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE, + monreg_command, db->mod_monitor); + /* low-level state-based CRDT synchronization commands*/ command_register(&(db->command_table), "CRDT ADD", "key [IP:port ...]", 1, 2, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE, @@ -1403,7 +1387,7 @@ static void evloop_timeout_cb(evutil_socket_t fd, short event, void *arg) } void swarmkv_caller_loop(struct swarmkv *db, int flags, struct timeval *tv) { - int tid=__gettid(db); + int tid=swarmkv_gettid(db); //must initiate from caller threads, and caller thread ID is larger than worker thread ID assert(tid >= db->opts->nr_worker_threads); struct swarmkv_thread *thr=db->threads+tid; @@ -1425,14 +1409,14 @@ void swarmkv_caller_loop(struct swarmkv *db, int flags, struct timeval *tv) } void swarmkv_caller_loop_break(struct swarmkv *db) { - int tid=__gettid(db); + int tid=swarmkv_gettid(db); //must initiate from caller threads, and caller thread ID is larger than worker thread ID assert(tid >= db->opts->nr_worker_threads); event_base_loopbreak(db->ref_evbases[tid]); } long long swarmkv_caller_get_pending_commands(struct swarmkv *db) { - int tid=__gettid(db); + int tid=swarmkv_gettid(db); //must initiate from caller threads, and caller thread ID is larger than worker thread ID assert(tid >= db->opts->nr_worker_threads); return swarmkv_rpc_mgr_count(db->rpc_mgr, tid); @@ -1493,7 +1477,7 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name, } event_config_free(ev_cfg); - db->mod_monitor=swarmkv_monitor_new(db->opts); + db->mod_monitor=swarmkv_monitor_new(db->opts, db); db->rpc_mgr=swarmkv_rpc_mgr_new(db->ref_evbases, opts->total_threads, opts->cluster_timeout_us); db->mesh=swarmkv_mesh_new(db->ref_evbases, opts->total_threads, db->logger); @@ -1623,6 +1607,10 @@ const char *swarmkv_self_address(const struct swarmkv *db) { return db->self.addr; } +const node_t *swarmkv_self_node(const struct swarmkv *db) +{ + return &db->self; +} void swarmkv_self_uuid(const struct swarmkv *db, char buff[37]) { uuid_unparse(db->opts->bin_uuid, buff); diff --git a/src/swarmkv_api.c b/src/swarmkv_api.c index f408274..6d98f15 100644 --- a/src/swarmkv_api.c +++ b/src/swarmkv_api.c @@ -10,7 +10,7 @@ #define MODULE_SWAMRKV_API module_name_str("swarmkv.api") void exec_for_local(struct swarmkv *db, const struct swarmkv_cmd *cmd, const node_t *target_node, swarmkv_on_reply_callback_t * cb, void * cb_arg); - +const node_t *swarmkv_self_node(const struct swarmkv *db); struct swarmkv_readoptions { @@ -27,7 +27,7 @@ struct swarmkv_options* swarmkv_options_new(void) opts->cluster_port=5210; opts->health_check_port=0; opts->loglevel=0; - opts->cluster_timeout_us=500*1000;//Default 500ms + opts->cluster_timeout_us=500*1000;//Default 500ms opts->sync_interval_us=10*1000; //Default 10ms strcpy(opts->bind_address, "0.0.0.0"); strcpy(opts->cluster_announce_ip, "127.0.0.1"); @@ -159,146 +159,266 @@ int swarmkv_options_set_max_dispatch_interval(struct swarmkv_options *opts, cons opts->eb_max_callbacks=max_callbacks; return 0; } +void swarmkv_async_command_on_argv(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *target, int argc, const char *argv[], size_t *argv_len) +{ + struct swarmkv_cmd *cmd=NULL; + cmd=swarmkv_cmd_new(argc, swarmkv_self_node(db)); + for(size_t i=0; i<argc; i++) + { + assert(argv[i]); + assert(argv_len[i]>0); + cmd->argv[i]=sdsnewlen(argv[i], argv_len[i]); + } + if(!target) + { + exec_for_local(db, cmd, NULL, cb, cb_arg); + } + else + { + node_t target_node; + memset(&target_node, 0, sizeof(node_t)); + node_init_from_sds(&target_node, target); + exec_for_local(db, cmd, &target_node, cb, cb_arg); + } + swarmkv_cmd_free(cmd); +} +struct blocking_query_ctx +{ + struct swarmkv_reply *reply; + struct swarmkv *db; +}; +void blocking_query_cb(const struct swarmkv_reply *reply, void * arg) +{ + struct blocking_query_ctx *ctx=(struct blocking_query_ctx*) arg; + ctx->reply=swarmkv_reply_dup(reply); + swarmkv_caller_loop_break(ctx->db); + return; +} +struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *target, int argc, const char *argv[], size_t *argv_len) +{ + struct blocking_query_ctx ctx; + memset(&ctx, 0, sizeof(ctx)); + ctx.db=db; + ctx.reply=NULL; + swarmkv_async_command_on_argv(db, blocking_query_cb, &ctx, target, argc, argv, argv_len); + long long pending_cmds=0; + pending_cmds=swarmkv_caller_get_pending_commands(db); + if(ctx.reply==NULL) + { + assert(pending_cmds==1); + swarmkv_caller_loop(db, SWARMKV_LOOP_NO_EXIT_ON_EMPTY, NULL); + } + assert(ctx.reply!=NULL); + struct swarmkv_reply *reply=NULL; + reply=ctx.reply; + ctx.reply=NULL; + return reply; +} +struct swarmkv_reply *swarmkv_command_on(struct swarmkv *db, const char *target, const char *format, ...) +{ + char *cmd_str=NULL; + va_list ap; + va_start(ap, format); + vasprintf(&cmd_str, format, ap); + va_end(ap); + + int argc=0; + sds *argv=NULL; + + argv=sdssplitargs(cmd_str, &argc); + size_t argv_len[argc]; + for(int i=0; i<argc; i++) + { + argv_len[i]=sdslen(argv[i]); + } + struct swarmkv_reply *reply=NULL; + reply=swarmkv_command_on_argv(db, target, argc, (const char**) argv, argv_len); + + if (argv) + { + sdsfreesplitres(argv, argc); + } + free(cmd_str); + + return reply; +} +struct swarmkv_reply *swarmkv_command(struct swarmkv *db, const char *format, ...) +{ + struct swarmkv_reply *reply=NULL; + char *cmd_str=NULL; + va_list ap; + va_start(ap, format); + vasprintf(&cmd_str, format, ap); + va_end(ap); + reply=swarmkv_command_on(db, NULL, cmd_str); + free(cmd_str); + return reply; +} +void swarmkv_async_command_on(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *target, const char *format, ...) +{ + int argc=0; sds *argv=NULL; + char *cmd_str=NULL; + + va_list ap; + va_start(ap, format); + vasprintf(&cmd_str, format, ap); + va_end(ap); + + argv=sdssplitargs(cmd_str, &argc); + size_t argv_len[argc]; + for(size_t i=0; i<argc; i++) + { + argv_len[i]=sdslen(argv[i]); + } + swarmkv_async_command_on_argv(db, cb, cb_arg, target, argc, (const char**)argv, argv_len); + + if (argv) + { + sdsfreesplitres(argv, argc); + } + free(cmd_str); +} +void swarmkv_async_command(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *format, ...) +{ + char *cmd_str=NULL; + va_list ap; + va_start(ap,format); + vasprintf(&cmd_str, format, ap); + va_end(ap); + swarmkv_async_command_on(db, cb, cb_arg, NULL, cmd_str); +} void swarmkv_set(struct swarmkv * db, const char * key, size_t keylen, const char * value, size_t vallen, swarmkv_on_reply_callback_t * cb, void * cb_arg) { - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(3); - cmd->argv[0]=sdsnew("set"); - cmd->argv[1]=sdsnewlen(key, keylen); - cmd->argv[2]=sdsnewlen(value, vallen); - - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); + const char *argv[3]; + size_t argv_len[3]; + argv[0]="set"; + argv_len[0]=4; + argv[1]=key; + argv_len[1]=keylen; + argv[2]=value; + argv_len[2]=vallen; + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 3, argv, argv_len); return; - } void swarmkv_get(struct swarmkv * db, const char *key, size_t keylen, swarmkv_on_reply_callback_t *cb, void *cb_arg) { - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(2); - cmd->argv[0]=sdsnew("get"); - cmd->argv[1]=sdsnewlen(key, keylen); - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); + const char *argv[2]; + size_t argv_len[2]; + argv[0]="get"; + argv_len[0]=4; + argv[1]=key; + argv_len[1]=keylen; + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2, argv, argv_len); return; } void swarmkv_expire(struct swarmkv *db, const char *key, size_t keylen, int seconds, swarmkv_on_reply_callback_t *cb, void *cb_arg) { - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(3); - cmd->argv[0]=sdsnew("expire"); - cmd->argv[1]=sdsnewlen(key, keylen); - cmd->argv[2]=sdsfromlonglong(seconds); - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); + const char *argv[3]; + size_t argv_len[3]; + argv[0]="expire"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + char buffer[32]; + sprintf(buffer, "%d", seconds); + argv[2]=buffer; + argv_len[2]=strlen(buffer); + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 3, argv, argv_len); return; } void swarmkv_ttl(struct swarmkv *db, const char *key, size_t keylen, swarmkv_on_reply_callback_t *cb, void *cb_arg) { - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(2); - cmd->argv[0]=sdsnew("ttl"); - cmd->argv[1]=sdsnewlen(key, keylen); - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); + const char *argv[2]; + size_t argv_len[2]; + argv[0]="ttl"; + argv_len[0]=4; + argv[1]=key; + argv_len[1]=keylen; + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2, argv, argv_len); return; } void swarmkv_persist(struct swarmkv *db, const char *key, size_t keylen, swarmkv_on_reply_callback_t *cb, void *cb_arg) { - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(2); - cmd->argv[0]=sdsnew("persist"); - cmd->argv[1]=sdsnewlen(key, keylen); - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); + const char *argv[2]; + size_t argv_len[2]; + argv[0]="persist"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2, argv, argv_len); return; } void swarmkv_sadd(struct swarmkv *db, const char* key, size_t keylen, const char *members[], const size_t members_len[], size_t n_members, swarmkv_on_reply_callback_t *cb, void *cb_arg) { - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(2+n_members); - cmd->argv[0]=sdsnew("sadd"); - cmd->argv[1]=sdsnewlen(key, keylen); + const char *argv[2+n_members]; + size_t argv_len[2+n_members]; + argv[0]="sadd"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; for(size_t i=0; i<n_members; i++) { - cmd->argv[2+i]=sdsnewlen(members[i], members_len[i]); + argv[2+i]=members[i]; + argv_len[2+i]=members_len[i]; } - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); - + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2+n_members, argv, argv_len); + return; } void swarmkv_srem(struct swarmkv *db, const char* key, size_t keylen, const char *members[], const size_t members_len[], size_t n_members, swarmkv_on_reply_callback_t *cb, void *cb_arg) { - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(2+n_members); - cmd->argv[0]=sdsnew("srem"); - cmd->argv[1]=sdsnewlen(key, keylen); + const char *argv[2+n_members]; + size_t argv_len[2+n_members]; + argv[0]="srem"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; for(size_t i=0; i<n_members; i++) { - cmd->argv[2+i]=sdsnewlen(members[i], members_len[i]); - } - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); - -} -void swarmkv_bfadd(struct swarmkv * db, const char * key, size_t keylen, const char *items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t *cb, void *cb_arg) -{ - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(2+n_items); - cmd->argv[0]=sdsnew("bfadd"); - cmd->argv[1]=sdsnewlen(key, keylen); - for(size_t i=0; i<n_items; i++) - { - cmd->argv[2+i]=sdsnewlen(items[i], items_len[i]); - } - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); -} -void swarmkv_bfexists(struct swarmkv * db, const char * key, size_t keylen, const char *items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t *cb, void *cb_arg) -{ - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(2+n_items); - cmd->argv[0]=sdsnew("bfexists"); - cmd->argv[1]=sdsnewlen(key, keylen); - for(size_t i=0; i<n_items; i++) - { - cmd->argv[2+i]=sdsnewlen(items[i], items_len[i]); + argv[2+i]=members[i]; + argv_len[2+i]=members_len[i]; } - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2+n_members, argv, argv_len); + return; } void swarmkv_sismember(struct swarmkv *db, const char* key, size_t keylen, const char *member, size_t member_len, swarmkv_on_reply_callback_t *cb, void *cb_arg) { - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(3); - cmd->argv[0]=sdsnew("sismember"); - cmd->argv[1]=sdsnewlen(key, keylen); - cmd->argv[2]=sdsnewlen(member, member_len); - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); + const char *argv[3]; + size_t argv_len[3]; + argv[0]="sismember"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + argv[2]=member; + argv_len[2]=member_len; + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 3, argv, argv_len); + return; } void swarmkv_smembers(struct swarmkv *db, const char* key, size_t keylen, swarmkv_on_reply_callback_t *cb, void *cb_arg) { - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(2); - cmd->argv[0]=sdsnew("smembers"); - cmd->argv[1]=sdsnewlen(key, keylen); - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); + const char *argv[2]; + size_t argv_len[2]; + argv[0]="smembers"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2, argv, argv_len); + return; } void swarmkv_scard(struct swarmkv *db, const char* key, size_t keylen, swarmkv_on_reply_callback_t *cb, void *cb_arg) { - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(2); - cmd->argv[0]=sdsnew("scard"); - cmd->argv[1]=sdsnewlen(key, keylen); - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); + const char *argv[2]; + size_t argv_len[2]; + argv[0]="scard"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2, argv, argv_len); + return; } int swarmkv_hset_string(struct swarmkv *db, @@ -307,218 +427,149 @@ int swarmkv_hset_string(struct swarmkv *db, return 0; } void swarmkv_del(struct swarmkv * db, const char * key, size_t keylen, swarmkv_on_reply_callback_t *cb, void *cb_arg) -{ - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(2); - cmd->argv[0]=sdsnew("del"); - cmd->argv[1]=sdsnewlen(key, keylen); - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); +{ + const char *argv[2]; + size_t argv_len[2]; + argv[0]="del"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2, argv, argv_len); return; } void swarmkv_incrby(struct swarmkv * db, const char * key, size_t keylen, long long increment, swarmkv_on_reply_callback_t *cb, void *cb_arg) -{ - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(3); - cmd->argv[0]=sdsnew("incrby"); - cmd->argv[1]=sdsnewlen(key, keylen); - cmd->argv[2]=sdsfromlonglong(increment); - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); +{ + const char *argv[3]; + size_t argv_len[3]; + argv[0]="incrby"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + char buffer[32]; + sprintf(buffer, "%lld", increment); + argv[2]=buffer; + argv_len[2]=strlen(buffer); + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 3, argv, argv_len); return; } void swarmkv_tconsume(struct swarmkv * db, const char * key, size_t keylen, long long tokens, swarmkv_on_reply_callback_t *cb, void *cb_arg) -{ - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(3); - cmd->argv[0]=sdsnew("tconsume"); - cmd->argv[1]=sdsnewlen(key, keylen); - cmd->argv[2]=sdsfromlonglong(tokens); - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); +{ + const char *argv[3]; + size_t argv_len[3]; + argv[0]="tconsume"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + char buffer[32]; + sprintf(buffer, "%lld", tokens); + argv[2]=buffer; + argv_len[2]=strlen(buffer); + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 3, argv, argv_len); return; } void swarmkv_ftconsume(struct swarmkv * db, const char * key, size_t keylen, const char * member, size_t member_len, long long weight, long long tokens, swarmkv_on_reply_callback_t *cb, void *cb_arg) -{ - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(5); - cmd->argv[0]=sdsnew("ftconsume"); - cmd->argv[1]=sdsnewlen(key, keylen); - cmd->argv[2]=sdsnewlen(member, member_len); - cmd->argv[3]=sdsfromlonglong(weight); - cmd->argv[4]=sdsfromlonglong(tokens); - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); +{ + const char *argv[5]; + size_t argv_len[5]; + argv[0]="ftconsume"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + argv[2]=member; + argv_len[2]=member_len; + char buffer1[32]; + sprintf(buffer1, "%lld", weight); + argv[3]=buffer1; + argv_len[3]=strlen(buffer1); + char buffer2[32]; + sprintf(buffer2, "%lld", tokens); + argv[4]=buffer2; + argv_len[4]=strlen(buffer2); + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 5, argv, argv_len); return; } void swarmkv_btconsume(struct swarmkv * db, const char * key, size_t keylen, const char * member, size_t member_len, long long tokens, swarmkv_on_reply_callback_t *cb, void *cb_arg) -{ - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(4); - cmd->argv[0]=sdsnew("btconsume"); - cmd->argv[1]=sdsnewlen(key, keylen); - cmd->argv[2]=sdsnewlen(member, member_len); - cmd->argv[3]=sdsfromlonglong(tokens); - exec_for_local(db, cmd, NULL, cb, cb_arg); - swarmkv_cmd_free(cmd); - return; -} -struct blocking_query_ctx { - struct swarmkv_reply *reply; - struct swarmkv *db; -}; -void blocking_query_cb(const struct swarmkv_reply *reply, void * arg) -{ - struct blocking_query_ctx *ctx=(struct blocking_query_ctx*) arg; - ctx->reply=swarmkv_reply_dup(reply); - swarmkv_caller_loop_break(ctx->db); + const char *argv[4]; + size_t argv_len[4]; + argv[0]="btconsume"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + argv[2]=member; + argv_len[2]=member_len; + char buffer[32]; + sprintf(buffer, "%lld", tokens); + argv[3]=buffer; + argv_len[3]=strlen(buffer); + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 4, argv, argv_len); return; } -struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *target, int argc, sds *argv) -{ - struct swarmkv_cmd *cmd=NULL; - struct swarmkv_reply *reply=NULL; - struct blocking_query_ctx ctx; - memset(&ctx, 0, sizeof(ctx)); - node_t target_node; - memset(&target_node, 0, sizeof(node_t)); - ctx.db=db; - ctx.reply=NULL; - cmd=swarmkv_cmd_new(argc); - for(int i=0; i<argc; i++) - { - cmd->argv[i]=sdsdup(argv[i]); - } - - if(!target) - { - exec_for_local(db, cmd, NULL, blocking_query_cb, &ctx); - } - else - { - node_init_from_sds(&target_node, target); - exec_for_local(db, cmd, &target_node, blocking_query_cb, &ctx); - } - long long pending_cmds=0; - pending_cmds=swarmkv_caller_get_pending_commands(db); - if(ctx.reply==NULL) - { - assert(pending_cmds==1); - swarmkv_caller_loop(db, SWARMKV_LOOP_NO_EXIT_ON_EMPTY, NULL); - } - assert(ctx.reply!=NULL); - reply=ctx.reply; - ctx.reply=NULL; - - swarmkv_cmd_free(cmd); - return reply; -} - -struct swarmkv_reply *swarmkv_command_on(struct swarmkv *db, const char *target, const char *format, ...) -{ - char *cmd_str=NULL; - va_list ap; - va_start(ap,format); - vasprintf(&cmd_str, format, ap); - va_end(ap); - - int argc=0; - sds *argv=NULL; - - argv=sdssplitargs(cmd_str, &argc); - struct swarmkv_reply *reply=NULL; - reply=swarmkv_command_on_argv(db, target, argc, argv); - - if (argv) - { - sdsfreesplitres(argv, argc); - } - free(cmd_str); - - return reply; -} -struct swarmkv_reply *swarmkv_command(struct swarmkv *db, const char *format, ...) +void swarmkv_bfadd(struct swarmkv * db, const char * key, size_t keylen, const char *items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t *cb, void *cb_arg) { - char *cmd_str=NULL; - va_list ap; - va_start(ap, format); - vasprintf(&cmd_str, format, ap); - va_end(ap); - - int argc=0; - sds *argv=NULL; - argv=sdssplitargs(cmd_str, &argc); - struct swarmkv_reply *reply=NULL; - reply=swarmkv_command_on_argv(db, NULL, argc, argv); - - if (argv) + const char *argv[2+n_items]; + size_t argv_len[2+n_items]; + argv[0]="bfadd"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + for(size_t i=0; i<n_items; i++) { - sdsfreesplitres(argv, argc); + argv[2+i]=items[i]; + argv_len[2+i]=items_len[i]; } - free(cmd_str); - - return reply; + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2+n_items, argv, argv_len); + return; } -void swarmkv_async_command_on_argv(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *target, int argc, sds *argv) +void swarmkv_bfmexists(struct swarmkv * db, const char * key, size_t keylen, const char *items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t *cb, void *cb_arg) { - struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(argc); - for(size_t i=0; i<argc; i++) + const char *argv[2+n_items]; + size_t argv_len[2+n_items]; + argv[0]="bfmexists"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + for(size_t i=0; i<n_items; i++) { - cmd->argv[i]=sdsdup(argv[i]); + argv[2+i]=items[i]; + argv_len[2+i]=items_len[i]; } - - if(!target) - { - exec_for_local(db, cmd, NULL, cb, cb_arg); - } - else - { - node_t target_node; - memset(&target_node, 0, sizeof(node_t)); - node_init_from_sds(&target_node, target); - exec_for_local(db, cmd, &target_node, cb, cb_arg); - } - swarmkv_cmd_free(cmd); + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2+n_items, argv, argv_len); + return; } -void swarmkv_async_command_on(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *target, const char *format, ...) +void swarmkv_cmsincrby(struct swarmkv * db, const char * key, size_t keylen, const char * items[], const size_t items_len[], long long increments[], size_t n_increment, swarmkv_on_reply_callback_t * cb, void * cb_arg) { - int argc=0; sds *argv=NULL; - char *cmd_str=NULL; - - va_list ap; - va_start(ap,format); - vasprintf(&cmd_str, format, ap); - va_end(ap); - - argv=sdssplitargs(cmd_str, &argc); - - swarmkv_async_command_on_argv(db, cb, cb_arg, target, argc, argv); - - if (argv) + const char *argv[2+2*n_increment]; + size_t argv_len[2+2*n_increment]; + char inc_buffer[n_increment][32]; + argv[0]="cmsincrby"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + for(size_t i=0; i<n_increment; i++) { - sdsfreesplitres(argv, argc); + sprintf(inc_buffer[i], "%lld", increments[i]); + argv[2+2*i]=items[i]; + argv_len[2+2*i]=items_len[i]; + argv[3+2*i]=inc_buffer[i]; + argv_len[3+2*i]=strlen(inc_buffer[i]); } - free(cmd_str); + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2+2*n_increment, argv, argv_len); + return; } -void swarmkv_async_command(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *format, ...) +void swarmkv_cmsmquery(struct swarmkv * db, const char * key, size_t keylen, const char * items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t * cb, void * cb_arg) { - int argc=0; sds *argv=NULL; - char *cmd_str=NULL; - - va_list ap; - va_start(ap,format); - vasprintf(&cmd_str, format, ap); - va_end(ap); - - argv=sdssplitargs(cmd_str, &argc); - - swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, argc, argv); - - if (argv) + const char *argv[2+n_items]; + size_t argv_len[2+n_items]; + argv[0]="cmsmquery"; + argv_len[0]=strlen(argv[0]); + argv[1]=key; + argv_len[1]=keylen; + for(size_t i=0; i<n_items; i++) { - sdsfreesplitres(argv, argc); + argv[2+i]=items[i]; + argv_len[2+i]=items_len[i]; } - free(cmd_str); + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2+n_items, argv, argv_len); + return; }
\ No newline at end of file diff --git a/src/swarmkv_common.c b/src/swarmkv_common.c index faa1a04..d915b8a 100644 --- a/src/swarmkv_common.c +++ b/src/swarmkv_common.c @@ -287,6 +287,10 @@ sds node_addr2sds(const node_t *node) { return sdsnew(node->addr); } +const char *node_addr2cstr(const node_t *node) +{ + return node->addr; +} int node_init_from_sds(node_t *node, const char *addr_str) { memset(node, 0, sizeof(node_t)); @@ -348,7 +352,7 @@ void node_init(node_t *node, const char *ipv4, unsigned int port) snprintf(node->addr, sizeof(node->addr), "%s:%u", ipv4, port); return; } -void node_init_from_string(node_t *node, const char *addr_str) +void node_init_from_cstr(node_t *node, const char *addr_str) { memset(node, 0, sizeof(node_t)); assert(strlen(addr_str)<sizeof(node_t)); @@ -356,6 +360,13 @@ void node_init_from_string(node_t *node, const char *addr_str) strncpy(node->addr, addr_str, sizeof(node->addr)); return; } +void node_init_from_string(node_t *node, const char *addr_str, size_t sz_addr) +{ + memset(node, 0, sizeof(node_t)); + assert(sz_addr<=sizeof(node->addr)); + strncpy(node->addr, addr_str, sz_addr); + return; +} int node_init_from_reply(node_t *node, const struct swarmkv_reply *reply) { if(reply->type!=SWARMKV_REPLY_NODE) @@ -368,7 +379,7 @@ int node_init_from_reply(node_t *node, const struct swarmkv_reply *reply) { p+=5; } - node_init_from_string(node, p); + node_init_from_cstr(node, p); return 0; } int node_list_new_from_reply(node_t **node_list, size_t *n_node, const struct swarmkv_reply *reply) @@ -559,10 +570,11 @@ void swarmkv_reply_free(struct swarmkv_reply *reply) } free(reply); } -struct swarmkv_cmd *swarmkv_cmd_new(size_t argc) +struct swarmkv_cmd *swarmkv_cmd_new(size_t argc, const node_t *caller) { size_t size=sizeof(struct swarmkv_cmd)+argc*sizeof(sds); struct swarmkv_cmd *cmd=(struct swarmkv_cmd*)malloc(size); + node_copy(&cmd->caller, caller); cmd->argc=argc; cmd->argv=(sds*)((char*)cmd+sizeof(struct swarmkv_cmd)); return cmd; @@ -580,7 +592,7 @@ void swarmkv_cmd_free(struct swarmkv_cmd *p) } struct swarmkv_cmd* swarmkv_cmd_dup(const struct swarmkv_cmd *origin) { - struct swarmkv_cmd* copy=swarmkv_cmd_new(origin->argc); + struct swarmkv_cmd* copy=swarmkv_cmd_new(origin->argc, &origin->caller); size_t i=0; for(i=0; i<origin->argc; i++) { @@ -657,7 +669,7 @@ void json2keyslots(sds json_buffer, void *slot_container_base, size_t sz_slot_co { iterator=(struct key_slot*)(slot_container_base+sz_slot_container*j+offset_slot); iterator->slot_id=j; - node_init_from_string(&iterator->owner, owner->valuestring); + node_init_from_cstr(&iterator->owner, owner->valuestring); k++; } } @@ -714,7 +726,7 @@ void leader_response2leader_node(const char *resp_body, node_t *leader) cJSON *item=NULL; root=cJSON_Parse(resp_body); item=cJSON_GetObjectItem(root, "node"); - node_init_from_string(leader, item->valuestring);; + node_init_from_cstr(leader, item->valuestring);; cJSON_Delete(root); } /* This is an modified version of keyHashSlot of Redis source code. diff --git a/src/swarmkv_common.h b/src/swarmkv_common.h index 3d427cf..d4f53e5 100644 --- a/src/swarmkv_common.h +++ b/src/swarmkv_common.h @@ -85,11 +85,12 @@ struct swarmkv_options struct swarmkv_cmd { + node_t caller; size_t argc; sds *argv; }; -struct swarmkv_cmd *swarmkv_cmd_new(size_t argc); +struct swarmkv_cmd *swarmkv_cmd_new(size_t argc, const node_t *originator); void swarmkv_cmd_free(struct swarmkv_cmd *p); struct swarmkv_cmd *swarmkv_cmd_dup(const struct swarmkv_cmd *origin); int swarmkv_cmd_parse_integer(const struct swarmkv_cmd *cmd, const char *name, long long *ival); @@ -117,21 +118,22 @@ int node_list_remove(node_t *list, size_t n_list, const node_t *node); sds node_print_json(const node_t *self, uuid_t uuid); sds node_addr2sds(const node_t *node); +const char *node_addr2cstr(const node_t *node); int node_init_from_sds(node_t *node, const char *addr_str); int node_init_from_reply(node_t *node, const struct swarmkv_reply *reply); +void node_init(node_t *node, const char *ipv4, unsigned int port); +void node_init_from_cstr(node_t *node, const char *addr_str); +void node_init_from_string(node_t *node, const char *addr_str, size_t sz_addr); + /* Return 0 if node1 == node 2 */ int node_compare(const node_t *node1, const node_t *node2); void node_copy(node_t *dst, const node_t *src); +int node_sanity(const node_t *node); int node_is_empty(const node_t *node); -void node_init(node_t *node, const char *ipv4, unsigned int port); -void node_init_from_string(node_t *node, const char *addr_str); + int node_parse(const node_t *node, unsigned int *port, char *addr_str, size_t sz_addr); void node_init_from_sockaddr(node_t *node, const struct sockaddr * sa); void node_to_sockaddr(const node_t *node, struct sockaddr *sockaddr); int http_blocking_request(enum evhttp_cmd_type type, const char* host, unsigned int port, const char* url, sds req_body, sds *resp_body); -//typedef void exec_cmd_func(struct swarmkv *db, const node_t *target_node, const struct swarmkv_cmd *cmd, struct future *future_of_caller); -struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *target, int argc, sds *argv); -void swarmkv_async_command_on_argv(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *target, int argc, sds *argv); - -int __gettid(struct swarmkv *db);
\ No newline at end of file +const node_t *swarmkv_self_node(const struct swarmkv *db);
\ No newline at end of file diff --git a/src/swarmkv_error.h b/src/swarmkv_error.h index 221d107..5b6522b 100644 --- a/src/swarmkv_error.h +++ b/src/swarmkv_error.h @@ -9,6 +9,7 @@ #define error_arg_not_valid_address "ERR arg `%s` is not `IP:port` format" #define error_arg_not_valid_float "ERR arg `%s` is not a valid float or out of range" #define error_arg_not_valid_uuid "ERR arg `%s` is not a valid UUID" +#define error_arg_not_valid_node "ERR arg `%s` is not a valid IP:port" #define error_arg_parse_failed "ERR arg `%s` parse failed" #define error_arg_string_should_be "ERR arg `%s` should be `%s`" #define error_need_additional_arg "ERR arg `%s` should be fllowed by more args" diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c index 1ca01c5..582cf52 100644 --- a/src/swarmkv_keyspace.c +++ b/src/swarmkv_keyspace.c @@ -156,19 +156,23 @@ void crdt_op_on_reply(const struct swarmkv_reply *reply, void * arg) void key_entry_deletion_notification(struct key_route_entry *key_entry, struct swarmkv *exec_cmd_handle) { struct replica_node *replica=NULL, *tmp=NULL; - struct swarmkv_cmd *crdt_del_cmd=swarmkv_cmd_new(3); - crdt_del_cmd->argv[0]=sdsnew("crdt"); - crdt_del_cmd->argv[1]=sdsnew("del"); - crdt_del_cmd->argv[2]=sdsdup(key_entry->key); + const char *argv[3]; + size_t argv_len[3]; + argv[0]="crdt"; + argv_len[0]=strlen(argv[0]); + argv[1]="del"; + argv_len[1]=strlen(argv[1]); + argv[2]=key_entry->key; + argv_len[2]=sdslen(key_entry->key); + HASH_ITER(hh, key_entry->hash_replica, replica, tmp) { struct crdt_op_ctx *ctx=ALLOC(struct crdt_op_ctx, 1); node_copy(&ctx->peer, &replica->node); ctx->key=sdsdup(key_entry->key); ctx->is_del=1; - swarmkv_async_command_on_argv(exec_cmd_handle, crdt_op_on_reply, ctx, replica->node.addr, crdt_del_cmd->argc, crdt_del_cmd->argv); + swarmkv_async_command_on_argv(exec_cmd_handle, crdt_op_on_reply, ctx, replica->node.addr, 3, argv, argv_len); } - swarmkv_cmd_free(crdt_del_cmd); return; } @@ -176,14 +180,18 @@ void key_entry_meet_replica(struct key_route_entry *key_entry, struct swarmkv *e { struct replica_node *replica=NULL, *tmp=NULL; int n_replica=HASH_COUNT(key_entry->hash_replica), i=0; - struct swarmkv_cmd *crdt_meet_cmd=swarmkv_cmd_new(3+n_replica); - crdt_meet_cmd->argv[0]=sdsnew("crdt"); - crdt_meet_cmd->argv[1]=sdsnew("meet"); - crdt_meet_cmd->argv[2]=sdsdup(key_entry->key); - + const char *argv[3+n_replica]; + size_t argv_len[3+n_replica]; + argv[0]="crdt"; + argv_len[0]=strlen(argv[0]); + argv[1]="meet"; + argv_len[1]=strlen(argv[1]); + argv[2]=key_entry->key; + argv_len[2]=sdslen(key_entry->key); HASH_ITER(hh, key_entry->hash_replica, replica, tmp) { - crdt_meet_cmd->argv[3+i]=node_addr2sds(&replica->node); + argv[3+i]=node_addr2cstr(&replica->node); + argv_len[3+i]=strlen(argv[3+i]); i++; } assert(i==n_replica); @@ -192,9 +200,8 @@ void key_entry_meet_replica(struct key_route_entry *key_entry, struct swarmkv *e struct crdt_op_ctx *ctx=ALLOC(struct crdt_op_ctx, 1); node_copy(&ctx->peer, &replica->node); ctx->key=sdsdup(key_entry->key); - swarmkv_async_command_on_argv(exec_cmd_handle, crdt_op_on_reply, ctx, replica->node.addr, crdt_meet_cmd->argc, crdt_meet_cmd->argv); + swarmkv_async_command_on_argv(exec_cmd_handle, crdt_op_on_reply, ctx, replica->node.addr, 3+n_replica, argv, argv_len); } - swarmkv_cmd_free(crdt_meet_cmd); return; } @@ -1443,7 +1450,7 @@ enum cmd_exec_result keyspace_getkeysinslot_command(struct swarmkv_module *mod_k static int ks_get_tid(struct swarmkv_keyspace *ks, int slot_id) { int tid=swarmkv_keyspace_slot2tid(&ks->module, slot_id); - int real_tid=__gettid(ks->exec_cmd_handle); + int real_tid=swarmkv_gettid(ks->exec_cmd_handle); assert(tid==real_tid); return tid; } @@ -1646,7 +1653,7 @@ enum cmd_exec_result keyspace_keys_command(struct swarmkv_module *mod_keyspace, int is_matched=0; UT_array *matched_replies=NULL; struct swarmkv_reply *r=NULL; - int real_tid=__gettid(ks->exec_cmd_handle); + int real_tid=swarmkv_gettid(ks->exec_cmd_handle); int thread_id=atoll(cmd->argv[2]); assert(real_tid==thread_id); const sds pattern=cmd->argv[3]; @@ -1903,7 +1910,7 @@ int swarmkv_keyspace_slot2tid(struct swarmkv_module *mod_keyspace, int slot_id) void swarmkv_keyspace_periodic(struct swarmkv_module *mod_keyspace, int thread_id) { struct swarmkv_keyspace *ks = module2keyspace(mod_keyspace); - int real_tid=__gettid(ks->exec_cmd_handle); + int real_tid=swarmkv_gettid(ks->exec_cmd_handle); assert(real_tid==thread_id); struct ks_thread *thr=ks->threads+thread_id; diff --git a/src/swarmkv_mesh.c b/src/swarmkv_mesh.c index 0ffc636..62d6129 100644 --- a/src/swarmkv_mesh.c +++ b/src/swarmkv_mesh.c @@ -44,7 +44,7 @@ int swarmkv_mesh_send(struct swarmkv_mesh *mesh, int current_thread_id, int dest assert(current_thread_id != dest_thread_id); struct swarmkv_mesh_thread *curr_thr=mesh->threads+current_thread_id; struct swarmkv_mesh_thread *dest_thr=mesh->threads+dest_thread_id; - int tid=__gettid((struct swarmkv *)(mesh->on_msg_cb_arg)); + int tid=swarmkv_gettid((struct swarmkv *)(mesh->on_msg_cb_arg)); assert(tid==current_thread_id); ringbuf_t *dest_ring=dest_thr->ring; assert(msg->magic == SWARMKV_MSG_MAGIC); @@ -88,7 +88,7 @@ static void mesh_on_eventfd_read_cb(evutil_socket_t fd, short what, void * arg) ringbuf_t *ring=thr->ring; uint64_t n_msg=0; - int tid=__gettid((struct swarmkv *)(mesh->on_msg_cb_arg)); + int tid=swarmkv_gettid((struct swarmkv *)(mesh->on_msg_cb_arg)); assert(tid==thr->thread_id); ssize_t s = read(thr->efd, &n_msg, sizeof(uint64_t)); diff --git a/src/swarmkv_message.c b/src/swarmkv_message.c index 8db7ac9..6f2607b 100644 --- a/src/swarmkv_message.c +++ b/src/swarmkv_message.c @@ -75,19 +75,24 @@ static struct swarmkv_cmd *swarmkv_cmd_deserialize(const char *blob, size_t size mpack_tree_init_data(&tree, blob, size); mpack_tree_parse(&tree); - mpack_node_t item; + mpack_node_t item, array_node; mpack_node_t root=mpack_tree_root(&tree); struct swarmkv_cmd *cmd=NULL; const char *arg=NULL; - size_t arg_sz=0; - - cmd=swarmkv_cmd_new(mpack_node_array_length(root)); + size_t sz=0; + node_t caller; + item=mpack_node_map_cstr(root, "caller"); + node_init_from_string(&caller, mpack_node_str(item), mpack_node_strlen(item)); + assert(node_sanity(&caller)); + //assert(sz==sizeof(node_t)); + array_node=mpack_node_map_cstr(root, "argv"); + cmd=swarmkv_cmd_new(mpack_node_array_length(array_node), &caller); for(size_t i=0; i<cmd->argc; i++) { - item=mpack_node_array_at(root, i); + item=mpack_node_array_at(array_node, i); arg=mpack_node_str(item); - arg_sz=mpack_node_strlen(item); - cmd->argv[i]=sdsnewlen(arg, arg_sz); + sz=mpack_node_strlen(item); + cmd->argv[i]=sdsnewlen(arg, sz); } if (mpack_tree_destroy(&tree) != mpack_ok) { fprintf(stderr, "An error occurred decoding a cmd blob!\n"); @@ -164,15 +169,20 @@ static void swarmkv_cmd_serialize(const struct swarmkv_cmd *cmd, char **blob, si char *root_mpack_buff=NULL; size_t root_mpack_sz=0; - mpack_writer_t writer; + mpack_writer_t writer; size_t i=0; mpack_writer_init_growable(&writer, &root_mpack_buff, &root_mpack_sz); + mpack_build_map(&writer); + mpack_write_cstr(&writer, "caller"); + mpack_write_cstr(&writer, node_addr2cstr(&cmd->caller)); + mpack_write_cstr(&writer, "argv"); mpack_build_array(&writer); for(i=0; i<cmd->argc; i++) { mpack_write_str(&writer, cmd->argv[i], (uint32_t)sdslen(cmd->argv[i])); } mpack_complete_array(&writer); + mpack_complete_map(&writer); if (mpack_writer_destroy(&writer) != mpack_ok) { fprintf(stderr, "An error occurred encoding the cmd!\n"); diff --git a/src/swarmkv_monitor.c b/src/swarmkv_monitor.c index 0856b1c..e0c070c 100644 --- a/src/swarmkv_monitor.c +++ b/src/swarmkv_monitor.c @@ -2,6 +2,7 @@ #include "sds.h" #include "log.h" #include "uthash.h" +#include "utlist.h" #include "swarmkv_monitor.h" #include "swarmkv_common.h" #include "swarmkv_utils.h" @@ -220,6 +221,31 @@ struct swarmkv_reply *recorder_metric_to_reply(const struct recorder_metric *met reply->elements[1]=swarmkv_reply_new_string_fmt(metric_buff); return reply; } +struct monitor_client +{ + node_t node; + sds pattern; + struct monitor_client *next; +}; +struct monitor_client *monitor_client_new(const node_t *node, sds pattern) +{ + struct monitor_client *client=ALLOC(struct monitor_client, 1); + node_copy(&client->node, node); + client->pattern = pattern? sdsdup(pattern):NULL; + return client; +} +void monitor_client_free(struct monitor_client *client) +{ + sdsfree(client->pattern); + free(client); + return; +} +struct swarmkv_monitor_thread +{ + int n_client; + struct monitor_client *clients; + pthread_mutex_t mutex; +}; struct swarmkv_monitor { struct swarmkv_module module; @@ -229,7 +255,9 @@ struct swarmkv_monitor long long max_latency_usec; int significant_figures; int nr_worker_threads; - pthread_mutex_t mutex;//only one latency command can be executed at a time + struct swarmkv_monitor_thread *threads; + pthread_mutex_t latency_execute_mutex; //only one latency command can be executed at a time + struct swarmkv *ref_db; }; struct swarmkv_monitor *module2monitor(struct swarmkv_module *module) @@ -239,7 +267,7 @@ struct swarmkv_monitor *module2monitor(struct swarmkv_module *module) assert(monitor==module->mod_ctx); return monitor; } -struct swarmkv_module *swarmkv_monitor_new(const struct swarmkv_options *opts) +struct swarmkv_module *swarmkv_monitor_new(const struct swarmkv_options *opts, struct swarmkv *db) { struct swarmkv_monitor *monitor=ALLOC(struct swarmkv_monitor, 1); monitor->max_latency_usec=opts->cluster_timeout_us; @@ -247,7 +275,14 @@ struct swarmkv_module *swarmkv_monitor_new(const struct swarmkv_options *opts) monitor->peers=ALLOC(struct recorder *, monitor->nr_worker_threads); strncpy(monitor->module.name, "monitor", sizeof(monitor->module.name)); monitor->module.mod_ctx=monitor; - pthread_mutex_init(&monitor->mutex, NULL); + pthread_mutex_init(&monitor->latency_execute_mutex, NULL); + monitor->threads=ALLOC(struct swarmkv_monitor_thread, monitor->nr_worker_threads); + for(size_t i=0; i<monitor->nr_worker_threads; i++) + { + pthread_mutex_init(&monitor->threads[i].mutex, PTHREAD_PROCESS_PRIVATE); + monitor->threads[i].clients=NULL; + } + monitor->ref_db=db; return &monitor->module; } void swarmkv_monitor_register_command(struct swarmkv_module *mod_monitor, const char *command) @@ -274,15 +309,142 @@ void swarmkv_monitor_free(struct swarmkv_module *mod_monitor) for(size_t i=0; i<monitor->nr_worker_threads; i++) { recorder_table_free(monitor->peers+i); + struct monitor_client *client=NULL, *tmp=NULL; + LL_FOREACH_SAFE(monitor->threads[i].clients, client, tmp) + { + LL_DELETE(monitor->threads[i].clients, client); + monitor_client_free(client); + } } + free(monitor->threads); free(monitor->peers); monitor->peers=NULL; free(monitor); } -void swarmkv_monitor_record_command(struct swarmkv_module *mod_monitor, const char *cmd_name, long long latency_usec) +struct print_ctx +{ + node_t node; + int tid; + struct swarmkv_monitor *monitor; +}; +static void print_on_reply(const struct swarmkv_reply *reply, void *user) +{ + struct print_ctx *ctx=(struct print_ctx *)user; + int tid=swarmkv_gettid(ctx->monitor->ref_db); + assert(tid==ctx->tid); + if(reply->type != SWARMKV_REPLY_STATUS) + { + struct monitor_client *client=NULL, *tmp=NULL; + pthread_mutex_lock(&ctx->monitor->threads[ctx->tid].mutex); + LL_FOREACH_SAFE(ctx->monitor->threads[ctx->tid].clients, client, tmp) + { + if(0==node_compare(&client->node, &ctx->node)) + { + ctx->monitor->threads[ctx->tid].n_client--; + LL_DELETE(ctx->monitor->threads[ctx->tid].clients, client); + monitor_client_free(client); + } + } + pthread_mutex_unlock(&ctx->monitor->threads[ctx->tid].mutex); + } + free(ctx); +} +static const char *exec_ret2string(enum cmd_exec_result ret) +{ + switch(ret) + { + case NEED_KEY_ROUTE: + return "KRT"; + case REDIRECT: + return "RDR"; + case FINISHED: + return "FIN"; + default: + assert(0); + } +} +void swarmkv_monitor_record_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd_spec *spec, const struct swarmkv_cmd *cmd, long long latency_usec, enum cmd_exec_result result) { struct swarmkv_monitor *monitor=module2monitor(mod_monitor); - recorder_table_record_latency(&monitor->commands, cmd_name, strlen(cmd_name), latency_usec, monitor->max_latency_usec, 0); + recorder_table_record_latency(&monitor->commands, spec->name, strlen(spec->name), latency_usec, monitor->max_latency_usec, 0); + int tid=swarmkv_gettid(monitor->ref_db); + assert(tid<monitor->nr_worker_threads); + if(0==strncasecmp(spec->name, "PRINT", strlen("PRINT"))) + { + //Do not log print command to avoid infinit recursion. + return; + } + if(monitor->threads[tid].n_client == 0) + { + //No monitor client registered. + return; + } + + struct monitor_client *client=NULL; + int matched_client=0; + node_t matched_client_nodes[monitor->threads[tid].n_client]; + pthread_mutex_lock(&monitor->threads[tid].mutex); + LL_FOREACH(monitor->threads[tid].clients, client) + { + if(client->pattern) + { + int is_matched=0; + for(int i=0; i<cmd->argc; i++) + { + is_matched=stringmatchlen(client->pattern, sdslen(client->pattern), cmd->argv[i], sdslen(cmd->argv[i]), 0); + if(is_matched) + { + break; + } + } + if(!is_matched) + { + continue; + } + } + node_copy(matched_client_nodes+matched_client, &client->node); + matched_client++; + } + //To avoid dead lock, we should not hold the lock when executing command. + pthread_mutex_unlock(&monitor->threads[tid].mutex); + assert(matched_client <= monitor->threads[tid].n_client); + if(matched_client==0) + { + return; + } + + sds cmdrepr = sdsempty(); + struct timeval tv; + gettimeofday(&tv, NULL); + cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec, (long)tv.tv_usec); + cmdrepr = sdscatprintf(cmdrepr,"[%d %c %s %s] ", tid, + 0==node_compare(&cmd->caller, swarmkv_self_node(monitor->ref_db))?'L':'R', + node_addr2cstr(&cmd->caller), + exec_ret2string(result)); + for(int i=0; i<cmd->argc; i++) + { + cmdrepr = sdscatrepr(cmdrepr, (char*)cmd->argv[i], sdslen(cmd->argv[i])); + if (i != cmd->argc-1) + cmdrepr = sdscatlen(cmdrepr," ",1); + } + cmdrepr = sdscatprintf(cmdrepr, "\r\n"); + + const char *print_cmd_argv[2]; + size_t print_cmd_argv_len[2]; + print_cmd_argv[0]="PRINT"; + print_cmd_argv_len[0]=strlen(print_cmd_argv[0]); + print_cmd_argv[1]=cmdrepr; + print_cmd_argv_len[1]=sdslen(cmdrepr); + + for(int i=0; i<matched_client; i++) + { + struct print_ctx *print_ctx=ALLOC(struct print_ctx, 1); + node_copy(&print_ctx->node, &matched_client_nodes[i]); + print_ctx->tid=tid; + print_ctx->monitor=monitor; + swarmkv_async_command_on_argv(monitor->ref_db, print_on_reply, print_ctx, print_ctx->node.addr, 2, print_cmd_argv, print_cmd_argv_len); + } + sdsfree(cmdrepr); return; } void swarmkv_monitor_record_peer(struct swarmkv_module *mod_monitor, node_t *peer, long long latency_usec, int thread_id) @@ -333,7 +495,7 @@ struct swarmkv_reply *latency_generic(struct recorder **table, const char *key) enum cmd_exec_result latency_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) { struct swarmkv_monitor *monitor=module2monitor(mod_monitor); - pthread_mutex_lock(&monitor->mutex); + pthread_mutex_lock(&monitor->latency_execute_mutex); if(cmd->argc==2 && !strcasecmp(cmd->argv[1], "help") ) { const char *help = { @@ -408,15 +570,43 @@ enum cmd_exec_result latency_command(struct swarmkv_module *mod_monitor, const s { *reply=swarmkv_reply_new_error(erorr_subcommand_syntax, cmd->argv[1], cmd->argv[0]); } - pthread_mutex_unlock(&monitor->mutex); + pthread_mutex_unlock(&monitor->latency_execute_mutex); return FINISHED; } -enum cmd_exec_result lastcmds_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) +enum cmd_exec_result monreg_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) { -/*LASTCMDS [offset]*/ +/* MONREG IP:port [pattern]*/ struct swarmkv_monitor *monitor=module2monitor(mod_monitor); - pthread_mutex_lock(&monitor->mutex); - - pthread_mutex_unlock(&monitor->mutex); + node_t node; + int ret=0; + ret=node_init_from_sds(&node, cmd->argv[1]); + if(ret<0) + { + *reply=swarmkv_reply_new_error(error_arg_not_valid_node); + return FINISHED; + } + for(int i=0; i<monitor->nr_worker_threads; i++) + { + struct monitor_client *client=NULL; + int found=0; + pthread_mutex_lock(&monitor->threads[i].mutex); + LL_FOREACH(monitor->threads[i].clients, client) + { + if(0==node_compare(&client->node, &node)) + { + sdsfree(client->pattern); + client->pattern=cmd->argc>2?sdsdup(cmd->argv[2]):NULL; + found=1; + } + } + if(!found) + { + client=monitor_client_new(&node, cmd->argc>2?cmd->argv[2]:NULL); + LL_APPEND(monitor->threads[i].clients, client); + monitor->threads[i].n_client++; + } + pthread_mutex_unlock(&monitor->threads[i].mutex); + } + *reply=swarmkv_reply_new_status("OK"); return FINISHED; }
\ No newline at end of file diff --git a/src/swarmkv_monitor.h b/src/swarmkv_monitor.h index 67273ea..f134618 100644 --- a/src/swarmkv_monitor.h +++ b/src/swarmkv_monitor.h @@ -6,12 +6,13 @@ #include <unistd.h> #include <string.h> -struct swarmkv_module *swarmkv_monitor_new(const struct swarmkv_options *opts); +struct swarmkv_module *swarmkv_monitor_new(const struct swarmkv_options *opts, struct swarmkv *db); void swarmkv_monitor_register_command(struct swarmkv_module *mod_monitor, const char *command); void swarmkv_monitor_free(struct swarmkv_module *mod_monitor); -void swarmkv_monitor_record_command(struct swarmkv_module *mod_monitor, const char *cmd_name, long long latency_usec); +void swarmkv_monitor_record_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd_spec *spec, const struct swarmkv_cmd *cmd, long long latency_usec, enum cmd_exec_result result); void swarmkv_monitor_register_event(struct swarmkv_module *mod_monitor, const char *event); void swarmkv_monitor_record_peer(struct swarmkv_module *mod_monitor, node_t *peer, long long latency_usec, int thread_id); void swarmkv_monitor_record_event(struct swarmkv_module *mod_monitor, const char *event_name, long long latency_usec); -enum cmd_exec_result latency_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
\ No newline at end of file +enum cmd_exec_result latency_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); +enum cmd_exec_result monreg_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
\ No newline at end of file diff --git a/src/swarmkv_net.c b/src/swarmkv_net.c index 4607d1d..0ead34b 100644 --- a/src/swarmkv_net.c +++ b/src/swarmkv_net.c @@ -618,7 +618,7 @@ void swarmkv_net_set_on_msg_callback(struct swarmkv_net *net, on_msg_callback_t #define MAX_OUTPUT_BUFFER_SIZE 1024*1024*64 int swarmkv_net_send(struct swarmkv_net *net, const node_t *dest, struct swarmkv_msg *msg, const char **err_str) { - int tid=__gettid((struct swarmkv *)(net->on_msg_cb_arg)); + int tid=swarmkv_gettid((struct swarmkv *)(net->on_msg_cb_arg)); assert(tid<net->nr_threads); struct snet_thread *thr=net->threads+tid; struct snet_conn *conn=NULL; diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c index 189a561..f1a3ac8 100644 --- a/src/swarmkv_store.c +++ b/src/swarmkv_store.c @@ -294,7 +294,7 @@ struct scontainer *store_lookup_scontainer(struct swarmkv_store *store, sds key) { struct scontainer *ctr=NULL; int designated_tid=key2tid(key, store->opts->nr_worker_threads); - int real_tid=__gettid(store->exec_cmd_handle); + int real_tid=swarmkv_gettid(store->exec_cmd_handle); assert(designated_tid==real_tid); ctr=scontainer_find(&(store->threads[designated_tid].obj_table), key); if(0==pthread_mutex_trylock(&store->threads[designated_tid].sanity_lock)) @@ -541,16 +541,16 @@ static void crdt_generic_on_reply(const struct swarmkv_reply *reply, void *user) return; } -void crdt_generic_call(struct swarmkv_store *store, enum CRDT_OP op, const struct swarmkv_cmd *cmd, const node_t *peer) +void crdt_generic_call(struct swarmkv_store *store, enum CRDT_OP op, const node_t *peer, int argc, const char *argv[], size_t *argv_len) { struct crdt_generic_ctx *ctx=NULL; assert(peer); ctx=ALLOC(struct crdt_generic_ctx, 1); ctx->op=op; ctx->store=store; - ctx->key=sdsdup(cmd->argv[2]); + ctx->key=sdsnew(argv[2]); node_copy(&ctx->peer, peer); - swarmkv_async_command_on_argv(store->exec_cmd_handle, crdt_generic_on_reply, ctx, peer->addr, cmd->argc, cmd->argv); + swarmkv_async_command_on_argv(store->exec_cmd_handle, crdt_generic_on_reply, ctx, peer->addr, argc, argv, argv_len); return; } #define MONITOR_SYNC_EVENT_NAME "crdt-sync-cycle" @@ -574,17 +574,21 @@ int store_batch_sync(struct swarmkv_store *store, int tid) } else { - struct swarmkv_cmd *cmd=swarmkv_cmd_new(4); - cmd->argv[0]=sdsnew("crdt"); - cmd->argv[1]=sdsnew("merge"); - cmd->argv[2]=sdsnew(ctr->obj.key); - cmd->argv[3]=sdsnewlen(blob, blob_sz); + const char *argv[4]; + size_t argv_len[4]; + argv[0]="crdt"; + argv_len[0]=strlen(argv[0]); + argv[1]="merge"; + argv_len[1]=strlen(argv[1]); + argv[2]=ctr->obj.key; + argv_len[2]=sdslen(ctr->obj.key); + argv[3]=blob; + argv_len[3]=blob_sz; for(int i=0; i<utarray_len(ctr->replica_node_list); i++) { node_t *peer=utarray_eltptr(ctr->replica_node_list, i); - crdt_generic_call(store, CRDT_MERGE, cmd, peer); + crdt_generic_call(store, CRDT_MERGE, peer, 4, argv, argv_len); } - swarmkv_cmd_free(cmd); free(blob); } DL_DELETE(thr->sync_queue, ctr); @@ -593,16 +597,22 @@ int store_batch_sync(struct swarmkv_store *store, int tid) n_synced++; if(n_synced>=MAX_SYNC_PER_PERIOD) break; } - node_t peer; - struct swarmkv_cmd *cmd=NULL; - int ret=1; - while(1) + + struct sync_task *task=NULL; + task=sync_master_get_task(sync_master); + while(task) { - ret=sync_master_get_cmd(sync_master, &peer, &cmd); - if(!ret) break; - crdt_generic_call(store, CRDT_MERGE, cmd, &peer); - swarmkv_cmd_free(cmd); - cmd=NULL; + size_t n_data=sync_task_key_count(task); + const char *argv[n_data*2+2]; + size_t argv_len[n_data*2+2]; + argv[0]="crdt"; + argv_len[0]=strlen(argv[0]); + argv[1]="merge"; + argv_len[1]=strlen(argv[1]); + sync_task_read_key_blob(task, argv+2, argv_len+2, n_data*2); + crdt_generic_call(store, CRDT_MERGE, sync_task_peer(task), n_data*2+2, argv, argv_len); + sync_task_free(task); + task=sync_master_get_task(sync_master); } sync_master_free(sync_master); return n_synced; @@ -613,7 +623,7 @@ void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id) struct scontainer *ctr=NULL, *tmp=NULL; struct timespec start, end; int n_synced=0; - int real_tid=__gettid(store->exec_cmd_handle); + int real_tid=swarmkv_gettid(store->exec_cmd_handle); assert(real_tid==thread_id); clock_gettime(CLOCK_MONOTONIC, &start); @@ -630,20 +640,22 @@ void swarmkv_store_periodic(struct swarmkv_module * mod_store, int thread_id) char *blob=NULL; size_t blob_sz=0; scontainer_serialize(ctr, &blob, &blob_sz); - - struct swarmkv_cmd *cmd=swarmkv_cmd_new(4); - cmd->argv[0]=sdsnew("crdt"); - cmd->argv[1]=sdsnew("merge"); - cmd->argv[2]=sdsnew(ctr->obj.key); - cmd->argv[3]=sdsnewlen(blob, blob_sz); + const char *argv[4]; + size_t argv_len[4]; + argv[0]="crdt"; + argv_len[0]=strlen(argv[0]); + argv[1]="merge"; + argv_len[1]=strlen(argv[1]); + argv[2]=ctr->obj.key; + argv_len[2]=sdslen(ctr->obj.key); + argv[3]=blob; + argv_len[3]=blob_sz; for(int i=0; i<utarray_len(ctr->replica_node_list); i++) { node_t *peer=utarray_eltptr(ctr->replica_node_list, i); - crdt_generic_call(store, CRDT_MERGE, cmd, peer); + crdt_generic_call(store, CRDT_MERGE, peer, 4, argv, argv_len); } - swarmkv_cmd_free(cmd); free(blob); - DL_DELETE(thr->sync_queue, ctr); ctr->is_in_sync_q=0; store->synced++; @@ -752,13 +764,11 @@ enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const st sds key=cmd->argv[2]; int tid=store_gettid(mod_store, key); - int real_tid=__gettid(store->exec_cmd_handle); + int real_tid=swarmkv_gettid(store->exec_cmd_handle); assert(tid==real_tid); - struct swarmkv_cmd *crdt_get_cmd=NULL; - crdt_get_cmd=swarmkv_cmd_new(3); - crdt_get_cmd->argv[0]=sdsnew("crdt"); - crdt_get_cmd->argv[1]=sdsnew("get"); - crdt_get_cmd->argv[2]=sdsdup(key); + + + size_t n_replica_node=cmd->argc-3; ctr=store_lookup_scontainer(store, key); @@ -782,23 +792,34 @@ enum cmd_exec_result crdt_add_command(struct swarmkv_module *mod_store, const st assert(node_compare(replica_nodes+i, &store->self)!=0); scontainer_add_replica_node(ctr, replica_nodes+i); } - struct swarmkv_cmd *crdt_meet_cmd=NULL; + const char *crdt_get_argv[3]; + size_t crdt_get_argv_len[3]; + crdt_get_argv[0]="crdt"; + crdt_get_argv_len[0]=strlen(crdt_get_argv[0]); + crdt_get_argv[1]="get"; + crdt_get_argv_len[1]=strlen(crdt_get_argv[1]); + crdt_get_argv[2]=key; + crdt_get_argv_len[2]=sdslen(key); + + const char *crdt_meet_argv[4]; + size_t crdt_meet_argv_len[4]; for(size_t i=0; i<n_replica_node; i++) { if(i<max_pull_node_num) { - crdt_generic_call(store, CRDT_GET, crdt_get_cmd, replica_nodes+i); + crdt_generic_call(store, CRDT_GET, replica_nodes+i, 3, crdt_get_argv, crdt_get_argv_len); } - crdt_meet_cmd=swarmkv_cmd_new(4); - crdt_meet_cmd->argv[0]=sdsnew("crdt"); - crdt_meet_cmd->argv[1]=sdsnew("meet"); - crdt_meet_cmd->argv[2]=sdsdup(ctr->obj.key); - crdt_meet_cmd->argv[3]=node_addr2sds(&store->self); - crdt_generic_call(store, CRDT_MEET, crdt_meet_cmd, replica_nodes+i); - swarmkv_cmd_free(crdt_meet_cmd); + + crdt_meet_argv[0]="crdt"; + crdt_meet_argv_len[0]=strlen(crdt_meet_argv[0]); + crdt_meet_argv[1]="meet"; + crdt_meet_argv_len[1]=strlen(crdt_meet_argv[1]); + crdt_meet_argv[2]=key; + crdt_meet_argv_len[2]=sdslen(key); + crdt_meet_argv[3]=node_addr2cstr(&store->self); + crdt_meet_argv_len[3]=strlen(crdt_meet_argv[3]); + crdt_generic_call(store, CRDT_MEET, replica_nodes+i, 4, crdt_meet_argv, crdt_meet_argv_len); } - swarmkv_cmd_free(crdt_get_cmd); - crdt_get_cmd=NULL; free(replica_nodes); *reply=swarmkv_reply_new_status("OK"); return FINISHED; diff --git a/src/swarmkv_sync.c b/src/swarmkv_sync.c index 17dffe9..a47f931 100644 --- a/src/swarmkv_sync.c +++ b/src/swarmkv_sync.c @@ -81,36 +81,58 @@ void sync_master_add_obj(struct sync_master *master, const sds key, char *blob, return; } -int sync_master_get_cmd(struct sync_master *master, node_t *peer, struct swarmkv_cmd **cmd) +struct sync_task *sync_master_get_task(struct sync_master *master) { struct sync_task *task=NULL, *tmp=NULL; - struct sync_data **p=NULL, *data=NULL; - struct swarmkv_cmd *c=NULL; - size_t n_data=0; - *cmd=NULL; + + //We are not iterating the whole table, just get the first one HASH_ITER(hh, master->task_table, task, tmp) { - n_data=utarray_len(task->sync_data_list); - c=swarmkv_cmd_new(2+n_data*2); - c->argv[0]=sdsnew("crdt"); - c->argv[1]=sdsnew("merge"); - for(size_t i=0; i<n_data; i++) - { - p=utarray_eltptr(task->sync_data_list, i); - data=*p; - c->argv[2+i*2]=sdsdup(data->key); - c->argv[2+i*2+1]=sdsnewlen(data->blob, data->blob_sz); - sync_data_free(data); - master->synced++; - } - node_copy(peer, &task->peer); HASH_DEL(master->task_table, task); - utarray_free(task->sync_data_list); - free(task); + master->synced += utarray_len(task->sync_data_list); break; } - *cmd=c; - return (c==NULL?0:1); + return task; +} +void sync_task_free(struct sync_task *task) +{ + struct sync_data **p=NULL, *data=NULL; + size_t n_data=utarray_len(task->sync_data_list); + for(size_t i=0; i<n_data; i++) + { + p=utarray_eltptr(task->sync_data_list, i); + data=*p; + sync_data_free(data); + } + utarray_free(task->sync_data_list); + free(task); + return; +} +const node_t *sync_task_peer(struct sync_task *task) +{ + return &task->peer; +} +size_t sync_task_key_count(struct sync_task *task) +{ + return utarray_len(task->sync_data_list); +} +void sync_task_read_key_blob(struct sync_task *task, const char *argv[], size_t *argv_len, int argc) +{ + struct sync_data **p=NULL, *data=NULL; + int n_data=utarray_len(task->sync_data_list); + assert(argc == 2*n_data); + for(int i=0, j=0; i<n_data && j<argc; i++, j+=2) + { + p=utarray_eltptr(task->sync_data_list, i); + data=*p; + assert(data->key); + assert(data->blob); + argv[j]=data->key; + argv_len[j]=sdslen(data->key); + argv[j+1]=data->blob; + argv_len[j+1]=data->blob_sz; + } + return; } void sync_master_free(struct sync_master *master) { diff --git a/src/swarmkv_sync.h b/src/swarmkv_sync.h index 36f96ff..84a2175 100644 --- a/src/swarmkv_sync.h +++ b/src/swarmkv_sync.h @@ -3,7 +3,12 @@ #include "sds.h" struct sync_master; +struct sync_task; struct sync_master *sync_master_new(void); void sync_master_add_obj(struct sync_master *mgr, const sds key, char *blob, size_t blob_sz, const node_t *peers, size_t n_peer); -int sync_master_get_cmd(struct sync_master *mgr, node_t *peer, struct swarmkv_cmd **cmd); +struct sync_task *sync_master_get_task(struct sync_master *master); +void sync_task_free(struct sync_task *task); +const node_t *sync_task_peer(struct sync_task *task); +size_t sync_task_key_count(struct sync_task *task); +void sync_task_read_key_blob(struct sync_task *task, const char *argv[], size_t *argv_len, int argc); void sync_master_free(struct sync_master *mgr);
\ No newline at end of file diff --git a/src/swarmkv_utils.c b/src/swarmkv_utils.c index ab0b920..adbd188 100644 --- a/src/swarmkv_utils.c +++ b/src/swarmkv_utils.c @@ -56,7 +56,6 @@ char* replace_char(char* str, char find, char replace){ return str; } -/* Glob-style pattern matching. */ //source https://github.com/redis/redis/blob/7.0/src/util.c int stringmatchlen(const char *pattern, int patternLen, const char *string, int stringLen, int nocase) diff --git a/src/swarmkv_utils.h b/src/swarmkv_utils.h index 55f5dbc..8951523 100644 --- a/src/swarmkv_utils.h +++ b/src/swarmkv_utils.h @@ -40,6 +40,8 @@ const char *module_name_str(const char *name); const char *swarmkv_util_pthread_cond_timedwait_error_to_string(const int timed_wait_rv); char *toLower(char* s); char *toUpper(char* s); +// Glob-style pattern matching. +// Return 1 if matched, 0 if not matched int stringmatchlen(const char *pattern, int patternLen, const char *string, int stringLen, int nocase); long long ustime(void); int is_number(const char *string, size_t sz, long long *number); diff --git a/src/t_cms.c b/src/t_cms.c index 5339cc9..587d3e2 100644 --- a/src/t_cms.c +++ b/src/t_cms.c @@ -156,7 +156,7 @@ enum cmd_exec_result cmsincrby_command(struct swarmkv_module *mod_store, const s } enum cmd_exec_result cmsmquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) { -/*CMSQUERY key item [item ...]*/ +/*CMSMQUERY key item [item ...]*/ struct sobj *obj=NULL; const sds key=cmd->argv[1]; diff --git a/test/swarmkv_gtest.cpp b/test/swarmkv_gtest.cpp index 4af72dd..87920de 100644 --- a/test/swarmkv_gtest.cpp +++ b/test/swarmkv_gtest.cpp @@ -19,19 +19,6 @@ #define CMD_EXEC_TIMEOUT_MS 1000*2 -void generic_callback(const struct swarmkv_reply *reply, void * cb_arg) -{ - struct cmd_exec_arg *arg=(struct cmd_exec_arg*)cb_arg; - if(0==reply_compare(reply, &(arg->expected_reply))) - { - cmd_exec_arg_success(arg); - } - else - { - cmd_exec_arg_failed(arg); - } - return; -} static int g_current_thread_id=0; void copy_reply_callback(const struct swarmkv_reply *reply, void * cb_arg) { @@ -67,6 +54,7 @@ protected: free(err); } swarmkv_register_thread(db); + //swarmkv_command(db, "monreg %s", swarmkv_self_address(db)); } static void TearDownTestCase() { @@ -92,11 +80,30 @@ TEST_F(SwarmkvBasicTest, TypeString) EXPECT_STREQ(reply->str, "OK"); swarmkv_reply_free(reply); - reply=swarmkv_command(db, "GET %s", key); + reply=swarmkv_command(db, "type %s", key); ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING); - EXPECT_STREQ(reply->str, val); + EXPECT_STREQ(reply->str, "string"); swarmkv_reply_free(reply); + const char *string_key="string-key"; + reply=swarmkv_command(db, "SET %s abc", string_key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); + EXPECT_STREQ(reply->str, "OK"); + swarmkv_reply_free(reply); + + + key="name2"; + val="lisi"; + swarmkv_set(db, key, strlen(key), val, strlen(val), copy_reply_callback, &reply); + swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); + swarmkv_reply_free(reply); + + swarmkv_get(db, key, strlen(key), copy_reply_callback, &reply); + swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING); + EXPECT_STREQ(reply->str, val); + swarmkv_reply_free(reply); } TEST_F(SwarmkvBasicTest, TypeInteger) { @@ -132,6 +139,18 @@ TEST_F(SwarmkvBasicTest, TypeInteger) swarmkv_reply_free(reply); } EXPECT_EQ(value, 100); + + reply=swarmkv_command(db, "type %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING); + EXPECT_STREQ(reply->str, "integer"); + swarmkv_reply_free(reply); + + key="int2"; + swarmkv_incrby(db, key, strlen(key), 100, copy_reply_callback, &reply); + swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); + EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 100); + swarmkv_reply_free(reply); } TEST_F(SwarmkvBasicTest, GenericDEL) { @@ -150,72 +169,86 @@ TEST_F(SwarmkvBasicTest, GenericDEL) ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); EXPECT_EQ(reply->integer, 1); swarmkv_reply_free(reply); + + reply=swarmkv_command(db, "SET %s %s", key, val); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); + EXPECT_STREQ(reply->str, "OK"); + swarmkv_reply_free(reply); + + swarmkv_del(db, key, strlen(key), copy_reply_callback, &reply); + swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 1); + swarmkv_reply_free(reply); } -TEST_F(SwarmkvBasicTest, GenericTYPE) +TEST_F(SwarmkvBasicTest, GenericTTL) { struct swarmkv *db=SwarmkvBasicTest::db; + const char *key="quarantine"; + const char *val="wuhan-江夏-如家"; + int seconds=3; struct swarmkv_reply *reply=NULL; - //TYPE string - const char *string_key="string-key"; - reply=swarmkv_command(db, "SET %s abc", string_key); + reply=swarmkv_command(db, "SET %s %s", key, val); ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); EXPECT_STREQ(reply->str, "OK"); swarmkv_reply_free(reply); - reply=swarmkv_command(db, "type %s", string_key); - ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING); - EXPECT_STREQ(reply->str, "string"); + reply=swarmkv_command(db, "EXPIRE %s %d", key, seconds); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 1); swarmkv_reply_free(reply); - //type integer - const char *integer_key="integer-key"; - reply=swarmkv_command(db, "SET %s 123", integer_key); - ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); - EXPECT_STREQ(reply->str, "OK"); + reply=swarmkv_command(db, "TTL %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, seconds); swarmkv_reply_free(reply); - reply=swarmkv_command(db, "type %s", integer_key); - ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING); - EXPECT_STREQ(reply->str, "integer"); + sleep(seconds+1); + + reply=swarmkv_command(db, "GET %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_NIL); swarmkv_reply_free(reply); - //type set - const char *set_key="set-key"; - reply=swarmkv_command(db, "SADD %s a b c d", set_key); + reply=swarmkv_command(db, "EXPIRE %s %d", key, seconds); ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); - EXPECT_EQ(reply->integer, 4); + EXPECT_EQ(reply->integer, 0); swarmkv_reply_free(reply); - reply=swarmkv_command(db, "type %s", set_key); - ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING); - EXPECT_STREQ(reply->str, "set"); + reply=swarmkv_command(db, "TTL %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, -2); swarmkv_reply_free(reply); - //type token-bucket - const char *tb_key="tb-key"; - reply=swarmkv_command(db, "TCFG %s 4000 2000", tb_key); + //API test + reply=swarmkv_command(db, "SET %s %s", key, val); ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); EXPECT_STREQ(reply->str, "OK"); swarmkv_reply_free(reply); - reply=swarmkv_command(db, "type %s", tb_key); - ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING); - EXPECT_STREQ(reply->str, "token-bucket"); + swarmkv_expire(db, key, strlen(key), seconds, copy_reply_callback, &reply); + swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 1); swarmkv_reply_free(reply); - //type hash - const char *hash_key="hash-key"; - reply=swarmkv_command(db, "HSET %s name zhangsan gender male age 18 gender male", hash_key); + swarmkv_ttl(db, key, strlen(key), copy_reply_callback, &reply); + swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); - EXPECT_EQ(reply->integer, 3); + EXPECT_EQ(reply->integer, seconds); swarmkv_reply_free(reply); - reply=swarmkv_command(db, "type %s", hash_key); - ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING); - EXPECT_STREQ(reply->str, "hash"); + swarmkv_persist(db, key, strlen(key), copy_reply_callback, &reply); + swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 1); swarmkv_reply_free(reply); - +} +TEST_F(SwarmkvBasicTest, GenericTYPE) +{ + struct swarmkv *db=SwarmkvBasicTest::db; + struct swarmkv_reply *reply=NULL; + //type non-exist key const char *nonexist_key="non-exist-key"; reply=swarmkv_command(db, "type %s", nonexist_key); @@ -266,6 +299,11 @@ TEST_F(SwarmkvBasicTest, TypeSet) EXPECT_EQ(reply->integer, 3); swarmkv_reply_free(reply); + reply=swarmkv_command(db, "type %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING); + EXPECT_STREQ(reply->str, "set"); + swarmkv_reply_free(reply); + reply=swarmkv_command(db, "DEL %s", key); ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); EXPECT_EQ(reply->integer, 1); @@ -281,6 +319,48 @@ TEST_F(SwarmkvBasicTest, TypeSet) EXPECT_EQ(reply->integer, 0); swarmkv_reply_free(reply); + //API test + size_t member_len[4]={strlen(member[0]), strlen(member[1]), strlen(member[2]), strlen(member[3])}; + swarmkv_sadd(db, key, strlen(key), member, member_len, 4, copy_reply_callback, &reply); + swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 4); + swarmkv_reply_free(reply); + + swarmkv_srem(db, key, strlen(key), member, member_len, 2, copy_reply_callback, &reply); + swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 2); + swarmkv_reply_free(reply); + + swarmkv_sismember(db, key, strlen(key), member[0], strlen(member[0]), copy_reply_callback, &reply); + swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 0); + swarmkv_reply_free(reply); + + swarmkv_sismember(db, key, strlen(key), member[2], strlen(member[2]), copy_reply_callback, &reply); + swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 1); + swarmkv_reply_free(reply); + + swarmkv_smembers(db, key, strlen(key), copy_reply_callback, &reply); + swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); + ASSERT_EQ(reply->type, SWARMKV_REPLY_ARRAY); + EXPECT_EQ(reply->n_element, 2); + for(i=0; i<reply->n_element; i++) + { + EXPECT_EQ(reply->elements[i]->type, SWARMKV_REPLY_STRING); + } + swarmkv_reply_free(reply); + + swarmkv_scard(db, key, strlen(key), copy_reply_callback, &reply); + swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); + ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 2); + swarmkv_reply_free(reply); + } TEST_F(SwarmkvBasicTest, TypeHash) { @@ -355,6 +435,11 @@ TEST_F(SwarmkvBasicTest, TypeHash) ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); EXPECT_EQ(reply->integer, 30-1); swarmkv_reply_free(reply); + + reply=swarmkv_command(db, "type %s", key2); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING); + EXPECT_STREQ(reply->str, "hash"); + swarmkv_reply_free(reply); } TEST_F(SwarmkvBasicTest, TypeTokenBucket) { @@ -428,6 +513,17 @@ TEST_F(SwarmkvBasicTest, TypeTokenBucket) ASSERT_EQ(reply->n_element, 12); EXPECT_EQ(reply->elements[7]->integer, allocated_tokens+inf_token); swarmkv_reply_free(reply); + + reply=swarmkv_command(db, "type %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING); + EXPECT_STREQ(reply->str, "token-bucket"); + swarmkv_reply_free(reply); + + swarmkv_tconsume(db, key, strlen(key), 100, copy_reply_callback, &reply); + swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); + EXPECT_EQ(reply->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->integer, 100); + swarmkv_reply_free(reply); } TEST_F(SwarmkvBasicTest, TypeFairTokenBucket) { @@ -476,6 +572,12 @@ TEST_F(SwarmkvBasicTest, TypeFairTokenBucket) ASSERT_EQ(reply->n_element, 16); EXPECT_EQ(reply->elements[7]->integer, allocated_tokens+inf_token); swarmkv_reply_free(reply); + + reply=swarmkv_command(db, "type %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING); + EXPECT_STREQ(reply->str, "fair-token-bucket"); + swarmkv_reply_free(reply); + } TEST_F(SwarmkvBasicTest, TypeBulkTokenBucket) { @@ -527,6 +629,10 @@ TEST_F(SwarmkvBasicTest, TypeBulkTokenBucket) } EXPECT_EQ(t, 10000*i); + reply=swarmkv_command(db, "type %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING); + EXPECT_STREQ(reply->str, "bulk-token-bucket"); + swarmkv_reply_free(reply); } TEST_F(SwarmkvBasicTest, TypeBloomFilter) @@ -626,11 +732,37 @@ TEST_F(SwarmkvBasicTest, TypeBloomFilter) EXPECT_EQ(reply->integer, 1); swarmkv_reply_free(reply); + reply=swarmkv_command(db, "type %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING); + EXPECT_STREQ(reply->str, "bloom-filter"); + swarmkv_reply_free(reply); reply=swarmkv_command(db, "DEL %s", key); ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); ASSERT_EQ(reply->integer, 1); swarmkv_reply_free(reply); + + //API test + key="bf-api"; + reply=swarmkv_command(db, "BFINIT %s %f %lld TIME %lld 12", key, error_rate, capacity, time_window_ms); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); + swarmkv_reply_free(reply); + + size_t item_len[4]={strlen(item[0]), strlen(item[1]), strlen(item[2]), strlen(item[3])}; + swarmkv_bfadd(db, key, strlen(key), item, item_len, 4, copy_reply_callback, &reply); + swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); + swarmkv_reply_free(reply); + + swarmkv_bfmexists(db, key, strlen(key), item, item_len, 4, copy_reply_callback, &reply); + swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); + ASSERT_EQ(reply->type, SWARMKV_REPLY_ARRAY); + ASSERT_EQ(reply->n_element, 4); + for(size_t i=0; i<reply->n_element; i++) + { + ASSERT_EQ(reply->elements[i]->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->elements[i]->integer, 1); + } } TEST_F(SwarmkvBasicTest, TypeCMS) { @@ -677,46 +809,48 @@ TEST_F(SwarmkvBasicTest, TypeCMS) ASSERT_EQ(reply->elements[3]->type, SWARMKV_REPLY_INTEGER); EXPECT_EQ(reply->elements[3]->integer, depth); swarmkv_reply_free(reply); -} -TEST_F(SwarmkvBasicTest, EXPIRE_TTL) -{ - struct swarmkv *db=SwarmkvBasicTest::db; - const char *key="quarantine"; - const char *val="wuhan-江夏-如家"; - int seconds=3; - struct swarmkv_reply *reply=NULL; - reply=swarmkv_command(db, "SET %s %s", key, val); - ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); - EXPECT_STREQ(reply->str, "OK"); + reply=swarmkv_command(db, "type %s", key); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING); + EXPECT_STREQ(reply->str, "count-min-sketch"); swarmkv_reply_free(reply); - reply=swarmkv_command(db, "EXPIRE %s %d", key, seconds); + reply=swarmkv_command(db, "DEL %s", key); ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); EXPECT_EQ(reply->integer, 1); swarmkv_reply_free(reply); - reply=swarmkv_command(db, "TTL %s", key); - ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); - EXPECT_EQ(reply->integer, seconds); - swarmkv_reply_free(reply); - - sleep(seconds+1); - - reply=swarmkv_command(db, "GET %s", key); - ASSERT_EQ(reply->type, SWARMKV_REPLY_NIL); + //API test + key="cms-api"; + size_t item_len[5]={strlen(item[0]), strlen(item[1]), strlen(item[2]), strlen(item[3]), strlen(item[4])}; + long long count[5]={1, 2, 3, 4, 5}; + reply=swarmkv_command(db, "CMSINITBYDIM %s %lld %lld", key, width, depth); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); swarmkv_reply_free(reply); - reply=swarmkv_command(db, "EXPIRE %s %d", key, seconds); - ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); - EXPECT_EQ(reply->integer, 0); + swarmkv_cmsincrby(db, key, strlen(key), item, item_len, count, 5, copy_reply_callback, &reply); + swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); + ASSERT_EQ(reply->type, SWARMKV_REPLY_ARRAY); + ASSERT_EQ(reply->n_element, 5); + for(size_t i=0; i<reply->n_element; i++) + { + ASSERT_EQ(reply->elements[i]->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->elements[i]->integer, count[i]); + } swarmkv_reply_free(reply); - reply=swarmkv_command(db, "TTL %s", key); - ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER); - EXPECT_EQ(reply->integer, -2); + swarmkv_cmsmquery(db, key, strlen(key), item, item_len, 5, copy_reply_callback, &reply); + swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL); + ASSERT_EQ(reply->type, SWARMKV_REPLY_ARRAY); + ASSERT_EQ(reply->n_element, 5); + for(size_t i=0; i<reply->n_element; i++) + { + ASSERT_EQ(reply->elements[i]->type, SWARMKV_REPLY_INTEGER); + EXPECT_EQ(reply->elements[i]->integer, count[i]); + } swarmkv_reply_free(reply); } + TEST_F(SwarmkvBasicTest, HashTags) { struct swarmkv *db=SwarmkvBasicTest::db; @@ -1593,9 +1727,7 @@ TEST_F(SwarmkvTwoNodes, TypeBulkTokenBucket) upper_limit=upper_limit*n_member; double accuracy=(double)allocated_tokens/(upper_limit<requested_tokens?upper_limit:requested_tokens); EXPECT_NEAR(accuracy, 1, 0.035); - - //wait_for_sync(); - sleep(100); + wait_for_sync(); reply=swarmkv_command(db[0], "BTINFO %s", key); ASSERT_EQ(reply->type, SWARMKV_REPLY_ARRAY); ASSERT_EQ(reply->n_element, 16); @@ -1751,7 +1883,6 @@ TEST_F(SwarmkvTwoNodes, TypeCMS) reply=swarmkv_command(db[0], "CMSRCLEAR %s %s", key, uuid); ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); swarmkv_reply_free(reply); - } TEST_F(SwarmkvTwoNodes, Info) { @@ -1766,11 +1897,23 @@ TEST_F(SwarmkvTwoNodes, Info) swarmkv_reply_free(reply); } } - +TEST_F(SwarmkvTwoNodes, Monitor) +{ + struct swarmkv *db[2]; + db[0]=SwarmkvTwoNodes::db1; + db[1]=SwarmkvTwoNodes::db2; + struct swarmkv_reply *reply=NULL; + for(size_t i=0; i<2; i++) + { + reply=swarmkv_command(db[i%2], "MONREG %s g?t", swarmkv_self_address(db[i%2])); + ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS); + swarmkv_reply_free(reply); + } +} TEST_F(SwarmkvTwoNodes, Wait) { //return; - //sleep(3600*2); + sleep(3600*2); } TEST(CloudNative, AnnounceIPPort) { diff --git a/tools/swarmkv_cli.c b/tools/swarmkv_cli.c index 6907749..6ce49f4 100644 --- a/tools/swarmkv_cli.c +++ b/tools/swarmkv_cli.c @@ -14,8 +14,6 @@ #include "linenoise/linenoise.h" #include "swarmkv/swarmkv.h" -struct swarmkv_reply *swarmkv_command_on_argv(struct swarmkv *db, const char *target, int argc, sds *argv); - struct cluster_manager_command { char* cluster_name; @@ -373,7 +371,7 @@ struct swarmkv_reply *cluster_addslotowner_command(struct swarmkv *db, char *arg struct swarmkv_reply *setslot_reply=NULL, *getkeysinslot_reply=NULL, *addkeystoslot_reply=NULL; - sds *migrate_argv=NULL; + printf("%zu slots to be migrated\n", actual_rebalanced_slot_num); long long migrated_keys=0; node_t *new_node=NULL, *old_node=NULL; @@ -427,24 +425,24 @@ struct swarmkv_reply *cluster_addslotowner_command(struct swarmkv *db, char *arg /*STEP 2.4 Add those keys to NEW node's slot*/ - migrate_argv=ALLOC(sds, 4); - migrate_argv[0]=sdsnew("keyspace"); - migrate_argv[1]=sdsnew("addkeystoslot"); - migrate_argv[2]=sdsfromlonglong(slot_id); - migrate_argv[3]=sdsnewlen(getkeysinslot_reply->str, getkeysinslot_reply->len); + const char *migrate_argv[4]; + size_t migrate_argv_len[4]; + migrate_argv[0]="keyspace"; + migrate_argv_len[0]=strlen(migrate_argv[0]); + migrate_argv[1]="addkeystoslot"; + migrate_argv_len[1]=strlen(migrate_argv[1]); + char buf[32]; + snprintf(buf, sizeof(buf), "%d", slot_id); + migrate_argv[2]=buf; + migrate_argv_len[2]=strlen(migrate_argv[2]); + migrate_argv[3]=getkeysinslot_reply->str; + migrate_argv_len[3]=getkeysinslot_reply->len; + + addkeystoslot_reply=swarmkv_command_on_argv(db, new_node->addr, 4, migrate_argv, migrate_argv_len); swarmkv_reply_free(getkeysinslot_reply); getkeysinslot_reply=NULL; - addkeystoslot_reply=swarmkv_command_on_argv(db, new_node->addr, 4, migrate_argv); - for(j=0; j<4; j++) - { - sdsfree(migrate_argv[j]); - migrate_argv[j]=NULL; - } - free(migrate_argv); - migrate_argv=NULL; - if(addkeystoslot_reply->type!=SWARMKV_REPLY_INTEGER) { reply=addkeystoslot_reply; @@ -568,6 +566,27 @@ struct swarmkv_reply *cluster_info_command(struct swarmkv *db, char *argv[], siz } return reply; } +struct swarmkv_reply *monitor_command(struct swarmkv *db, char *argv[], size_t argc) +{ + struct swarmkv_reply *reply=NULL; + const char *target=get_attach_target(); + if(!target) + { + reply = swarmkv_reply_new_error("ERR `MONITOR` command requires attaching to a node."); + } + else + { + if(argc>1) + { + reply=swarmkv_command_on(db, target, "monreg %s %s", swarmkv_self_address(db), argv[1]); + } + else + { + reply=swarmkv_command_on(db, target, "monreg %s", swarmkv_self_address(db)); + } + } + return reply; +} struct replica_node { node_t node; @@ -741,7 +760,7 @@ struct swarmkv_reply *cluster_sanity_command(struct swarmkv *db, char *argv[], s size_t n_active_node=nodes_reply->n_element; for(size_t i=0; i<nodes_reply->n_element; i++) { - node_init_from_string(active_nodes+i, nodes_reply->elements[i]->str); + node_init_from_cstr(active_nodes+i, nodes_reply->elements[i]->str); } if(0==strcasecmp(argv[2], "heal")) { @@ -975,7 +994,8 @@ struct cluster_cmd_spec cluster_cmds[]={ {"CLUSTER CREATE", "cluster-name IP:port [IP:port ...]", cluster_create_command}, {"CLUSTER SANITY", "check | heal", cluster_sanity_command}, {"ATTACH", "IP:port", attach_command}, - {"DETACH", "", detach_command} + {"DETACH", "", detach_command}, + {"MONITOR", "[pattern]", monitor_command} }; int is_cluster_command(int argc, char *argv[]) @@ -1232,7 +1252,7 @@ int main(int argc, char * argv[]) exec_argv=ALLOC(sds, argc); while (++i < argc && argv[i][0] != '-') { - exec_argv[exec_argc]=sdsnew(argv[i]); + exec_argv[exec_argc]=sdsnew(argv[i]); exec_argc++; } } @@ -1286,12 +1306,17 @@ int main(int argc, char * argv[]) } else { - reply=swarmkv_command_on_argv(g_config.db, get_attach_target(), exec_argc, exec_argv); + size_t exec_argv_len[exec_argc]; + for(int i=0; i<exec_argc; i++) + { + exec_argv_len[i]=sdslen(exec_argv[i]); + } + reply=swarmkv_command_on_argv(g_config.db, get_attach_target(), exec_argc, (const char**)exec_argv, exec_argv_len); } - swarmkv_reply_print(reply, stdout); - swarmkv_reply_free(reply); for(i=0; i<exec_argc; i++) sdsfree(exec_argv[i]); free(exec_argv); + swarmkv_reply_print(reply, stdout); + swarmkv_reply_free(reply); goto clean; } @@ -1350,7 +1375,12 @@ int main(int argc, char * argv[]) } else { - reply=swarmkv_command_on_argv(g_config.db, get_attach_target(), exec_argc-offset, exec_argv+offset); + size_t exec_argv_len[exec_argc-offset]; + for(int i=0; i<exec_argc-offset; i++) + { + exec_argv_len[i]=sdslen(exec_argv[i+offset]); + } + reply=swarmkv_command_on_argv(g_config.db, get_attach_target(), exec_argc-offset, (const char**)exec_argv+offset, exec_argv_len); } swarmkv_reply_print(reply, stdout); if(run_interval_sec && reply->type != SWARMKV_REPLY_ERROR) |
