diff options
| author | 郑超 <[email protected]> | 2024-02-29 06:43:54 +0000 |
|---|---|---|
| committer | 郑超 <[email protected]> | 2024-02-29 06:43:54 +0000 |
| commit | 6b49d228c93be4335aa95cd70a16c3517496554d (patch) | |
| tree | 127bbddc3b74bf10e0dd1dfa00f588eba0e0f4af /src | |
| parent | a75b7b6fec27f5067e19b08810516f4db2c96ac8 (diff) | |
Bloom Filter and Token bucket refill period customization.v4.1.0
Diffstat (limited to 'src')
| -rw-r--r-- | src/CMakeLists.txt | 6 | ||||
| -rw-r--r-- | src/swarmkv.c | 43 | ||||
| -rw-r--r-- | src/swarmkv_api.c | 42 | ||||
| -rw-r--r-- | src/swarmkv_common.c | 48 | ||||
| -rw-r--r-- | src/swarmkv_common.h | 4 | ||||
| -rw-r--r-- | src/swarmkv_error.h | 3 | ||||
| -rw-r--r-- | src/swarmkv_keyspace.c | 5 | ||||
| -rw-r--r-- | src/swarmkv_store.c | 9 | ||||
| -rw-r--r-- | src/swarmkv_store.h | 3 | ||||
| -rw-r--r-- | src/t_bloom_filter.c | 227 | ||||
| -rw-r--r-- | src/t_bloom_filter.h | 9 | ||||
| -rw-r--r-- | src/t_token_bucket.c | 107 |
12 files changed, 450 insertions, 56 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 89b9f87..dbf5977 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,6 +1,6 @@ set(SWARMKV_MAJOR_VERSION 4) -set(SWARMKV_MINOR_VERSION 0) -set(SWARMKV_PATCH_VERSION 5) +set(SWARMKV_MINOR_VERSION 1) +set(SWARMKV_PATCH_VERSION 0) set(SWARMKV_VERSION ${SWARMKV_MAJOR_VERSION}.${SWARMKV_MINOR_VERSION}.${SWARMKV_PATCH_VERSION}) message(STATUS "SwarmKV, Version: ${SWARMKV_VERSION}") @@ -10,7 +10,7 @@ add_definitions(-fPIC) set(SWARMKV_SRC swarmkv.c swarmkv_api.c swarmkv_mesh.c swarmkv_rpc.c swarmkv_message.c swarmkv_net.c swarmkv_store.c swarmkv_sync.c swarmkv_keyspace.c swarmkv_monitor.c - t_string.c t_set.c t_token_bucket.c t_hash.c + t_string.c t_set.c t_token_bucket.c t_hash.c t_bloom_filter.c swarmkv_common.c swarmkv_utils.c future_promise.c http_client.c) set(LIB_SOURCE_FILES diff --git a/src/swarmkv.c b/src/swarmkv.c index 20f5d3d..0d77238 100644 --- a/src/swarmkv.c +++ b/src/swarmkv.c @@ -21,9 +21,9 @@ //header of data types #include "t_string.h" #include "t_set.h" -#include "t_token_bucket.h" #include "t_hash.h" - +#include "t_token_bucket.h" +#include "t_bloom_filter.h" #include "uthash.h" @@ -1053,8 +1053,8 @@ void command_spec_init(struct swarmkv *db) command_register(&(db->command_table), "DECR", "key", 1, 1, CMD_KEY_OW, REPLY_ERROR, AUTO_ROUTE, decr_command, db->mod_store); + /* Generic commands*/ - command_register(&(db->command_table), "DEL", "key", 1, 1, CMD_KEY_RM, REPLY_INT_0, AUTO_ROUTE, del_command, db->mod_keyspace); @@ -1118,7 +1118,7 @@ void command_spec_init(struct swarmkv *db) hincrby_command, db->mod_store); /* Token bucket commands */ - command_register(&(db->command_table), "TCFG", "key rate capacity", + command_register(&(db->command_table), "TCFG", "key rate capacity [PD seconds]", 3, 1, CMD_KEY_OW, REPLY_ERROR, AUTO_ROUTE, tcfg_command, db->mod_store); command_register(&(db->command_table), "TCONSUME", "key tokens [NORMAL|FORCE|FLEXIBLE]", @@ -1127,7 +1127,7 @@ void command_spec_init(struct swarmkv *db) command_register(&(db->command_table), "TINFO", "key", 1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE, tinfo_command, db->mod_store); - command_register(&(db->command_table), "FTCFG", "key rate capacity divisor", + command_register(&(db->command_table), "FTCFG", "key rate capacity divisor [PD seconds]", 4, 1, CMD_KEY_OW, REPLY_ERROR, AUTO_ROUTE, ftcfg_command, db->mod_store); command_register(&(db->command_table), "FTCONSUME", "key member weight tokens", @@ -1136,7 +1136,7 @@ void command_spec_init(struct swarmkv *db) command_register(&(db->command_table), "FTINFO", "key", 1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE, ftinfo_command, db->mod_store); - command_register(&(db->command_table), "BTCFG", "key rate capacity number-of-buckets", + command_register(&(db->command_table), "BTCFG", "key rate capacity number-of-buckets [PD seconds]", 4, 1, CMD_KEY_OW, REPLY_ERROR, AUTO_ROUTE, btcfg_command, db->mod_store); command_register(&(db->command_table), "BTCONSUME", "key member tokens [NORMAL|FORCE|FLEXIBLE]", @@ -1146,6 +1146,26 @@ void command_spec_init(struct swarmkv *db) 1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE, btinfo_command, db->mod_store); + /*Bloom filter commands*/ + command_register(&(db->command_table), "BFRESERVE", "key error_rate capacity [TIME window-milliseconds slice-number]", + 3, 1, CMD_KEY_OW, REPLY_EMPTY_ARRAY, AUTO_ROUTE, + bfreserve_command, db->mod_store); + command_register(&(db->command_table), "BFADD", "key item [item ...]", + 2, 1, CMD_KEY_RW, REPLY_EMPTY_ARRAY, AUTO_ROUTE, + bfadd_command, db->mod_store); + command_register(&(db->command_table), "BFEXISTS", "key item", + 2, 1, CMD_KEY_RO, REPLY_INT_0, AUTO_ROUTE, + bfexists_command, db->mod_store); + command_register(&(db->command_table), "BFMEXISTS", "key item [item ...]", + 2, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE, + bfmexists_command, db->mod_store); + command_register(&(db->command_table), "BFCARD", "key", + 1, 1, CMD_KEY_RO, REPLY_INT_0, AUTO_ROUTE, + bfcard_command, db->mod_store); + command_register(&(db->command_table), "BFINFO", "key", + 1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE, + bfinfo_command, db->mod_store); + /* Debug Commands */ command_register(&(db->command_table), "INFO", "[section]", 0, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE, @@ -1159,7 +1179,6 @@ void command_spec_init(struct swarmkv *db) command_register(&(db->command_table), "COMMAND LIST", "", 0, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE, command_list_command, &db->module); - command_register(&(db->command_table), "LATENCY", "<subcommand>", 1, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE, latency_command, db->mod_monitor); @@ -1197,17 +1216,17 @@ void command_spec_init(struct swarmkv *db) command_register(&(db->command_table), "KEYSPACE RLIST", "key", 1, 2, CMD_KEY_RO, REPLY_NA, AUTO_ROUTE, keyspace_rlist_command, db->mod_keyspace); - command_register(&(db->command_table), "KEYSPACE RADD", "key IP:port", + command_register(&(db->command_table), "KEYSPACE RADD", "key [IP:port]", 1, 2, CMD_KEY_OW, REPLY_NA, AUTO_ROUTE, keyspace_radd_command, db->mod_keyspace); command_register(&(db->command_table), "KEYSPACE XRADD", "key IP:port", - 1, 2, CMD_KEY_OW, REPLY_NA, AUTO_ROUTE, + 2, 2, CMD_KEY_OW, REPLY_NA, AUTO_ROUTE, keyspace_xradd_command, db->mod_keyspace); command_register(&(db->command_table), "KEYSPACE KEYS", "tid pattern",//worker-thread-id - 1, KEY_OFFSET_TID, CMD_KEY_RO, REPLY_NA, NOT_AUTO_ROUTE, + 2, KEY_OFFSET_TID, CMD_KEY_RO, REPLY_NA, NOT_AUTO_ROUTE, keyspace_keys_command, db->mod_keyspace); command_register(&(db->command_table), "KEYSPACE RDEL", "key IP:port", - 1, 2, CMD_KEY_RW, REPLY_NA, AUTO_ROUTE, + 2, 2, CMD_KEY_RW, REPLY_NA, AUTO_ROUTE, keyspace_rdel_command, db->mod_keyspace); command_register(&(db->command_table), "KEYSPACE EXISTS", "key", 1, 2, CMD_KEY_RO, REPLY_NA, AUTO_ROUTE, @@ -1215,7 +1234,7 @@ void command_spec_init(struct swarmkv *db) /* low-level keyspace reorgnization commands */ command_register(&(db->command_table), "KEYSPACE SETSLOT", "slot IMPORTING|MIGRATING|NODE|STABLE IP:port", - 2, KEY_OFFSET_SLOTID, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE, + 3, KEY_OFFSET_SLOTID, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE, keyspace_setslot_command, db->mod_keyspace); command_register(&(db->command_table), "KEYSPACE GETKEYSINSLOT", "slot", 1, KEY_OFFSET_SLOTID, CMD_KEY_RO, REPLY_NA, NOT_AUTO_ROUTE, diff --git a/src/swarmkv_api.c b/src/swarmkv_api.c index 97e0552..f408274 100644 --- a/src/swarmkv_api.c +++ b/src/swarmkv_api.c @@ -218,34 +218,60 @@ void swarmkv_persist(struct swarmkv *db, const char *key, size_t keylen, swarmkv swarmkv_cmd_free(cmd); return; } -void swarmkv_sadd(struct swarmkv *db, const char* key, size_t keylen, const char *member[], const size_t member_len[], size_t n_member, swarmkv_on_reply_callback_t *cb, void *cb_arg) +void swarmkv_sadd(struct swarmkv *db, const char* key, size_t keylen, const char *members[], const size_t members_len[], size_t n_members, swarmkv_on_reply_callback_t *cb, void *cb_arg) { struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(2+n_member); + cmd=swarmkv_cmd_new(2+n_members); cmd->argv[0]=sdsnew("sadd"); cmd->argv[1]=sdsnewlen(key, keylen); - for(size_t i=0; i<n_member; i++) + for(size_t i=0; i<n_members; i++) { - cmd->argv[2+i]=sdsnewlen(member[i], member_len[i]); + cmd->argv[2+i]=sdsnewlen(members[i], members_len[i]); } exec_for_local(db, cmd, NULL, cb, cb_arg); swarmkv_cmd_free(cmd); } -void swarmkv_srem(struct swarmkv *db, const char* key, size_t keylen, const char *member[], const size_t member_len[], size_t n_member, swarmkv_on_reply_callback_t *cb, void *cb_arg) +void swarmkv_srem(struct swarmkv *db, const char* key, size_t keylen, const char *members[], const size_t members_len[], size_t n_members, swarmkv_on_reply_callback_t *cb, void *cb_arg) { struct swarmkv_cmd *cmd=NULL; - cmd=swarmkv_cmd_new(2+n_member); + cmd=swarmkv_cmd_new(2+n_members); cmd->argv[0]=sdsnew("srem"); cmd->argv[1]=sdsnewlen(key, keylen); - for(size_t i=0; i<n_member; i++) + for(size_t i=0; i<n_members; i++) { - cmd->argv[2+i]=sdsnewlen(member[i], member_len[i]); + cmd->argv[2+i]=sdsnewlen(members[i], members_len[i]); } exec_for_local(db, cmd, NULL, cb, cb_arg); swarmkv_cmd_free(cmd); } +void swarmkv_bfadd(struct swarmkv * db, const char * key, size_t keylen, const char *items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t *cb, void *cb_arg) +{ + struct swarmkv_cmd *cmd=NULL; + cmd=swarmkv_cmd_new(2+n_items); + cmd->argv[0]=sdsnew("bfadd"); + cmd->argv[1]=sdsnewlen(key, keylen); + for(size_t i=0; i<n_items; i++) + { + cmd->argv[2+i]=sdsnewlen(items[i], items_len[i]); + } + exec_for_local(db, cmd, NULL, cb, cb_arg); + swarmkv_cmd_free(cmd); +} +void swarmkv_bfexists(struct swarmkv * db, const char * key, size_t keylen, const char *items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t *cb, void *cb_arg) +{ + struct swarmkv_cmd *cmd=NULL; + cmd=swarmkv_cmd_new(2+n_items); + cmd->argv[0]=sdsnew("bfexists"); + cmd->argv[1]=sdsnewlen(key, keylen); + for(size_t i=0; i<n_items; i++) + { + cmd->argv[2+i]=sdsnewlen(items[i], items_len[i]); + } + exec_for_local(db, cmd, NULL, cb, cb_arg); + swarmkv_cmd_free(cmd); +} void swarmkv_sismember(struct swarmkv *db, const char* key, size_t keylen, const char *member, size_t member_len, swarmkv_on_reply_callback_t *cb, void *cb_arg) { struct swarmkv_cmd *cmd=NULL; diff --git a/src/swarmkv_common.c b/src/swarmkv_common.c index 649a5dd..faa1a04 100644 --- a/src/swarmkv_common.c +++ b/src/swarmkv_common.c @@ -454,6 +454,9 @@ struct swarmkv_reply *swarmkv_reply_dup(const struct swarmkv_reply *origin) case SWARMKV_REPLY_INTEGER: copy->integer=origin->integer; break; + case SWARMKV_REPLY_DOUBLE: + copy->dval=origin->dval; + break; case SWARMKV_REPLY_STRING: case SWARMKV_REPLY_STATUS: case SWARMKV_REPLY_ERROR: @@ -799,3 +802,48 @@ char *str_replace(char *orig, char *rep, char *with) strcpy(tmp, orig); return result; } +int swarmkv_cmd_parse_integer(const struct swarmkv_cmd *cmd, const char *name, long long *integer) +{ + if(cmd->argc<2) + { + return -1; + } + for(int i=0; i<cmd->argc; i++) + { + if(0==strcasecmp(cmd->argv[i], name)) + { + if(i+1<cmd->argc) + { + return str2integer(cmd->argv[i+1], integer); + } + else + { + return -1; + } + } + } + return -1; +} +int swarmkv_cmd_parse_double(const struct swarmkv_cmd *cmd, const char *name, double *dval) +{ + if(cmd->argc<2) + { + return -1; + } + for(int i=0; i<cmd->argc; i++) + { + if(0==strcasecmp(cmd->argv[i], name)) + { + if(i+1<cmd->argc) + { + *dval=strtod(cmd->argv[i+1], NULL); + return 0; + } + else + { + return -1; + } + } + } + return -1; +}
\ No newline at end of file diff --git a/src/swarmkv_common.h b/src/swarmkv_common.h index 1e3baae..f16edf5 100644 --- a/src/swarmkv_common.h +++ b/src/swarmkv_common.h @@ -14,7 +14,7 @@ #include <event2/buffer.h> #include <event2/thread.h> #include <event2/http.h> -#define SWARMKV_VERSION "4.0.5" +#define SWARMKV_VERSION "4.1.0" enum cmd_exec_result { NEED_KEY_ROUTE, @@ -93,6 +93,8 @@ struct swarmkv_cmd struct swarmkv_cmd *swarmkv_cmd_new(size_t argc); void swarmkv_cmd_free(struct swarmkv_cmd *p); struct swarmkv_cmd *swarmkv_cmd_dup(const struct swarmkv_cmd *origin); +int swarmkv_cmd_parse_integer(const struct swarmkv_cmd *cmd, const char *name, long long *ival); +int swarmkv_cmd_parse_double(const struct swarmkv_cmd *cmd, const char *name, double *dval); struct swarmkv_reply *swarmkv_reply_new_string(const char *str, size_t sz); struct swarmkv_reply *swarmkv_reply_new_string_fmt(const char *format, ...); diff --git a/src/swarmkv_error.h b/src/swarmkv_error.h index 55eea7c..4da62e0 100644 --- a/src/swarmkv_error.h +++ b/src/swarmkv_error.h @@ -7,10 +7,11 @@ #define error_value_not_integer "ERR value is not an integer or out of range" #define error_arg_not_valid_integer "ERR arg `%s` is not an integer or out of range" #define error_arg_not_valid_address "ERR arg `%s` is not `IP:port` format" +#define error_arg_not_valid_float "ERR arg `%s` is not a valid float or out of range" +#define error_arg_parse_failed "ERR arg `%s` parse failed" #define error_arg_string_should_be "ERR arg `%s` should be `%s`" #define error_need_additional_arg "ERR arg `%s` should be fllowed by more args" #define erorr_subcommand_syntax "ERR unknown subcommand or wrong number of arguments for '%.128s'. Try %s HELP." -#define error_tunnel_busy "ERR node is busy tunneling for %s" #define error_too_many_redirects "ERR too many redirects, the lastest is `%s`" #define error_cluster_leader_not_found "ERR cluster leader not found" diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c index b86d5c9..d7abec7 100644 --- a/src/swarmkv_keyspace.c +++ b/src/swarmkv_keyspace.c @@ -1658,7 +1658,10 @@ enum cmd_exec_result keyspace_keys_command(struct swarmkv_module *mod_keyspace, { continue; } - if(!slot_is_my_thread(i, thread_id, ks->opts->nr_worker_threads)) continue; + if(!slot_is_my_thread(i, thread_id, ks->opts->nr_worker_threads)) + { + continue; + } HASH_ITER(hh, slot_rt->keyroute_table, key_entry, tmp) { is_matched=stringmatchlen(pattern, sdslen(pattern), key_entry->key, sdslen(key_entry->key), 0); diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c index 48761be..0bc7514 100644 --- a/src/swarmkv_store.c +++ b/src/swarmkv_store.c @@ -102,6 +102,15 @@ struct swarmkv_obj_specs sobj_specs[__SWARMKV_OBJ_TYPE_MAX] = .obj_size=(size_t (*)(const void *))bulk_token_bucket_mem_size }, { + .type=OBJ_TYPE_BLOOM_FILTER, + .type_name="bloom-filter", + .obj_free=(void (*)(void *))AP_bloom_free, + .obj_serialize=(void (*)(const void *, char **, size_t *))AP_bloom_serialize, + .obj_merge_blob=(void (*)(void *, const char *, size_t))AP_bloom_merge_blob, + .obj_replicate=(void * (*)(uuid_t, const char *, size_t))AP_bloom_replicate, + .obj_size=(size_t (*)(const void *))AP_bloom_mem_size + }, + { .type=OBJ_TYPE_UNDEFINED, .type_name="undefined", .obj_free=undefined_obj_free, diff --git a/src/swarmkv_store.h b/src/swarmkv_store.h index 29962e1..d1fc926 100644 --- a/src/swarmkv_store.h +++ b/src/swarmkv_store.h @@ -13,6 +13,7 @@ #include "oc_token_bucket.h" #include "fair_token_bucket.h" #include "bulk_token_bucket.h" +#include "ap_bloom.h" enum sobj_type { @@ -23,6 +24,7 @@ enum sobj_type OBJ_TYPE_TOKEN_BUCKET, OBJ_TYPE_FAIR_TOKEN_BUCKET, OBJ_TYPE_BULK_TOKEN_BUCKET, + OBJ_TYPE_BLOOM_FILTER, OBJ_TYPE_UNDEFINED, __SWARMKV_OBJ_TYPE_MAX }; @@ -40,6 +42,7 @@ struct sobj struct OC_token_bucket *bucket; struct fair_token_bucket *ftb; struct bulk_token_bucket *btb; + struct AP_bloom *bloom; void *raw; }; }; diff --git a/src/t_bloom_filter.c b/src/t_bloom_filter.c new file mode 100644 index 0000000..fbaad2e --- /dev/null +++ b/src/t_bloom_filter.c @@ -0,0 +1,227 @@ +#include "swarmkv_common.h" +#include "swarmkv_utils.h" +#include "swarmkv_store.h" +#include "swarmkv_error.h" +#include "ap_bloom.h" + +#include <stdlib.h> +#include <assert.h> + +enum cmd_exec_result bfreserve_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) +{ +/* BFRESERVE key error_rate capacity [TIME window-milliseconds slice-number] */ + struct sobj *obj=NULL; + const sds key=cmd->argv[1]; + + double error_rate=0; + long long capacity=0, time_window_ms=0, time_slice_num=0; + + int ret=0; + error_rate=strtod(cmd->argv[2], NULL); + if(error_rate < 0 || error_rate >= 1.0) + { + *reply=swarmkv_reply_new_error(error_arg_not_valid_float, cmd->argv[2]); + return FINISHED; + } + ret=str2integer(cmd->argv[3], &capacity); + if(ret<0) + { + *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[3]); + } + if(cmd->argc==7) + { + if(strncasecmp(cmd->argv[4], "TIME", 4)!=0) + { + *reply=swarmkv_reply_new_error(error_arg_string_should_be, cmd->argv[4], "TIME"); + return FINISHED; + } + ret=str2integer(cmd->argv[5], &time_window_ms); + if(ret<0) + { + *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[5]); + return FINISHED; + } + ret=str2integer(cmd->argv[6], &time_slice_num); + if(ret<0 || time_slice_num<0) + { + *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[6]); + return FINISHED; + } + } + obj=store_lookup(mod_store, key); + if(!obj) + { + return NEED_KEY_ROUTE; + } + struct timeval now; + gettimeofday(&now, NULL); + + if(obj->type==OBJ_TYPE_UNDEFINED) + { + assert(obj->raw==NULL); + obj->bloom=AP_bloom_new(now, error_rate, capacity, time_window_ms, time_slice_num); + obj->type=OBJ_TYPE_BLOOM_FILTER; + *reply=swarmkv_reply_new_status("OK"); + } + else + { + *reply=swarmkv_reply_new_array(0); + } + return FINISHED; +} +enum cmd_exec_result bfadd_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) +{ +/*BFADD key item [item ...]*/ + struct sobj *obj=NULL; + const sds key=cmd->argv[1]; + + obj=store_lookup(mod_store, key); + if(!obj) + { + return NEED_KEY_ROUTE; + } + struct timeval now; + gettimeofday(&now, NULL); + + if(obj->type==OBJ_TYPE_UNDEFINED) + { + return handle_undefined_object(obj, reply); + } + else if(obj->type!=OBJ_TYPE_BLOOM_FILTER) + { + *reply=swarmkv_reply_new_error(error_wrong_type); + return FINISHED; + } + + for(int i=0; i<cmd->argc-2; i++) + { + AP_bloom_add(obj->bloom, now, cmd->argv[i+2], sdslen(cmd->argv[i+2])); + } + *reply=swarmkv_reply_new_status("OK"); + return FINISHED; +} +enum cmd_exec_result bfmexists_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) +{ +/*BFMEXISTS key item [item ...]*/ + struct sobj *obj=NULL; + const sds key=cmd->argv[1]; + obj=store_lookup(mod_store, key); + if(!obj) + { + return NEED_KEY_ROUTE; + } + struct timeval now; + gettimeofday(&now, NULL); + if(obj->type==OBJ_TYPE_UNDEFINED) + { + return handle_undefined_object(obj, reply); + } + else if(obj->type!=OBJ_TYPE_BLOOM_FILTER) + { + *reply=swarmkv_reply_new_error(error_wrong_type); + return FINISHED; + } + + long long exists=0; + *reply=swarmkv_reply_new_array(cmd->argc-2); + for(int i=0; i<cmd->argc-2; i++) + { + exists = AP_bloom_check(obj->bloom, now, cmd->argv[i+2], sdslen(cmd->argv[i+2])); + (*reply)->elements[i]=swarmkv_reply_new_integer(exists); + } + return FINISHED; +} +enum cmd_exec_result bfexists_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) +{ + enum cmd_exec_result ret; + struct swarmkv_reply *tmp_reply=NULL; + ret=bfmexists_command(mod_store, cmd, &tmp_reply); + if(ret==FINISHED) + { + if(tmp_reply->type==SWARMKV_REPLY_ARRAY) + { + *reply=swarmkv_reply_dup(tmp_reply->elements[0]); + swarmkv_reply_free(tmp_reply); + } + else + { + *reply=tmp_reply; + } + } + return ret; +} +enum cmd_exec_result bfcard_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) +{ +/*BFCARD key*/ + struct sobj *obj=NULL; + const sds key=cmd->argv[1]; + obj=store_lookup(mod_store, key); + if(!obj) + { + return NEED_KEY_ROUTE; + } + struct timeval now; + gettimeofday(&now, NULL); + if(obj->type==OBJ_TYPE_UNDEFINED) + { + return handle_undefined_object(obj, reply); + } + else if(obj->type!=OBJ_TYPE_BLOOM_FILTER) + { + *reply=swarmkv_reply_new_error(error_wrong_type); + return FINISHED; + } + + long long cardinality=0; + cardinality=AP_bloom_cardinality(obj->bloom); + *reply=swarmkv_reply_new_integer(cardinality); + return FINISHED; +} +enum cmd_exec_result bfinfo_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) +{ +/*BFINFO key*/ + struct sobj *obj=NULL; + const sds key=cmd->argv[1]; + obj=store_lookup(mod_store, key); + if(!obj) + { + return NEED_KEY_ROUTE; + } + struct timeval now; + gettimeofday(&now, NULL); + if(obj->type==OBJ_TYPE_UNDEFINED) + { + return handle_undefined_object(obj, reply); + } + else if(obj->type!=OBJ_TYPE_BLOOM_FILTER) + { + *reply=swarmkv_reply_new_error(error_wrong_type); + return FINISHED; + } + struct AP_bloom_info info; + AP_bloom_info(obj->bloom, &info); + int i=0; + *reply=swarmkv_reply_new_array(20); + (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("ErrorRate"); + (*reply)->elements[i++]=swarmkv_reply_new_double(info.error); + (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Capacity"); + (*reply)->elements[i++]=swarmkv_reply_new_integer(info.capacity); + (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("TimeWindowMs"); + (*reply)->elements[i++]=swarmkv_reply_new_integer(info.time_window_ms); + (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("TimeSlices"); + (*reply)->elements[i++]=swarmkv_reply_new_integer(info.time_slice_num); + (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("HashNum"); + (*reply)->elements[i++]=swarmkv_reply_new_integer(info.hash_num); + (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("TotalSlices"); + (*reply)->elements[i++]=swarmkv_reply_new_integer(info.total_slice_number); + (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("MaxExpansionTimes"); + (*reply)->elements[i++]=swarmkv_reply_new_integer(info.max_expand_times); + (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("ApproximateItemNum"); + (*reply)->elements[i++]=swarmkv_reply_new_integer(info.approximate_item_num); + (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("FillRatio"); + (*reply)->elements[i++]=swarmkv_reply_new_double(info.fill_ratio); + (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("OldestItemTime"); + (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("%lld.%03lld", info.oldest_item_time.tv_sec, info.oldest_item_time.tv_usec/1000); + assert(i==20); + return FINISHED; +}
\ No newline at end of file diff --git a/src/t_bloom_filter.h b/src/t_bloom_filter.h new file mode 100644 index 0000000..0231b0c --- /dev/null +++ b/src/t_bloom_filter.h @@ -0,0 +1,9 @@ +#pragma once +#include "swarmkv_common.h" + +enum cmd_exec_result bfreserve_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); +enum cmd_exec_result bfadd_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); +enum cmd_exec_result bfexists_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); +enum cmd_exec_result bfmexists_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); +enum cmd_exec_result bfcard_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); +enum cmd_exec_result bfinfo_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); diff --git a/src/t_token_bucket.c b/src/t_token_bucket.c index ab77653..eaedbf2 100644 --- a/src/t_token_bucket.c +++ b/src/t_token_bucket.c @@ -31,13 +31,13 @@ static int get_consume_type(sds s, enum tb_consume_type *consume_type) } enum cmd_exec_result tcfg_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) { -/*TCFG key rate capacity*/ +/*TCFG key rate capacity [PD seconds]*/ struct sobj *obj=NULL; const sds key=cmd->argv[1]; char *endptr=NULL; - long long rate=0, capacity=0; + long long rate=0, capacity=0, period=1; rate=strtol(cmd->argv[2], &endptr, 10); if(*endptr!='\0' || rate<0) { @@ -51,7 +51,20 @@ enum cmd_exec_result tcfg_command(struct swarmkv_module *mod_store, const struct *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[3]); return FINISHED; } - + if(cmd->argc==6) + { + if(strcasecmp(cmd->argv[4], "PD")!=0) + { + *reply=swarmkv_reply_new_error(error_arg_string_should_be, cmd->argv[5], "PD"); + return FINISHED; + } + period=strtol(cmd->argv[5], &endptr, 10); + if(*endptr!='\0' || period<0) + { + *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[5]); + return FINISHED; + } + } obj=store_lookup(mod_store, key); if(!obj) { @@ -65,13 +78,13 @@ enum cmd_exec_result tcfg_command(struct swarmkv_module *mod_store, const struct uuid_t uuid; assert(obj->raw==NULL); store_get_uuid(mod_store, uuid); - obj->bucket=OC_token_bucket_new(uuid, now, rate, capacity); + obj->bucket=OC_token_bucket_new(uuid, now, rate, period, capacity); obj->type=OBJ_TYPE_TOKEN_BUCKET; *reply=swarmkv_reply_new_status("OK"); } else if(obj->type==OBJ_TYPE_TOKEN_BUCKET) { - OC_token_bucket_configure(obj->bucket, now, rate, capacity); + OC_token_bucket_configure(obj->bucket, now, rate, period, capacity); sobj_need_sync(mod_store, obj); *reply=swarmkv_reply_new_status("OK"); } @@ -110,18 +123,20 @@ enum cmd_exec_result tinfo_command(struct swarmkv_module *mod_store, const struc memset(&oc_info, 0, sizeof(oc_info)); OC_token_bucket_info(obj->bucket, now, &oc_info); int i=0; - *reply=swarmkv_reply_new_array(10); + *reply=swarmkv_reply_new_array(12); (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Rate"); - (*reply)->elements[i++]=swarmkv_reply_new_integer(oc_info.CIR); + (*reply)->elements[i++]=swarmkv_reply_new_integer(oc_info.rate); + (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Period"); + (*reply)->elements[i++]=swarmkv_reply_new_integer(oc_info.period); (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Capacity"); - (*reply)->elements[i++]=swarmkv_reply_new_integer(oc_info.CBS); + (*reply)->elements[i++]=swarmkv_reply_new_integer(oc_info.capacity); (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Consumed"); (*reply)->elements[i++]=swarmkv_reply_new_integer(oc_info.consumed); (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Refilled"); (*reply)->elements[i++]=swarmkv_reply_new_integer(oc_info.refilled); (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Available"); (*reply)->elements[i++]=swarmkv_reply_new_integer(oc_info.available); - assert(i==10); + assert(i==12); return FINISHED; } enum cmd_exec_result tconsume_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) @@ -160,7 +175,7 @@ enum cmd_exec_result tconsume_command(struct swarmkv_module *mod_store, const st if(obj->type!=OBJ_TYPE_TOKEN_BUCKET) { - *reply=swarmkv_reply_new_error(error_wrong_type); + *reply=swarmkv_reply_new_error(error_wrong_type); return FINISHED; } struct timeval now; @@ -179,13 +194,13 @@ bool is_power_of_2(long long num) } enum cmd_exec_result ftcfg_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) { -/*FTCFG key rate capacity divisor*/ +/*FTCFG key rate capacity divisor [PERIOD seconds]*/ struct sobj *obj=NULL; const sds key=cmd->argv[1]; char *endptr=NULL; - long long rate=0, capacity=0, divisor=0; + long long rate=0, capacity=0, divisor=0, period=1; rate=strtol(cmd->argv[2], &endptr, 10); if(*endptr!='\0' || rate<0) { @@ -204,6 +219,20 @@ enum cmd_exec_result ftcfg_command(struct swarmkv_module *mod_store, const struc *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[4]); return FINISHED; } + if(cmd->argc==7) + { + if(strcasecmp(cmd->argv[5], "PD")!=0) + { + *reply=swarmkv_reply_new_error(error_arg_string_should_be, cmd->argv[5], "PD"); + return FINISHED; + } + period=strtol(cmd->argv[6], &endptr, 10); + if(*endptr!='\0' || period<0) + { + *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[6]); + return FINISHED; + } + } obj=store_lookup(mod_store, key); if(!obj) { @@ -219,12 +248,12 @@ enum cmd_exec_result ftcfg_command(struct swarmkv_module *mod_store, const struc store_get_uuid(mod_store, uuid); obj->type=OBJ_TYPE_FAIR_TOKEN_BUCKET; - obj->ftb=fair_token_bucket_new(uuid, now, rate, capacity, divisor); + obj->ftb=fair_token_bucket_new(uuid, now, rate, period, capacity, divisor); *reply=swarmkv_reply_new_status("OK"); } else if(obj->type==OBJ_TYPE_FAIR_TOKEN_BUCKET) { - fair_token_bucket_configure(obj->ftb, now, rate, capacity, divisor); + fair_token_bucket_configure(obj->ftb, now, rate, period, capacity, divisor); sobj_need_sync(mod_store, obj); *reply=swarmkv_reply_new_status("OK"); } @@ -236,7 +265,7 @@ enum cmd_exec_result ftcfg_command(struct swarmkv_module *mod_store, const struc } enum cmd_exec_result ftconsume_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) { -/*FTCONSUME key member weight tokens*/ +/*FTCONSUME key member weight tokens*/ struct sobj *obj=NULL; const sds key=cmd->argv[1]; const sds member=cmd->argv[2]; @@ -306,11 +335,13 @@ enum cmd_exec_result ftinfo_command(struct swarmkv_module *mod_store, const stru fair_token_bucket_info(obj->ftb, now, &ftb_info); int i=0; - *reply=swarmkv_reply_new_array(14); - (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Rate"); - (*reply)->elements[i++]=swarmkv_reply_new_integer(ftb_info.bucket_info.CIR); + *reply=swarmkv_reply_new_array(16); + (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Refill"); + (*reply)->elements[i++]=swarmkv_reply_new_integer(ftb_info.bucket_info.rate); + (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Period"); + (*reply)->elements[i++]=swarmkv_reply_new_integer(ftb_info.bucket_info.period); (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Capacity"); - (*reply)->elements[i++]=swarmkv_reply_new_integer(ftb_info.bucket_info.CBS); + (*reply)->elements[i++]=swarmkv_reply_new_integer(ftb_info.bucket_info.capacity); (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Consumed"); (*reply)->elements[i++]=swarmkv_reply_new_integer(ftb_info.bucket_info.consumed); (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Refilled"); @@ -321,18 +352,18 @@ enum cmd_exec_result ftinfo_command(struct swarmkv_module *mod_store, const stru (*reply)->elements[i++]=swarmkv_reply_new_integer(ftb_info.divisor); (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("ActiveMembers"); (*reply)->elements[i++]=swarmkv_reply_new_integer(ftb_info.active_key_number); - assert(i==14); + assert(i==16); return FINISHED; } enum cmd_exec_result btcfg_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) { -/*BTCFG key rate capacity buckets*/ +/*BTCFG key rate capacity buckets [PD seconds]*/ struct sobj *obj=NULL; const sds key=cmd->argv[1]; char *endptr=NULL; - long long rate=0, capacity=0, buckets=0; + long long rate=0, capacity=0, buckets=0, period=1; rate=strtol(cmd->argv[2], &endptr, 10); if(*endptr!='\0' || rate<0) { @@ -351,6 +382,20 @@ enum cmd_exec_result btcfg_command(struct swarmkv_module *mod_store, const struc *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[4]); return FINISHED; } + if(cmd->argc==7) + { + if(strcasecmp(cmd->argv[5], "PD")!=0) + { + *reply=swarmkv_reply_new_error(error_arg_string_should_be, cmd->argv[5], "PD"); + return FINISHED; + } + period=strtol(cmd->argv[6], &endptr, 10); + if(*endptr!='\0' || period<0) + { + *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[6]); + return FINISHED; + } + } obj=store_lookup(mod_store, key); if(!obj) { @@ -366,12 +411,12 @@ enum cmd_exec_result btcfg_command(struct swarmkv_module *mod_store, const struc store_get_uuid(mod_store, uuid); obj->type=OBJ_TYPE_BULK_TOKEN_BUCKET; - obj->btb=bulk_token_bucket_new(uuid, now, rate, capacity, buckets); + obj->btb=bulk_token_bucket_new(uuid, now, rate, period, capacity, buckets); *reply=swarmkv_reply_new_status("OK"); } else if(obj->type==OBJ_TYPE_BULK_TOKEN_BUCKET) { - bulk_token_bucket_configure(obj->btb, now, rate, capacity, buckets); + bulk_token_bucket_configure(obj->btb, now, rate, period, capacity, buckets); sobj_need_sync(mod_store, obj); *reply=swarmkv_reply_new_status("OK"); } @@ -461,19 +506,21 @@ enum cmd_exec_result btinfo_command(struct swarmkv_module *mod_store, const stru available=bulk_token_bucket_read_available(obj->btb, now, cmd->argv[2], sdslen(cmd->argv[2])); } int i=0; - *reply=swarmkv_reply_new_array(12); - (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Rate"); - (*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.CIR); + *reply=swarmkv_reply_new_array(14); + (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Refill"); + (*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.rate); + (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Period"); + (*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.period); (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Capacity"); - (*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.CBS); + (*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.capacity); (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Buckets"); (*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.bucket_number); (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("ActiveMembers"); - (*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.estimate_keys); + (*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.approximate_keys); (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Collisions"); (*reply)->elements[i++]=swarmkv_reply_new_double(btb_info.collision_rate); (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Query"); (*reply)->elements[i++]=swarmkv_reply_new_integer(available); - assert(i==12); + assert(i==14); return FINISHED; }
\ No newline at end of file |
