summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
author郑超 <[email protected]>2024-03-30 04:18:41 +0000
committer郑超 <[email protected]>2024-03-30 04:18:41 +0000
commita2d902c3434fe0edb3ecc9d70a95b9d6b790f379 (patch)
tree6450b570c8510bf0a7127813ab5ec6aae593179e /src
parentaf8bbfb568851b144a00622810efb6bbff628389 (diff)
Add new data type: Count-Min Sketch. Rename `BFRESERVE` to `BFINIT` for consistency.
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt4
-rw-r--r--src/swarmkv.c72
-rw-r--r--src/swarmkv_error.h1
-rw-r--r--src/swarmkv_monitor.c38
-rw-r--r--src/swarmkv_store.c25
-rw-r--r--src/swarmkv_store.h5
-rw-r--r--src/t_bloom_filter.c7
-rw-r--r--src/t_bloom_filter.h2
-rw-r--r--src/t_cms.c322
-rw-r--r--src/t_cms.h10
-rw-r--r--src/t_hash.c6
-rw-r--r--src/t_set.c4
-rw-r--r--src/t_string.c6
-rw-r--r--src/t_token_bucket.c20
14 files changed, 460 insertions, 62 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 5518414..6c8b3e5 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -1,5 +1,5 @@
set(SWARMKV_MAJOR_VERSION 4)
-set(SWARMKV_MINOR_VERSION 1)
+set(SWARMKV_MINOR_VERSION 2)
set(SWARMKV_PATCH_VERSION 0)
set(SWARMKV_VERSION ${SWARMKV_MAJOR_VERSION}.${SWARMKV_MINOR_VERSION}.${SWARMKV_PATCH_VERSION})
@@ -26,7 +26,7 @@ add_definitions(-fPIC)
set(SWARMKV_SRC swarmkv.c swarmkv_api.c
swarmkv_mesh.c swarmkv_rpc.c swarmkv_message.c swarmkv_net.c
swarmkv_store.c swarmkv_sync.c swarmkv_keyspace.c swarmkv_monitor.c
- t_string.c t_set.c t_token_bucket.c t_hash.c t_bloom_filter.c
+ t_string.c t_set.c t_token_bucket.c t_hash.c t_bloom_filter.c t_cms.c
swarmkv_common.c swarmkv_utils.c future_promise.c http_client.c)
set(LIB_SOURCE_FILES
diff --git a/src/swarmkv.c b/src/swarmkv.c
index 5cc1356..382f56e 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -24,7 +24,7 @@
#include "t_hash.h"
#include "t_token_bucket.h"
#include "t_bloom_filter.h"
-
+#include "t_cms.h"
#include "uthash.h"
#include "sds.h"
@@ -62,7 +62,6 @@ struct swarmkv
{
struct swarmkv_module module;
char db_name[SWARMKV_SYMBOL_MAX];
- uuid_t bin_uuid;
struct swarmkv_options *opts;
int thread_counter;
@@ -879,6 +878,20 @@ void __on_msg_callback(struct swarmkv_msg *msg, void *arg)
}
}
}
+static const char *exec_ret2string(enum cmd_exec_result ret)
+{
+ switch(ret)
+ {
+ case NEED_KEY_ROUTE:
+ return "NEED_KEY_ROUTE";
+ case REDIRECT:
+ return "REDIRECT";
+ case FINISHED:
+ return "FINISHED";
+ default:
+ assert(0);
+ }
+}
#define INTER_THREAD_RPC_TIMEOUT_AHEAD 1000
void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swarmkv_cmd *cmd, struct future *future_of_caller)
{
@@ -970,7 +983,18 @@ void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swar
clock_gettime(CLOCK_MONOTONIC_COARSE, &end);
swarmkv_monitor_record_command(db->mod_monitor, spec->name, timespec_diff_usec(&start, &end));
-
+ //if(strcasestr(spec->name, "CRDT"))
+ if(0){
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ printf("%ld.%.6ld %s %d %s %s %s\n",
+ now.tv_sec, now.tv_usec,
+ db->self.addr,
+ db->threads[cur_tid].recusion_depth,
+ spec->name,
+ spec->key_offset<0?"NULL":cmd->argv[spec->key_offset],
+ exec_ret2string(exec_ret));
+ }
switch(exec_ret)
{
case FINISHED:
@@ -1117,7 +1141,7 @@ void command_spec_init(struct swarmkv *db)
3, 1, CMD_KEY_OW, REPLY_ERROR, AUTO_ROUTE,
hincrby_command, db->mod_store);
- /* Token bucket commands */
+ /* Token Buckets commands */
command_register(&(db->command_table), "TCFG", "key rate capacity [PD seconds]",
3, 1, CMD_KEY_OW, REPLY_ERROR, AUTO_ROUTE,
tcfg_command, db->mod_store);
@@ -1146,10 +1170,10 @@ void command_spec_init(struct swarmkv *db)
1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
btinfo_command, db->mod_store);
- /*Bloom filter commands*/
- command_register(&(db->command_table), "BFRESERVE", "key error_rate capacity [TIME window-milliseconds slice-number]",
+ /*Bloom Filter commands*/
+ command_register(&(db->command_table), "BFINIT", "key error capacity [TIME window-milliseconds slice-number]",
3, 1, CMD_KEY_OW, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
- bfreserve_command, db->mod_store);
+ bfinit_command, db->mod_store);
command_register(&(db->command_table), "BFADD", "key item [item ...]",
2, 1, CMD_KEY_RW, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
bfadd_command, db->mod_store);
@@ -1166,6 +1190,32 @@ void command_spec_init(struct swarmkv *db)
1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
bfinfo_command, db->mod_store);
+ /*Count-min Sketch Commands*/
+ command_register(&(db->command_table), "CMSINITBYDIM", "key width depth",
+ 3, 1, CMD_KEY_OW, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ cmsinitbydim_command, db->mod_store);
+ command_register(&(db->command_table), "CMSINITBYPROB", "key error probability",
+ 3, 1, CMD_KEY_OW, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ cmsinitbyprob_command, db->mod_store);
+ command_register(&(db->command_table), "CMSINCRBY", "key item increment [item increment ...]",
+ 3, 1, CMD_KEY_RW, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ cmsincrby_command, db->mod_store);
+ command_register(&(db->command_table), "CMSQUERY", "key item",
+ 2, 1, CMD_KEY_RO, REPLY_INT_0, AUTO_ROUTE,
+ cmsquery_command, db->mod_store);
+ command_register(&(db->command_table), "CMSMQUERY", "key item [item ...]",
+ 2, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ cmsmquery_command, db->mod_store);
+ command_register(&(db->command_table), "CMSINFO", "key",
+ 1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ cmsinfo_command, db->mod_store);
+ command_register(&(db->command_table), "CMSRLIST", "key",
+ 1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ cmsrlist_command, db->mod_store);
+ command_register(&(db->command_table), "CMSRCLEAR", "key uuid",
+ 2, 1, CMD_KEY_RW, REPLY_ERROR, AUTO_ROUTE,
+ cmsrclear_command, db->mod_store);
+
/* Debug Commands */
command_register(&(db->command_table), "INFO", "[section]",
0, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE,
@@ -1420,9 +1470,6 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name,
}
db->logger=log_handle_create(log_path, opts->loglevel);
}
-
-
- uuid_copy(db->bin_uuid, opts->bin_uuid);
if(opts->dryrun)
{
}
@@ -1576,3 +1623,8 @@ const char *swarmkv_self_address(const struct swarmkv *db)
{
return db->self.addr;
}
+void swarmkv_self_uuid(const struct swarmkv *db, char buff[37])
+{
+ uuid_unparse(db->opts->bin_uuid, buff);
+ return;
+} \ No newline at end of file
diff --git a/src/swarmkv_error.h b/src/swarmkv_error.h
index 4da62e0..221d107 100644
--- a/src/swarmkv_error.h
+++ b/src/swarmkv_error.h
@@ -8,6 +8,7 @@
#define error_arg_not_valid_integer "ERR arg `%s` is not an integer or out of range"
#define error_arg_not_valid_address "ERR arg `%s` is not `IP:port` format"
#define error_arg_not_valid_float "ERR arg `%s` is not a valid float or out of range"
+#define error_arg_not_valid_uuid "ERR arg `%s` is not a valid UUID"
#define error_arg_parse_failed "ERR arg `%s` parse failed"
#define error_arg_string_should_be "ERR arg `%s` should be `%s`"
#define error_need_additional_arg "ERR arg `%s` should be fllowed by more args"
diff --git a/src/swarmkv_monitor.c b/src/swarmkv_monitor.c
index 95c88d7..0856b1c 100644
--- a/src/swarmkv_monitor.c
+++ b/src/swarmkv_monitor.c
@@ -10,12 +10,12 @@
#include <hdr/hdr_interval_recorder.h>
#include <pthread.h>
-#define METRIC_KEY_MAX 64
+#define KEY_LEN_MAX 64
#define SIGNIFICANT_FIGURES 2
#define LOWEST_TRACKABLE_VALUE 1
struct recorder
{
- char key[METRIC_KEY_MAX];
+ char key[KEY_LEN_MAX];
struct hdr_interval_recorder hdr_interval;
struct hdr_histogram *hdr_previous;
struct hdr_histogram *hdr_all_time;
@@ -45,7 +45,7 @@ void recorder_free(struct recorder *recorder)
}
struct recorder_metric
{
- char key[METRIC_KEY_MAX];
+ char key[KEY_LEN_MAX];
long long total_count;
long long max;
long long min;
@@ -83,11 +83,11 @@ void hdr_to_metric(const struct hdr_histogram *hdr, const char *key, struct reco
strncpy(metric->key, key, sizeof(metric->key));
metric->total_count=hdr->total_count;
if(metric->total_count==0) return;
- metric->p50=hdr_value_at_percentile(hdr, 0.5);
- metric->p80=hdr_value_at_percentile(hdr, 0.8);
- metric->p90=hdr_value_at_percentile(hdr, 0.9);
- metric->p95=hdr_value_at_percentile(hdr, 0.95);
- metric->p99=hdr_value_at_percentile(hdr, 0.99);
+ metric->p50=hdr_value_at_percentile(hdr, 50);
+ metric->p80=hdr_value_at_percentile(hdr, 80);
+ metric->p90=hdr_value_at_percentile(hdr, 90);
+ metric->p95=hdr_value_at_percentile(hdr, 95);
+ metric->p99=hdr_value_at_percentile(hdr, 99);
metric->total_count=hdr->total_count;
metric->max=hdr_max(hdr);
metric->mean=hdr_mean(hdr);
@@ -108,18 +108,7 @@ void recorder_sample(struct recorder *recorder, struct recorder_metric *metric)
hdr_to_metric(recorder->hdr_all_time, recorder->key, metric);
return;
}
-void recorder_sample_n(struct recorder **recorder, size_t n_recorder, long long max_latency_usec, struct recorder_metric *metric)
-{
- struct hdr_histogram *hdr_merged;
- hdr_init(1, max_latency_usec, 2, &hdr_merged);
- for(size_t i=0; i<n_recorder; i++)
- {
- recorder_commit(recorder[i]);
- hdr_add(hdr_merged, recorder[i]->hdr_all_time);
- }
- hdr_to_metric(hdr_merged, recorder[0]->key, metric);
- hdr_close(hdr_merged);
-}
+
void recorder_reset(struct recorder *recorder)
{
recorder->hdr_previous=hdr_interval_recorder_sample_and_recycle(&recorder->hdr_interval, recorder->hdr_previous);
@@ -421,4 +410,13 @@ enum cmd_exec_result latency_command(struct swarmkv_module *mod_monitor, const s
}
pthread_mutex_unlock(&monitor->mutex);
return FINISHED;
+}
+enum cmd_exec_result lastcmds_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+/*LASTCMDS [offset]*/
+ struct swarmkv_monitor *monitor=module2monitor(mod_monitor);
+ pthread_mutex_lock(&monitor->mutex);
+
+ pthread_mutex_unlock(&monitor->mutex);
+ return FINISHED;
} \ No newline at end of file
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c
index 0bc7514..189a561 100644
--- a/src/swarmkv_store.c
+++ b/src/swarmkv_store.c
@@ -111,11 +111,20 @@ struct swarmkv_obj_specs sobj_specs[__SWARMKV_OBJ_TYPE_MAX] =
.obj_size=(size_t (*)(const void *))AP_bloom_mem_size
},
{
+ .type=OBJ_TYPE_CMS,
+ .type_name="count-min-sketch",
+ .obj_free=(void (*)(void *))CM_sketch_free,
+ .obj_serialize=(void (*)(const void *, char **, size_t *))CM_sketch_serialize,
+ .obj_merge_blob=(void (*)(void *, const char *, size_t))CM_sketch_merge_blob,
+ .obj_replicate=(void * (*)(uuid_t, const char *, size_t))CM_sketch_replicate,
+ .obj_size=(size_t (*)(const void *))CM_sketch_size
+ },
+ {
.type=OBJ_TYPE_UNDEFINED,
.type_name="undefined",
.obj_free=undefined_obj_free,
.obj_serialize=NULL,
- .obj_merge_blob=NULL,
+ .obj_merge_blob=NULL,
.obj_replicate=NULL,
.obj_size=(size_t (*)(const void *))undefined_obj_mem_size
}
@@ -331,7 +340,7 @@ void store_get_node_addr(struct swarmkv_module* mod_store, node_t *node)
node_copy(node, &store->self);
return;
}
-void sobj_need_sync(struct swarmkv_module *mod_store, struct sobj *obj)
+void store_mark_object_as_modified(struct swarmkv_module *mod_store, struct sobj *obj)
{
struct swarmkv_store *store=module2store(mod_store);
struct scontainer *ctr=container_of(obj, struct scontainer, obj);
@@ -423,7 +432,7 @@ void sobj_merge_blob(struct sobj *obj, const char *blob, size_t blob_sz, uuid_t
offset+=sizeof(size_t);
assert(offset+value_blob_sz==blob_sz);
const char *value_blob=blob+offset;
- if(!obj->raw)
+ if(obj->raw == NULL)
{
obj->raw=sobj_specs[obj->type].obj_replicate(uuid, value_blob, value_blob_sz);
}
@@ -1008,17 +1017,17 @@ enum cmd_exec_result crdt_info_command(struct swarmkv_module *mod_store, const s
size_t sz=0;
sz+=sobj_specs[ctr->obj.type].obj_size(ctr->obj.raw);
sz+=sizeof(struct scontainer)+sdslen(ctr->obj.key);
- size_t n_replica=0;
- n_replica=ctr->replica_node_list?utarray_len(ctr->replica_node_list):0;
- sz+=n_replica*sizeof(node_t);
+ size_t n_peer=0;
+ n_peer=ctr->replica_node_list?utarray_len(ctr->replica_node_list):0;
+ sz+=n_peer*sizeof(node_t);
int i=0;
*reply=swarmkv_reply_new_array(8);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Type");
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt(sobj_specs[ctr->obj.type].type_name);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Size");
(*reply)->elements[i++]=swarmkv_reply_new_integer(sz);
- (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Replicas");
- (*reply)->elements[i++]=swarmkv_reply_new_integer(n_replica);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Peers");
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(n_peer);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("LastModified");
(*reply)->elements[i++]=swarmkv_reply_new_integer(ctr->op_timestamp.tv_sec);
return FINISHED;
diff --git a/src/swarmkv_store.h b/src/swarmkv_store.h
index d1fc926..4e4e4cd 100644
--- a/src/swarmkv_store.h
+++ b/src/swarmkv_store.h
@@ -14,6 +14,7 @@
#include "fair_token_bucket.h"
#include "bulk_token_bucket.h"
#include "ap_bloom.h"
+#include "cm_sketch.h"
enum sobj_type
{
@@ -25,6 +26,7 @@ enum sobj_type
OBJ_TYPE_FAIR_TOKEN_BUCKET,
OBJ_TYPE_BULK_TOKEN_BUCKET,
OBJ_TYPE_BLOOM_FILTER,
+ OBJ_TYPE_CMS,
OBJ_TYPE_UNDEFINED,
__SWARMKV_OBJ_TYPE_MAX
};
@@ -43,6 +45,7 @@ struct sobj
struct fair_token_bucket *ftb;
struct bulk_token_bucket *btb;
struct AP_bloom *bloom;
+ struct CM_sketch *cms;
void *raw;
};
};
@@ -62,7 +65,7 @@ struct store_info
};
void swarmkv_store_info(struct swarmkv_module *module, struct store_info *info);
-void sobj_need_sync(struct swarmkv_module *mod_store, struct sobj *obj);
+void store_mark_object_as_modified(struct swarmkv_module *mod_store, struct sobj *obj);
int sobj_get_random_replica(struct sobj *obj, node_t *out);
enum cmd_exec_result handle_undefined_object(struct sobj *obj, struct swarmkv_reply **reply);
diff --git a/src/t_bloom_filter.c b/src/t_bloom_filter.c
index fbaad2e..9ad3623 100644
--- a/src/t_bloom_filter.c
+++ b/src/t_bloom_filter.c
@@ -7,9 +7,9 @@
#include <stdlib.h>
#include <assert.h>
-enum cmd_exec_result bfreserve_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+enum cmd_exec_result bfinit_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
{
-/* BFRESERVE key error_rate capacity [TIME window-milliseconds slice-number] */
+/* BFINIT key error_rate capacity [TIME window-milliseconds slice-number] */
struct sobj *obj=NULL;
const sds key=cmd->argv[1];
@@ -97,6 +97,7 @@ enum cmd_exec_result bfadd_command(struct swarmkv_module *mod_store, const struc
{
AP_bloom_add(obj->bloom, now, cmd->argv[i+2], sdslen(cmd->argv[i+2]));
}
+ store_mark_object_as_modified(mod_store, obj);
*reply=swarmkv_reply_new_status("OK");
return FINISHED;
}
@@ -202,7 +203,7 @@ enum cmd_exec_result bfinfo_command(struct swarmkv_module *mod_store, const stru
AP_bloom_info(obj->bloom, &info);
int i=0;
*reply=swarmkv_reply_new_array(20);
- (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("ErrorRate");
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Error");
(*reply)->elements[i++]=swarmkv_reply_new_double(info.error);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Capacity");
(*reply)->elements[i++]=swarmkv_reply_new_integer(info.capacity);
diff --git a/src/t_bloom_filter.h b/src/t_bloom_filter.h
index 0231b0c..3675e82 100644
--- a/src/t_bloom_filter.h
+++ b/src/t_bloom_filter.h
@@ -1,7 +1,7 @@
#pragma once
#include "swarmkv_common.h"
-enum cmd_exec_result bfreserve_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result bfinit_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
enum cmd_exec_result bfadd_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
enum cmd_exec_result bfexists_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
enum cmd_exec_result bfmexists_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
diff --git a/src/t_cms.c b/src/t_cms.c
new file mode 100644
index 0000000..5339cc9
--- /dev/null
+++ b/src/t_cms.c
@@ -0,0 +1,322 @@
+#include "swarmkv_common.h"
+#include "swarmkv_utils.h"
+#include "swarmkv_store.h"
+#include "swarmkv_error.h"
+#include "cm_sketch.h"
+
+#include <stdlib.h>
+#include <assert.h>
+
+enum cmd_exec_result cmsinitbydim_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+/* CMSINITBYDIM key width depth*/
+ struct sobj *obj=NULL;
+ const sds key=cmd->argv[1];
+
+ long long width=0, depth=0;
+
+ width=strtol(cmd->argv[2], NULL, 10);
+ if(width<=0)
+ {
+ *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[2]);
+ return FINISHED;
+ }
+ depth=strtol(cmd->argv[3], NULL, 10);
+ if(depth<=0)
+ {
+ *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[3]);
+ return FINISHED;
+ }
+ obj=store_lookup(mod_store, key);
+ if(!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ struct timeval now;
+ gettimeofday(&now, NULL);
+
+ if(obj->type==OBJ_TYPE_UNDEFINED)
+ {
+ assert(obj->raw==NULL);
+ uuid_t uuid;
+ store_get_uuid(mod_store, uuid);
+ obj->cms=CM_sketch_new(uuid, width, depth);
+ obj->type=OBJ_TYPE_CMS;
+ *reply=swarmkv_reply_new_status("OK");
+ }
+ else
+ {
+ *reply=swarmkv_reply_new_array(0);
+ }
+ return FINISHED;
+}
+enum cmd_exec_result cmsinitbyprob_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+/* CMSINITBYPROB key error probability*/
+ struct sobj *obj=NULL;
+ const sds key=cmd->argv[1];
+
+ double error=0, probability=0;
+ error=strtod(cmd->argv[2], NULL);
+ if(error < 0 || error >= 1.0)
+ {
+ *reply=swarmkv_reply_new_error(error_arg_not_valid_float, cmd->argv[2]);
+ return FINISHED;
+ }
+ probability=strtod(cmd->argv[3], NULL);
+ if(probability < 0 || probability >= 1.0)
+ {
+ *reply=swarmkv_reply_new_error(error_arg_not_valid_float, cmd->argv[3]);
+ return FINISHED;
+ }
+ int width=0, depth=0;
+ CM_sketch_tool_dimension_by_prob(error, probability, &width, &depth);
+ obj=store_lookup(mod_store, key);
+ if(!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ struct timeval now;
+ gettimeofday(&now, NULL);
+
+ if(obj->type==OBJ_TYPE_UNDEFINED)
+ {
+ assert(obj->raw==NULL);
+ uuid_t uuid;
+ store_get_uuid(mod_store, uuid);
+ obj->cms=CM_sketch_new(uuid, width, depth);
+ obj->type=OBJ_TYPE_CMS;
+ *reply=swarmkv_reply_new_status("OK");
+ }
+ else
+ {
+ *reply=swarmkv_reply_new_array(0);
+ }
+ return FINISHED;
+}
+static int parse_incrby_args(const struct swarmkv_cmd *cmd, long long *increments, size_t n_increment, int *invalid_idx)
+{
+ assert(n_increment == (cmd->argc-2)/2);
+ char *endptr=NULL;
+ for(int i=3, j=0; i<cmd->argc; i+=2, j++)
+ {
+ *invalid_idx=i;
+ increments[j]=strtol(cmd->argv[i], &endptr, 10);
+ if(*endptr != '\0')
+ {
+ return -1;
+ }
+ if(increments[j] > INT32_MAX || increments[j] < INT32_MIN)
+ {
+ return -1;
+ }
+ }
+ return 0;
+}
+enum cmd_exec_result cmsincrby_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+/*CMSINCRBY key item increment [item increment ...]*/
+ struct sobj *obj=NULL;
+ const sds key=cmd->argv[1];
+ obj=store_lookup(mod_store, key);
+ if(!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ struct timeval now;
+ gettimeofday(&now, NULL);
+
+ if(obj->type==OBJ_TYPE_UNDEFINED)
+ {
+ assert(obj->raw == NULL);
+ return handle_undefined_object(obj, reply);
+ }
+ else if(obj->type!=OBJ_TYPE_CMS)
+ {
+ *reply=swarmkv_reply_new_error(error_wrong_type);
+ return FINISHED;
+ }
+ size_t n_increment=(cmd->argc-2)/2;
+ long long increments[n_increment], query[n_increment];
+ int ret=0, invalid_idx=0;
+ ret = parse_incrby_args(cmd, increments, n_increment, &invalid_idx);
+ if(ret<0)
+ {
+ *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[invalid_idx]);
+ return FINISHED;
+ }
+ *reply=swarmkv_reply_new_array(n_increment);
+ for(int i=2, j=0; i<cmd->argc; i+=2, j++)
+ {
+ query[j]=CM_sketch_incrby(obj->cms, cmd->argv[i], sdslen(cmd->argv[i]), (int) increments[j]);
+ (*reply)->elements[j]=swarmkv_reply_new_integer(query[j]);
+ }
+ store_mark_object_as_modified(mod_store, obj);
+ return FINISHED;
+}
+enum cmd_exec_result cmsmquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+/*CMSQUERY key item [item ...]*/
+ struct sobj *obj=NULL;
+ const sds key=cmd->argv[1];
+
+ obj=store_lookup(mod_store, key);
+ if(!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ struct timeval now;
+ gettimeofday(&now, NULL);
+
+ if(obj->type==OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
+ else if(obj->type!=OBJ_TYPE_CMS)
+ {
+ *reply=swarmkv_reply_new_error(error_wrong_type);
+ return FINISHED;
+ }
+
+ *reply=swarmkv_reply_new_array(cmd->argc-2);
+ for(int i=0; i<cmd->argc-2; i++)
+ {
+ int query=0;
+ query=CM_sketch_query(obj->cms, cmd->argv[i+2], sdslen(cmd->argv[i+2]));
+ (*reply)->elements[i]=swarmkv_reply_new_integer(query);
+ }
+ return FINISHED;
+}
+enum cmd_exec_result cmsquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+ enum cmd_exec_result ret;
+ struct swarmkv_reply *tmp_reply=NULL;
+ ret=cmsmquery_command(mod_store, cmd, &tmp_reply);
+ if(ret==FINISHED)
+ {
+ if(tmp_reply->type==SWARMKV_REPLY_ARRAY)
+ {
+ *reply=swarmkv_reply_dup(tmp_reply->elements[0]);
+ swarmkv_reply_free(tmp_reply);
+ }
+ else
+ {
+ *reply=tmp_reply;
+ }
+ }
+ return ret;
+}
+enum cmd_exec_result cmsinfo_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+/*CMSINFO key*/
+ struct sobj *obj=NULL;
+ const sds key=cmd->argv[1];
+ obj=store_lookup(mod_store, key);
+ if(!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ if(obj->type==OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
+ else if(obj->type!=OBJ_TYPE_CMS)
+ {
+ *reply=swarmkv_reply_new_error(error_wrong_type);
+ return FINISHED;
+ }
+ struct CM_sketch_info info;
+ CM_sketch_info(obj->cms, &info);
+ int i=0;
+ *reply=swarmkv_reply_new_array(12);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Width");
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(info.width);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Depth");
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(info.depth);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Error");
+ (*reply)->elements[i++]=swarmkv_reply_new_double(info.error);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Probability");
+ (*reply)->elements[i++]=swarmkv_reply_new_double(info.probability);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Count");
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(info.count);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("ReplicaNumber");
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(info.n_replica);
+ assert(i==12);
+ return FINISHED;
+}
+enum cmd_exec_result cmsrlist_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+/*CMSRLIST key*/
+ struct sobj *obj=NULL;
+ const sds key=cmd->argv[1];
+ obj=store_lookup(mod_store, key);
+ if(!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ if(obj->type==OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
+ else if(obj->type!=OBJ_TYPE_CMS)
+ {
+ *reply=swarmkv_reply_new_error(error_wrong_type);
+ return FINISHED;
+ }
+ struct CM_sketch_info info;
+ CM_sketch_info(obj->cms, &info);
+ uuid_t replicas[info.n_replica];
+ size_t n_nodes=0;
+ CM_sketch_list_replica(obj->cms, replicas, &n_nodes);
+ *reply=swarmkv_reply_new_array(n_nodes);
+ for(int i=0; i<n_nodes; i++)
+ {
+ char uuid_str[37];
+ uuid_unparse(replicas[i], uuid_str);
+ (*reply)->elements[i]=swarmkv_reply_new_string(uuid_str, strlen(uuid_str)+1);
+ }
+ return FINISHED;
+}
+enum cmd_exec_result cmsrclear_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+/*CMSRCLEAR key uuid*/
+ struct sobj *obj=NULL;
+ const sds key=cmd->argv[1];
+ obj=store_lookup(mod_store, key);
+ if(!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ if(obj->type==OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
+ else if(obj->type!=OBJ_TYPE_CMS)
+ {
+ *reply=swarmkv_reply_new_error(error_wrong_type);
+ return FINISHED;
+ }
+ uuid_t uuid;
+ if(0 > uuid_parse(cmd->argv[2], uuid))
+ {
+ *reply=swarmkv_reply_new_error(error_arg_not_valid_uuid, cmd->argv[2]);
+ return FINISHED;
+ }
+ int ret=CM_sketch_clear_replica(obj->cms, uuid);
+ if(ret<0)
+ {
+ *reply=swarmkv_reply_new_error("replica uuid is not exist");
+ }
+ else
+ {
+ *reply=swarmkv_reply_new_status("OK");
+ store_mark_object_as_modified(mod_store, obj);
+ }
+ return FINISHED;
+} \ No newline at end of file
diff --git a/src/t_cms.h b/src/t_cms.h
new file mode 100644
index 0000000..b0348bd
--- /dev/null
+++ b/src/t_cms.h
@@ -0,0 +1,10 @@
+#pragma once
+#include "swarmkv_common.h"
+enum cmd_exec_result cmsinitbydim_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result cmsinitbyprob_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result cmsincrby_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result cmsquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result cmsmquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result cmsinfo_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result cmsrlist_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result cmsrclear_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); \ No newline at end of file
diff --git a/src/t_hash.c b/src/t_hash.c
index a5a7b0b..120f1ed 100644
--- a/src/t_hash.c
+++ b/src/t_hash.c
@@ -70,7 +70,7 @@ enum cmd_exec_result hset_command(struct swarmkv_module *mod_store, const struct
if(ret>0) n_added++;
}
*reply=swarmkv_reply_new_integer(n_added);
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
}
else
{
@@ -110,7 +110,7 @@ enum cmd_exec_result hdel_command(struct swarmkv_module *mod_store, const struct
if(ret>0) n_deleted++;
}
*reply=swarmkv_reply_new_integer(n_deleted);
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
return FINISHED;
}
@@ -246,7 +246,7 @@ enum cmd_exec_result hincrby_command(struct swarmkv_module *mod_store, const str
else
{
*reply=swarmkv_reply_new_integer(result);
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
}
}
else
diff --git a/src/t_set.c b/src/t_set.c
index 87d5504..e7d4d4a 100644
--- a/src/t_set.c
+++ b/src/t_set.c
@@ -37,7 +37,7 @@ enum cmd_exec_result sadd_command(struct swarmkv_module *mod_store, const struct
if(ret>0) n_added++;
}
*reply=swarmkv_reply_new_integer(n_added);
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
}
else
{
@@ -76,7 +76,7 @@ enum cmd_exec_result srem_command(struct swarmkv_module *mod_store, const struct
if(ret>0) n_removed++;
}
*reply=swarmkv_reply_new_integer(n_removed);
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
return FINISHED;
}
enum cmd_exec_result smembers_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
diff --git a/src/t_string.c b/src/t_string.c
index d5f2a59..1a5f5f5 100644
--- a/src/t_string.c
+++ b/src/t_string.c
@@ -72,7 +72,7 @@ enum cmd_exec_result set_command(struct swarmkv_module *mod_store, const struct
{
LWW_register_set(obj->string, value, sdslen(value));
*reply=swarmkv_reply_new_status("OK");
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
}
else if(obj->type==OBJ_TYPE_INTEGER)
{
@@ -80,7 +80,7 @@ enum cmd_exec_result set_command(struct swarmkv_module *mod_store, const struct
{
PN_counter_set(obj->counter, int_value);
*reply=swarmkv_reply_new_status("OK");
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
}
else
{
@@ -113,7 +113,7 @@ enum cmd_exec_result integer_generic(struct swarmkv_module *mod_store, const sds
}
value=PN_counter_incrby(obj->counter, increment);
*reply=swarmkv_reply_new_integer(value);
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
}
else
{
diff --git a/src/t_token_bucket.c b/src/t_token_bucket.c
index a16672f..d79f6b0 100644
--- a/src/t_token_bucket.c
+++ b/src/t_token_bucket.c
@@ -85,7 +85,7 @@ enum cmd_exec_result tcfg_command(struct swarmkv_module *mod_store, const struct
else if(obj->type==OBJ_TYPE_TOKEN_BUCKET)
{
OC_token_bucket_configure(obj->bucket, now, rate, period, capacity);
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
*reply=swarmkv_reply_new_status("OK");
}
else
@@ -182,7 +182,7 @@ enum cmd_exec_result tconsume_command(struct swarmkv_module *mod_store, const st
gettimeofday(&now, NULL);
allocated=OC_token_bucket_consume(obj->bucket, now, consume_type, request);
*reply=swarmkv_reply_new_integer(allocated);
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
return FINISHED;
}
bool is_power_of_2(long long num)
@@ -254,7 +254,7 @@ enum cmd_exec_result ftcfg_command(struct swarmkv_module *mod_store, const struc
else if(obj->type==OBJ_TYPE_FAIR_TOKEN_BUCKET)
{
fair_token_bucket_configure(obj->ftb, now, rate, period, capacity, divisor);
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
*reply=swarmkv_reply_new_status("OK");
}
else
@@ -313,7 +313,7 @@ enum cmd_exec_result ftconsume_command(struct swarmkv_module *mod_store, const s
allocated=fair_token_bucket_consume(obj->ftb, now, member, sdslen(member), weight, consume_type, request);
*reply=swarmkv_reply_new_integer(allocated);
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
return FINISHED;
}
enum cmd_exec_result ftinfo_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
@@ -426,7 +426,7 @@ enum cmd_exec_result btcfg_command(struct swarmkv_module *mod_store, const struc
else if(obj->type==OBJ_TYPE_BULK_TOKEN_BUCKET)
{
bulk_token_bucket_configure(obj->btb, now, rate, period, capacity, buckets);
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
*reply=swarmkv_reply_new_status("OK");
}
else
@@ -480,7 +480,7 @@ enum cmd_exec_result btconsume_command(struct swarmkv_module *mod_store, const s
allocated=bulk_token_bucket_consume(obj->btb, now, member, sdslen(member), consume_type, request);
*reply=swarmkv_reply_new_integer(allocated);
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
return FINISHED;
}
enum cmd_exec_result btinfo_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
@@ -515,8 +515,8 @@ enum cmd_exec_result btinfo_command(struct swarmkv_module *mod_store, const stru
available=bulk_token_bucket_read_available(obj->btb, now, cmd->argv[2], sdslen(cmd->argv[2]));
}
int i=0;
- *reply=swarmkv_reply_new_array(14);
- (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Refill");
+ *reply=swarmkv_reply_new_array(16);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Rate");
(*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.rate);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Period");
(*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.period);
@@ -528,8 +528,10 @@ enum cmd_exec_result btinfo_command(struct swarmkv_module *mod_store, const stru
(*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.approximate_keys);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Collisions");
(*reply)->elements[i++]=swarmkv_reply_new_double(btb_info.collision_rate);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Replicas");
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.replicas);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Query");
(*reply)->elements[i++]=swarmkv_reply_new_integer(available);
- assert(i==14);
+ assert(i==16);
return FINISHED;
} \ No newline at end of file