summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
author郑超 <[email protected]>2024-02-29 06:43:54 +0000
committer郑超 <[email protected]>2024-02-29 06:43:54 +0000
commit6b49d228c93be4335aa95cd70a16c3517496554d (patch)
tree127bbddc3b74bf10e0dd1dfa00f588eba0e0f4af /src
parenta75b7b6fec27f5067e19b08810516f4db2c96ac8 (diff)
Bloom Filter and Token bucket refill period customization.v4.1.0
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt6
-rw-r--r--src/swarmkv.c43
-rw-r--r--src/swarmkv_api.c42
-rw-r--r--src/swarmkv_common.c48
-rw-r--r--src/swarmkv_common.h4
-rw-r--r--src/swarmkv_error.h3
-rw-r--r--src/swarmkv_keyspace.c5
-rw-r--r--src/swarmkv_store.c9
-rw-r--r--src/swarmkv_store.h3
-rw-r--r--src/t_bloom_filter.c227
-rw-r--r--src/t_bloom_filter.h9
-rw-r--r--src/t_token_bucket.c107
12 files changed, 450 insertions, 56 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 89b9f87..dbf5977 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -1,6 +1,6 @@
set(SWARMKV_MAJOR_VERSION 4)
-set(SWARMKV_MINOR_VERSION 0)
-set(SWARMKV_PATCH_VERSION 5)
+set(SWARMKV_MINOR_VERSION 1)
+set(SWARMKV_PATCH_VERSION 0)
set(SWARMKV_VERSION ${SWARMKV_MAJOR_VERSION}.${SWARMKV_MINOR_VERSION}.${SWARMKV_PATCH_VERSION})
message(STATUS "SwarmKV, Version: ${SWARMKV_VERSION}")
@@ -10,7 +10,7 @@ add_definitions(-fPIC)
set(SWARMKV_SRC swarmkv.c swarmkv_api.c
swarmkv_mesh.c swarmkv_rpc.c swarmkv_message.c swarmkv_net.c
swarmkv_store.c swarmkv_sync.c swarmkv_keyspace.c swarmkv_monitor.c
- t_string.c t_set.c t_token_bucket.c t_hash.c
+ t_string.c t_set.c t_token_bucket.c t_hash.c t_bloom_filter.c
swarmkv_common.c swarmkv_utils.c future_promise.c http_client.c)
set(LIB_SOURCE_FILES
diff --git a/src/swarmkv.c b/src/swarmkv.c
index 20f5d3d..0d77238 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -21,9 +21,9 @@
//header of data types
#include "t_string.h"
#include "t_set.h"
-#include "t_token_bucket.h"
#include "t_hash.h"
-
+#include "t_token_bucket.h"
+#include "t_bloom_filter.h"
#include "uthash.h"
@@ -1053,8 +1053,8 @@ void command_spec_init(struct swarmkv *db)
command_register(&(db->command_table), "DECR", "key",
1, 1, CMD_KEY_OW, REPLY_ERROR, AUTO_ROUTE,
decr_command, db->mod_store);
+
/* Generic commands*/
-
command_register(&(db->command_table), "DEL", "key",
1, 1, CMD_KEY_RM, REPLY_INT_0, AUTO_ROUTE,
del_command, db->mod_keyspace);
@@ -1118,7 +1118,7 @@ void command_spec_init(struct swarmkv *db)
hincrby_command, db->mod_store);
/* Token bucket commands */
- command_register(&(db->command_table), "TCFG", "key rate capacity",
+ command_register(&(db->command_table), "TCFG", "key rate capacity [PD seconds]",
3, 1, CMD_KEY_OW, REPLY_ERROR, AUTO_ROUTE,
tcfg_command, db->mod_store);
command_register(&(db->command_table), "TCONSUME", "key tokens [NORMAL|FORCE|FLEXIBLE]",
@@ -1127,7 +1127,7 @@ void command_spec_init(struct swarmkv *db)
command_register(&(db->command_table), "TINFO", "key",
1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
tinfo_command, db->mod_store);
- command_register(&(db->command_table), "FTCFG", "key rate capacity divisor",
+ command_register(&(db->command_table), "FTCFG", "key rate capacity divisor [PD seconds]",
4, 1, CMD_KEY_OW, REPLY_ERROR, AUTO_ROUTE,
ftcfg_command, db->mod_store);
command_register(&(db->command_table), "FTCONSUME", "key member weight tokens",
@@ -1136,7 +1136,7 @@ void command_spec_init(struct swarmkv *db)
command_register(&(db->command_table), "FTINFO", "key",
1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
ftinfo_command, db->mod_store);
- command_register(&(db->command_table), "BTCFG", "key rate capacity number-of-buckets",
+ command_register(&(db->command_table), "BTCFG", "key rate capacity number-of-buckets [PD seconds]",
4, 1, CMD_KEY_OW, REPLY_ERROR, AUTO_ROUTE,
btcfg_command, db->mod_store);
command_register(&(db->command_table), "BTCONSUME", "key member tokens [NORMAL|FORCE|FLEXIBLE]",
@@ -1146,6 +1146,26 @@ void command_spec_init(struct swarmkv *db)
1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
btinfo_command, db->mod_store);
+ /*Bloom filter commands*/
+ command_register(&(db->command_table), "BFRESERVE", "key error_rate capacity [TIME window-milliseconds slice-number]",
+ 3, 1, CMD_KEY_OW, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ bfreserve_command, db->mod_store);
+ command_register(&(db->command_table), "BFADD", "key item [item ...]",
+ 2, 1, CMD_KEY_RW, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ bfadd_command, db->mod_store);
+ command_register(&(db->command_table), "BFEXISTS", "key item",
+ 2, 1, CMD_KEY_RO, REPLY_INT_0, AUTO_ROUTE,
+ bfexists_command, db->mod_store);
+ command_register(&(db->command_table), "BFMEXISTS", "key item [item ...]",
+ 2, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ bfmexists_command, db->mod_store);
+ command_register(&(db->command_table), "BFCARD", "key",
+ 1, 1, CMD_KEY_RO, REPLY_INT_0, AUTO_ROUTE,
+ bfcard_command, db->mod_store);
+ command_register(&(db->command_table), "BFINFO", "key",
+ 1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ bfinfo_command, db->mod_store);
+
/* Debug Commands */
command_register(&(db->command_table), "INFO", "[section]",
0, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE,
@@ -1159,7 +1179,6 @@ void command_spec_init(struct swarmkv *db)
command_register(&(db->command_table), "COMMAND LIST", "",
0, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE,
command_list_command, &db->module);
-
command_register(&(db->command_table), "LATENCY", "<subcommand>",
1, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE,
latency_command, db->mod_monitor);
@@ -1197,17 +1216,17 @@ void command_spec_init(struct swarmkv *db)
command_register(&(db->command_table), "KEYSPACE RLIST", "key",
1, 2, CMD_KEY_RO, REPLY_NA, AUTO_ROUTE,
keyspace_rlist_command, db->mod_keyspace);
- command_register(&(db->command_table), "KEYSPACE RADD", "key IP:port",
+ command_register(&(db->command_table), "KEYSPACE RADD", "key [IP:port]",
1, 2, CMD_KEY_OW, REPLY_NA, AUTO_ROUTE,
keyspace_radd_command, db->mod_keyspace);
command_register(&(db->command_table), "KEYSPACE XRADD", "key IP:port",
- 1, 2, CMD_KEY_OW, REPLY_NA, AUTO_ROUTE,
+ 2, 2, CMD_KEY_OW, REPLY_NA, AUTO_ROUTE,
keyspace_xradd_command, db->mod_keyspace);
command_register(&(db->command_table), "KEYSPACE KEYS", "tid pattern",//worker-thread-id
- 1, KEY_OFFSET_TID, CMD_KEY_RO, REPLY_NA, NOT_AUTO_ROUTE,
+ 2, KEY_OFFSET_TID, CMD_KEY_RO, REPLY_NA, NOT_AUTO_ROUTE,
keyspace_keys_command, db->mod_keyspace);
command_register(&(db->command_table), "KEYSPACE RDEL", "key IP:port",
- 1, 2, CMD_KEY_RW, REPLY_NA, AUTO_ROUTE,
+ 2, 2, CMD_KEY_RW, REPLY_NA, AUTO_ROUTE,
keyspace_rdel_command, db->mod_keyspace);
command_register(&(db->command_table), "KEYSPACE EXISTS", "key",
1, 2, CMD_KEY_RO, REPLY_NA, AUTO_ROUTE,
@@ -1215,7 +1234,7 @@ void command_spec_init(struct swarmkv *db)
/* low-level keyspace reorgnization commands */
command_register(&(db->command_table), "KEYSPACE SETSLOT", "slot IMPORTING|MIGRATING|NODE|STABLE IP:port",
- 2, KEY_OFFSET_SLOTID, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE,
+ 3, KEY_OFFSET_SLOTID, CMD_KEY_NA, REPLY_NA, NOT_AUTO_ROUTE,
keyspace_setslot_command, db->mod_keyspace);
command_register(&(db->command_table), "KEYSPACE GETKEYSINSLOT", "slot",
1, KEY_OFFSET_SLOTID, CMD_KEY_RO, REPLY_NA, NOT_AUTO_ROUTE,
diff --git a/src/swarmkv_api.c b/src/swarmkv_api.c
index 97e0552..f408274 100644
--- a/src/swarmkv_api.c
+++ b/src/swarmkv_api.c
@@ -218,34 +218,60 @@ void swarmkv_persist(struct swarmkv *db, const char *key, size_t keylen, swarmkv
swarmkv_cmd_free(cmd);
return;
}
-void swarmkv_sadd(struct swarmkv *db, const char* key, size_t keylen, const char *member[], const size_t member_len[], size_t n_member, swarmkv_on_reply_callback_t *cb, void *cb_arg)
+void swarmkv_sadd(struct swarmkv *db, const char* key, size_t keylen, const char *members[], const size_t members_len[], size_t n_members, swarmkv_on_reply_callback_t *cb, void *cb_arg)
{
struct swarmkv_cmd *cmd=NULL;
- cmd=swarmkv_cmd_new(2+n_member);
+ cmd=swarmkv_cmd_new(2+n_members);
cmd->argv[0]=sdsnew("sadd");
cmd->argv[1]=sdsnewlen(key, keylen);
- for(size_t i=0; i<n_member; i++)
+ for(size_t i=0; i<n_members; i++)
{
- cmd->argv[2+i]=sdsnewlen(member[i], member_len[i]);
+ cmd->argv[2+i]=sdsnewlen(members[i], members_len[i]);
}
exec_for_local(db, cmd, NULL, cb, cb_arg);
swarmkv_cmd_free(cmd);
}
-void swarmkv_srem(struct swarmkv *db, const char* key, size_t keylen, const char *member[], const size_t member_len[], size_t n_member, swarmkv_on_reply_callback_t *cb, void *cb_arg)
+void swarmkv_srem(struct swarmkv *db, const char* key, size_t keylen, const char *members[], const size_t members_len[], size_t n_members, swarmkv_on_reply_callback_t *cb, void *cb_arg)
{
struct swarmkv_cmd *cmd=NULL;
- cmd=swarmkv_cmd_new(2+n_member);
+ cmd=swarmkv_cmd_new(2+n_members);
cmd->argv[0]=sdsnew("srem");
cmd->argv[1]=sdsnewlen(key, keylen);
- for(size_t i=0; i<n_member; i++)
+ for(size_t i=0; i<n_members; i++)
{
- cmd->argv[2+i]=sdsnewlen(member[i], member_len[i]);
+ cmd->argv[2+i]=sdsnewlen(members[i], members_len[i]);
}
exec_for_local(db, cmd, NULL, cb, cb_arg);
swarmkv_cmd_free(cmd);
}
+void swarmkv_bfadd(struct swarmkv * db, const char * key, size_t keylen, const char *items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t *cb, void *cb_arg)
+{
+ struct swarmkv_cmd *cmd=NULL;
+ cmd=swarmkv_cmd_new(2+n_items);
+ cmd->argv[0]=sdsnew("bfadd");
+ cmd->argv[1]=sdsnewlen(key, keylen);
+ for(size_t i=0; i<n_items; i++)
+ {
+ cmd->argv[2+i]=sdsnewlen(items[i], items_len[i]);
+ }
+ exec_for_local(db, cmd, NULL, cb, cb_arg);
+ swarmkv_cmd_free(cmd);
+}
+void swarmkv_bfexists(struct swarmkv * db, const char * key, size_t keylen, const char *items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t *cb, void *cb_arg)
+{
+ struct swarmkv_cmd *cmd=NULL;
+ cmd=swarmkv_cmd_new(2+n_items);
+ cmd->argv[0]=sdsnew("bfexists");
+ cmd->argv[1]=sdsnewlen(key, keylen);
+ for(size_t i=0; i<n_items; i++)
+ {
+ cmd->argv[2+i]=sdsnewlen(items[i], items_len[i]);
+ }
+ exec_for_local(db, cmd, NULL, cb, cb_arg);
+ swarmkv_cmd_free(cmd);
+}
void swarmkv_sismember(struct swarmkv *db, const char* key, size_t keylen, const char *member, size_t member_len, swarmkv_on_reply_callback_t *cb, void *cb_arg)
{
struct swarmkv_cmd *cmd=NULL;
diff --git a/src/swarmkv_common.c b/src/swarmkv_common.c
index 649a5dd..faa1a04 100644
--- a/src/swarmkv_common.c
+++ b/src/swarmkv_common.c
@@ -454,6 +454,9 @@ struct swarmkv_reply *swarmkv_reply_dup(const struct swarmkv_reply *origin)
case SWARMKV_REPLY_INTEGER:
copy->integer=origin->integer;
break;
+ case SWARMKV_REPLY_DOUBLE:
+ copy->dval=origin->dval;
+ break;
case SWARMKV_REPLY_STRING:
case SWARMKV_REPLY_STATUS:
case SWARMKV_REPLY_ERROR:
@@ -799,3 +802,48 @@ char *str_replace(char *orig, char *rep, char *with)
strcpy(tmp, orig);
return result;
}
+int swarmkv_cmd_parse_integer(const struct swarmkv_cmd *cmd, const char *name, long long *integer)
+{
+ if(cmd->argc<2)
+ {
+ return -1;
+ }
+ for(int i=0; i<cmd->argc; i++)
+ {
+ if(0==strcasecmp(cmd->argv[i], name))
+ {
+ if(i+1<cmd->argc)
+ {
+ return str2integer(cmd->argv[i+1], integer);
+ }
+ else
+ {
+ return -1;
+ }
+ }
+ }
+ return -1;
+}
+int swarmkv_cmd_parse_double(const struct swarmkv_cmd *cmd, const char *name, double *dval)
+{
+ if(cmd->argc<2)
+ {
+ return -1;
+ }
+ for(int i=0; i<cmd->argc; i++)
+ {
+ if(0==strcasecmp(cmd->argv[i], name))
+ {
+ if(i+1<cmd->argc)
+ {
+ *dval=strtod(cmd->argv[i+1], NULL);
+ return 0;
+ }
+ else
+ {
+ return -1;
+ }
+ }
+ }
+ return -1;
+} \ No newline at end of file
diff --git a/src/swarmkv_common.h b/src/swarmkv_common.h
index 1e3baae..f16edf5 100644
--- a/src/swarmkv_common.h
+++ b/src/swarmkv_common.h
@@ -14,7 +14,7 @@
#include <event2/buffer.h>
#include <event2/thread.h>
#include <event2/http.h>
-#define SWARMKV_VERSION "4.0.5"
+#define SWARMKV_VERSION "4.1.0"
enum cmd_exec_result
{
NEED_KEY_ROUTE,
@@ -93,6 +93,8 @@ struct swarmkv_cmd
struct swarmkv_cmd *swarmkv_cmd_new(size_t argc);
void swarmkv_cmd_free(struct swarmkv_cmd *p);
struct swarmkv_cmd *swarmkv_cmd_dup(const struct swarmkv_cmd *origin);
+int swarmkv_cmd_parse_integer(const struct swarmkv_cmd *cmd, const char *name, long long *ival);
+int swarmkv_cmd_parse_double(const struct swarmkv_cmd *cmd, const char *name, double *dval);
struct swarmkv_reply *swarmkv_reply_new_string(const char *str, size_t sz);
struct swarmkv_reply *swarmkv_reply_new_string_fmt(const char *format, ...);
diff --git a/src/swarmkv_error.h b/src/swarmkv_error.h
index 55eea7c..4da62e0 100644
--- a/src/swarmkv_error.h
+++ b/src/swarmkv_error.h
@@ -7,10 +7,11 @@
#define error_value_not_integer "ERR value is not an integer or out of range"
#define error_arg_not_valid_integer "ERR arg `%s` is not an integer or out of range"
#define error_arg_not_valid_address "ERR arg `%s` is not `IP:port` format"
+#define error_arg_not_valid_float "ERR arg `%s` is not a valid float or out of range"
+#define error_arg_parse_failed "ERR arg `%s` parse failed"
#define error_arg_string_should_be "ERR arg `%s` should be `%s`"
#define error_need_additional_arg "ERR arg `%s` should be fllowed by more args"
#define erorr_subcommand_syntax "ERR unknown subcommand or wrong number of arguments for '%.128s'. Try %s HELP."
-#define error_tunnel_busy "ERR node is busy tunneling for %s"
#define error_too_many_redirects "ERR too many redirects, the lastest is `%s`"
#define error_cluster_leader_not_found "ERR cluster leader not found"
diff --git a/src/swarmkv_keyspace.c b/src/swarmkv_keyspace.c
index b86d5c9..d7abec7 100644
--- a/src/swarmkv_keyspace.c
+++ b/src/swarmkv_keyspace.c
@@ -1658,7 +1658,10 @@ enum cmd_exec_result keyspace_keys_command(struct swarmkv_module *mod_keyspace,
{
continue;
}
- if(!slot_is_my_thread(i, thread_id, ks->opts->nr_worker_threads)) continue;
+ if(!slot_is_my_thread(i, thread_id, ks->opts->nr_worker_threads))
+ {
+ continue;
+ }
HASH_ITER(hh, slot_rt->keyroute_table, key_entry, tmp)
{
is_matched=stringmatchlen(pattern, sdslen(pattern), key_entry->key, sdslen(key_entry->key), 0);
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c
index 48761be..0bc7514 100644
--- a/src/swarmkv_store.c
+++ b/src/swarmkv_store.c
@@ -102,6 +102,15 @@ struct swarmkv_obj_specs sobj_specs[__SWARMKV_OBJ_TYPE_MAX] =
.obj_size=(size_t (*)(const void *))bulk_token_bucket_mem_size
},
{
+ .type=OBJ_TYPE_BLOOM_FILTER,
+ .type_name="bloom-filter",
+ .obj_free=(void (*)(void *))AP_bloom_free,
+ .obj_serialize=(void (*)(const void *, char **, size_t *))AP_bloom_serialize,
+ .obj_merge_blob=(void (*)(void *, const char *, size_t))AP_bloom_merge_blob,
+ .obj_replicate=(void * (*)(uuid_t, const char *, size_t))AP_bloom_replicate,
+ .obj_size=(size_t (*)(const void *))AP_bloom_mem_size
+ },
+ {
.type=OBJ_TYPE_UNDEFINED,
.type_name="undefined",
.obj_free=undefined_obj_free,
diff --git a/src/swarmkv_store.h b/src/swarmkv_store.h
index 29962e1..d1fc926 100644
--- a/src/swarmkv_store.h
+++ b/src/swarmkv_store.h
@@ -13,6 +13,7 @@
#include "oc_token_bucket.h"
#include "fair_token_bucket.h"
#include "bulk_token_bucket.h"
+#include "ap_bloom.h"
enum sobj_type
{
@@ -23,6 +24,7 @@ enum sobj_type
OBJ_TYPE_TOKEN_BUCKET,
OBJ_TYPE_FAIR_TOKEN_BUCKET,
OBJ_TYPE_BULK_TOKEN_BUCKET,
+ OBJ_TYPE_BLOOM_FILTER,
OBJ_TYPE_UNDEFINED,
__SWARMKV_OBJ_TYPE_MAX
};
@@ -40,6 +42,7 @@ struct sobj
struct OC_token_bucket *bucket;
struct fair_token_bucket *ftb;
struct bulk_token_bucket *btb;
+ struct AP_bloom *bloom;
void *raw;
};
};
diff --git a/src/t_bloom_filter.c b/src/t_bloom_filter.c
new file mode 100644
index 0000000..fbaad2e
--- /dev/null
+++ b/src/t_bloom_filter.c
@@ -0,0 +1,227 @@
+#include "swarmkv_common.h"
+#include "swarmkv_utils.h"
+#include "swarmkv_store.h"
+#include "swarmkv_error.h"
+#include "ap_bloom.h"
+
+#include <stdlib.h>
+#include <assert.h>
+
+enum cmd_exec_result bfreserve_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+/* BFRESERVE key error_rate capacity [TIME window-milliseconds slice-number] */
+ struct sobj *obj=NULL;
+ const sds key=cmd->argv[1];
+
+ double error_rate=0;
+ long long capacity=0, time_window_ms=0, time_slice_num=0;
+
+ int ret=0;
+ error_rate=strtod(cmd->argv[2], NULL);
+ if(error_rate < 0 || error_rate >= 1.0)
+ {
+ *reply=swarmkv_reply_new_error(error_arg_not_valid_float, cmd->argv[2]);
+ return FINISHED;
+ }
+ ret=str2integer(cmd->argv[3], &capacity);
+ if(ret<0)
+ {
+ *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[3]);
+ }
+ if(cmd->argc==7)
+ {
+ if(strncasecmp(cmd->argv[4], "TIME", 4)!=0)
+ {
+ *reply=swarmkv_reply_new_error(error_arg_string_should_be, cmd->argv[4], "TIME");
+ return FINISHED;
+ }
+ ret=str2integer(cmd->argv[5], &time_window_ms);
+ if(ret<0)
+ {
+ *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[5]);
+ return FINISHED;
+ }
+ ret=str2integer(cmd->argv[6], &time_slice_num);
+ if(ret<0 || time_slice_num<0)
+ {
+ *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[6]);
+ return FINISHED;
+ }
+ }
+ obj=store_lookup(mod_store, key);
+ if(!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ struct timeval now;
+ gettimeofday(&now, NULL);
+
+ if(obj->type==OBJ_TYPE_UNDEFINED)
+ {
+ assert(obj->raw==NULL);
+ obj->bloom=AP_bloom_new(now, error_rate, capacity, time_window_ms, time_slice_num);
+ obj->type=OBJ_TYPE_BLOOM_FILTER;
+ *reply=swarmkv_reply_new_status("OK");
+ }
+ else
+ {
+ *reply=swarmkv_reply_new_array(0);
+ }
+ return FINISHED;
+}
+enum cmd_exec_result bfadd_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+/*BFADD key item [item ...]*/
+ struct sobj *obj=NULL;
+ const sds key=cmd->argv[1];
+
+ obj=store_lookup(mod_store, key);
+ if(!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ struct timeval now;
+ gettimeofday(&now, NULL);
+
+ if(obj->type==OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
+ else if(obj->type!=OBJ_TYPE_BLOOM_FILTER)
+ {
+ *reply=swarmkv_reply_new_error(error_wrong_type);
+ return FINISHED;
+ }
+
+ for(int i=0; i<cmd->argc-2; i++)
+ {
+ AP_bloom_add(obj->bloom, now, cmd->argv[i+2], sdslen(cmd->argv[i+2]));
+ }
+ *reply=swarmkv_reply_new_status("OK");
+ return FINISHED;
+}
+enum cmd_exec_result bfmexists_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+/*BFMEXISTS key item [item ...]*/
+ struct sobj *obj=NULL;
+ const sds key=cmd->argv[1];
+ obj=store_lookup(mod_store, key);
+ if(!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ if(obj->type==OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
+ else if(obj->type!=OBJ_TYPE_BLOOM_FILTER)
+ {
+ *reply=swarmkv_reply_new_error(error_wrong_type);
+ return FINISHED;
+ }
+
+ long long exists=0;
+ *reply=swarmkv_reply_new_array(cmd->argc-2);
+ for(int i=0; i<cmd->argc-2; i++)
+ {
+ exists = AP_bloom_check(obj->bloom, now, cmd->argv[i+2], sdslen(cmd->argv[i+2]));
+ (*reply)->elements[i]=swarmkv_reply_new_integer(exists);
+ }
+ return FINISHED;
+}
+enum cmd_exec_result bfexists_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+ enum cmd_exec_result ret;
+ struct swarmkv_reply *tmp_reply=NULL;
+ ret=bfmexists_command(mod_store, cmd, &tmp_reply);
+ if(ret==FINISHED)
+ {
+ if(tmp_reply->type==SWARMKV_REPLY_ARRAY)
+ {
+ *reply=swarmkv_reply_dup(tmp_reply->elements[0]);
+ swarmkv_reply_free(tmp_reply);
+ }
+ else
+ {
+ *reply=tmp_reply;
+ }
+ }
+ return ret;
+}
+enum cmd_exec_result bfcard_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+/*BFCARD key*/
+ struct sobj *obj=NULL;
+ const sds key=cmd->argv[1];
+ obj=store_lookup(mod_store, key);
+ if(!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ if(obj->type==OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
+ else if(obj->type!=OBJ_TYPE_BLOOM_FILTER)
+ {
+ *reply=swarmkv_reply_new_error(error_wrong_type);
+ return FINISHED;
+ }
+
+ long long cardinality=0;
+ cardinality=AP_bloom_cardinality(obj->bloom);
+ *reply=swarmkv_reply_new_integer(cardinality);
+ return FINISHED;
+}
+enum cmd_exec_result bfinfo_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+/*BFINFO key*/
+ struct sobj *obj=NULL;
+ const sds key=cmd->argv[1];
+ obj=store_lookup(mod_store, key);
+ if(!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ if(obj->type==OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
+ else if(obj->type!=OBJ_TYPE_BLOOM_FILTER)
+ {
+ *reply=swarmkv_reply_new_error(error_wrong_type);
+ return FINISHED;
+ }
+ struct AP_bloom_info info;
+ AP_bloom_info(obj->bloom, &info);
+ int i=0;
+ *reply=swarmkv_reply_new_array(20);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("ErrorRate");
+ (*reply)->elements[i++]=swarmkv_reply_new_double(info.error);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Capacity");
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(info.capacity);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("TimeWindowMs");
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(info.time_window_ms);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("TimeSlices");
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(info.time_slice_num);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("HashNum");
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(info.hash_num);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("TotalSlices");
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(info.total_slice_number);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("MaxExpansionTimes");
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(info.max_expand_times);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("ApproximateItemNum");
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(info.approximate_item_num);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("FillRatio");
+ (*reply)->elements[i++]=swarmkv_reply_new_double(info.fill_ratio);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("OldestItemTime");
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("%lld.%03lld", info.oldest_item_time.tv_sec, info.oldest_item_time.tv_usec/1000);
+ assert(i==20);
+ return FINISHED;
+} \ No newline at end of file
diff --git a/src/t_bloom_filter.h b/src/t_bloom_filter.h
new file mode 100644
index 0000000..0231b0c
--- /dev/null
+++ b/src/t_bloom_filter.h
@@ -0,0 +1,9 @@
+#pragma once
+#include "swarmkv_common.h"
+
+enum cmd_exec_result bfreserve_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result bfadd_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result bfexists_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result bfmexists_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result bfcard_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result bfinfo_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
diff --git a/src/t_token_bucket.c b/src/t_token_bucket.c
index ab77653..eaedbf2 100644
--- a/src/t_token_bucket.c
+++ b/src/t_token_bucket.c
@@ -31,13 +31,13 @@ static int get_consume_type(sds s, enum tb_consume_type *consume_type)
}
enum cmd_exec_result tcfg_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
{
-/*TCFG key rate capacity*/
+/*TCFG key rate capacity [PD seconds]*/
struct sobj *obj=NULL;
const sds key=cmd->argv[1];
char *endptr=NULL;
- long long rate=0, capacity=0;
+ long long rate=0, capacity=0, period=1;
rate=strtol(cmd->argv[2], &endptr, 10);
if(*endptr!='\0' || rate<0)
{
@@ -51,7 +51,20 @@ enum cmd_exec_result tcfg_command(struct swarmkv_module *mod_store, const struct
*reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[3]);
return FINISHED;
}
-
+ if(cmd->argc==6)
+ {
+ if(strcasecmp(cmd->argv[4], "PD")!=0)
+ {
+ *reply=swarmkv_reply_new_error(error_arg_string_should_be, cmd->argv[5], "PD");
+ return FINISHED;
+ }
+ period=strtol(cmd->argv[5], &endptr, 10);
+ if(*endptr!='\0' || period<0)
+ {
+ *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[5]);
+ return FINISHED;
+ }
+ }
obj=store_lookup(mod_store, key);
if(!obj)
{
@@ -65,13 +78,13 @@ enum cmd_exec_result tcfg_command(struct swarmkv_module *mod_store, const struct
uuid_t uuid;
assert(obj->raw==NULL);
store_get_uuid(mod_store, uuid);
- obj->bucket=OC_token_bucket_new(uuid, now, rate, capacity);
+ obj->bucket=OC_token_bucket_new(uuid, now, rate, period, capacity);
obj->type=OBJ_TYPE_TOKEN_BUCKET;
*reply=swarmkv_reply_new_status("OK");
}
else if(obj->type==OBJ_TYPE_TOKEN_BUCKET)
{
- OC_token_bucket_configure(obj->bucket, now, rate, capacity);
+ OC_token_bucket_configure(obj->bucket, now, rate, period, capacity);
sobj_need_sync(mod_store, obj);
*reply=swarmkv_reply_new_status("OK");
}
@@ -110,18 +123,20 @@ enum cmd_exec_result tinfo_command(struct swarmkv_module *mod_store, const struc
memset(&oc_info, 0, sizeof(oc_info));
OC_token_bucket_info(obj->bucket, now, &oc_info);
int i=0;
- *reply=swarmkv_reply_new_array(10);
+ *reply=swarmkv_reply_new_array(12);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Rate");
- (*reply)->elements[i++]=swarmkv_reply_new_integer(oc_info.CIR);
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(oc_info.rate);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Period");
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(oc_info.period);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Capacity");
- (*reply)->elements[i++]=swarmkv_reply_new_integer(oc_info.CBS);
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(oc_info.capacity);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Consumed");
(*reply)->elements[i++]=swarmkv_reply_new_integer(oc_info.consumed);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Refilled");
(*reply)->elements[i++]=swarmkv_reply_new_integer(oc_info.refilled);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Available");
(*reply)->elements[i++]=swarmkv_reply_new_integer(oc_info.available);
- assert(i==10);
+ assert(i==12);
return FINISHED;
}
enum cmd_exec_result tconsume_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
@@ -160,7 +175,7 @@ enum cmd_exec_result tconsume_command(struct swarmkv_module *mod_store, const st
if(obj->type!=OBJ_TYPE_TOKEN_BUCKET)
{
- *reply=swarmkv_reply_new_error(error_wrong_type);
+ *reply=swarmkv_reply_new_error(error_wrong_type);
return FINISHED;
}
struct timeval now;
@@ -179,13 +194,13 @@ bool is_power_of_2(long long num)
}
enum cmd_exec_result ftcfg_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
{
-/*FTCFG key rate capacity divisor*/
+/*FTCFG key rate capacity divisor [PERIOD seconds]*/
struct sobj *obj=NULL;
const sds key=cmd->argv[1];
char *endptr=NULL;
- long long rate=0, capacity=0, divisor=0;
+ long long rate=0, capacity=0, divisor=0, period=1;
rate=strtol(cmd->argv[2], &endptr, 10);
if(*endptr!='\0' || rate<0)
{
@@ -204,6 +219,20 @@ enum cmd_exec_result ftcfg_command(struct swarmkv_module *mod_store, const struc
*reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[4]);
return FINISHED;
}
+ if(cmd->argc==7)
+ {
+ if(strcasecmp(cmd->argv[5], "PD")!=0)
+ {
+ *reply=swarmkv_reply_new_error(error_arg_string_should_be, cmd->argv[5], "PD");
+ return FINISHED;
+ }
+ period=strtol(cmd->argv[6], &endptr, 10);
+ if(*endptr!='\0' || period<0)
+ {
+ *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[6]);
+ return FINISHED;
+ }
+ }
obj=store_lookup(mod_store, key);
if(!obj)
{
@@ -219,12 +248,12 @@ enum cmd_exec_result ftcfg_command(struct swarmkv_module *mod_store, const struc
store_get_uuid(mod_store, uuid);
obj->type=OBJ_TYPE_FAIR_TOKEN_BUCKET;
- obj->ftb=fair_token_bucket_new(uuid, now, rate, capacity, divisor);
+ obj->ftb=fair_token_bucket_new(uuid, now, rate, period, capacity, divisor);
*reply=swarmkv_reply_new_status("OK");
}
else if(obj->type==OBJ_TYPE_FAIR_TOKEN_BUCKET)
{
- fair_token_bucket_configure(obj->ftb, now, rate, capacity, divisor);
+ fair_token_bucket_configure(obj->ftb, now, rate, period, capacity, divisor);
sobj_need_sync(mod_store, obj);
*reply=swarmkv_reply_new_status("OK");
}
@@ -236,7 +265,7 @@ enum cmd_exec_result ftcfg_command(struct swarmkv_module *mod_store, const struc
}
enum cmd_exec_result ftconsume_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
{
-/*FTCONSUME key member weight tokens*/
+/*FTCONSUME key member weight tokens*/
struct sobj *obj=NULL;
const sds key=cmd->argv[1];
const sds member=cmd->argv[2];
@@ -306,11 +335,13 @@ enum cmd_exec_result ftinfo_command(struct swarmkv_module *mod_store, const stru
fair_token_bucket_info(obj->ftb, now, &ftb_info);
int i=0;
- *reply=swarmkv_reply_new_array(14);
- (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Rate");
- (*reply)->elements[i++]=swarmkv_reply_new_integer(ftb_info.bucket_info.CIR);
+ *reply=swarmkv_reply_new_array(16);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Refill");
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(ftb_info.bucket_info.rate);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Period");
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(ftb_info.bucket_info.period);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Capacity");
- (*reply)->elements[i++]=swarmkv_reply_new_integer(ftb_info.bucket_info.CBS);
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(ftb_info.bucket_info.capacity);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Consumed");
(*reply)->elements[i++]=swarmkv_reply_new_integer(ftb_info.bucket_info.consumed);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Refilled");
@@ -321,18 +352,18 @@ enum cmd_exec_result ftinfo_command(struct swarmkv_module *mod_store, const stru
(*reply)->elements[i++]=swarmkv_reply_new_integer(ftb_info.divisor);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("ActiveMembers");
(*reply)->elements[i++]=swarmkv_reply_new_integer(ftb_info.active_key_number);
- assert(i==14);
+ assert(i==16);
return FINISHED;
}
enum cmd_exec_result btcfg_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
{
-/*BTCFG key rate capacity buckets*/
+/*BTCFG key rate capacity buckets [PD seconds]*/
struct sobj *obj=NULL;
const sds key=cmd->argv[1];
char *endptr=NULL;
- long long rate=0, capacity=0, buckets=0;
+ long long rate=0, capacity=0, buckets=0, period=1;
rate=strtol(cmd->argv[2], &endptr, 10);
if(*endptr!='\0' || rate<0)
{
@@ -351,6 +382,20 @@ enum cmd_exec_result btcfg_command(struct swarmkv_module *mod_store, const struc
*reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[4]);
return FINISHED;
}
+ if(cmd->argc==7)
+ {
+ if(strcasecmp(cmd->argv[5], "PD")!=0)
+ {
+ *reply=swarmkv_reply_new_error(error_arg_string_should_be, cmd->argv[5], "PD");
+ return FINISHED;
+ }
+ period=strtol(cmd->argv[6], &endptr, 10);
+ if(*endptr!='\0' || period<0)
+ {
+ *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[6]);
+ return FINISHED;
+ }
+ }
obj=store_lookup(mod_store, key);
if(!obj)
{
@@ -366,12 +411,12 @@ enum cmd_exec_result btcfg_command(struct swarmkv_module *mod_store, const struc
store_get_uuid(mod_store, uuid);
obj->type=OBJ_TYPE_BULK_TOKEN_BUCKET;
- obj->btb=bulk_token_bucket_new(uuid, now, rate, capacity, buckets);
+ obj->btb=bulk_token_bucket_new(uuid, now, rate, period, capacity, buckets);
*reply=swarmkv_reply_new_status("OK");
}
else if(obj->type==OBJ_TYPE_BULK_TOKEN_BUCKET)
{
- bulk_token_bucket_configure(obj->btb, now, rate, capacity, buckets);
+ bulk_token_bucket_configure(obj->btb, now, rate, period, capacity, buckets);
sobj_need_sync(mod_store, obj);
*reply=swarmkv_reply_new_status("OK");
}
@@ -461,19 +506,21 @@ enum cmd_exec_result btinfo_command(struct swarmkv_module *mod_store, const stru
available=bulk_token_bucket_read_available(obj->btb, now, cmd->argv[2], sdslen(cmd->argv[2]));
}
int i=0;
- *reply=swarmkv_reply_new_array(12);
- (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Rate");
- (*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.CIR);
+ *reply=swarmkv_reply_new_array(14);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Refill");
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.rate);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Period");
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.period);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Capacity");
- (*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.CBS);
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.capacity);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Buckets");
(*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.bucket_number);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("ActiveMembers");
- (*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.estimate_keys);
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.approximate_keys);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Collisions");
(*reply)->elements[i++]=swarmkv_reply_new_double(btb_info.collision_rate);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Query");
(*reply)->elements[i++]=swarmkv_reply_new_integer(available);
- assert(i==12);
+ assert(i==14);
return FINISHED;
} \ No newline at end of file