summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchenzizhan <[email protected]>2024-07-26 16:35:23 +0800
committerchenzizhan <[email protected]>2024-08-09 16:05:03 +0800
commit2dae3ddcaa3cd4fa40f93ef97bca9de135fbd596 (patch)
tree0b20ac52e80629c05a1d4ecb4d5c21439b17b83a
parentfd20b1143cbf0220b78d74a85eaf56d274ab33e0 (diff)
spread sketch crdt
-rw-r--r--CRDT/CMakeLists.txt4
-rw-r--r--CRDT/spread_sketch.c38
-rw-r--r--CRDT/spread_sketch.h28
-rw-r--r--CRDT/spread_sketch_gtest.cpp (renamed from CRDT/spreadsketch_crdt_test.cpp)34
-rw-r--r--CRDT/spread_sketch_sheet.c511
-rw-r--r--docs/command_toc.md1
-rw-r--r--docs/commands/spread_sketch.md155
-rw-r--r--include/swarmkv/swarmkv.h2
-rw-r--r--readme.md2
-rw-r--r--src/CMakeLists.txt2
-rw-r--r--src/swarmkv.c26
-rw-r--r--src/swarmkv_api.c4
-rw-r--r--src/t_spread_sketch.c307
-rw-r--r--src/t_spread_sketch.h (renamed from src/t_spreadsketch.h)3
-rw-r--r--src/t_spreadsketch.c256
-rw-r--r--test/swarmkv_gtest.cpp82
16 files changed, 601 insertions, 854 deletions
diff --git a/CRDT/CMakeLists.txt b/CRDT/CMakeLists.txt
index 1c8b49a..0f58111 100644
--- a/CRDT/CMakeLists.txt
+++ b/CRDT/CMakeLists.txt
@@ -19,10 +19,10 @@ add_executable(tb_CRDT_gtest tb_crdt_gtest.cpp
${PROJECT_SOURCE_DIR}/deps/xxhash/xxhash.c)
target_link_libraries(tb_CRDT_gtest CRDT gtest-static uuid)
-add_executable(spreadsketch_crdt_test spreadsketch_crdt_test.cpp
+add_executable(spread_sketch_gtest spread_sketch_gtest.cpp
${PROJECT_SOURCE_DIR}/deps/mpack/mpack.c
${PROJECT_SOURCE_DIR}/deps/xxhash/xxhash.c)
-target_link_libraries(spreadsketch_crdt_test CRDT gtest-static uuid)
+target_link_libraries(spread_sketch_gtest CRDT gtest-static uuid)
add_executable(probabilistic_CRDT_gtest probabilistic_crdt_gtest.cpp
${PROJECT_SOURCE_DIR}/deps/mpack/mpack.c
diff --git a/CRDT/spread_sketch.c b/CRDT/spread_sketch.c
index 4a4f531..ab2d1ea 100644
--- a/CRDT/spread_sketch.c
+++ b/CRDT/spread_sketch.c
@@ -200,9 +200,8 @@ struct entry_table *smart_ptr_table_new() {
return table;
}
-
struct spread_sketch *spread_sketch_new(int depth, int width, unsigned char precision, int time_window_ms, struct timeval now) {
- struct spread_sketch *pthis = malloc(sizeof(struct spread_sketch));
+ struct spread_sketch *pthis = calloc(1, sizeof(struct spread_sketch));
pthis->depth = depth;
pthis->width = width;
@@ -241,7 +240,7 @@ void move_registers_forward(struct spread_sketch *ss, const struct timeval *now)
}
// return 0 if not added, return 1 if added
-int spread_sketch_add_hash(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg, struct timeval now) {
+int spread_sketch_add_hash(struct spread_sketch *ss, const char *entry, size_t entry_length, uint64_t item_hash, void *arg, struct timeval now) {
uint32_t level = (uint32_t)__builtin_clzll(item_hash) + 1;
long long now_ms = now.tv_sec * 1000 + now.tv_usec / 1000;
@@ -255,7 +254,7 @@ int spread_sketch_add_hash(struct spread_sketch *ss, const char *key, size_t key
// https://www.eecs.harvard.edu/~michaelm/postscripts/tr-02-05.pdf
// A technique from the hashing literature is to use two hash functions h1(x) and h2(x) to simulate additional hash functions of the form gi(x) = h1(x) + ih2(x)
// Assuming that the 128-bit xxhash function is perfect, we can view it as a combination of two 64-bit hash functions.
- uint64_t hash_x_tmp = XXH3_64bits_withSeed(key, key_length, 171);
+ uint64_t hash_x_tmp = XXH3_64bits_withSeed(entry, entry_length, 171);
uint32_t hash_x1 = (uint32_t) (hash_x_tmp >> 32);
uint32_t hash_x2 = (uint32_t) hash_x_tmp;
@@ -266,7 +265,7 @@ int spread_sketch_add_hash(struct spread_sketch *ss, const char *key, size_t key
int bucket_idx = (hash_x % ss->width) + i * ss->width;
struct bucket *bucket = &ss->buckets[bucket_idx];
- if (bucket->content != NULL && key_equal(bucket->content->key, bucket->content->key_len, key, key_length)) {
+ if (bucket->content != NULL && key_equal(bucket->content->key, bucket->content->key_len, entry, entry_length)) {
bucket->content->dying = false;
bucket->last_update_ms = now_ms;
@@ -290,7 +289,7 @@ int spread_sketch_add_hash(struct spread_sketch *ss, const char *key, size_t key
if (content_old != NULL) {
smart_ptr_table_release(ss->table, content_old->key, content_old->key_len);
}
- struct entry *content_new = smart_ptr_table_get(ss->table, key, key_length, arg);
+ struct entry *content_new = smart_ptr_table_get(ss->table, entry, entry_length, arg);
bucket->content = content_new;
bucket->last_update_ms = now_ms;
@@ -305,13 +304,13 @@ int spread_sketch_add_hash(struct spread_sketch *ss, const char *key, size_t key
return in_sketch ? 1 : 0;
}
-int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, const char* item, size_t item_len, void *arg, struct timeval now) {
+int spread_sketch_add(struct spread_sketch *ss, const char *entry, size_t entry_length, const char* item, size_t item_len, void *arg, struct timeval now) {
uint64_t hash = XXH3_64bits(item, item_len);
- return spread_sketch_add_hash(ss, key, key_length, hash, arg, now);
+ return spread_sketch_add_hash(ss, entry, entry_length, hash, arg, now);
}
-double spread_sketch_point_query(const struct spread_sketch *ss, const char *key, size_t key_length) {
- uint64_t hash_x_tmp = XXH3_64bits_withSeed(key, key_length, 171);
+double spread_sketch_point_query(const struct spread_sketch *ss, const char *entry, size_t entry_length) {
+ uint64_t hash_x_tmp = XXH3_64bits_withSeed(entry, entry_length, 171);
uint32_t hash_x1 = (uint32_t) (hash_x_tmp >> 32);
uint32_t hash_x2 = (uint32_t) hash_x_tmp;
@@ -484,12 +483,12 @@ size_t spread_sketch_list(const struct spread_sketch *ss, void **exdatas, size_t
return count;
}
-void spread_sketch_list_keys(const struct spread_sketch *ss, char ***keys, size_t **key_lens, size_t *n_keys) {
+void spread_sketch_list_entries(const struct spread_sketch *ss, char ***entries, size_t **entry_lens, size_t *n_entry) {
size_t count_max = HASH_COUNT(ss->table->entry);
size_t count = 0;
- *keys = malloc(count_max * sizeof(char *));
- *key_lens = malloc(count_max * sizeof(size_t));
+ *entries = malloc(count_max * sizeof(char *));
+ *entry_lens = malloc(count_max * sizeof(size_t));
struct entry *content, *tmp;
HASH_ITER(hh, ss->table->entry, content, tmp) {
@@ -497,12 +496,12 @@ void spread_sketch_list_keys(const struct spread_sketch *ss, char ***keys, size_
continue;
}
- (*keys)[count] = content->key;
- (*key_lens)[count] = content->key_len;
+ (*entries)[count] = content->key;
+ (*entry_lens)[count] = content->key_len;
count++;
}
- *n_keys = count;
+ *n_entry = count;
}
double spread_sketch_get_cardinality(const struct spread_sketch *ss, const char *key, size_t key_len) {
@@ -538,7 +537,7 @@ struct spread_sketch *spread_sketch_copy(const struct spread_sketch *src) {
return dst;
}
-void spread_sketch_get_parameter_recommendation(int expected_super_spreader_number, int *depth_out, int *width_out, unsigned char *precision_out)
+void spread_sketch_recommend_parameters(int expected_super_spreader_number, int *depth_out, int *width_out, unsigned char *precision_out)
{
int logk = expected_super_spreader_number >= 3200 ? 4 : 3; // lg3200 = 3.51,round up to 4
*depth_out = logk;
@@ -592,7 +591,7 @@ void spread_sketch_serialize(const struct spread_sketch *ss, char **blob, size_t
}
}
- char *buffer = malloc(sz);
+ char *buffer = calloc(sz, sizeof(char));
*blob = buffer;
*blob_sz = sz;
@@ -650,6 +649,7 @@ struct spread_sketch *spread_sketch_deserialize(const char *blob, size_t blob_sz
}
const char *key = blob;
blob += key_len;
+
struct entry *content = smart_ptr_table_get(ss->table, key, key_len, NULL);
ss->buckets[i].content = content;
}
@@ -670,10 +670,8 @@ size_t spread_sketch_calculate_memory_usage(const struct spread_sketch *ss)
ret += sizeof(struct spread_sketch);
size_t bucket_size = sizeof(struct bucket) + hll_size(ss->precision);
- printf("every bucket size: %zu\n", bucket_size);
ret += ss->depth * ss->width * bucket_size;
- printf("the number of content: %u\n", HASH_COUNT(ss->table->entry));
struct entry *content, *tmp;
HASH_ITER(hh, ss->table->entry, content, tmp) {
ret += sizeof(struct entry);
diff --git a/CRDT/spread_sketch.h b/CRDT/spread_sketch.h
index 9d6a257..80e6916 100644
--- a/CRDT/spread_sketch.h
+++ b/CRDT/spread_sketch.h
@@ -21,23 +21,23 @@ struct spread_sketch *spread_sketch_new(int depth, int width, unsigned char prec
void spread_sketch_free(struct spread_sketch *ss);
void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn);
-int spread_sketch_add_hash(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg, struct timeval now);
-int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, const char* item, size_t item_len, void *arg, struct timeval now);
+int spread_sketch_add_hash(struct spread_sketch *ss, const char *entry, size_t entry_length, uint64_t item_hash, void *arg, struct timeval now);
+int spread_sketch_add(struct spread_sketch *ss, const char *entry, size_t entry_length, const char* item, size_t item_len, void *arg, struct timeval now);
-// get the number of keys stored in spread sketch
+// get the number of entrys stored in spread sketch
int spread_sketch_get_count(const struct spread_sketch *ss);
-// list all the keys in spread sketch. User should free the arrays, but do not free the elements of strings in the array(because they are references to the internal data structure)
-// Example: char **key; size_t *key_len; size_t n_keys; spread_sketch_list_keys(&key, &key_len, &n_keys); free(key); free(key_len);
-void spread_sketch_list_keys(const struct spread_sketch *ss, char ***keys, size_t **key_lens, size_t *n_keys);
-// query the cardinality(or fanout) of a key in spread sketch.
-// Even thought spread sketch algorithm does not requires keys to exist innately, when querying a key that is not present in the spread sketch, `spread_sketch_get_cardinality` will return -1.
-double spread_sketch_get_cardinality(const struct spread_sketch *ss, const char *key, size_t key_len);
+// list all the entrys in spread sketch. User should free the arrays, but do not free the elements of strings in the array(because they are references to the internal data structure)
+// Example: char **entry; size_t *entry_len; size_t n_entrys; spread_sketch_list_entrys(&entry, &entry_len, &n_entrys); free(entry); free(entry_len);
+void spread_sketch_list_entries(const struct spread_sketch *ss, char ***entries, size_t **entry_lens, size_t *n_entry);
+// query the cardinality(or fanout) of a entry in spread sketch.
+// Even thought spread sketch algorithm does not requires entrys to exist innately, when querying a entry that is not present in the spread sketch, `spread_sketch_get_cardinality` will return -1.
+double spread_sketch_get_cardinality(const struct spread_sketch *ss, const char *entry, size_t entry_len);
// query a hyperloglog 's base64 serialization. The serialization format is [1,precision,register...] and then encoded by base64
-char *spread_sketch_get_hll_base64_serialization(const struct spread_sketch *ss, const char *key, size_t key_len);
-void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *key, size_t key_len);
+char *spread_sketch_get_hll_base64_serialization(const struct spread_sketch *ss, const char *entry, size_t entry_len);
+void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *entry, size_t entry_len);
// in most cases, it has the same output as `spread_sketch_get_cardinality`, but it will perform more like an ordinary spread sketch query.
-// Will always return a value, even if the key is not present in the spread sketch. Must pass a `now` value required by Stagger hll query.
-double spread_sketch_query(const struct spread_sketch *ss, const char *key, size_t key_length, struct timeval now);
+// Will always return a value, even if the entry is not present in the spread sketch. Must pass a `now` value required by Stagger hll query.
+double spread_sketch_query(const struct spread_sketch *ss, const char *entry, size_t entry_length, struct timeval now);
void spread_sketch_merge(struct spread_sketch *dest, const struct spread_sketch *src);
struct spread_sketch *spread_sketch_copy(const struct spread_sketch *src);
@@ -50,7 +50,7 @@ struct spread_sketch *spread_sketch_replicate(uuid_t uuid, const char *blob, siz
void spread_sketch_get_parameter(const struct spread_sketch *ss, int *depth_out, int *width_out, unsigned char *precision_out, int *time_window_ms_out);
// spread sketch alway store values more than expected_query_num,expected_query_num is a hint to set spread sketch parameters properly
-void spread_sketch_get_parameter_recommendation(int expected_super_spreader_number, int *depth_out, int *width_out, unsigned char *precision_out);
+void spread_sketch_recommend_parameters(int expected_super_spreader_number, int *depth_out, int *width_out, unsigned char *precision_out);
size_t spread_sketch_calculate_memory_usage(const struct spread_sketch *ss);
#ifdef __cplusplus
diff --git a/CRDT/spreadsketch_crdt_test.cpp b/CRDT/spread_sketch_gtest.cpp
index 6f728aa..55084b6 100644
--- a/CRDT/spreadsketch_crdt_test.cpp
+++ b/CRDT/spread_sketch_gtest.cpp
@@ -83,7 +83,7 @@ struct spread_sketch_list *spread_sketch_list(const struct spread_sketch *ss) {
size_t n_result;
size_t *key_lens;
char **keys;
- spread_sketch_list_keys(ss, &keys, &key_lens, &n_result);
+ spread_sketch_list_entries(ss, &keys, &key_lens, &n_result);
struct spread_sketch_list *ret = (struct spread_sketch_list *)malloc(sizeof(struct spread_sketch_list));
if (n_result == 0) {
@@ -238,7 +238,7 @@ public:
return mre / total_ss;
}
- bool assess_estimate(unsigned char hll_precision, double eplilon, double delta) {
+ bool check_result(unsigned char hll_precision, double eplilon, double delta) {
int good = 0;
int total_valid_key = 0;
double sigma = hll_error(hll_precision) * 2; // RSD -> max error estimation
@@ -360,7 +360,7 @@ TestRet *spread_sketch_test(struct spread_sketch *ss, long long window_s, long l
return test_ret;
}
-TestRet *spread_sketch_test_swarm(int precision, int depth, int width, long long window_s, long long duration_s, int n_super_spreader, int n_total, int fanout_superspreader, int fanout_others)
+TestRet *spread_sketch_generic_test(int precision, int depth, int width, long long window_s, long long duration_s, int n_super_spreader, int n_total, int fanout_superspreader, int fanout_others)
{
long long now = 0;
const int N_INSTANCES = 10;
@@ -416,40 +416,40 @@ double cal_CM_epsilon(int width) {
return 2.0 / width;
}
-TEST(SpreadSketch, test_hll_on_a_single_bucket)
+TEST(SpreadSketch, OneEntry)
{
- TestRet *test_ret = spread_sketch_test_swarm(
+ TestRet *test_ret = spread_sketch_generic_test(
4, // hll precision
4, 1024, // depth, width
10, 100, // window, duration
0, 1, // n_super_spreader, n_total. just one flow, so no error on CM sketch
10000, 10000); // fanout_superspreader, fanout_others
- bool good = test_ret->assess_estimate(4, 0, 0);
+ bool good = test_ret->check_result(4, 0, 0);
EXPECT_TRUE(good);
delete test_ret;
}
-TEST(SpreadSketch, test_hll_given_short_input)
+TEST(SpreadSketch, OneEntryNoSliding)
{
// use very little fp compared to big sketch so that CM sketch error are negligible
- TestRet *test_ret = spread_sketch_test_swarm(
+ TestRet *test_ret = spread_sketch_generic_test(
4, // hll precision
4, 1024, // depth, width
10, 1, // window, duration, window is larger than duration
0, 1, // n_super_spreader, n_total. just one flow, so no error on CM sketch
10000, 10000); // fanout_superspreader, fanout_others
- bool good = test_ret->assess_estimate(4, 0, 0);
+ bool good = test_ret->check_result(4, 0, 0);
EXPECT_TRUE(good);
delete test_ret;
}
-TEST(SpreadSketch, normal_flows)
+TEST(SpreadSketch, Precision)
{
- TestRet *test_ret = spread_sketch_test_swarm(
+ TestRet *test_ret = spread_sketch_generic_test(
6, // hll precision
4, 1024, // depth, width
100, 200, // window, duration
@@ -459,7 +459,7 @@ TEST(SpreadSketch, normal_flows)
double mre = test_ret->cal_mre();
EXPECT_LE(mre, 0.4);
- bool good = test_ret->assess_estimate(6, cal_CM_epsilon(1024), cal_CM_delta(4));
+ bool good = test_ret->check_result(6, cal_CM_epsilon(1024), cal_CM_delta(4));
EXPECT_TRUE(good);
double recall_rate = test_ret->cal_recall_rate(500); // 500: n_super_spreader
@@ -470,7 +470,7 @@ TEST(SpreadSketch, normal_flows)
delete test_ret;
}
-TEST(SpreadSketch, scarce_flow)
+TEST(SpreadSketch, PrecisionOfScarceEntry)
{
int window_s = 100;
int n_super_spreader = 100;
@@ -493,7 +493,7 @@ TEST(SpreadSketch, scarce_flow)
spread_sketch_free(ss);
}
-TEST(SpreadSketch, decay_old_key) {
+TEST(SpreadSketch, Expire) {
int n_super_spreader = 1000;
int window_s = 10;
@@ -524,7 +524,7 @@ TEST(SpreadSketch, decay_old_key) {
spread_sketch_free(ss);
}
-TEST(SpreadSketch, serialize_with_nonstring_keys) {
+TEST(SpreadSketch, Serialize) {
struct spread_sketch *ss = spread_sketch_new(4, 1024, 4, 10*1000, ms_to_timeval(0));
int key1 = 1;
int key2 = 2;
@@ -575,7 +575,7 @@ void spread_sketch_parameter_experiments(int depth, int width, int precision, lo
printf("===============================================\n");
}
-TEST(SpreadSketchExperiments, uniform_distribution) {
+TEST(SpreadSketchExperiments, UniformDistribution) {
long long window_s = 10;
// check CM sketch related parameters
int depth[] = {3, 4};
@@ -767,7 +767,7 @@ void parameter_experiments_given_zipf_experiment(int depth, int width, int preci
spread_sketch_free(ss);
}
-TEST(SpreadSketchExperiments, zipf_distribution) {
+TEST(SpreadSketchExperiments, ZipfDistribution) {
/*
test with flows in zipf distribution fanout.
Every time window, there are `n_flows_per_window` distinct flows generated. Cardinality({y|x=x_i}) ~ Zipf(N, skewness), where N = 10 * n_superspreader
diff --git a/CRDT/spread_sketch_sheet.c b/CRDT/spread_sketch_sheet.c
deleted file mode 100644
index ef69734..0000000
--- a/CRDT/spread_sketch_sheet.c
+++ /dev/null
@@ -1,511 +0,0 @@
-
-#include <stdbool.h>
-#include <stdlib.h>
-#include <string.h>
-#include <stdio.h>
-#include <math.h>
-#include <assert.h>
-
-#include "xxhash.h"
-#include "uthash.h"
-
-#include "spread_sketch.h"
-#include "hll_sheet.h"
-#include "crdt_utils.h"
-
-
-
-struct stagger_hll_group {
- uint32_t *registers;
-
- // shared configuration
- long long reset_idx;
- struct timeval reset_time;
-
- // configuration
- unsigned char precision;
- long long time_window_ms;
- int sheet_size;
-};
-
-struct spread_sketch {
- int depth;
- int width;
- long long time_window_ms;
-
- // buckets
- // TODO: 定义一个bucket 结构体
- unsigned char *max_levels;
- char **keys; // TODO: 不一定是字符串
- struct stagger_hll_group *counters; // TODO: 先改成简单版本
- long long *last_updates_ms; // linux timestamp since Jan 1st 1970
-};
-
-struct spread_sketch_list; // defined in spread_sketch.h
-
-struct stagger_hll_group *shllg_new(unsigned depth, unsigned width, unsigned char precision, int time_window_s, struct timeval now) {
- struct stagger_hll_group *pthis = malloc(sizeof(struct stagger_hll_group));
-
- pthis->sheet_size = depth * width;
-
- pthis->precision = precision;
- pthis->time_window_ms = time_window_s * 1000;
-
- pthis->reset_idx = 0;
- pthis->reset_time = now;
-
- pthis->registers = hlls_create_register(precision, pthis->sheet_size);
-
- return pthis;
-}
-
-void move_registers_forward(struct stagger_hll_group *pthis, const struct timeval *now) {
- long long reset_reg_count = hlls_get_reset_register_count(pthis->precision, pthis->time_window_ms, *now, &pthis->reset_time);
- if(reset_reg_count)
- {
- hlls_reset_registers(pthis->registers, pthis->precision, pthis->sheet_size, pthis->reset_idx, reset_reg_count);
- hlls_advance_reset_index(&pthis->reset_idx, reset_reg_count, pthis->precision);
- }
-}
-
-void shllg_update(struct stagger_hll_group *pthis, int bucket_idx, uint64_t key) {
- hlls_add_hash(pthis->registers, pthis->precision, pthis->sheet_size, key, bucket_idx);
-}
-
-double shllg_count(struct stagger_hll_group *pthis, int bucket_idx) {
- return hlls_count(pthis->registers, bucket_idx, pthis->precision, pthis->sheet_size, pthis->reset_idx, pthis->time_window_ms);
-}
-
-void shllg_free(struct stagger_hll_group *pthis) {
- hlls_free_register(pthis->registers);
- free(pthis);
-}
-
-size_t shllg_size(unsigned char precision, int sheet_size) {
- return hlls_size(precision, sheet_size) + sizeof(struct stagger_hll_group);
-}
-
-struct stagger_hll_group *shllg_duplicate(const struct stagger_hll_group *src) {
- struct stagger_hll_group *pthis = malloc(sizeof(struct stagger_hll_group));
- pthis->sheet_size = src->sheet_size;
-
- pthis->precision = src->precision;
- pthis->time_window_ms = src->time_window_ms;
-
- pthis->reset_idx = src->reset_idx;
- pthis->reset_time = src->reset_time;
-
- pthis->registers = hlls_duplicate(src->registers, src->precision, src->sheet_size);
-
- return pthis;
-}
-
-void shllg_merge(struct stagger_hll_group *dst, const struct stagger_hll_group *src) {
- hlls_merge(dst->registers, src->registers, dst->precision, dst->sheet_size);
-}
-
-void shllg_serialize(struct stagger_hll_group *pthis, char **blob, size_t *blob_sz) {
- struct serializer_reader *reader = serializer_reader_new();
- serializer_reader_read_uint(reader, pthis->precision);
- serializer_reader_read_uint(reader, pthis->sheet_size);
- serializer_reader_read_longlong(reader, pthis->time_window_ms);
-
- serializer_reader_read_longlong(reader, pthis->reset_idx);
- serializer_reader_read_longlong(reader, pthis->reset_time.tv_sec);
- serializer_reader_read_longlong(reader, pthis->reset_time.tv_usec);
-
- serializer_reader_read_bin(reader, (const char *)pthis->registers, hlls_size(pthis->precision, pthis->sheet_size));
- serializer_reader_finalize(reader, blob, blob_sz);
-}
-
-struct stagger_hll_group *shllg_deserialize(const char *blob, size_t blob_sz) {
- struct serializer_writer *writer = serializer_writer_new(blob, blob_sz);
- struct stagger_hll_group *pthis = malloc(sizeof(struct stagger_hll_group));
- int ret;
-
- uint32_t precision_tmp;
- ret = serializer_writer_write_uint(writer, &precision_tmp);
- assert(ret != -1);
- pthis->precision = (unsigned char)precision_tmp;
-
- uint32_t sheet_size_tmp;
- ret = serializer_writer_write_uint(writer, &sheet_size_tmp);
- assert(ret != -1);
- pthis->sheet_size = sheet_size_tmp;
- ret = serializer_writer_write_longlong(writer, &pthis->time_window_ms);
- assert(ret != -1);
-
- ret = serializer_writer_write_longlong(writer, &pthis->reset_idx);
- assert(ret != -1);
- long long tv_sec_tmp, tv_usec_tmp;
- ret = serializer_writer_write_longlong(writer, &tv_sec_tmp);
- assert(ret != -1);
- ret = serializer_writer_write_longlong(writer, &tv_usec_tmp);
- assert(ret != -1);
- pthis->reset_time.tv_sec = tv_sec_tmp;
- pthis->reset_time.tv_usec = tv_usec_tmp;
-
- char *registers = NULL;
- size_t registers_sz = 0;
- ret = serializer_writer_write_bin(writer, &registers, &registers_sz);
- assert(ret != -1);
- assert(registers_sz == hlls_size(pthis->precision, pthis->sheet_size));
-
- pthis->registers = (uint32_t *)registers;
-
- serializer_writer_free(writer);
-
- return pthis;
-}
-
-/* -------------------------------------------------------------------------- */
-/* spread_sketch */
-/* -------------------------------------------------------------------------- */
-
-unsigned char cal_true_level(const struct spread_sketch *ss, int bucket_idx, long long t) {
- /*
- return f(t), the actual level of bucket, which satisfy:
- 1. d 2^f(t)/dt is constant
- 2. f(t0 + T) = 1
- 3. f((t0) = L )
- */
- unsigned L = ss->max_levels[bucket_idx];
- long long T = ss->time_window_ms;
- long long t0 = ss->last_updates_ms[bucket_idx];
-
- assert(t >= t0);
- if (t - t0 >= T) {
- return 1;
- }
- if (L <= 1) {
- return L;
- }
-
- long long tmp_exp = 1 << L;
- double a = ((double)(2 - tmp_exp)) / ((double)T);
- double b = (double)(tmp_exp);
-
- return (unsigned char)(log2(a * ((double)(t-t0)) + b) + 0.5);
-}
-
-struct spread_sketch *spread_sketch_new(int depth, int width, unsigned char precision, int time_window_s, struct timeval now) {
- struct spread_sketch *pthis = malloc(sizeof(struct spread_sketch));
-
- pthis->depth = depth;
- pthis->width = width;
- pthis->time_window_ms = time_window_s * 1000;
-
- pthis->max_levels = calloc(depth * width, sizeof(unsigned char));
- pthis->keys = calloc(depth * width, sizeof(char *));
- pthis->counters = shllg_new(depth, width, precision, time_window_s, now);
- pthis->last_updates_ms = calloc(depth * width, sizeof(long long));
-
- return pthis;
-}
-
-/*
-hash utils:
-
-xxhash 在输入较短的情况下比murmurhash 快(https://github.com/sebastian-software/node-hash-comparison),因此使用xxhash。
-
-需要把src和dst 哈希到一起,但是我严重觉得只哈希dst 就行,这里可以实验一下。
-anyway, 我觉得xxhash 就好
-*/
-
-void spread_sketch_add(struct spread_sketch *ss, const char *key_main, size_t key_main_length, const char *key_spread, size_t key_spread_length, struct timeval now) {
- uint64_t hash_y = XXH3_64bits_withSeed(key_spread, strlen(key_spread), 171);
- unsigned char level = (unsigned char)__builtin_clzll(hash_y) + 1;
- long long now_ms = now.tv_sec * 1000 + now.tv_usec / 1000;
-
- // https://www.eecs.harvard.edu/~michaelm/postscripts/tr-02-05.pdf
- // A technique from the hashing literature is to use two hash functions h1(x) and h2(x) to simulate additional hash functions of the form gi(x) = h1(x) + ih2(x)
- // Assuming that the 128-bit xxhash function is perfect, we can view it as a combination of two 64-bit hash functions.
- uint64_t hash_x_tmp = XXH3_64bits_withSeed(key_main, strlen(key_main), 171);
- uint32_t hash_x1 = (uint32_t) (hash_x_tmp >> 32);
- uint32_t hash_x2 = (uint32_t) hash_x_tmp;
-
- move_registers_forward(ss->counters, &now);
-
- for (int i = 0; i < ss->depth; i++) {
- uint32_t hash_x = hash_x1 + i * hash_x2;
- int bucket_idx = (hash_x % ss->width) + i * ss->width;
-
-
- if ((ss->keys[bucket_idx] != NULL) && (strcmp(ss->keys[bucket_idx], key_main) == 0)) {
- ss->last_updates_ms[bucket_idx] = now_ms;
- if (ss->max_levels[bucket_idx] < level) {
- ss->max_levels[bucket_idx] = level;
- }
- } else {
- unsigned char true_level;
- if (ss->keys[bucket_idx] == NULL) {
- true_level = 0;
- } else {
- true_level = cal_true_level(ss, bucket_idx, now_ms);
- }
-
- if (true_level < level) {
- free(ss->keys[bucket_idx]);
- ss->keys[bucket_idx] = strdup(key_main);
- ss->last_updates_ms[bucket_idx] = now_ms;
- ss->max_levels[bucket_idx] = level;
- }
- }
-
- shllg_update(ss->counters, bucket_idx, hash_y);
- }
-}
-
-double spread_sketch_point_query(struct spread_sketch *ss, const char *key_main) {
- uint64_t hash_x_tmp = XXH3_64bits_withSeed(key_main, strlen(key_main), 171);
- uint32_t hash_x1 = (uint32_t) (hash_x_tmp >> 32);
- uint32_t hash_x2 = (uint32_t) hash_x_tmp;
-
- double count_min = (double)INT32_MAX;
- for (int i = 0; i < ss->depth; i++) {
- uint32_t hash_x = hash_x1 + i * hash_x2;
- int bucket_idx = (hash_x % ss->width) + i * ss->width;
-
- double est = shllg_count(ss->counters, bucket_idx);
- if (est < count_min) {
- count_min = est;
- }
- }
- return count_min;
-}
-
-void ss_query_result_free(struct spread_sketch_list *result) {
- if (result == NULL) {
- return;
- }
-
- for(int i=0; i<result->n_results; i++){
- free(result->key[i]);
- }
- free(result->key);
- free(result->count);
- free(result);
-}
-
-
-struct spread_sketch_list *spread_sketch_list(const struct spread_sketch *ss, struct timeval now) {
- // gen a duplication, so that step won't change the original
- struct spread_sketch *duplicate = malloc(sizeof(struct spread_sketch));
- duplicate->depth = ss->depth;
- duplicate->width = ss->width;
- duplicate->time_window_ms = ss->time_window_ms;
- duplicate->max_levels = NULL; // `max level` is useless in query
- duplicate->keys = ss->keys; // keys are read only
- duplicate->counters = shllg_duplicate(ss->counters); // counters are deep copy
-
- struct stagger_hll_group *hlls = duplicate->counters;
- if (hlls_should_slide(hlls->precision, hlls->time_window_ms, now, hlls->reset_time)) {
- move_registers_forward(hlls, &now);
- }
-
- // find out all unique keys
- struct key_item {
- char *key;
- UT_hash_handle hh;
- } *key_counts = NULL;
-
- long long now_ms = now.tv_sec * 1000 + now.tv_usec / 1000;
- for (int i = 0; i < ss->depth; i++) {
- for (int j = 0; j < ss->width; j++) {
- char *key = ss->keys[i * ss->width + j];
- if (key == NULL) {
- continue;
- }
- if (now_ms - ss->last_updates_ms[i * ss->width + j] > ss->time_window_ms) {
- printf("skip because the key is not updated for a long time, key: %s, last update: %lld, now: %lld, time window: %lld\n", key, ss->last_updates_ms[i * ss->width + j], now_ms, ss->time_window_ms);
- continue;
- }
-
- struct key_item *kc;
- HASH_FIND_STR(key_counts, key, kc);
- if (kc == NULL) {
- kc = malloc(sizeof(struct key_item));
- kc->key = key;
- HASH_ADD_KEYPTR(hh, key_counts, kc->key, strlen(kc->key), kc);
- }
- }
- }
-
- printf("find number of keys: %d\n", HASH_COUNT(key_counts));
- // query
- int n_result = HASH_COUNT(key_counts);
- struct spread_sketch_list *ret = malloc(sizeof(struct spread_sketch_list));
- ret->key = malloc(sizeof(char *) * n_result);
- ret->count = malloc(sizeof(double) * n_result);
-
- int n_results_found = 0;
- struct key_item *kc, *tmp;
- HASH_ITER(hh, key_counts, kc, tmp) {
- double count = spread_sketch_point_query(duplicate, kc->key);
- if (count >= 1.0) {
- ret->key[n_results_found] = strdup(kc->key);
- ret->count[n_results_found] = count;
- n_results_found++;
- }
- }
- ret->n_results = n_results_found;
-
- // clean up
- HASH_ITER(hh, key_counts, kc, tmp) {
- HASH_DEL(key_counts, kc);
- free(kc);
- }
- shllg_free(duplicate->counters);
- free(duplicate);
-
- return ret;
-}
-
-void spread_sketch_free(struct spread_sketch *ss) {
- for (int i = 0; i < ss->depth * ss->width; i++) {
- free(ss->keys[i]);
- }
- free(ss->keys);
- free(ss->max_levels);
- shllg_free(ss->counters);
- free(ss->last_updates_ms);
- free(ss);
-}
-
-void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch *src)
-{
- assert(dst->depth == src->depth && dst->width == src->width);
- assert(dst->time_window_ms == src->time_window_ms);
- assert(dst->counters->precision == src->counters->precision);
-
- for (int i = 0; i < dst->depth * dst->width; i++) {
- if (src->keys[i] == NULL) {
- continue;
- }
-
- if (dst->keys[i] == NULL) {
- dst->keys[i] = strdup(src->keys[i]);
- dst->max_levels[i] = src->max_levels[i];
- dst->last_updates_ms[i] = src->last_updates_ms[i];
- continue;
- }
-
- if (strcmp(dst->keys[i], src->keys[i]) == 0) {
- if (src->max_levels[i] > dst->max_levels[i]) {
- dst->max_levels[i] = src->max_levels[i];
- }
- if (src->last_updates_ms[i] > dst->last_updates_ms[i]) {
- dst->last_updates_ms[i] = src->last_updates_ms[i];
- }
- } else {
- unsigned char true_level_src = cal_true_level(src, i, src->last_updates_ms[i]);
- unsigned char true_level_dst = cal_true_level(dst, i, dst->last_updates_ms[i]);
-
- if (true_level_src > true_level_dst) {
- free(dst->keys[i]);
- dst->keys[i] = strdup(src->keys[i]);
- dst->max_levels[i] = src->max_levels[i];
- dst->last_updates_ms[i] = src->last_updates_ms[i];
- }
- }
- }
-
- shllg_merge(dst->counters, src->counters);
-}
-
-void spread_sketch_serialize(const struct spread_sketch *ss, char **blob, size_t *blob_sz)
-{
- struct serializer_reader *reader = serializer_reader_new();
- int bucket_length = ss->depth * ss->width;
- serializer_reader_read_uint(reader, ss->depth);
- serializer_reader_read_uint(reader, ss->width);
- serializer_reader_read_longlong(reader, ss->time_window_ms);
-
- for (int i = 0; i < bucket_length; i++) {
- if (ss->keys[i] == NULL) {
- serializer_reader_read_nil(reader);
- } else {
- serializer_reader_read_str(reader, ss->keys[i]);
- }
- }
- serializer_reader_read_bin(reader, (const char *)ss->max_levels, bucket_length);
- serializer_reader_read_bin(reader, (const char *)ss->last_updates_ms, bucket_length * sizeof(long long));
-
- char *shllg_blob = NULL;
- size_t shllg_blob_sz = 0;
- shllg_serialize(ss->counters, &shllg_blob, &shllg_blob_sz);
- serializer_reader_read_bin(reader, shllg_blob, shllg_blob_sz);
- free(shllg_blob);
-
- serializer_reader_finalize(reader, blob, blob_sz);
-}
-
-struct spread_sketch *spread_sketch_deserialize(const char *blob, size_t blob_sz)
-{
- struct serializer_writer *writer = serializer_writer_new(blob, blob_sz);
- struct spread_sketch *ss = malloc(sizeof(struct spread_sketch));
- int ret;
- ret = serializer_writer_write_uint(writer, (uint32_t*)&ss->depth);
- assert(ret != -1);
- ret = serializer_writer_write_uint(writer, (uint32_t*)&ss->width);
- assert(ret != -1);
- ret = serializer_writer_write_longlong(writer, &ss->time_window_ms);
- assert(ret != -1);
-
- int bucket_length = ss->depth * ss->width;
-
- ss->keys = calloc(bucket_length, sizeof(char *));
- for (int i = 0; i < bucket_length; i++) {
- ret = serializer_writer_expect_nil(writer);
- assert(ret != -1);
- if (ret == 0) {
- char *key = NULL;
- ret = serializer_writer_write_str(writer, &key);
- assert(ret != -1);
- ss->keys[i] = key;
- } else {
- ss->keys[i] = NULL;
- }
- }
- size_t size_dummy = 0;
- char *max_level_tmp = NULL;
- ret = serializer_writer_write_bin(writer, &max_level_tmp, &size_dummy);
- assert(ret != -1);
- assert(size_dummy == bucket_length * sizeof(unsigned char));
- ss->max_levels = (unsigned char *)max_level_tmp;
-
- char *last_update_tmp = NULL;
- ret = serializer_writer_write_bin(writer, &last_update_tmp, &size_dummy);
- assert(ret != -1);
- assert(size_dummy == bucket_length * sizeof(long long));
- ss->last_updates_ms = (long long *)last_update_tmp;
-
- char *shllg_blob = NULL;
- size_t shllg_blob_sz = 0;
- ret = serializer_writer_write_bin(writer, &shllg_blob, &shllg_blob_sz);
- assert(ret != -1);
- ss->counters = shllg_deserialize(shllg_blob, shllg_blob_sz);
- free(shllg_blob);
-
- serializer_writer_free(writer);
-
- return ss;
-}
-
-void spread_sketch_merge_blob(struct spread_sketch *dst, const char *blob, size_t blob_sz)
-{
-}
-
-size_t spread_sketch_calculate_memory_usage(const struct spread_sketch *ss)
-{
- size_t ret = 0;
- ret += sizeof(struct spread_sketch);
-
- size_t bucket_size = sizeof(unsigned char) + (sizeof(char *) + 5) + sizeof(long long);
- size_t average_sthllg = shllg_size(ss->counters->precision, ss->counters->sheet_size) / ss->width / ss->depth;
- bucket_size += average_sthllg;
- printf("every bucket size: %ld\n", bucket_size);
-
- ret += ss->depth * ss->width * bucket_size;
- return ret;
-}
diff --git a/docs/command_toc.md b/docs/command_toc.md
index a9c7de6..bb4a9a9 100644
--- a/docs/command_toc.md
+++ b/docs/command_toc.md
@@ -11,6 +11,7 @@ The supported command are category as follows:
* [Bloom Filter Type](./commands/bloom_filter.md)
* [Count-Min Sketch Type](./commands/count_min_sketch.md)
* [HyperLogLog Type](./commands/hyperloglog.md)
+* [Spread Sketch Type](./commands/spread_sketch.md)
* [Cluster Management](./commands/cluster.md)
* [Trouble Shooting](./commands/trouble_shooting.md)
## COMMAND LIST
diff --git a/docs/commands/spread_sketch.md b/docs/commands/spread_sketch.md
new file mode 100644
index 0000000..9175e11
--- /dev/null
+++ b/docs/commands/spread_sketch.md
@@ -0,0 +1,155 @@
+## Spread Sketch
+
+Spread Sketch is a probabilistic data structure that estimates the cardinalities of a huge number of entries. It's implemented based on [A High-Performance Invertible Sketch for Network-Wide Superspreader Detection](https://ieeexplore.ieee.org/abstract/document/9858870/) and innovated for the support of near-continuous time window based on the [Staggered HyperLogLog (STHLL)](https://www.sciencedirect.com/science/article/abs/pii/S0140366422002407) and level decay mechanism. The unique counter in sketch is implemented by STHLL instead of Multiple Resolution Bitmap suggested in the original paper.
+
+
+### SSINITBYDIM
+
+Syntax
+
+```
+SSINITBYDIM key width depth precision [TIME window-milliseconds]
+```
+
+ Initialize a Spread Sketch that can estimates cardinality of entries within a sliding time window with following arguments:
+ - width: Number of counters in each array. Reduces the error size.
+ - depth: Number of counter-arrays. Reduces the probability for an error of a certain size.
+ - precision -- Used by Hyperloglog, controlling the number of registers in the HyperLogLog. The precision should be between 4 to 16.
+ - TIME -- Optional. Creates a time-limited Spread Sketch. 'window-milliseconds' specifies the duration of the time window in milliseconds. This mode allows estimating the cardinality of items within the range (now - window-milliseconds, now).
+
+The error of Spread Sketch is determined by combining the error of STHLL and the error of CM Sketch. The memory usage of Spread Sketch can be estimated by width * depth * (4 + m), and gets slightly larger as more entries are added.
+
+Return
+- Simple String Reply: OK if the HyperLogLog created successfully.
+- Empty Array if key already exists.
+
+
+### SSINITBYCAPACITY
+
+Syntax
+
+```
+SSINITBYCAPACITY key capacity precision [TIME window-milliseconds]
+```
+
+ Initialize a Spread Sketch that can estimates cardinality of entries within a sliding time window with following arguments:
+ - capacity -- The expected number of entries which will be inserted. It determines the width and depth of the sketch by `width = capacity` an `depth = lg(capacity)`. Width less than 40 will be set to 40, and depth less than 3 will be set to 3.
+ - precision -- The data of the HyperLogLog is stored in an array of m registers. m is 2^precision, e.g. precision=4, m=16. The precision should be between 4 to 16.
+ - TIME -- Optional. Creates a time-limited Spread Sketch. 'window-milliseconds' specifies the duration of the time window in milliseconds. This mode allows estimating the cardinality of items within the range (now - window-milliseconds, now).
+
+Return
+- Simple String Reply: OK if the HyperLogLog created successfully.
+- Empty Array if key already exists.
+
+
+### SSADD
+
+Syntax
+
+```
+SSADD key entry item [item ...]
+```
+
+Add each `entry` and `item` to the Spread Sketch stored at `key`. For each the `entry`, the cardinality of `item` will be estimated by the Spread Sketch.
+
+Return
+- Integer reply an updated cardinality of the `entry` in the sketch.
+- Integer reply -1 if the key does not exist.
+
+### SSLIST
+
+Syntax
+
+```
+SSLIST key
+```
+
+Returns all the entries in record (usually those with high cardinality in high probability) as well as the cardinality of each entry.
+
+Return
+- Array of key-cardinality pairs if the key exists.
+- Empty Array if the key does not exist.
+
+Examples
+```
+swarmkv-basic-test> SSINITBYCAPACITY dos_port_scan 100 6
+OK
+swarmkv-basic-test> SSADD dos_port_scan 1.1.1.1 1.1.1.1:22-244.10.1.1:8888
+(integer) 1
+swarmkv-basic-test> SSADD dos_port_scan 10.10.10.10 any_string_can_be_item item2 just_add_many
+(integer) 3
+swarmkv-basic-test> SSLIST dos_port_scan
+1) "1.1.1.1"
+2) (integer) 1
+3) "10.10.10.10"
+4) (integer) 3
+```
+
+### SSQUERY
+
+Syntax
+
+```
+SSQUERY key entry
+```
+
+Returns the cardinality for an entry in a sketch.
+
+Return
+
+- Integer reply: The cardinality for an entry in a sketch.
+- Integer 0 if key does not exist.
+
+
+### SSMQUERY
+
+Syntax
+
+```
+SSMQUERY key entry [entry ...]
+```
+
+Returns the cardinality for one or more entries. It's the batch query operation version of `SSQUERY`.
+
+Return
+- Array reply of Integer reply with an cardinality of each of the entries in the sketch.
+- Empty Array if key does not exist.
+
+
+
+### SSINFO
+
+Syntax
+```
+SSINFO key
+```
+Returns the information of the sketch.
+- Width: The parameter specified in `SSINITBYDIM` or if the SS is init with `SSINITBYCAPACITY`, width approximately equals to capacity and slightly larger since they are tuned by the results of experiments.
+- Depth: The parameter specified in `SSINITBYDIM` or if the SS is init with `SSINITBYCAPACITY`, depth equals to lg(capacity).
+- Precision: The precision of the HyperLogLog.
+- TimeWindowMs: The time window in milliseconds. 0 if the SS is not time-limited.
+- ErrorCMS: The error rate of the sketch. Contributes to the error in correlation with total cardinality.
+- ErrorHLL: The error rate of the HyperLogLog. Contributes to the error in correlation with each entry.
+- Probability: The probability of inflated cardinality.
+
+Return
+- Array reply with information of the Spread Sketch.
+
+Examples
+```
+swarmkv-basic-test> SSINFO dos_port_scan
+ 1) "Width"
+ 2) (integer) 150
+ 3) "Depth"
+ 4) (integer) 3
+ 5) "Precision"
+ 6) (integer) 6
+ 7) "TimeWindowMs"
+ 8) (integer) 0
+ 9) "ErrorCMS"
+1) (double) 0.013333
+2) "ErrorHLL"
+3) (double) 0.130000
+4) "Probability"
+5) (double) 0.125000
+```
diff --git a/include/swarmkv/swarmkv.h b/include/swarmkv/swarmkv.h
index 7fe3e39..92e5079 100644
--- a/include/swarmkv/swarmkv.h
+++ b/include/swarmkv/swarmkv.h
@@ -170,7 +170,7 @@ void swarmkv_bfmexists(struct swarmkv * db, const char * key, size_t keylen, con
void swarmkv_cmsincrby(struct swarmkv * db, const char * key, size_t keylen, const char * items[], const size_t items_len[], long long increments[], size_t n_increment, swarmkv_on_reply_callback_t * cb, void * cb_arg);
void swarmkv_ssadd(struct swarmkv * db, const char * key, size_t keylen, const char * item, const size_t item_len, const char *members[], size_t members_len[], size_t n_member, swarmkv_on_reply_callback_t * cb, void * cb_arg);
void swarmkv_cmsmquery(struct swarmkv * db, const char * key, size_t keylen, const char * items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t * cb, void * cb_arg);
-void swarmkv_ssquery(struct swarmkv * db, const char * key, size_t keylen, const char * items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t * cb, void * cb_arg);
+void swarmkv_ssmquery(struct swarmkv * db, const char * key, size_t keylen, const char * items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t * cb, void * cb_arg);
//Used by swarmkv-cli
size_t swarmkv_get_possible_command_name(struct swarmkv *db, const char *prefix, const char *cmd_names[], size_t sz);
diff --git a/readme.md b/readme.md
index e236525..4e103b2 100644
--- a/readme.md
+++ b/readme.md
@@ -33,7 +33,7 @@ SwarmKV Data Types
- [Bloom Filter](./docs/commands/bloom_filter.md) by age-partitioned bloom filter with the ability to expire.
- [Count-Min Sketch](./docs/commands/count_min_sketch.md).
- [HyperLogLog](./docs/commands/hyperloglog.md) by staggered HyperLogLog with the ability to expire.
-- [spreadsketch]() TODO
+- [Spread Sketch](./docs/commands/spread_sketch.md) by Spread Sketch with the ability to expire.
# Getting started
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 032c3b2..84fe726 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -26,7 +26,7 @@ add_definitions(-fPIC)
set(SWARMKV_SRC swarmkv_cmd_spec.c swarmkv.c swarmkv_api.c
swarmkv_mesh.c swarmkv_rpc.c swarmkv_message.c swarmkv_net.c
swarmkv_sync.c swarmkv_store.c swarmkv_keyspace.c swarmkv_monitor.c
- t_string.c t_set.c t_token_bucket.c t_hash.c t_bloom_filter.c t_cms.c t_hyperloglog.c t_spreadsketch.c
+ t_string.c t_set.c t_token_bucket.c t_hash.c t_bloom_filter.c t_cms.c t_hyperloglog.c t_spread_sketch.c
swarmkv_common.c swarmkv_utils.c future_promise.c http_client.c)
set(LIB_SOURCE_FILES
diff --git a/src/swarmkv.c b/src/swarmkv.c
index 5a017a9..61e347b 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -26,7 +26,7 @@
#include "t_bloom_filter.h"
#include "t_cms.h"
#include "t_hyperloglog.h"
-#include "t_spreadsketch.h"
+#include "t_spread_sketch.h"
#include "uthash.h"
#include "sds.h"
@@ -1178,20 +1178,26 @@ void command_spec_init(struct swarmkv *db)
1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
pfinfo_command, db->mod_store);
- /*spread sketch commands*/
- command_register(&(db->command_table), "SSINITBYDIM", "key width depth precision [TIME window-milliseconds]",
+ /*Spread sketch commands*/
+ swarmkv_command_table_register(db->mod_command_table, "SSINITBYDIM", "key width depth precision [TIME window-milliseconds]",
4, 1, CMD_KEY_OW, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
ssinitbydim_command, db->mod_store);
- command_register(&(db->command_table), "SSINITBYENTRYNUMBER", "key expected_superspreader_count [TIME window-milliseconds]*/",
- 2, 1, CMD_KEY_OW, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
- ssinitbyentrynumber_command, db->mod_store);
- command_register(&(db->command_table), "SSADD", "key entry item [item ...] ",
+ swarmkv_command_table_register(db->mod_command_table, "SSINITBYCAPACITY", "key capacity precsion [TIME window-milliseconds]*/",
+ 3, 1, CMD_KEY_OW, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ ssinitbycapacity_command, db->mod_store);
+ swarmkv_command_table_register(db->mod_command_table, "SSADD", "key entry item [item ...] ",
3, 1, CMD_KEY_RW, REPLY_INT_MINORS1, AUTO_ROUTE,
ssadd_command, db->mod_store);
- command_register(&(db->command_table), "SSLIST", "key",
+ swarmkv_command_table_register(db->mod_command_table, "SSLIST", "key",
1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
sslist_command, db->mod_store);
- command_register(&(db->command_table), "SSINFO", "key",
+ swarmkv_command_table_register(db->mod_command_table, "SSQUERY", "key entry",
+ 2, 1, CMD_KEY_RO, REPLY_INT_0, AUTO_ROUTE,
+ ssquery_command, db->mod_store);
+ swarmkv_command_table_register(db->mod_command_table, "SSMQUERY", "key entry [entry ...]",
+ 2, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ ssmquery_command, db->mod_store);
+ swarmkv_command_table_register(db->mod_command_table, "SSINFO", "key",
1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
ssinfo_command, db->mod_store);
@@ -1284,8 +1290,6 @@ void command_spec_init(struct swarmkv *db)
1, KEY_OFFSET_SLOTID, CMD_KEY_RO, REPLY_NA, NOT_AUTO_ROUTE,
keyspace_countkeysinslot_command, db->mod_keyspace);
- // TODO: 添加 SPREAD SKETCH
-
/* cluster commands are defined in swarmkv-cli.c */
return;
}
diff --git a/src/swarmkv_api.c b/src/swarmkv_api.c
index 7062404..00147e8 100644
--- a/src/swarmkv_api.c
+++ b/src/swarmkv_api.c
@@ -599,11 +599,11 @@ void swarmkv_ssadd(struct swarmkv *db, const char *key, size_t keylen, const cha
swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 3 + n_member, argv, argv_len);
}
-void swarmkv_ssquery(struct swarmkv *db, const char *key, size_t keylen, const char *items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t *cb, void *cb_arg)
+void swarmkv_ssmquery(struct swarmkv *db, const char *key, size_t keylen, const char *items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t *cb, void *cb_arg)
{
const char *argv[2 + n_items];
size_t argv_len[2 + n_items];
- argv[0] = "SSQUERY";
+ argv[0] = "SSMQUERY";
argv_len[0] = strlen(argv[0]);
argv[1] = key;
argv_len[1] = keylen;
diff --git a/src/t_spread_sketch.c b/src/t_spread_sketch.c
new file mode 100644
index 0000000..d6a7131
--- /dev/null
+++ b/src/t_spread_sketch.c
@@ -0,0 +1,307 @@
+#include <stdlib.h>
+#include <assert.h>
+#include <math.h>
+
+#include "swarmkv_common.h"
+#include "swarmkv_utils.h"
+#include "swarmkv_store.h"
+#include "swarmkv_error.h"
+#include "spread_sketch.h"
+
+enum cmd_exec_result ssinitbydim_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+ /* SSINITBYDIM key width depth precision [TIME window-milliseconds]*/
+ const sds key = cmd->argv[1];
+
+ long width = strtol(cmd->argv[2], NULL, 10);
+ if (width <= 0)
+ {
+ *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[2]);
+ return FINISHED;
+ }
+ long depth = strtol(cmd->argv[3], NULL, 10);
+ if (depth <= 0)
+ {
+ *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[3]);
+ return FINISHED;
+ }
+ long precision = strtol(cmd->argv[4], NULL, 10);
+ if (precision < HLL_MIN_PRECISION || precision > HLL_MAX_PRECISION)
+ {
+ *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[4]);
+ return FINISHED;
+ }
+ long long time_window_ms = 0;
+ if (cmd->argc == 7)
+ {
+ if (strncasecmp(cmd->argv[5], "TIME", 4) != 0)
+ {
+ *reply = swarmkv_reply_new_error(error_arg_string_should_be, cmd->argv[5], "TIME");
+ return FINISHED;
+ }
+ if (str2integer(cmd->argv[6], &time_window_ms) < 0)
+ {
+ *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[6]);
+ return FINISHED;
+ }
+ }
+
+ struct sobj *obj = store_lookup(mod_store, key);
+ if (!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+
+ if (obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ assert(obj->raw == NULL);
+
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ obj->spread_sketch = spread_sketch_new(width, depth, precision, time_window_ms, now);
+
+ obj->type = OBJ_TYPE_SPREAD_SKETCH;
+ *reply = swarmkv_reply_new_status("OK");
+ }
+ else
+ {
+ *reply = swarmkv_reply_new_array(0);
+ }
+ return FINISHED;
+}
+
+enum cmd_exec_result ssinitbycapacity_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+ /* SSINITBYCAPACITY key capacity precision [TIME window-milliseconds]*/
+ const sds key = cmd->argv[1];
+ long expected_superspreader_count = strtol(cmd->argv[2], NULL, 10);
+ if (expected_superspreader_count <= 0)
+ {
+ *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[2]);
+ return FINISHED;
+ }
+ long precision = strtol(cmd->argv[3], NULL, 10);
+ if (precision < HLL_MIN_PRECISION || precision > HLL_MAX_PRECISION)
+ {
+ *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[3]);
+ return FINISHED;
+ }
+ long long time_window_ms = 0; // when set to 0, it means no time decay is used
+ if (cmd->argc == 6)
+ {
+ if (strncasecmp(cmd->argv[4], "TIME", 4) != 0)
+ {
+ *reply = swarmkv_reply_new_error(error_arg_string_should_be, cmd->argv[4], "TIME");
+ return FINISHED;
+ }
+ if (str2integer(cmd->argv[5], &time_window_ms) < 0)
+ {
+ *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[5]);
+ return FINISHED;
+ }
+ }
+
+ struct sobj *obj = store_lookup(mod_store, key);
+ if (!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+
+ if (obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ assert(obj->raw == NULL);
+
+ struct timeval now;
+ gettimeofday(&now, NULL);
+
+ int width, depth;
+ unsigned char precision_dummy;
+ spread_sketch_recommend_parameters(expected_superspreader_count, &width, &depth, &precision_dummy);
+ obj->spread_sketch = spread_sketch_new(width, depth, precision, time_window_ms, now);
+
+ obj->type = OBJ_TYPE_SPREAD_SKETCH;
+ *reply = swarmkv_reply_new_status("OK");
+ }
+ else
+ {
+ *reply = swarmkv_reply_new_array(0);
+ }
+ return FINISHED;
+}
+
+enum cmd_exec_result ssadd_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+ // SSADD key entry item [item ...]
+ const sds key = cmd->argv[1];
+ struct sobj *obj = store_lookup(mod_store, key);
+ if (!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ if (obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
+ if (obj->type != OBJ_TYPE_SPREAD_SKETCH)
+ {
+ *reply = swarmkv_reply_new_error(error_wrong_type);
+ return FINISHED;
+ }
+
+ const sds entry_key = cmd->argv[2];
+
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ for (size_t i = 3; i < cmd->argc; i++)
+ {
+ spread_sketch_add(obj->spread_sketch, entry_key, sdslen(entry_key), cmd->argv[i], sdslen(cmd->argv[i]), NULL, now);
+ }
+
+ double est = spread_sketch_query(obj->spread_sketch, entry_key, sdslen(entry_key), now);
+ *reply = swarmkv_reply_new_integer((int)(est + 0.5));
+
+ store_mark_object_as_modified(mod_store, obj);
+ return FINISHED;
+}
+
+enum cmd_exec_result sslist_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+ // SSLIST key
+ const sds key = cmd->argv[1];
+
+ struct sobj *obj = store_lookup(mod_store, key);
+ if (!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ if (obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
+ if (obj->type != OBJ_TYPE_SPREAD_SKETCH)
+ {
+ *reply = swarmkv_reply_new_error(error_wrong_type);
+ return FINISHED;
+ }
+
+ struct spread_sketch *ss = obj->spread_sketch;
+ size_t n_entry;
+ char **entry_keys = NULL;
+ size_t *entry_keys_len = NULL;
+ spread_sketch_list_entries(ss, &entry_keys, &entry_keys_len, &n_entry);
+
+ *reply = swarmkv_reply_new_array(n_entry * 2);
+ for (size_t i = 0; i < n_entry; i++)
+ {
+ const char *entry_tmp = entry_keys[i];
+ size_t len_tmp = entry_keys_len[i];
+ int est_tmp = (int)(spread_sketch_get_cardinality(ss, entry_tmp, len_tmp) + 0.5);
+
+ (*reply)->elements[i * 2] = swarmkv_reply_new_string(entry_tmp, len_tmp);
+ (*reply)->elements[i * 2 + 1] = swarmkv_reply_new_integer(est_tmp);
+ }
+
+ free(entry_keys);
+ free(entry_keys_len);
+ return FINISHED;
+}
+
+enum cmd_exec_result ssmquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+ // SSMQUERY key entry [entry ...]
+ const sds key = cmd->argv[1];
+ struct sobj *obj = store_lookup(mod_store, key);
+ if (!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ if (obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
+ if (obj->type != OBJ_TYPE_SPREAD_SKETCH)
+ {
+ *reply = swarmkv_reply_new_error(error_wrong_type);
+ return FINISHED;
+ }
+
+ struct spread_sketch *ss = obj->spread_sketch;
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ *reply = swarmkv_reply_new_array(cmd->argc - 2);
+ for (size_t i = 2; i < cmd->argc; i++)
+ {
+ const sds entry_key = cmd->argv[i];
+ double est = spread_sketch_query(ss, entry_key, sdslen(entry_key), now);
+ (*reply)->elements[i - 2] = swarmkv_reply_new_integer((int)(est + 0.5));
+ }
+ return FINISHED;
+}
+enum cmd_exec_result ssquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+ // SSQUERY key entry
+ enum cmd_exec_result ret;
+ struct swarmkv_reply *tmp_reply = NULL;
+ ret = ssmquery_command(mod_store, cmd, &tmp_reply);
+ if (ret == FINISHED)
+ {
+ if (tmp_reply->type == SWARMKV_REPLY_ARRAY)
+ {
+ *reply = swarmkv_reply_dup(tmp_reply->elements[0]);
+ swarmkv_reply_free(tmp_reply);
+ }
+ else
+ {
+ *reply = tmp_reply;
+ }
+ }
+ return ret;
+}
+enum cmd_exec_result ssinfo_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+ // SSINFO key
+ const sds key = cmd->argv[1];
+
+ struct sobj *obj = store_lookup(mod_store, key);
+ if (!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ if (obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
+ if (obj->type != OBJ_TYPE_SPREAD_SKETCH)
+ {
+ *reply = swarmkv_reply_new_error(error_wrong_type);
+ return FINISHED;
+ }
+
+ struct spread_sketch *ss = obj->spread_sketch;
+ int depth, width, time_window_ms;
+ unsigned char precision;
+ spread_sketch_get_parameter(ss, &depth, &width, &precision, &time_window_ms);
+ double error_cms = 2.0 / (double)width;
+ double error_hll = ST_hyperloglog_error_for_precision(precision);
+ double probability = 1 / pow(2, depth);
+
+ int i = 0;
+ *reply = swarmkv_reply_new_array(14);
+ (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("Width");
+ (*reply)->elements[i++] = swarmkv_reply_new_integer(width);
+ (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("Depth");
+ (*reply)->elements[i++] = swarmkv_reply_new_integer(depth);
+ (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("Precision");
+ (*reply)->elements[i++] = swarmkv_reply_new_integer(precision);
+ (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("TimeWindowMs");
+ (*reply)->elements[i++] = swarmkv_reply_new_integer(time_window_ms);
+ (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("ErrorCMS");
+ (*reply)->elements[i++] = swarmkv_reply_new_double(error_cms);
+ (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("ErrorHLL");
+ (*reply)->elements[i++] = swarmkv_reply_new_double(error_hll);
+ (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("Probability");
+ (*reply)->elements[i++] = swarmkv_reply_new_double(probability);
+ assert(i == 14);
+
+ return FINISHED;
+} \ No newline at end of file
diff --git a/src/t_spreadsketch.h b/src/t_spread_sketch.h
index 93b1ee5..e8f6180 100644
--- a/src/t_spreadsketch.h
+++ b/src/t_spread_sketch.h
@@ -7,4 +7,5 @@ enum cmd_exec_result ssinitbycapacity_command(struct swarmkv_module *mod_store,
enum cmd_exec_result ssadd_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
enum cmd_exec_result sslist_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
enum cmd_exec_result ssinfo_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
-enum cmd_exec_result ssquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); \ No newline at end of file
+enum cmd_exec_result ssquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result ssmquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); \ No newline at end of file
diff --git a/src/t_spreadsketch.c b/src/t_spreadsketch.c
deleted file mode 100644
index 3d02191..0000000
--- a/src/t_spreadsketch.c
+++ /dev/null
@@ -1,256 +0,0 @@
-#include <stdlib.h>
-#include <assert.h>
-#include <math.h>
-
-#include "t_spreadsketch.h"
-
-#include "swarmkv_common.h"
-#include "swarmkv_utils.h"
-#include "swarmkv_store.h"
-#include "swarmkv_error.h"
-#include "spread_sketch.h"
-
-
-enum cmd_exec_result ssinitbydim_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) {
- /* SSINITBYDIM key width depth precision [TIME window-milliseconds]*/
- const sds key=cmd->argv[1];
-
- long width = strtol(cmd->argv[2], NULL, 10);
- if(width<=0) {
- *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[2]);
- return FINISHED;
- }
- long depth = strtol(cmd->argv[3], NULL, 10);
- if(depth<=0) {
- *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[3]);
- return FINISHED;
- }
- unsigned char precision = (unsigned char)strtol(cmd->argv[4], NULL, 10);
- if(precision<=0) {
- *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[4]);
- return FINISHED;
- }
- long long time_window_ms = 0;
- if (cmd->argc == 6) {
- if(strncasecmp(cmd->argv[5], "TIME", 4)!=0) {
- *reply=swarmkv_reply_new_error(error_arg_string_should_be, cmd->argv[5], "TIME");
- return FINISHED;
- }
- if(str2integer(cmd->argv[6], &time_window_ms) < 0) {
- *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[6]);
- return FINISHED;
- }
- }
-
- struct sobj *obj = store_lookup(mod_store, key);
- if (!obj) {
- return NEED_KEY_ROUTE;
- }
-
- if(obj->type==OBJ_TYPE_UNDEFINED) {
- assert(obj->raw==NULL);
-
- struct timeval now;
- gettimeofday(&now, NULL);
- obj->spread_sketch = spread_sketch_new(width, depth, precision, time_window_ms, now);
-
- obj->type=OBJ_TYPE_SPREAD_SKETCH;
- *reply=swarmkv_reply_new_status("OK");
- } else {
- *reply=swarmkv_reply_new_array(0);
- }
- return FINISHED;
-}
-
-enum cmd_exec_result ssinitbyentrynumber_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) {
- /* SSINITBYENTRYNUMBER key capacity [TIME window-milliseconds]*/
- const sds key=cmd->argv[1];
- long expected_superspreader_count = strtol(cmd->argv[2], NULL, 10);
- if(expected_superspreader_count<=0) {
- *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[2]);
- return FINISHED;
- }
- long long time_window_ms = 0; // when set to 0, it means no time decay is used
- if (cmd->argc == 4) {
- if(strncasecmp(cmd->argv[3], "TIME", 4)!=0) {
- *reply=swarmkv_reply_new_error(error_arg_string_should_be, cmd->argv[3], "TIME");
- return FINISHED;
- }
- if(str2integer(cmd->argv[4], &time_window_ms) < 0) {
- *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[4]);
- return FINISHED;
- }
- }
-
- struct sobj *obj = store_lookup(mod_store, key);
- if (!obj) {
- return NEED_KEY_ROUTE;
- }
-
- if(obj->type==OBJ_TYPE_UNDEFINED) {
- assert(obj->raw==NULL);
-
- struct timeval now;
- gettimeofday(&now, NULL);
-
- int width, depth;
- unsigned char precision;
- spread_sketch_get_parameter_recommendation(expected_superspreader_count, &width, &depth, &precision);
- obj->spread_sketch = spread_sketch_new(width, depth, precision, time_window_ms, now);
-
- obj->type=OBJ_TYPE_SPREAD_SKETCH;
- *reply=swarmkv_reply_new_status("OK");
- } else {
- *reply=swarmkv_reply_new_array(0);
- }
- return FINISHED;
-}
-
-enum cmd_exec_result ssadd_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) {
- // TODO: 还是说SSADD key entry item [entry item ...]
- // SSADD key entry item [item ...]
- const sds key=cmd->argv[1];
- struct sobj *obj = store_lookup(mod_store, key);
- if (!obj) {
- return NEED_KEY_ROUTE;
- }
- if(obj->type==OBJ_TYPE_UNDEFINED)
- {
- return handle_undefined_object(obj, reply);
- }
- if(obj->type!=OBJ_TYPE_SPREAD_SKETCH) {
- *reply=swarmkv_reply_new_error(error_wrong_type);
- return FINISHED;
- }
-
- // todo: 返回什么?确定是返回一个添加后它的计数,根据cmsketch的实现,这个计数应该是必定返回的。但是spread sketch 里有key,这个key 可能会实际上不存储在spread sketch 里。
- // 或者这样说,spread sketch 本质是TOP cardinality,当一个key 不在top 时,是不是返回失败好一点。
- // 添加的返回值不存在于list 中,感觉有点不合理?
- // 不对,这里真的不应该纠结,哪有OK 和 double 混合返回的逻辑,肯定都是double,或者一个类似于opset 的返回,表明是否记录了。
- // 选择方案1:都返回double
-
- const sds entry_key = cmd->argv[2];
-
- struct timeval now;
- gettimeofday(&now, NULL);
- for (size_t i = 3; i < cmd->argc; i++) {
- spread_sketch_add(obj->spread_sketch, entry_key, sdslen(entry_key), cmd->argv[i], sdslen(cmd->argv[i]), NULL, now);
- }
-
- double est = spread_sketch_query(obj->spread_sketch, entry_key, sdslen(entry_key), now);
- *reply=swarmkv_reply_new_integer((int) (est + 0.5));
-
- store_mark_object_as_modified(mod_store, obj);
- return FINISHED;
-}
-
-enum cmd_exec_result sslist_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) {
- // SSLIST key
- const sds key=cmd->argv[1];
-
- struct sobj *obj = store_lookup(mod_store, key);
- if (!obj) {
- return NEED_KEY_ROUTE;
- }
- if(obj->type==OBJ_TYPE_UNDEFINED) {
- return handle_undefined_object(obj, reply);
- }
- if(obj->type!=OBJ_TYPE_SPREAD_SKETCH) {
- *reply=swarmkv_reply_new_error(error_wrong_type);
- return FINISHED;
- }
-
- struct spread_sketch *ss = obj->spread_sketch;
- size_t n_entry;
- char **entry_keys=NULL;
- size_t *entry_keys_len=NULL;
- spread_sketch_list_keys(ss, &entry_keys, &entry_keys_len, &n_entry);
-
- *reply=swarmkv_reply_new_array(n_entry * 2);
- for (size_t i = 0; i < n_entry; i++) {
- const char *entry_tmp = entry_keys[i];
- size_t len_tmp = entry_keys_len[i];
- int est_tmp = (int) (spread_sketch_get_cardinality(ss, entry_tmp, len_tmp) + 0.5);
-
- (*reply)->elements[i*2] = swarmkv_reply_new_string(entry_tmp, len_tmp);
- (*reply)->elements[i*2+1] = swarmkv_reply_new_integer(est_tmp);
- }
-
- free(entry_keys);
- free(entry_keys_len);
- return FINISHED;
-}
-
-enum cmd_exec_result ssquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) {
- // SSQUERY key entry [entry ...]
- const sds key=cmd->argv[1];
- struct sobj *obj = store_lookup(mod_store, key);
- if (!obj) {
- return NEED_KEY_ROUTE;
- }
- if(obj->type==OBJ_TYPE_UNDEFINED) {
- return handle_undefined_object(obj, reply);
- }
- if(obj->type!=OBJ_TYPE_SPREAD_SKETCH) {
- *reply=swarmkv_reply_new_error(error_wrong_type);
- return FINISHED;
- }
-
- struct spread_sketch *ss = obj->spread_sketch;
- struct timeval now;
- gettimeofday(&now, NULL);
- *reply=swarmkv_reply_new_array(cmd->argc-2);
- for (size_t i = 2; i < cmd->argc; i++) {
- const sds entry_key = cmd->argv[i];
- double est = spread_sketch_query(ss, entry_key, sdslen(entry_key), now);
- (*reply)->elements[i-2] = swarmkv_reply_new_integer((int) (est + 0.5));
- }
- return FINISHED;
-}
-
-
-enum cmd_exec_result ssinfo_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) {
- // SSINFO key
- const sds key=cmd->argv[1];
-
- struct sobj *obj = store_lookup(mod_store, key);
- if (!obj) {
- return NEED_KEY_ROUTE;
- }
- if(obj->type==OBJ_TYPE_UNDEFINED)
- {
- return handle_undefined_object(obj, reply);
- }
- if(obj->type!=OBJ_TYPE_SPREAD_SKETCH) {
- *reply=swarmkv_reply_new_error(error_wrong_type);
- return FINISHED;
- }
-
- struct spread_sketch *ss = obj->spread_sketch;
- int depth, width, time_window_ms;
- unsigned char precision;
- spread_sketch_get_parameter(ss, &depth, &width, &precision, &time_window_ms);
- double error_cms = 2.0 / (double) width;
- double error_hll = ST_hyperloglog_error_for_precision(precision);
- double probability = 1 / pow(2, depth);
-
- int i = 0;
- *reply = swarmkv_reply_new_array(14);
- (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("Width");
- (*reply)->elements[i++] = swarmkv_reply_new_integer(width);
- (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("Depth");
- (*reply)->elements[i++] = swarmkv_reply_new_integer(depth);
- (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("Precision");
- (*reply)->elements[i++] = swarmkv_reply_new_integer(precision);
- (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("TimeWindowMs");
- (*reply)->elements[i++]=swarmkv_reply_new_integer(time_window_ms);
- (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("ErrorCMS");
- (*reply)->elements[i++] = swarmkv_reply_new_double(error_cms);
- (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("ErrorHLL");
- (*reply)->elements[i++] = swarmkv_reply_new_double(error_hll);
- (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("Probability");
- (*reply)->elements[i++] = swarmkv_reply_new_double(probability);
- assert(i==14);
-
- return FINISHED;
-} \ No newline at end of file
diff --git a/test/swarmkv_gtest.cpp b/test/swarmkv_gtest.cpp
index ba634ac..b583d00 100644
--- a/test/swarmkv_gtest.cpp
+++ b/test/swarmkv_gtest.cpp
@@ -784,7 +784,7 @@ TEST_F(SwarmkvBasicTest, TypeBloomFilter)
EXPECT_EQ(reply->elements[i]->integer, 1);
}
}
-TEST_F(SwarmkvBasicTest, TypeCMS)
+TEST_F(SwarmkvBasicTest, TypeCountMinSketch)
{
struct swarmkv *db=SwarmkvBasicTest::db;
const char *key="cms-001";
@@ -976,7 +976,7 @@ TEST_F(SwarmkvBasicTest, HashTags)
}
}
-TEST_F(SwarmkvBasicTest, TypeSS)
+TEST_F(SwarmkvBasicTest, TypeSpreadSketch)
{
struct swarmkv *db=SwarmkvBasicTest::db;
const char *key="ss-001";
@@ -992,17 +992,17 @@ TEST_F(SwarmkvBasicTest, TypeSS)
const char *item_2[]={"item2_1"};
const int n_item[2] = {3, 1};
reply=swarmkv_command(db, "SSADD %s %s %s %s %s", key, entries[0], item_1[0], item_1[1], item_1[2]);
- ASSERT_EQ(reply->type, SWARMKV_REPLY_DOUBLE);
- ASSERT_NEAR(reply->dval, 3.0, 0.4);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 3);
swarmkv_reply_free(reply);
reply=swarmkv_command(db, "SSADD %s %s %s", key, entries[1], item_2[0]);
- ASSERT_EQ(reply->type, SWARMKV_REPLY_DOUBLE);
- ASSERT_NEAR(reply->dval, 1.0, 0.4);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 1);
swarmkv_reply_free(reply);
reply=swarmkv_command(db, "SSLIST %s", key);
ASSERT_EQ(reply->type, SWARMKV_REPLY_ARRAY);
- ASSERT_EQ(reply->n_element, sizeof(entries)/sizeof(entries[0]) * 2);
+ EXPECT_EQ(reply->n_element, sizeof(entries)/sizeof(entries[0]) * 2);
for(size_t i=0; i<reply->n_element / 2; i++)
{
EXPECT_EQ(reply->elements[i*2]->type, SWARMKV_REPLY_STRING);
@@ -1036,10 +1036,59 @@ TEST_F(SwarmkvBasicTest, TypeSS)
swarmkv_reply_free(reply);
// time-decay spread sketch
- // 设置一个time window。
- // 添加很多个,保证大小。
- // sleep 时间窗口长度
- // 重新查询,list 结果应当为空。// todo: last oper time 来filter 。
+ long long time_window_ms=1000;
+ reply=swarmkv_command(db, "SSINITBYCAPACITY %s %d %d TIME %lld", key, 10, 12, time_window_ms);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ swarmkv_reply_free(reply);
+
+ // add many entries, each 3 items
+ for (int i=0; i<1000; i++) {
+ reply=swarmkv_command(db, "SSADD %s old%d %d %d %d", key, i, 1, 2, 3);
+ swarmkv_reply_free(reply);
+ }
+
+ usleep(time_window_ms*1000 * 2);
+
+ // add other entries, they should replace the old entries.
+ for (int i=0; i<1000; i++) {
+ reply=swarmkv_command(db, "SSADD %s new%d %d %d %d", key, i, 1, 2, 3);
+ swarmkv_reply_free(reply);
+ }
+ reply=swarmkv_command(db, "SSLIST %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_ARRAY);
+ int error_cnt = 0;
+ for (size_t i=0; i<reply->n_element / 2; i++) {
+ EXPECT_EQ(reply->elements[2 * i]->type, SWARMKV_REPLY_STRING);
+ EXPECT_TRUE(strncmp(reply->elements[2 * i]->str, "new", 3) == 0);
+ EXPECT_EQ(reply->elements[2 * i + 1]->type, SWARMKV_REPLY_INTEGER);
+ if (reply->elements[2 * i + 1]->integer != 3) {
+ error_cnt++;
+ }
+ }
+ EXPECT_LE(error_cnt, 1); // allow 1 of element estimations to be wrong because of probabilistic sketch algorithm error.
+ swarmkv_reply_free(reply);
+
+ //API test
+ key="ss-api";
+ reply=swarmkv_command(db, "SSINITBYDIM %s %lld %lld %d TIME %lld", key, width, depth,precision, time_window_ms);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ swarmkv_reply_free(reply);
+ size_t item_2_len[] = {strlen(item_2[0])};
+
+ swarmkv_ssadd(db, key, strlen(key), entries[0], strlen(entries[0]), item_2, item_2_len, 1, copy_reply_callback, &reply);
+ swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 1);
+ swarmkv_reply_free(reply);
+
+ size_t entries_len[] = {strlen(entries[0]), strlen(entries[1])};
+ swarmkv_ssmquery(db, key, strlen(key), entries, entries_len, 2, copy_reply_callback, &reply);
+ swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_ARRAY);
+ ASSERT_EQ(reply->n_element, 2);
+ EXPECT_EQ(reply->elements[0]->integer, 1);
+ EXPECT_EQ(reply->elements[1]->integer, 0);
+ swarmkv_reply_free(reply);
}
class SwarmkvTwoNodes : public testing::Test
@@ -2004,7 +2053,7 @@ TEST_F(SwarmkvTwoNodes, TypeBloomFilter)
EXPECT_EQ(reply->integer, 1);
swarmkv_reply_free(reply);
}
-TEST_F(SwarmkvTwoNodes, TypeCMS)
+TEST_F(SwarmkvTwoNodes, TypeCountMinSketch)
{
struct swarmkv *db[2];
db[0]=SwarmkvTwoNodes::db1;
@@ -2091,7 +2140,7 @@ TEST_F(SwarmkvTwoNodes, TypeHyperLogLog)
}
}
}
-TEST_F(SwarmkvTwoNodes, TypeSS)
+TEST_F(SwarmkvTwoNodes, TypeSpreadSketch)
{
struct swarmkv *db[2];
db[0]=SwarmkvTwoNodes::db1;
@@ -2103,7 +2152,7 @@ TEST_F(SwarmkvTwoNodes, TypeSS)
int n_entry = sizeof(entries)/sizeof(entries[0]);
int n_loop = 10000;
- reply=swarmkv_command(db[0], "SSINITBYENTRYNUMBER %s 5", key);
+ reply=swarmkv_command(db[0], "SSINITBYCAPACITY %s 5", key);
ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
EXPECT_STREQ(reply->str, "OK");
swarmkv_reply_free(reply);
@@ -2126,7 +2175,6 @@ TEST_F(SwarmkvTwoNodes, TypeSS)
swarmkv_reply_free(reply);
}
}
-
}
TEST_F(SwarmkvTwoNodes, Info)
{
@@ -2253,7 +2301,6 @@ TEST_F(SwarmkvTwoNodes, Latency)
reply=swarmkv_command(db1, "latency peer");
EXPECT_EQ(reply->type, SWARMKV_REPLY_ARRAY);
swarmkv_reply_free(reply);
- sleep(3500);
}
TEST_F(SwarmkvTwoNodes, Wait)
{
@@ -2492,7 +2539,8 @@ int main(int argc, char ** argv)
int ret=0;
g_current_thread_id=syscall(SYS_gettid);
::testing::InitGoogleTest(&argc, argv);
- // ::testing::GTEST_FLAG(filter) = "*TypeSS";
+ // ::testing::GTEST_FLAG(filter) = "-*.TypeSS";
+ // ::testing::GTEST_FLAG(filter) = "SwarmkvTwoNodes.TypeSS";
ret=RUN_ALL_TESTS();
return ret;
}