diff options
| author | 郑超 <[email protected]> | 2024-03-30 04:18:41 +0000 |
|---|---|---|
| committer | 郑超 <[email protected]> | 2024-03-30 04:18:41 +0000 |
| commit | a2d902c3434fe0edb3ecc9d70a95b9d6b790f379 (patch) | |
| tree | 6450b570c8510bf0a7127813ab5ec6aae593179e /src | |
| parent | af8bbfb568851b144a00622810efb6bbff628389 (diff) | |
Add new data type: Count-Min Sketch. Rename `BFRESERVE` to `BFINIT` for consistency.
Diffstat (limited to 'src')
| -rw-r--r-- | src/CMakeLists.txt | 4 | ||||
| -rw-r--r-- | src/swarmkv.c | 72 | ||||
| -rw-r--r-- | src/swarmkv_error.h | 1 | ||||
| -rw-r--r-- | src/swarmkv_monitor.c | 38 | ||||
| -rw-r--r-- | src/swarmkv_store.c | 25 | ||||
| -rw-r--r-- | src/swarmkv_store.h | 5 | ||||
| -rw-r--r-- | src/t_bloom_filter.c | 7 | ||||
| -rw-r--r-- | src/t_bloom_filter.h | 2 | ||||
| -rw-r--r-- | src/t_cms.c | 322 | ||||
| -rw-r--r-- | src/t_cms.h | 10 | ||||
| -rw-r--r-- | src/t_hash.c | 6 | ||||
| -rw-r--r-- | src/t_set.c | 4 | ||||
| -rw-r--r-- | src/t_string.c | 6 | ||||
| -rw-r--r-- | src/t_token_bucket.c | 20 |
14 files changed, 460 insertions, 62 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 5518414..6c8b3e5 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,5 +1,5 @@ set(SWARMKV_MAJOR_VERSION 4) -set(SWARMKV_MINOR_VERSION 1) +set(SWARMKV_MINOR_VERSION 2) set(SWARMKV_PATCH_VERSION 0) set(SWARMKV_VERSION ${SWARMKV_MAJOR_VERSION}.${SWARMKV_MINOR_VERSION}.${SWARMKV_PATCH_VERSION}) @@ -26,7 +26,7 @@ add_definitions(-fPIC) set(SWARMKV_SRC swarmkv.c swarmkv_api.c swarmkv_mesh.c swarmkv_rpc.c swarmkv_message.c swarmkv_net.c swarmkv_store.c swarmkv_sync.c swarmkv_keyspace.c swarmkv_monitor.c - t_string.c t_set.c t_token_bucket.c t_hash.c t_bloom_filter.c + t_string.c t_set.c t_token_bucket.c t_hash.c t_bloom_filter.c t_cms.c swarmkv_common.c swarmkv_utils.c future_promise.c http_client.c) set(LIB_SOURCE_FILES diff --git a/src/swarmkv.c b/src/swarmkv.c index 5cc1356..382f56e 100644 --- a/src/swarmkv.c +++ b/src/swarmkv.c @@ -24,7 +24,7 @@ #include "t_hash.h" #include "t_token_bucket.h" #include "t_bloom_filter.h" - +#include "t_cms.h" #include "uthash.h" #include "sds.h" @@ -62,7 +62,6 @@ struct swarmkv { struct swarmkv_module module; char db_name[SWARMKV_SYMBOL_MAX]; - uuid_t bin_uuid; struct swarmkv_options *opts; int thread_counter; @@ -879,6 +878,20 @@ void __on_msg_callback(struct swarmkv_msg *msg, void *arg) } } } +static const char *exec_ret2string(enum cmd_exec_result ret) +{ + switch(ret) + { + case NEED_KEY_ROUTE: + return "NEED_KEY_ROUTE"; + case REDIRECT: + return "REDIRECT"; + case FINISHED: + return "FINISHED"; + default: + assert(0); + } +} #define INTER_THREAD_RPC_TIMEOUT_AHEAD 1000 void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swarmkv_cmd *cmd, struct future *future_of_caller) { @@ -970,7 +983,18 @@ void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swar clock_gettime(CLOCK_MONOTONIC_COARSE, &end); swarmkv_monitor_record_command(db->mod_monitor, spec->name, timespec_diff_usec(&start, &end)); - + //if(strcasestr(spec->name, "CRDT")) + if(0){ + struct timeval now; + gettimeofday(&now, NULL); + printf("%ld.%.6ld %s %d %s %s %s\n", + now.tv_sec, now.tv_usec, + db->self.addr, + db->threads[cur_tid].recusion_depth, + spec->name, + spec->key_offset<0?"NULL":cmd->argv[spec->key_offset], + exec_ret2string(exec_ret)); + } switch(exec_ret) { case FINISHED: @@ -1117,7 +1141,7 @@ void command_spec_init(struct swarmkv *db) 3, 1, CMD_KEY_OW, REPLY_ERROR, AUTO_ROUTE, hincrby_command, db->mod_store); - /* Token bucket commands */ + /* Token Buckets commands */ command_register(&(db->command_table), "TCFG", "key rate capacity [PD seconds]", 3, 1, CMD_KEY_OW, REPLY_ERROR, AUTO_ROUTE, tcfg_command, db->mod_store); @@ -1146,10 +1170,10 @@ void command_spec_init(struct swarmkv *db) 1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE, btinfo_command, db->mod_store); - /*Bloom filter commands*/ - command_register(&(db->command_table), "BFRESERVE", "key error_rate capacity [TIME window-milliseconds slice-number]", + /*Bloom Filter commands*/ + command_register(&(db->command_table), "BFINIT", "key error capacity [TIME window-milliseconds slice-number]", 3, 1, CMD_KEY_OW, REPLY_EMPTY_ARRAY, AUTO_ROUTE, - bfreserve_command, db->mod_store); + bfinit_command, db->mod_store); command_register(&(db->command_table), "BFADD", "key item [item ...]", 2, 1, CMD_KEY_RW, REPLY_EMPTY_ARRAY, AUTO_ROUTE, bfadd_command, db->mod_store); @@ -1166,6 +1190,32 @@ void command_spec_init(struct swarmkv *db) 1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE, bfinfo_command, db->mod_store); + /*Count-min Sketch Commands*/ + command_register(&(db->command_table), "CMSINITBYDIM", "key width depth", + 3, 1, CMD_KEY_OW, REPLY_EMPTY_ARRAY, AUTO_ROUTE, + cmsinitbydim_command, db->mod_store); + command_register(&(db->command_table), "CMSINITBYPROB", "key error probability", + 3, 1, CMD_KEY_OW, REPLY_EMPTY_ARRAY, AUTO_ROUTE, + cmsinitbyprob_command, db->mod_store); + command_register(&(db->command_table), "CMSINCRBY", "key item increment [item increment ...]", + 3, 1, CMD_KEY_RW, REPLY_EMPTY_ARRAY, AUTO_ROUTE, + cmsincrby_command, db->mod_store); + command_register(&(db->command_table), "CMSQUERY", "key item", + 2, 1, CMD_KEY_RO, REPLY_INT_0, AUTO_ROUTE, + cmsquery_command, db->mod_store); + command_register(&(db->command_table), "CMSMQUERY", "key item [item ...]", + 2, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE, + cmsmquery_command, db->mod_store); + command_register(&(db->command_table), "CMSINFO", "key", + 1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE, + cmsinfo_command, db->mod_store); + command_register(&(db->command_table), "CMSRLIST", "key", + 1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE, + cmsrlist_command, db->mod_store); + command_register(&(db->command_table), "CMSRCLEAR", "key uuid", + 2, 1, CMD_KEY_RW, REPLY_ERROR, AUTO_ROUTE, + cmsrclear_command, db->mod_store); + /* Debug Commands */ command_register(&(db->command_table), "INFO", "[section]", 0, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE, @@ -1420,9 +1470,6 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name, } db->logger=log_handle_create(log_path, opts->loglevel); } - - - uuid_copy(db->bin_uuid, opts->bin_uuid); if(opts->dryrun) { } @@ -1576,3 +1623,8 @@ const char *swarmkv_self_address(const struct swarmkv *db) { return db->self.addr; } +void swarmkv_self_uuid(const struct swarmkv *db, char buff[37]) +{ + uuid_unparse(db->opts->bin_uuid, buff); + return; +}
\ No newline at end of file diff --git a/src/swarmkv_error.h b/src/swarmkv_error.h index 4da62e0..221d107 100644 --- a/src/swarmkv_error.h +++ b/src/swarmkv_error.h @@ -8,6 +8,7 @@ #define error_arg_not_valid_integer "ERR arg `%s` is not an integer or out of range" #define error_arg_not_valid_address "ERR arg `%s` is not `IP:port` format" #define error_arg_not_valid_float "ERR arg `%s` is not a valid float or out of range" +#define error_arg_not_valid_uuid "ERR arg `%s` is not a valid UUID" #define error_arg_parse_failed "ERR arg `%s` parse failed" #define error_arg_string_should_be "ERR arg `%s` should be `%s`" #define error_need_additional_arg "ERR arg `%s` should be fllowed by more args" diff --git a/src/swarmkv_monitor.c b/src/swarmkv_monitor.c index 95c88d7..0856b1c 100644 --- a/src/swarmkv_monitor.c +++ b/src/swarmkv_monitor.c @@ -10,12 +10,12 @@ #include <hdr/hdr_interval_recorder.h> #include <pthread.h> -#define METRIC_KEY_MAX 64 +#define KEY_LEN_MAX 64 #define SIGNIFICANT_FIGURES 2 #define LOWEST_TRACKABLE_VALUE 1 struct recorder { - char key[METRIC_KEY_MAX]; + char key[KEY_LEN_MAX]; struct hdr_interval_recorder hdr_interval; struct hdr_histogram *hdr_previous; struct hdr_histogram *hdr_all_time; @@ -45,7 +45,7 @@ void recorder_free(struct recorder *recorder) } struct recorder_metric { - char key[METRIC_KEY_MAX]; + char key[KEY_LEN_MAX]; long long total_count; long long max; long long min; @@ -83,11 +83,11 @@ void hdr_to_metric(const struct hdr_histogram *hdr, const char *key, struct reco strncpy(metric->key, key, sizeof(metric->key)); metric->total_count=hdr->total_count; if(metric->total_count==0) return; - metric->p50=hdr_value_at_percentile(hdr, 0.5); - metric->p80=hdr_value_at_percentile(hdr, 0.8); - metric->p90=hdr_value_at_percentile(hdr, 0.9); - metric->p95=hdr_value_at_percentile(hdr, 0.95); - metric->p99=hdr_value_at_percentile(hdr, 0.99); + metric->p50=hdr_value_at_percentile(hdr, 50); + metric->p80=hdr_value_at_percentile(hdr, 80); + metric->p90=hdr_value_at_percentile(hdr, 90); + metric->p95=hdr_value_at_percentile(hdr, 95); + metric->p99=hdr_value_at_percentile(hdr, 99); metric->total_count=hdr->total_count; metric->max=hdr_max(hdr); metric->mean=hdr_mean(hdr); @@ -108,18 +108,7 @@ void recorder_sample(struct recorder *recorder, struct recorder_metric *metric) hdr_to_metric(recorder->hdr_all_time, recorder->key, metric); return; } -void recorder_sample_n(struct recorder **recorder, size_t n_recorder, long long max_latency_usec, struct recorder_metric *metric) -{ - struct hdr_histogram *hdr_merged; - hdr_init(1, max_latency_usec, 2, &hdr_merged); - for(size_t i=0; i<n_recorder; i++) - { - recorder_commit(recorder[i]); - hdr_add(hdr_merged, recorder[i]->hdr_all_time); - } - hdr_to_metric(hdr_merged, recorder[0]->key, metric); - hdr_close(hdr_merged); -} + void recorder_reset(struct recorder *recorder) { recorder->hdr_previous=hdr_interval_recorder_sample_and_recycle(&recorder->hdr_interval, recorder->hdr_previous); @@ -421,4 +410,13 @@ enum cmd_exec_result latency_command(struct swarmkv_module *mod_monitor, const s } pthread_mutex_unlock(&monitor->mutex); return FINISHED; +} +enum cmd_exec_result lastcmds_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) +{ +/*LASTCMDS [offset]*/ + struct swarmkv_monitor *monitor=module2monitor(mod_monitor); + pthread_mutex_lock(&monitor->mutex); + + pthread_mutex_unlock(&monitor->mutex); + return FINISHED; }
\ No newline at end of file diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c index 0bc7514..189a561 100644 --- a/src/swarmkv_store.c +++ b/src/swarmkv_store.c @@ -111,11 +111,20 @@ struct swarmkv_obj_specs sobj_specs[__SWARMKV_OBJ_TYPE_MAX] = .obj_size=(size_t (*)(const void *))AP_bloom_mem_size }, { + .type=OBJ_TYPE_CMS, + .type_name="count-min-sketch", + .obj_free=(void (*)(void *))CM_sketch_free, + .obj_serialize=(void (*)(const void *, char **, size_t *))CM_sketch_serialize, + .obj_merge_blob=(void (*)(void *, const char *, size_t))CM_sketch_merge_blob, + .obj_replicate=(void * (*)(uuid_t, const char *, size_t))CM_sketch_replicate, + .obj_size=(size_t (*)(const void *))CM_sketch_size + }, + { .type=OBJ_TYPE_UNDEFINED, .type_name="undefined", .obj_free=undefined_obj_free, .obj_serialize=NULL, - .obj_merge_blob=NULL, + .obj_merge_blob=NULL, .obj_replicate=NULL, .obj_size=(size_t (*)(const void *))undefined_obj_mem_size } @@ -331,7 +340,7 @@ void store_get_node_addr(struct swarmkv_module* mod_store, node_t *node) node_copy(node, &store->self); return; } -void sobj_need_sync(struct swarmkv_module *mod_store, struct sobj *obj) +void store_mark_object_as_modified(struct swarmkv_module *mod_store, struct sobj *obj) { struct swarmkv_store *store=module2store(mod_store); struct scontainer *ctr=container_of(obj, struct scontainer, obj); @@ -423,7 +432,7 @@ void sobj_merge_blob(struct sobj *obj, const char *blob, size_t blob_sz, uuid_t offset+=sizeof(size_t); assert(offset+value_blob_sz==blob_sz); const char *value_blob=blob+offset; - if(!obj->raw) + if(obj->raw == NULL) { obj->raw=sobj_specs[obj->type].obj_replicate(uuid, value_blob, value_blob_sz); } @@ -1008,17 +1017,17 @@ enum cmd_exec_result crdt_info_command(struct swarmkv_module *mod_store, const s size_t sz=0; sz+=sobj_specs[ctr->obj.type].obj_size(ctr->obj.raw); sz+=sizeof(struct scontainer)+sdslen(ctr->obj.key); - size_t n_replica=0; - n_replica=ctr->replica_node_list?utarray_len(ctr->replica_node_list):0; - sz+=n_replica*sizeof(node_t); + size_t n_peer=0; + n_peer=ctr->replica_node_list?utarray_len(ctr->replica_node_list):0; + sz+=n_peer*sizeof(node_t); int i=0; *reply=swarmkv_reply_new_array(8); (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Type"); (*reply)->elements[i++]=swarmkv_reply_new_string_fmt(sobj_specs[ctr->obj.type].type_name); (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Size"); (*reply)->elements[i++]=swarmkv_reply_new_integer(sz); - (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Replicas"); - (*reply)->elements[i++]=swarmkv_reply_new_integer(n_replica); + (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Peers"); + (*reply)->elements[i++]=swarmkv_reply_new_integer(n_peer); (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("LastModified"); (*reply)->elements[i++]=swarmkv_reply_new_integer(ctr->op_timestamp.tv_sec); return FINISHED; diff --git a/src/swarmkv_store.h b/src/swarmkv_store.h index d1fc926..4e4e4cd 100644 --- a/src/swarmkv_store.h +++ b/src/swarmkv_store.h @@ -14,6 +14,7 @@ #include "fair_token_bucket.h" #include "bulk_token_bucket.h" #include "ap_bloom.h" +#include "cm_sketch.h" enum sobj_type { @@ -25,6 +26,7 @@ enum sobj_type OBJ_TYPE_FAIR_TOKEN_BUCKET, OBJ_TYPE_BULK_TOKEN_BUCKET, OBJ_TYPE_BLOOM_FILTER, + OBJ_TYPE_CMS, OBJ_TYPE_UNDEFINED, __SWARMKV_OBJ_TYPE_MAX }; @@ -43,6 +45,7 @@ struct sobj struct fair_token_bucket *ftb; struct bulk_token_bucket *btb; struct AP_bloom *bloom; + struct CM_sketch *cms; void *raw; }; }; @@ -62,7 +65,7 @@ struct store_info }; void swarmkv_store_info(struct swarmkv_module *module, struct store_info *info); -void sobj_need_sync(struct swarmkv_module *mod_store, struct sobj *obj); +void store_mark_object_as_modified(struct swarmkv_module *mod_store, struct sobj *obj); int sobj_get_random_replica(struct sobj *obj, node_t *out); enum cmd_exec_result handle_undefined_object(struct sobj *obj, struct swarmkv_reply **reply); diff --git a/src/t_bloom_filter.c b/src/t_bloom_filter.c index fbaad2e..9ad3623 100644 --- a/src/t_bloom_filter.c +++ b/src/t_bloom_filter.c @@ -7,9 +7,9 @@ #include <stdlib.h> #include <assert.h> -enum cmd_exec_result bfreserve_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) +enum cmd_exec_result bfinit_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) { -/* BFRESERVE key error_rate capacity [TIME window-milliseconds slice-number] */ +/* BFINIT key error_rate capacity [TIME window-milliseconds slice-number] */ struct sobj *obj=NULL; const sds key=cmd->argv[1]; @@ -97,6 +97,7 @@ enum cmd_exec_result bfadd_command(struct swarmkv_module *mod_store, const struc { AP_bloom_add(obj->bloom, now, cmd->argv[i+2], sdslen(cmd->argv[i+2])); } + store_mark_object_as_modified(mod_store, obj); *reply=swarmkv_reply_new_status("OK"); return FINISHED; } @@ -202,7 +203,7 @@ enum cmd_exec_result bfinfo_command(struct swarmkv_module *mod_store, const stru AP_bloom_info(obj->bloom, &info); int i=0; *reply=swarmkv_reply_new_array(20); - (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("ErrorRate"); + (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Error"); (*reply)->elements[i++]=swarmkv_reply_new_double(info.error); (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Capacity"); (*reply)->elements[i++]=swarmkv_reply_new_integer(info.capacity); diff --git a/src/t_bloom_filter.h b/src/t_bloom_filter.h index 0231b0c..3675e82 100644 --- a/src/t_bloom_filter.h +++ b/src/t_bloom_filter.h @@ -1,7 +1,7 @@ #pragma once #include "swarmkv_common.h" -enum cmd_exec_result bfreserve_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); +enum cmd_exec_result bfinit_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); enum cmd_exec_result bfadd_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); enum cmd_exec_result bfexists_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); enum cmd_exec_result bfmexists_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); diff --git a/src/t_cms.c b/src/t_cms.c new file mode 100644 index 0000000..5339cc9 --- /dev/null +++ b/src/t_cms.c @@ -0,0 +1,322 @@ +#include "swarmkv_common.h" +#include "swarmkv_utils.h" +#include "swarmkv_store.h" +#include "swarmkv_error.h" +#include "cm_sketch.h" + +#include <stdlib.h> +#include <assert.h> + +enum cmd_exec_result cmsinitbydim_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) +{ +/* CMSINITBYDIM key width depth*/ + struct sobj *obj=NULL; + const sds key=cmd->argv[1]; + + long long width=0, depth=0; + + width=strtol(cmd->argv[2], NULL, 10); + if(width<=0) + { + *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[2]); + return FINISHED; + } + depth=strtol(cmd->argv[3], NULL, 10); + if(depth<=0) + { + *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[3]); + return FINISHED; + } + obj=store_lookup(mod_store, key); + if(!obj) + { + return NEED_KEY_ROUTE; + } + struct timeval now; + gettimeofday(&now, NULL); + + if(obj->type==OBJ_TYPE_UNDEFINED) + { + assert(obj->raw==NULL); + uuid_t uuid; + store_get_uuid(mod_store, uuid); + obj->cms=CM_sketch_new(uuid, width, depth); + obj->type=OBJ_TYPE_CMS; + *reply=swarmkv_reply_new_status("OK"); + } + else + { + *reply=swarmkv_reply_new_array(0); + } + return FINISHED; +} +enum cmd_exec_result cmsinitbyprob_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) +{ +/* CMSINITBYPROB key error probability*/ + struct sobj *obj=NULL; + const sds key=cmd->argv[1]; + + double error=0, probability=0; + error=strtod(cmd->argv[2], NULL); + if(error < 0 || error >= 1.0) + { + *reply=swarmkv_reply_new_error(error_arg_not_valid_float, cmd->argv[2]); + return FINISHED; + } + probability=strtod(cmd->argv[3], NULL); + if(probability < 0 || probability >= 1.0) + { + *reply=swarmkv_reply_new_error(error_arg_not_valid_float, cmd->argv[3]); + return FINISHED; + } + int width=0, depth=0; + CM_sketch_tool_dimension_by_prob(error, probability, &width, &depth); + obj=store_lookup(mod_store, key); + if(!obj) + { + return NEED_KEY_ROUTE; + } + struct timeval now; + gettimeofday(&now, NULL); + + if(obj->type==OBJ_TYPE_UNDEFINED) + { + assert(obj->raw==NULL); + uuid_t uuid; + store_get_uuid(mod_store, uuid); + obj->cms=CM_sketch_new(uuid, width, depth); + obj->type=OBJ_TYPE_CMS; + *reply=swarmkv_reply_new_status("OK"); + } + else + { + *reply=swarmkv_reply_new_array(0); + } + return FINISHED; +} +static int parse_incrby_args(const struct swarmkv_cmd *cmd, long long *increments, size_t n_increment, int *invalid_idx) +{ + assert(n_increment == (cmd->argc-2)/2); + char *endptr=NULL; + for(int i=3, j=0; i<cmd->argc; i+=2, j++) + { + *invalid_idx=i; + increments[j]=strtol(cmd->argv[i], &endptr, 10); + if(*endptr != '\0') + { + return -1; + } + if(increments[j] > INT32_MAX || increments[j] < INT32_MIN) + { + return -1; + } + } + return 0; +} +enum cmd_exec_result cmsincrby_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) +{ +/*CMSINCRBY key item increment [item increment ...]*/ + struct sobj *obj=NULL; + const sds key=cmd->argv[1]; + obj=store_lookup(mod_store, key); + if(!obj) + { + return NEED_KEY_ROUTE; + } + struct timeval now; + gettimeofday(&now, NULL); + + if(obj->type==OBJ_TYPE_UNDEFINED) + { + assert(obj->raw == NULL); + return handle_undefined_object(obj, reply); + } + else if(obj->type!=OBJ_TYPE_CMS) + { + *reply=swarmkv_reply_new_error(error_wrong_type); + return FINISHED; + } + size_t n_increment=(cmd->argc-2)/2; + long long increments[n_increment], query[n_increment]; + int ret=0, invalid_idx=0; + ret = parse_incrby_args(cmd, increments, n_increment, &invalid_idx); + if(ret<0) + { + *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[invalid_idx]); + return FINISHED; + } + *reply=swarmkv_reply_new_array(n_increment); + for(int i=2, j=0; i<cmd->argc; i+=2, j++) + { + query[j]=CM_sketch_incrby(obj->cms, cmd->argv[i], sdslen(cmd->argv[i]), (int) increments[j]); + (*reply)->elements[j]=swarmkv_reply_new_integer(query[j]); + } + store_mark_object_as_modified(mod_store, obj); + return FINISHED; +} +enum cmd_exec_result cmsmquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) +{ +/*CMSQUERY key item [item ...]*/ + struct sobj *obj=NULL; + const sds key=cmd->argv[1]; + + obj=store_lookup(mod_store, key); + if(!obj) + { + return NEED_KEY_ROUTE; + } + struct timeval now; + gettimeofday(&now, NULL); + + if(obj->type==OBJ_TYPE_UNDEFINED) + { + return handle_undefined_object(obj, reply); + } + else if(obj->type!=OBJ_TYPE_CMS) + { + *reply=swarmkv_reply_new_error(error_wrong_type); + return FINISHED; + } + + *reply=swarmkv_reply_new_array(cmd->argc-2); + for(int i=0; i<cmd->argc-2; i++) + { + int query=0; + query=CM_sketch_query(obj->cms, cmd->argv[i+2], sdslen(cmd->argv[i+2])); + (*reply)->elements[i]=swarmkv_reply_new_integer(query); + } + return FINISHED; +} +enum cmd_exec_result cmsquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) +{ + enum cmd_exec_result ret; + struct swarmkv_reply *tmp_reply=NULL; + ret=cmsmquery_command(mod_store, cmd, &tmp_reply); + if(ret==FINISHED) + { + if(tmp_reply->type==SWARMKV_REPLY_ARRAY) + { + *reply=swarmkv_reply_dup(tmp_reply->elements[0]); + swarmkv_reply_free(tmp_reply); + } + else + { + *reply=tmp_reply; + } + } + return ret; +} +enum cmd_exec_result cmsinfo_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) +{ +/*CMSINFO key*/ + struct sobj *obj=NULL; + const sds key=cmd->argv[1]; + obj=store_lookup(mod_store, key); + if(!obj) + { + return NEED_KEY_ROUTE; + } + struct timeval now; + gettimeofday(&now, NULL); + if(obj->type==OBJ_TYPE_UNDEFINED) + { + return handle_undefined_object(obj, reply); + } + else if(obj->type!=OBJ_TYPE_CMS) + { + *reply=swarmkv_reply_new_error(error_wrong_type); + return FINISHED; + } + struct CM_sketch_info info; + CM_sketch_info(obj->cms, &info); + int i=0; + *reply=swarmkv_reply_new_array(12); + (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Width"); + (*reply)->elements[i++]=swarmkv_reply_new_integer(info.width); + (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Depth"); + (*reply)->elements[i++]=swarmkv_reply_new_integer(info.depth); + (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Error"); + (*reply)->elements[i++]=swarmkv_reply_new_double(info.error); + (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Probability"); + (*reply)->elements[i++]=swarmkv_reply_new_double(info.probability); + (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Count"); + (*reply)->elements[i++]=swarmkv_reply_new_integer(info.count); + (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("ReplicaNumber"); + (*reply)->elements[i++]=swarmkv_reply_new_integer(info.n_replica); + assert(i==12); + return FINISHED; +} +enum cmd_exec_result cmsrlist_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) +{ +/*CMSRLIST key*/ + struct sobj *obj=NULL; + const sds key=cmd->argv[1]; + obj=store_lookup(mod_store, key); + if(!obj) + { + return NEED_KEY_ROUTE; + } + struct timeval now; + gettimeofday(&now, NULL); + if(obj->type==OBJ_TYPE_UNDEFINED) + { + return handle_undefined_object(obj, reply); + } + else if(obj->type!=OBJ_TYPE_CMS) + { + *reply=swarmkv_reply_new_error(error_wrong_type); + return FINISHED; + } + struct CM_sketch_info info; + CM_sketch_info(obj->cms, &info); + uuid_t replicas[info.n_replica]; + size_t n_nodes=0; + CM_sketch_list_replica(obj->cms, replicas, &n_nodes); + *reply=swarmkv_reply_new_array(n_nodes); + for(int i=0; i<n_nodes; i++) + { + char uuid_str[37]; + uuid_unparse(replicas[i], uuid_str); + (*reply)->elements[i]=swarmkv_reply_new_string(uuid_str, strlen(uuid_str)+1); + } + return FINISHED; +} +enum cmd_exec_result cmsrclear_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) +{ +/*CMSRCLEAR key uuid*/ + struct sobj *obj=NULL; + const sds key=cmd->argv[1]; + obj=store_lookup(mod_store, key); + if(!obj) + { + return NEED_KEY_ROUTE; + } + struct timeval now; + gettimeofday(&now, NULL); + if(obj->type==OBJ_TYPE_UNDEFINED) + { + return handle_undefined_object(obj, reply); + } + else if(obj->type!=OBJ_TYPE_CMS) + { + *reply=swarmkv_reply_new_error(error_wrong_type); + return FINISHED; + } + uuid_t uuid; + if(0 > uuid_parse(cmd->argv[2], uuid)) + { + *reply=swarmkv_reply_new_error(error_arg_not_valid_uuid, cmd->argv[2]); + return FINISHED; + } + int ret=CM_sketch_clear_replica(obj->cms, uuid); + if(ret<0) + { + *reply=swarmkv_reply_new_error("replica uuid is not exist"); + } + else + { + *reply=swarmkv_reply_new_status("OK"); + store_mark_object_as_modified(mod_store, obj); + } + return FINISHED; +}
\ No newline at end of file diff --git a/src/t_cms.h b/src/t_cms.h new file mode 100644 index 0000000..b0348bd --- /dev/null +++ b/src/t_cms.h @@ -0,0 +1,10 @@ +#pragma once +#include "swarmkv_common.h" +enum cmd_exec_result cmsinitbydim_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); +enum cmd_exec_result cmsinitbyprob_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); +enum cmd_exec_result cmsincrby_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); +enum cmd_exec_result cmsquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); +enum cmd_exec_result cmsmquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); +enum cmd_exec_result cmsinfo_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); +enum cmd_exec_result cmsrlist_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); +enum cmd_exec_result cmsrclear_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
\ No newline at end of file diff --git a/src/t_hash.c b/src/t_hash.c index a5a7b0b..120f1ed 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -70,7 +70,7 @@ enum cmd_exec_result hset_command(struct swarmkv_module *mod_store, const struct if(ret>0) n_added++; } *reply=swarmkv_reply_new_integer(n_added); - sobj_need_sync(mod_store, obj); + store_mark_object_as_modified(mod_store, obj); } else { @@ -110,7 +110,7 @@ enum cmd_exec_result hdel_command(struct swarmkv_module *mod_store, const struct if(ret>0) n_deleted++; } *reply=swarmkv_reply_new_integer(n_deleted); - sobj_need_sync(mod_store, obj); + store_mark_object_as_modified(mod_store, obj); return FINISHED; } @@ -246,7 +246,7 @@ enum cmd_exec_result hincrby_command(struct swarmkv_module *mod_store, const str else { *reply=swarmkv_reply_new_integer(result); - sobj_need_sync(mod_store, obj); + store_mark_object_as_modified(mod_store, obj); } } else diff --git a/src/t_set.c b/src/t_set.c index 87d5504..e7d4d4a 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -37,7 +37,7 @@ enum cmd_exec_result sadd_command(struct swarmkv_module *mod_store, const struct if(ret>0) n_added++; } *reply=swarmkv_reply_new_integer(n_added); - sobj_need_sync(mod_store, obj); + store_mark_object_as_modified(mod_store, obj); } else { @@ -76,7 +76,7 @@ enum cmd_exec_result srem_command(struct swarmkv_module *mod_store, const struct if(ret>0) n_removed++; } *reply=swarmkv_reply_new_integer(n_removed); - sobj_need_sync(mod_store, obj); + store_mark_object_as_modified(mod_store, obj); return FINISHED; } enum cmd_exec_result smembers_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) diff --git a/src/t_string.c b/src/t_string.c index d5f2a59..1a5f5f5 100644 --- a/src/t_string.c +++ b/src/t_string.c @@ -72,7 +72,7 @@ enum cmd_exec_result set_command(struct swarmkv_module *mod_store, const struct { LWW_register_set(obj->string, value, sdslen(value)); *reply=swarmkv_reply_new_status("OK"); - sobj_need_sync(mod_store, obj); + store_mark_object_as_modified(mod_store, obj); } else if(obj->type==OBJ_TYPE_INTEGER) { @@ -80,7 +80,7 @@ enum cmd_exec_result set_command(struct swarmkv_module *mod_store, const struct { PN_counter_set(obj->counter, int_value); *reply=swarmkv_reply_new_status("OK"); - sobj_need_sync(mod_store, obj); + store_mark_object_as_modified(mod_store, obj); } else { @@ -113,7 +113,7 @@ enum cmd_exec_result integer_generic(struct swarmkv_module *mod_store, const sds } value=PN_counter_incrby(obj->counter, increment); *reply=swarmkv_reply_new_integer(value); - sobj_need_sync(mod_store, obj); + store_mark_object_as_modified(mod_store, obj); } else { diff --git a/src/t_token_bucket.c b/src/t_token_bucket.c index a16672f..d79f6b0 100644 --- a/src/t_token_bucket.c +++ b/src/t_token_bucket.c @@ -85,7 +85,7 @@ enum cmd_exec_result tcfg_command(struct swarmkv_module *mod_store, const struct else if(obj->type==OBJ_TYPE_TOKEN_BUCKET) { OC_token_bucket_configure(obj->bucket, now, rate, period, capacity); - sobj_need_sync(mod_store, obj); + store_mark_object_as_modified(mod_store, obj); *reply=swarmkv_reply_new_status("OK"); } else @@ -182,7 +182,7 @@ enum cmd_exec_result tconsume_command(struct swarmkv_module *mod_store, const st gettimeofday(&now, NULL); allocated=OC_token_bucket_consume(obj->bucket, now, consume_type, request); *reply=swarmkv_reply_new_integer(allocated); - sobj_need_sync(mod_store, obj); + store_mark_object_as_modified(mod_store, obj); return FINISHED; } bool is_power_of_2(long long num) @@ -254,7 +254,7 @@ enum cmd_exec_result ftcfg_command(struct swarmkv_module *mod_store, const struc else if(obj->type==OBJ_TYPE_FAIR_TOKEN_BUCKET) { fair_token_bucket_configure(obj->ftb, now, rate, period, capacity, divisor); - sobj_need_sync(mod_store, obj); + store_mark_object_as_modified(mod_store, obj); *reply=swarmkv_reply_new_status("OK"); } else @@ -313,7 +313,7 @@ enum cmd_exec_result ftconsume_command(struct swarmkv_module *mod_store, const s allocated=fair_token_bucket_consume(obj->ftb, now, member, sdslen(member), weight, consume_type, request); *reply=swarmkv_reply_new_integer(allocated); - sobj_need_sync(mod_store, obj); + store_mark_object_as_modified(mod_store, obj); return FINISHED; } enum cmd_exec_result ftinfo_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) @@ -426,7 +426,7 @@ enum cmd_exec_result btcfg_command(struct swarmkv_module *mod_store, const struc else if(obj->type==OBJ_TYPE_BULK_TOKEN_BUCKET) { bulk_token_bucket_configure(obj->btb, now, rate, period, capacity, buckets); - sobj_need_sync(mod_store, obj); + store_mark_object_as_modified(mod_store, obj); *reply=swarmkv_reply_new_status("OK"); } else @@ -480,7 +480,7 @@ enum cmd_exec_result btconsume_command(struct swarmkv_module *mod_store, const s allocated=bulk_token_bucket_consume(obj->btb, now, member, sdslen(member), consume_type, request); *reply=swarmkv_reply_new_integer(allocated); - sobj_need_sync(mod_store, obj); + store_mark_object_as_modified(mod_store, obj); return FINISHED; } enum cmd_exec_result btinfo_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) @@ -515,8 +515,8 @@ enum cmd_exec_result btinfo_command(struct swarmkv_module *mod_store, const stru available=bulk_token_bucket_read_available(obj->btb, now, cmd->argv[2], sdslen(cmd->argv[2])); } int i=0; - *reply=swarmkv_reply_new_array(14); - (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Refill"); + *reply=swarmkv_reply_new_array(16); + (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Rate"); (*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.rate); (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Period"); (*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.period); @@ -528,8 +528,10 @@ enum cmd_exec_result btinfo_command(struct swarmkv_module *mod_store, const stru (*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.approximate_keys); (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Collisions"); (*reply)->elements[i++]=swarmkv_reply_new_double(btb_info.collision_rate); + (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Replicas"); + (*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.replicas); (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Query"); (*reply)->elements[i++]=swarmkv_reply_new_integer(available); - assert(i==14); + assert(i==16); return FINISHED; }
\ No newline at end of file |
