summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchenzizhan <[email protected]>2024-08-09 17:10:27 +0800
committerchenzizhan <[email protected]>2024-08-09 17:10:27 +0800
commitfb406eaf6166125575c1e957b2fbeaa881766851 (patch)
treeee70a29624ac50cd92f020e97a447f68070510e5
parentab340ae375f5dde4bc7a85c86855e545de10abca (diff)
TSG-21840v4.4.1
-rw-r--r--CRDT/CMakeLists.txt7
-rw-r--r--CRDT/exdata.h7
-rw-r--r--CRDT/hll_common.c2
-rw-r--r--CRDT/hll_common.h13
-rw-r--r--CRDT/spread_sketch.c697
-rw-r--r--CRDT/spread_sketch.h58
-rw-r--r--CRDT/spread_sketch_gtest.cpp866
-rw-r--r--docs/command_toc.md1
-rw-r--r--docs/commands/spread_sketch.md155
-rw-r--r--include/swarmkv/swarmkv.h2
-rw-r--r--readme.md1
-rw-r--r--src/CMakeLists.txt2
-rw-r--r--src/swarmkv.c24
-rw-r--r--src/swarmkv_api.c36
-rw-r--r--src/swarmkv_store.c7
-rw-r--r--src/swarmkv_store.h3
-rw-r--r--src/t_spread_sketch.c307
-rw-r--r--src/t_spread_sketch.h11
-rw-r--r--test/swarmkv_gtest.cpp158
19 files changed, 2349 insertions, 8 deletions
diff --git a/CRDT/CMakeLists.txt b/CRDT/CMakeLists.txt
index 76a236f..6dacb42 100644
--- a/CRDT/CMakeLists.txt
+++ b/CRDT/CMakeLists.txt
@@ -3,7 +3,7 @@ add_definitions(-fPIC)
add_library(CRDT lww_register.c pn_counter.c or_map.c or_set.c hll_common.c st_hyperloglog.c ap_bloom.c cm_sketch.c
g_array.c token_bucket_common.c oc_token_bucket.c fair_token_bucket.c bulk_token_bucket.c
- crdt_utils.c)
+ crdt_utils.c spread_sketch.c)
include_directories(${PROJECT_SOURCE_DIR}/deps/mpack
${PROJECT_SOURCE_DIR}/deps/uthash
@@ -19,6 +19,11 @@ add_executable(tb_CRDT_gtest tb_crdt_gtest.cpp
${PROJECT_SOURCE_DIR}/deps/xxhash/xxhash.c)
target_link_libraries(tb_CRDT_gtest CRDT gtest-static uuid)
+add_executable(spread_sketch_gtest spread_sketch_gtest.cpp
+ ${PROJECT_SOURCE_DIR}/deps/mpack/mpack.c
+ ${PROJECT_SOURCE_DIR}/deps/xxhash/xxhash.c)
+target_link_libraries(spread_sketch_gtest CRDT gtest-static uuid)
+
add_executable(probabilistic_CRDT_gtest probabilistic_crdt_gtest.cpp
${PROJECT_SOURCE_DIR}/deps/mpack/mpack.c
${PROJECT_SOURCE_DIR}/deps/xxhash/xxhash.c)
diff --git a/CRDT/exdata.h b/CRDT/exdata.h
new file mode 100644
index 0000000..a9c28de
--- /dev/null
+++ b/CRDT/exdata.h
@@ -0,0 +1,7 @@
+#pragma once
+
+typedef void * (*exdata_new_cb)(void *arg);
+typedef void (*exdata_free_cb)(void *exdata);
+typedef void (*exdata_merge_cb)(void *dest, void *src);
+typedef void (*exdata_reset_cb)(void *exdata);
+typedef void * (*exdata_copy_cb)(void *exdata);
diff --git a/CRDT/hll_common.c b/CRDT/hll_common.c
index 7017242..6ec3807 100644
--- a/CRDT/hll_common.c
+++ b/CRDT/hll_common.c
@@ -298,7 +298,7 @@ void hll_advance_reset_index(long long *reset_idx, long long reset_reg_count, un
long long hll_get_reset_register_count(unsigned char precision, long long time_window_ms, struct timeval now, struct timeval *reset_time)
{
long long reset_time_slot_us=RESET_TIME_SLOT_US(time_window_ms, precision);
- long long delta_us=timeval_delta_us(*reset_time, now);
+ long long delta_us=timeval_delta_us((*reset_time), now);
if(delta_us < reset_time_slot_us)
return 0;
diff --git a/CRDT/hll_common.h b/CRDT/hll_common.h
index b9fc670..a110153 100644
--- a/CRDT/hll_common.h
+++ b/CRDT/hll_common.h
@@ -1,3 +1,10 @@
+#pragma once
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
#include <stdint.h>
#include <sys/time.h>//gettimeofday
#include <stddef.h>
@@ -13,4 +20,8 @@ int hll_should_slide(unsigned char precision, long long time_window_ms, struct t
void hll_reset_registers(uint32_t *registers, unsigned char precision, long long offset, long long count);
long long hll_get_reset_register_count(unsigned char precision, long long time_window_ms, struct timeval now, struct timeval *reset_time);
void hll_advance_reset_index(long long *reset_idx, long long reset_reg_count, unsigned char precision);
-void hll_merge(uint32_t *dst_registers, const uint32_t *src_registers, unsigned char precision); \ No newline at end of file
+void hll_merge(uint32_t *dst_registers, const uint32_t *src_registers, unsigned char precision);
+
+#ifdef __cplusplus
+}
+#endif \ No newline at end of file
diff --git a/CRDT/spread_sketch.c b/CRDT/spread_sketch.c
new file mode 100644
index 0000000..ab2d1ea
--- /dev/null
+++ b/CRDT/spread_sketch.c
@@ -0,0 +1,697 @@
+#include <stdbool.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+#include <math.h>
+#include <assert.h>
+
+#include "xxhash.h"
+#include "uthash.h"
+
+#include "spread_sketch.h"
+#include "hll_common.h"
+#include "exdata.h"
+
+struct entry {
+ int ref_count;
+ void *exdata;
+ bool dying;
+ char *key;
+ size_t key_len;
+ UT_hash_handle hh;
+};
+
+struct spread_sketch_scheme {
+ exdata_new_cb new_fn;
+ exdata_free_cb free_fn;
+ exdata_merge_cb merge_fn;
+ exdata_reset_cb reset_fn;
+ exdata_copy_cb copy_fn;
+};
+
+struct entry_table {
+ struct entry *entry;
+
+ struct spread_sketch_scheme scheme;
+};
+
+struct bucket {
+ struct entry *content;
+ uint32_t level;
+ long long last_update_ms; // linux timestamp since Jan 1st 1970
+ uint32_t *sthll_register;
+};
+
+struct spread_sketch {
+ int depth;
+ int width;
+ long long time_window_ms;
+ unsigned char precision;
+
+ // shared states of all sthlls
+ long long reset_idx;
+ struct timeval reset_time;
+
+ struct spread_sketch_scheme scheme;
+
+ struct bucket *buckets;
+ struct entry_table *table;
+
+ int level0_cnt; // used to filter out dummy adding
+};
+
+static void *default_new_fn(void *arg) {
+ return NULL;
+}
+static void default_free_fn(void *exdata) {
+ return;
+}
+static void default_merge_fn(void *dest, void *src) {
+ return;
+}
+static void default_reset_fn(void *exdata) {
+ return;
+}
+static void *default_copy_fn(void *exdata) {
+ return exdata;
+}
+
+struct spread_sketch_scheme DEFAULT_SCHEME = {
+ .new_fn = default_new_fn,
+ .free_fn = default_free_fn,
+ .merge_fn = default_merge_fn,
+ .reset_fn = default_reset_fn,
+ .copy_fn = default_copy_fn
+};
+
+uint32_t cal_true_level(const struct spread_sketch *ss, int bucket_idx, long long t) {
+ /*
+ return f(t), the actual level of bucket, which satisfy:
+ 1. d 2^f(t)/dt is constants
+ 2. f(t0 + 2T) = 0
+ 3. f((t0) = L )
+ */
+ struct bucket *bucket = &ss->buckets[bucket_idx];
+ if (ss->time_window_ms == 0) {
+ return bucket->level;
+ }
+
+ unsigned L = bucket->level;
+ long long T = ss->time_window_ms;
+ long long t0 = bucket->last_update_ms;
+
+ assert(t >= t0);
+ if (t - t0 >= 2 * T) {
+ return 0;
+ }
+ if (L <= 1) {
+ return t - t0 >= T ? 0 : L;
+ }
+
+ long long tmp_exp = 1 << L;
+ double a = ((double)(1 - tmp_exp)) / (2.0 * T);
+ double b = (double)(tmp_exp);
+
+ return (uint32_t)(log2(a * ((double)(t-t0)) + b) + 0.5);
+}
+
+static inline bool key_equal(const char *key1, size_t key1_len, const char *key2, size_t key2_len) {
+ if (key1_len != key2_len) {
+ return false;
+ }
+ return memcmp(key1, key2, key1_len) == 0;
+}
+static inline char *key_dup(const char *key, size_t key_len) {
+ char *ret = malloc(key_len+1);
+ memcpy(ret, key, key_len);
+ ret[key_len] = '\0';
+ return ret;
+}
+
+struct entry *smart_ptr_table_find(struct entry_table *table, const char *key, size_t key_len) {
+ struct entry *ret;
+ HASH_FIND(hh, table->entry, key, key_len, ret);
+ if (ret == NULL || ret->dying) {
+ return NULL;
+ }
+ return ret;
+}
+
+struct entry *smart_ptr_table_get(struct entry_table *table, const char *key, size_t key_len, void *arg) {
+ struct entry *ret;
+ HASH_FIND(hh, table->entry, key, key_len, ret);
+
+ if (ret != NULL) {
+ ret->dying = false;
+ ret->ref_count++;
+ } else {
+ ret = malloc(sizeof(struct entry));
+ ret->dying = false;
+ ret->ref_count = 1;
+ ret->key = key_dup(key, key_len);
+ ret->key_len = key_len;
+ if (arg == NULL) {
+ ret->exdata = NULL;
+ } else {
+ ret->exdata = table->scheme.new_fn(arg);
+ }
+ HASH_ADD_KEYPTR(hh, table->entry, ret->key, ret->key_len, ret);
+ }
+
+ return ret;
+}
+
+int smart_ptr_table_release(struct entry_table *table, const char *key, size_t key_len) {
+ struct entry *ret;
+ HASH_FIND(hh, table->entry, key, key_len, ret);
+ if (ret == NULL) {
+ return -1;
+ }
+
+ ret->ref_count--;
+ if (ret->ref_count == 0) {
+ // printf("release %s\n", key);
+ HASH_DEL(table->entry, ret);
+ table->scheme.free_fn(ret->exdata);
+ free(ret->key);
+ free(ret);
+ }
+
+ return 0;
+}
+
+void smart_ptr_table_free(struct entry_table *table) {
+ struct entry *current, *tmp;
+ HASH_ITER(hh, table->entry, current, tmp) {
+ HASH_DEL(table->entry, current);
+ table->scheme.free_fn(current->exdata);
+ free(current->key);
+ free(current);
+ }
+ free(table);
+}
+
+struct entry_table *smart_ptr_table_new() {
+ struct entry_table *table = malloc(sizeof(struct entry_table));
+ table->entry = NULL;
+
+ table->scheme = DEFAULT_SCHEME;
+
+ return table;
+}
+
+struct spread_sketch *spread_sketch_new(int depth, int width, unsigned char precision, int time_window_ms, struct timeval now) {
+ struct spread_sketch *pthis = calloc(1, sizeof(struct spread_sketch));
+
+ pthis->depth = depth;
+ pthis->width = width;
+ pthis->time_window_ms = time_window_ms;
+ pthis->precision = precision;
+
+ pthis->reset_idx = 0;
+ pthis->reset_time = now;
+
+ pthis->buckets = calloc(depth * width, sizeof(struct bucket));
+ pthis->scheme = DEFAULT_SCHEME;
+ pthis->table = smart_ptr_table_new();
+ pthis->table->scheme = pthis->scheme;
+
+ for (int i = 0; i < depth * width; i++) {
+ pthis->buckets[i].sthll_register = hll_create_register(precision);
+ }
+
+ pthis->level0_cnt = pthis->depth * pthis->width;
+
+ return pthis;
+}
+
+void move_registers_forward(struct spread_sketch *ss, const struct timeval *now) {
+ if (ss->time_window_ms == 0) {
+ return;
+ }
+
+ long long reset_reg_count = hll_get_reset_register_count(ss->precision, ss->time_window_ms, *now, &ss->reset_time);
+ if (reset_reg_count > 0) {
+ for (int i = 0; i < ss->depth * ss->width; i++) {
+ hll_reset_registers(ss->buckets[i].sthll_register, ss->precision, ss->reset_idx, reset_reg_count);
+ }
+ hll_advance_reset_index(&ss->reset_idx, reset_reg_count, ss->precision);
+ }
+}
+
+// return 0 if not added, return 1 if added
+int spread_sketch_add_hash(struct spread_sketch *ss, const char *entry, size_t entry_length, uint64_t item_hash, void *arg, struct timeval now) {
+ uint32_t level = (uint32_t)__builtin_clzll(item_hash) + 1;
+ long long now_ms = now.tv_sec * 1000 + now.tv_usec / 1000;
+
+ if (item_hash == DUMMY_ITEM_HASH) {
+ if (ss->level0_cnt == 0) {
+ return 0;
+ }
+ assert(ss->level0_cnt>0);
+ }
+
+ // https://www.eecs.harvard.edu/~michaelm/postscripts/tr-02-05.pdf
+ // A technique from the hashing literature is to use two hash functions h1(x) and h2(x) to simulate additional hash functions of the form gi(x) = h1(x) + ih2(x)
+ // Assuming that the 128-bit xxhash function is perfect, we can view it as a combination of two 64-bit hash functions.
+ uint64_t hash_x_tmp = XXH3_64bits_withSeed(entry, entry_length, 171);
+ uint32_t hash_x1 = (uint32_t) (hash_x_tmp >> 32);
+ uint32_t hash_x2 = (uint32_t) hash_x_tmp;
+
+ bool in_sketch = false;
+ move_registers_forward(ss, &now);
+ for (int i = 0; i < ss->depth; i++) {
+ uint32_t hash_x = hash_x1 + i * hash_x2;
+ int bucket_idx = (hash_x % ss->width) + i * ss->width;
+ struct bucket *bucket = &ss->buckets[bucket_idx];
+
+ if (bucket->content != NULL && key_equal(bucket->content->key, bucket->content->key_len, entry, entry_length)) {
+ bucket->content->dying = false;
+ bucket->last_update_ms = now_ms;
+
+ if (bucket->level < level) {
+ if (bucket->level == 0) {
+ ss->level0_cnt--;
+ }
+ bucket->level = level;
+ }
+ in_sketch = true;
+ } else {
+ uint32_t true_level = bucket->content == NULL ? 0: cal_true_level(ss, bucket_idx, now_ms);
+
+ if (level > true_level) {
+ if (bucket->level == 0) {
+ ss->level0_cnt--;
+ }
+
+ // printf("update key %s to %s, and level %u to %u, in bucket (r,w)=(%d,%u)\n", bucket->content == NULL ? "NULL": bucket->content->key, key, true_level, level, i, hash_x % ss->width);
+ const struct entry *content_old = bucket->content;
+ if (content_old != NULL) {
+ smart_ptr_table_release(ss->table, content_old->key, content_old->key_len);
+ }
+ struct entry *content_new = smart_ptr_table_get(ss->table, entry, entry_length, arg);
+ bucket->content = content_new;
+
+ bucket->last_update_ms = now_ms;
+ bucket->level = level;
+
+ in_sketch = true;
+ }
+ }
+ hll_add_hash(bucket->sthll_register, ss->precision, item_hash);
+ }
+
+ return in_sketch ? 1 : 0;
+}
+
+int spread_sketch_add(struct spread_sketch *ss, const char *entry, size_t entry_length, const char* item, size_t item_len, void *arg, struct timeval now) {
+ uint64_t hash = XXH3_64bits(item, item_len);
+ return spread_sketch_add_hash(ss, entry, entry_length, hash, arg, now);
+}
+
+double spread_sketch_point_query(const struct spread_sketch *ss, const char *entry, size_t entry_length) {
+ uint64_t hash_x_tmp = XXH3_64bits_withSeed(entry, entry_length, 171);
+ uint32_t hash_x1 = (uint32_t) (hash_x_tmp >> 32);
+ uint32_t hash_x2 = (uint32_t) hash_x_tmp;
+
+ double count_min = (double)INT32_MAX;
+ for (int i = 0; i < ss->depth; i++) {
+ uint32_t hash_x = hash_x1 + i * hash_x2;
+ int bucket_idx = (hash_x % ss->width) + i * ss->width;
+
+ double est = hll_count(ss->buckets[bucket_idx].sthll_register, ss->precision, ss->reset_idx, ss->time_window_ms);
+ if (est < count_min) {
+ count_min = est;
+ }
+ }
+ return count_min;
+}
+
+struct spread_sketch *duplicate_and_step(const struct spread_sketch *ss, const struct timeval *now) {
+ struct spread_sketch *duplicate = malloc(sizeof(struct spread_sketch));
+ duplicate->depth = ss->depth;
+ duplicate->width = ss->width;
+ duplicate->precision = ss->precision;
+ duplicate->reset_idx = ss->reset_idx;
+ duplicate->reset_time = ss->reset_time;
+ duplicate->time_window_ms = ss->time_window_ms;
+ duplicate->buckets = calloc(ss->depth * ss->width, sizeof(struct bucket));
+ for (int i = 0; i < ss->depth * ss->width; i++) {
+ duplicate->buckets[i].sthll_register = hll_duplicate(ss->buckets[i].sthll_register, ss->precision);
+ }
+
+ move_registers_forward(duplicate, now);
+ return duplicate;
+}
+
+void spread_sketch_free(struct spread_sketch *ss) {
+ if (ss == NULL) {
+ return;
+ }
+ smart_ptr_table_free(ss->table);
+ for (int i = 0; i < ss->depth * ss->width; i++) {
+ hll_free_register(ss->buckets[i].sthll_register);
+ }
+ free(ss->buckets);
+ free(ss);
+}
+
+void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch *src)
+{
+ assert(dst->depth == src->depth && dst->width == src->width);
+ assert(dst->time_window_ms == src->time_window_ms);
+ assert(dst->precision == src->precision);
+
+ for (int i = 0; i < dst->depth * dst->width; i++) {
+ const struct bucket *bucket_src = &src->buckets[i];
+ struct bucket *bucket_dst = &dst->buckets[i];
+
+ if (bucket_src->content == NULL || bucket_src->content->dying) {
+ continue;
+ }
+
+ hll_merge(bucket_dst->sthll_register, src->buckets[i].sthll_register, dst->precision);
+ if (bucket_dst->level == 0 && bucket_src->level > 0) {
+ dst->level0_cnt--;
+ }
+
+ if (bucket_dst->content == NULL) {
+ bucket_dst->content = smart_ptr_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL);
+ bucket_dst->level = bucket_src->level;
+ bucket_dst->last_update_ms = bucket_src->last_update_ms;
+
+ continue;
+ }
+ bucket_dst->content->dying = false;
+
+ if (key_equal(bucket_src->content->key, bucket_src->content->key_len, bucket_dst->content->key, bucket_dst->content->key_len)) {
+ if (bucket_src->level > bucket_dst->level) {
+ bucket_dst->level = bucket_src->level;
+ }
+ if (bucket_src->last_update_ms > bucket_dst->last_update_ms) {
+ bucket_dst->last_update_ms = bucket_src->last_update_ms;
+ }
+ } else {
+ uint32_t true_level_src = cal_true_level(src, i, bucket_src->last_update_ms);
+ uint32_t true_level_dst = cal_true_level(dst, i, bucket_dst->last_update_ms);
+
+ if (true_level_src > true_level_dst) {
+ smart_ptr_table_release(dst->table, bucket_dst->content->key, bucket_dst->content->key_len);
+ bucket_dst->content = smart_ptr_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL);
+ bucket_dst->last_update_ms = bucket_src->last_update_ms;
+ }
+ }
+ }
+
+ // for exdata
+ const struct spread_sketch_scheme *scheme = &dst->table->scheme;
+
+ struct entry *content_dest, *content_src, *tmp;
+ HASH_ITER(hh, dst->table->entry, content_dest, tmp) {
+ HASH_FIND(hh, src->table->entry, content_dest->key, content_dest->key_len, content_src);
+ if (content_src == NULL || content_src->dying) {
+ continue;
+ }
+
+ if (content_dest->exdata == NULL) {
+ content_dest->exdata = scheme->copy_fn(content_src->exdata);
+ } else {
+ scheme->merge_fn(content_dest->exdata, content_src->exdata);
+ }
+ }
+}
+
+void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *key, size_t key_len) {
+ const struct entry *content = smart_ptr_table_find(ss->table, key, key_len);
+ if (content == NULL) {
+ return NULL;
+ }
+ return content->exdata;
+}
+
+void spread_sketch_reset(struct spread_sketch *ss) {
+ for (int i = 0; i < ss->depth * ss->width; i++) {
+ struct bucket *bucket = &ss->buckets[i];
+ bucket->level = 0;
+ hll_reset_registers(bucket->sthll_register, ss->precision, ss->reset_idx, 100000); // count is big enough
+ }
+
+ struct entry *content, *tmp;
+ HASH_ITER(hh, ss->table->entry, content, tmp) {
+ ss->scheme.reset_fn(content->exdata);
+ content->dying = true;
+ }
+
+ ss->level0_cnt = ss->depth * ss->width;
+}
+
+void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn) {
+ ss->scheme.new_fn = new_fn;
+ ss->scheme.free_fn = free_fn;
+ ss->scheme.merge_fn = merge_fn;
+ ss->scheme.reset_fn = reset_fn;
+ ss->scheme.copy_fn = copy_fn;
+
+ ss->table->scheme = ss->scheme;
+}
+
+int spread_sketch_get_count(const struct spread_sketch *ss) {
+ int cnt = 0;
+ struct entry *content, *tmp;
+ HASH_ITER(hh, ss->table->entry, content, tmp) {
+ if (!content->dying) {
+ cnt++;
+ }
+ }
+
+ return cnt;
+}
+
+size_t spread_sketch_list(const struct spread_sketch *ss, void **exdatas, size_t n_exdatas) {
+ size_t count = 0;
+ struct entry *content, *tmp;
+ HASH_ITER(hh, ss->table->entry, content, tmp) {
+ if (content->dying) {
+ continue;
+ }
+ if (count >= n_exdatas) {
+ break;
+ }
+ exdatas[count] = content->exdata;
+ count++;
+ }
+ return count;
+}
+
+void spread_sketch_list_entries(const struct spread_sketch *ss, char ***entries, size_t **entry_lens, size_t *n_entry) {
+ size_t count_max = HASH_COUNT(ss->table->entry);
+ size_t count = 0;
+
+ *entries = malloc(count_max * sizeof(char *));
+ *entry_lens = malloc(count_max * sizeof(size_t));
+
+ struct entry *content, *tmp;
+ HASH_ITER(hh, ss->table->entry, content, tmp) {
+ if (content->dying) {
+ continue;
+ }
+
+ (*entries)[count] = content->key;
+ (*entry_lens)[count] = content->key_len;
+ count++;
+ }
+
+ *n_entry = count;
+}
+
+double spread_sketch_get_cardinality(const struct spread_sketch *ss, const char *key, size_t key_len) {
+ if (smart_ptr_table_find(ss->table, key, key_len) == NULL) {
+ return -1;
+ }
+
+ double est = spread_sketch_point_query(ss, key, key_len);
+ return est;
+}
+
+struct spread_sketch *spread_sketch_copy(const struct spread_sketch *src) {
+ struct spread_sketch *dst = malloc(sizeof(struct spread_sketch));
+ memcpy(dst, src, sizeof(struct spread_sketch));
+
+ dst->buckets = calloc(dst->depth * dst->width, sizeof(struct bucket));
+ dst->table = smart_ptr_table_new();
+ spread_sketch_set_exdata_schema(dst, src->scheme.new_fn, src->scheme.free_fn, src->scheme.merge_fn, src->scheme.reset_fn, src->scheme.copy_fn);
+ for (int i = 0; i < dst->depth * dst->width; i++) {
+ dst->buckets[i].sthll_register = hll_duplicate(src->buckets[i].sthll_register, src->precision);
+ }
+
+ for (int i = 0; i < dst->depth * dst->width; i++) {
+ if (src->buckets[i].content == NULL || src->buckets[i].content->dying) {
+ continue;
+ }
+ dst->buckets[i].content = smart_ptr_table_get(dst->table, src->buckets[i].content->key, src->buckets[i].content->key_len, NULL);
+ dst->buckets[i].level = src->buckets[i].level;
+ if (dst->buckets[i].content->exdata == NULL) {
+ dst->buckets[i].content->exdata = src->scheme.copy_fn(src->buckets[i].content->exdata);
+ }
+ }
+ return dst;
+}
+
+void spread_sketch_recommend_parameters(int expected_super_spreader_number, int *depth_out, int *width_out, unsigned char *precision_out)
+{
+ int logk = expected_super_spreader_number >= 3200 ? 4 : 3; // lg3200 = 3.51,round up to 4
+ *depth_out = logk;
+
+ int w;
+ if (expected_super_spreader_number <= 100) {
+ w = expected_super_spreader_number * 3 /2; // * 1.5, when the number is small, we need more width
+ } else {
+ w = expected_super_spreader_number + 50; // + 50: 100*1.5-100 = 50, make w=f(k) continuous
+ }
+ if (w < 40) {
+ w = 40;
+ }
+ *width_out = w;
+
+ *precision_out = 6;
+}
+
+void spread_sketch_get_parameter(const struct spread_sketch *ss, int *depth_out, int *width_out, unsigned char *precision_out, int *time_window_ms_out)
+{
+ *depth_out = ss->depth;
+ *width_out = ss->width;
+ *precision_out = ss->precision;
+ *time_window_ms_out = ss->time_window_ms;
+}
+
+void spread_sketch_change_precision(struct spread_sketch *ss, unsigned char precision) {
+ for (int i = 0; i < ss->depth * ss->width; i++) {
+ hll_free_register(ss->buckets[i].sthll_register);
+ ss->buckets[i].sthll_register = hll_create_register(precision);
+ }
+ ss->precision = precision;
+}
+
+void spread_sketch_serialize(const struct spread_sketch *ss, char **blob, size_t *blob_sz)
+{
+ /*
+ format:
+ struct spread_sketch(including useless pointers)
+ struct bucket * depth * width
+ int64_t * depth * width for keylen + key(close-knit, key_len, key, key_len, key, ..., in order of bucket index)
+ */
+ // get serialize size
+
+ size_t sz = 0;
+ sz += sizeof(struct spread_sketch);
+ sz += ss->depth * ss->width * (sizeof(struct bucket) + hll_size(ss->precision) + sizeof(int64_t));
+ for (int i = 0; i < ss->depth * ss->width; i++) {
+ if (ss->buckets[i].content != NULL) {
+ sz += ss->buckets[i].content->key_len;
+ }
+ }
+
+ char *buffer = calloc(sz, sizeof(char));
+ *blob = buffer;
+ *blob_sz = sz;
+
+ memcpy(buffer, ss, sizeof(struct spread_sketch));
+ buffer += sizeof(struct spread_sketch);
+ for (int i = 0; i < ss->depth * ss->width; i++) {
+ const struct bucket *bucket = &ss->buckets[i];
+ memcpy(buffer, bucket, sizeof(struct bucket));
+ buffer += sizeof(struct bucket);
+ memcpy(buffer, bucket->sthll_register, hll_size(ss->precision));
+ buffer += hll_size(ss->precision);
+ }
+ for (int i = 0; i < ss->depth * ss->width; i++) {
+ int64_t key_len;
+ if (ss->buckets[i].content != NULL) {
+ key_len = ss->buckets[i].content->key_len;
+ } else {
+ key_len = 0;
+ }
+ memcpy(buffer, &key_len, sizeof(int64_t));
+ buffer += sizeof(int64_t);
+
+ if (key_len > 0) {
+ memcpy(buffer, ss->buckets[i].content->key, key_len);
+ buffer += key_len;
+ }
+ }
+}
+
+struct spread_sketch *spread_sketch_deserialize(const char *blob, size_t blob_sz)
+{
+ struct spread_sketch *ss = malloc(sizeof(struct spread_sketch));
+ memcpy(ss, blob, sizeof(struct spread_sketch));
+ blob += sizeof(struct spread_sketch);
+ ss->table = smart_ptr_table_new();
+ ss->buckets = calloc(ss->depth * ss->width, sizeof(struct bucket));
+ ss->scheme = DEFAULT_SCHEME;
+
+ for (int i = 0; i < ss->depth * ss->width; i++) {
+ struct bucket *bucket = &ss->buckets[i];
+ memcpy(bucket, blob, sizeof(struct bucket));
+ blob += sizeof(struct bucket);
+
+ bucket->sthll_register = hll_create_register(ss->precision);
+ memcpy(bucket->sthll_register, blob, hll_size(ss->precision));
+ blob += hll_size(ss->precision);
+ }
+
+ for (int i = 0; i < ss->depth * ss->width; i++) {
+ int64_t key_len;
+ memcpy(&key_len, blob, sizeof(int64_t));
+ blob += sizeof(int64_t);
+ if (key_len == 0) {
+ continue;
+ }
+ const char *key = blob;
+ blob += key_len;
+
+ struct entry *content = smart_ptr_table_get(ss->table, key, key_len, NULL);
+ ss->buckets[i].content = content;
+ }
+
+ return ss;
+}
+
+void spread_sketch_merge_blob(struct spread_sketch *dst, const char *blob, size_t blob_sz)
+{
+ struct spread_sketch *src = spread_sketch_deserialize(blob, blob_sz);
+ spread_sketch_merge(dst, src);
+ spread_sketch_free(src);
+}
+
+size_t spread_sketch_calculate_memory_usage(const struct spread_sketch *ss)
+{
+ size_t ret = 0;
+ ret += sizeof(struct spread_sketch);
+
+ size_t bucket_size = sizeof(struct bucket) + hll_size(ss->precision);
+ ret += ss->depth * ss->width * bucket_size;
+
+ struct entry *content, *tmp;
+ HASH_ITER(hh, ss->table->entry, content, tmp) {
+ ret += sizeof(struct entry);
+ ret += content->key_len;
+ // assume the exdata is NULL
+ }
+ return ret;
+}
+
+double spread_sketch_query(const struct spread_sketch *ss, const char *key, size_t key_length, struct timeval now) {
+ if (ss->time_window_ms!=0 && hll_should_slide(ss->precision, ss->time_window_ms, now, ss->reset_time)) {
+ struct spread_sketch *duplicate = duplicate_and_step(ss, &now);
+ double ret = spread_sketch_point_query(duplicate, key, key_length);
+ spread_sketch_free(duplicate);
+ return ret;
+ } else {
+ return spread_sketch_point_query(ss, key, key_length);
+ }
+}
+
+struct spread_sketch *spread_sketch_replicate(uuid_t uuid, const char *blob, size_t blob_sz) {
+ return spread_sketch_deserialize(blob, blob_sz);
+}
diff --git a/CRDT/spread_sketch.h b/CRDT/spread_sketch.h
new file mode 100644
index 0000000..80e6916
--- /dev/null
+++ b/CRDT/spread_sketch.h
@@ -0,0 +1,58 @@
+#pragma once
+
+
+
+#ifdef __cplusplus
+extern "C"{
+#endif
+
+#include "exdata.h"
+#include <stdint.h> //uint64_t
+#include <sys/time.h> // struct timeval
+#include <stddef.h>
+#include <uuid/uuid.h>
+
+#define DUMMY_ITEM_HASH (1ULL<<63) // level(left most zeros) = 0
+
+struct spread_sketch;
+
+// set time_window_ms to zero, the spread sketch will not slide and decay, in which case, `now` can be any value
+struct spread_sketch *spread_sketch_new(int depth, int width, unsigned char precision, int time_window_ms, struct timeval now);
+void spread_sketch_free(struct spread_sketch *ss);
+void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn);
+
+int spread_sketch_add_hash(struct spread_sketch *ss, const char *entry, size_t entry_length, uint64_t item_hash, void *arg, struct timeval now);
+int spread_sketch_add(struct spread_sketch *ss, const char *entry, size_t entry_length, const char* item, size_t item_len, void *arg, struct timeval now);
+
+// get the number of entrys stored in spread sketch
+int spread_sketch_get_count(const struct spread_sketch *ss);
+// list all the entrys in spread sketch. User should free the arrays, but do not free the elements of strings in the array(because they are references to the internal data structure)
+// Example: char **entry; size_t *entry_len; size_t n_entrys; spread_sketch_list_entrys(&entry, &entry_len, &n_entrys); free(entry); free(entry_len);
+void spread_sketch_list_entries(const struct spread_sketch *ss, char ***entries, size_t **entry_lens, size_t *n_entry);
+// query the cardinality(or fanout) of a entry in spread sketch.
+// Even thought spread sketch algorithm does not requires entrys to exist innately, when querying a entry that is not present in the spread sketch, `spread_sketch_get_cardinality` will return -1.
+double spread_sketch_get_cardinality(const struct spread_sketch *ss, const char *entry, size_t entry_len);
+// query a hyperloglog 's base64 serialization. The serialization format is [1,precision,register...] and then encoded by base64
+char *spread_sketch_get_hll_base64_serialization(const struct spread_sketch *ss, const char *entry, size_t entry_len);
+void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *entry, size_t entry_len);
+// in most cases, it has the same output as `spread_sketch_get_cardinality`, but it will perform more like an ordinary spread sketch query.
+// Will always return a value, even if the entry is not present in the spread sketch. Must pass a `now` value required by Stagger hll query.
+double spread_sketch_query(const struct spread_sketch *ss, const char *entry, size_t entry_length, struct timeval now);
+
+void spread_sketch_merge(struct spread_sketch *dest, const struct spread_sketch *src);
+struct spread_sketch *spread_sketch_copy(const struct spread_sketch *src);
+void spread_sketch_serialize(const struct spread_sketch *ss, char **blob, size_t *blob_sz);
+struct spread_sketch *spread_sketch_deserialize(const char *blob, size_t blob_sz);
+void spread_sketch_merge_blob(struct spread_sketch *dst, const char *blob, size_t blob_sz);
+void spread_sketch_reset(struct spread_sketch *ss);
+// exactly the same as `spread_sketch_deserialize`. `uuid` is a dummy parameter.(since spread sketch itself performs very like set CRDT, it does not need uuid to sync replicas)
+struct spread_sketch *spread_sketch_replicate(uuid_t uuid, const char *blob, size_t blob_sz);
+
+void spread_sketch_get_parameter(const struct spread_sketch *ss, int *depth_out, int *width_out, unsigned char *precision_out, int *time_window_ms_out);
+// spread sketch alway store values more than expected_query_num,expected_query_num is a hint to set spread sketch parameters properly
+void spread_sketch_recommend_parameters(int expected_super_spreader_number, int *depth_out, int *width_out, unsigned char *precision_out);
+size_t spread_sketch_calculate_memory_usage(const struct spread_sketch *ss);
+
+#ifdef __cplusplus
+}
+#endif \ No newline at end of file
diff --git a/CRDT/spread_sketch_gtest.cpp b/CRDT/spread_sketch_gtest.cpp
new file mode 100644
index 0000000..55084b6
--- /dev/null
+++ b/CRDT/spread_sketch_gtest.cpp
@@ -0,0 +1,866 @@
+#include "crdt_utils.h"
+#include "st_hyperloglog.h"
+#include "spread_sketch.h"
+#include "hll_common.h"
+
+#include <gtest/gtest.h>
+#include <math.h>
+#include <vector>
+#include <unordered_map>
+#include <unordered_set>
+#include <string>
+#include <random>
+#include <fstream>
+
+
+struct timeval ms_to_timeval(long long ms)
+{
+ struct timeval tv;
+ tv.tv_sec=ms/1000;
+ tv.tv_usec=(ms%1000)*1000;
+ return tv;
+}
+
+struct Flow {
+ std::string src_ip;
+ std::string dst_ip;
+ long long time;
+};
+
+class InputGenerator {
+private:
+ double _now;
+ int _n_super_spreader;
+ int _n_others;
+ int _fanout_superspreader;
+ int _fanout_others;
+ double _probability_ss;
+
+ double _ms_per_packet;
+
+public:
+ InputGenerator(int n_super_spreader, int n_others, double probability_ss, int fanout_superspreader, int fanout_others, long long now, double ms_per_packet ) {
+ _now = (double)now;
+ _n_super_spreader = n_super_spreader;
+ _n_others = n_others;
+ _probability_ss = probability_ss;
+
+ _fanout_superspreader = fanout_superspreader;
+ _fanout_others = fanout_others;
+ _ms_per_packet = ms_per_packet;
+
+ if (_n_super_spreader == 0) {
+ _probability_ss = 0;
+ }
+ if (_n_others == 0) {
+ _probability_ss = 1;
+ }
+ }
+
+ struct Flow next() {
+ struct Flow flow;
+ if (rand() % 100 < _probability_ss * 100 && _n_super_spreader > 0) {
+ flow.src_ip = "s_" + std::to_string(rand() % _n_super_spreader);
+ flow.dst_ip = "sd_" + std::to_string(rand() % _fanout_superspreader);
+ } else {
+ flow.src_ip = std::to_string(rand() % _n_others);
+ flow.dst_ip = "d_" + std::to_string(rand() % _fanout_others);
+ }
+ flow.time = (long long)_now;
+ _now += _ms_per_packet;
+ return flow;
+ }
+};
+
+struct spread_sketch_list {
+ char **key;
+ size_t *key_len;
+ double *count;
+ int n_results;
+};
+
+struct spread_sketch_list *spread_sketch_list(const struct spread_sketch *ss) {
+ size_t n_result;
+ size_t *key_lens;
+ char **keys;
+ spread_sketch_list_entries(ss, &keys, &key_lens, &n_result);
+
+ struct spread_sketch_list *ret = (struct spread_sketch_list *)malloc(sizeof(struct spread_sketch_list));
+ if (n_result == 0) {
+ ret->n_results = 0;
+ ret->key = NULL;
+ ret->count = NULL;
+ return ret;
+ }
+
+ // sort by count
+ std::vector<std::pair<double, size_t>> sorted;
+ for (size_t i = 0; i < n_result; i++) {
+ sorted.push_back(std::make_pair(spread_sketch_get_cardinality(ss, keys[i], key_lens[i]), i));
+ }
+ std::sort(sorted.begin(), sorted.end(), [](const std::pair<double, size_t> &a, const std::pair<double, size_t> &b) {
+ return a.first > b.first;
+ });
+
+ ret->key = (char **)malloc(sizeof(char *) * n_result);
+ ret->count = (double *)malloc(sizeof(double) * n_result);
+ ret->key_len = (size_t *)malloc(sizeof(size_t) * n_result);
+
+ for (size_t i = 0; i < n_result; i++) {
+ size_t idx = sorted[i].second;
+ ret->key[i] = (char *)malloc(key_lens[idx]);
+ memcpy(ret->key[i], keys[idx], key_lens[idx]);
+ ret->key_len[i] = key_lens[idx];
+ ret->count[i] = sorted[i].first;
+ }
+
+ ret->n_results = n_result;
+ free(keys);
+ free(key_lens);
+ return ret;
+}
+
+void ss_query_result_free(struct spread_sketch_list *result) {
+ if (result == NULL) {
+ return;
+ }
+
+ for(int i=0; i<result->n_results; i++){
+ free(result->key[i]);
+ }
+ free(result->key);
+ free(result->count);
+ free(result->key_len);
+ free(result);
+}
+
+class TestRet {
+private:
+ struct spread_sketch_list *ret_raw;
+
+ unsigned char _precision;
+ long long _window_s;
+
+ long long _last_time;
+ bool _use_hll;
+
+
+public:
+ TestRet(){
+ ret_raw = NULL;
+ _precision = 4;
+ _window_s = 10;
+ _use_hll = false;
+ }
+
+ TestRet(unsigned char precision, long long window_s) {
+ ret_raw = NULL;
+ _precision = precision;
+ _window_s = window_s;
+ _use_hll = true;
+ }
+
+ ~TestRet(){
+ ss_query_result_free(ret_raw);
+ if (_use_hll) {
+ for (auto &kv : benchmark_sthll) {
+ ST_hyperloglog_free(kv.second);
+ }
+ }
+ }
+
+ void feed(std::string key_main, std::string key_spread, long long now_ms){
+ benchmark[key_main].insert(key_spread);
+
+ if (_use_hll) {
+ if (benchmark_sthll.find(key_main) == benchmark_sthll.end()) {
+ benchmark_sthll[key_main] = ST_hyperloglog_new(_precision, _window_s * 1000, ms_to_timeval(now_ms));
+ }
+
+ ST_hyperloglog_add(benchmark_sthll[key_main], key_spread.c_str(), key_spread.size(), ms_to_timeval(now_ms));
+ }
+
+ _last_time = now_ms;
+ }
+
+ void read_ss_query_result(struct spread_sketch_list *ret){
+ this->ret_raw = ret;
+
+ // printf("read_ss_query_result n_results: %d\n", ret->n_results);
+ for (int i = 0; i < ret->n_results; i++) {
+ std::string key(ret->key[i], ret->key_len[i]);
+ // struct ST_hyperloglog *hll = _use_hll ? benchmark_sthll[key] : NULL;
+ // double hllcnt = hll == NULL ? -1.0 : ST_hyperloglog_count(hll, ms_to_timeval(_last_time));
+ // long actural = benchmark[key].size();
+ // printf("key: %s, count: %f, hll benchmark: %f, actual: %lu\n", key.c_str(), ret->count[i], hllcnt, actural);
+
+ results[key] = ret->count[i];
+ }
+ }
+
+ double cal_mre(){
+ double mre = 0;
+ int cnt = 0;
+ for (auto &kv : benchmark) {
+ // only calculate MRE for keys that exist in benchmark
+ // and only take account of spread keys(start with s_)
+ if (kv.first[0] != 's') {
+ continue;
+ }
+ double est = results.find(kv.first) == results.end() ? 0 : results[kv.first];
+ double actual = kv.second.size();
+
+ mre += fabs(est - actual) / actual;
+ cnt++;
+ }
+ return mre / cnt;
+ }
+
+ double cal_mre(double top_percent){
+ double mre = 0;
+ std::vector<std::pair<std::string, int>> top_n_keys;
+ for (auto &kv : benchmark) {
+ int fanout = kv.second.size();
+ top_n_keys.push_back(std::make_pair(kv.first, fanout));
+ }
+ std::sort(top_n_keys.begin(), top_n_keys.end(), [](const std::pair<std::string, int> &a, const std::pair<std::string, int> &b) {
+ return a.second > b.second;
+ });
+
+ int total_ss = top_percent * top_n_keys.size();
+ for (int i = 0; i < total_ss; i++) {
+
+ double est = results.find(top_n_keys[i].first) == results.end() ? 0 : results[top_n_keys[i].first];
+ double actual = top_n_keys[i].second;
+
+ mre += fabs(est - actual) / actual;
+ }
+ return mre / total_ss;
+ }
+
+ bool check_result(unsigned char hll_precision, double eplilon, double delta) {
+ int good = 0;
+ int total_valid_key = 0;
+ double sigma = hll_error(hll_precision) * 2; // RSD -> max error estimation
+
+ for (auto &kv : benchmark) {
+ if (results.find(kv.first) == results.end()) {
+ continue;
+ }
+
+ double actual = kv.second.size();
+ double est = results[kv.first];
+ total_valid_key++;
+
+ // sigma is actually an RSD instead of an error estimation, but the chances of error estimation being larger than RSD is low enough to ignore, compared to the error of CM sketch
+ // est is always bigger than actual if HLL error is omitted
+ if (est <= (1+sigma) * (1+eplilon) * actual && est >= (1-sigma) * actual) {
+ good++;
+ }
+ }
+
+ bool ret = good >= total_valid_key * (1 - delta);
+ if (!ret) {
+ // if (true) {
+ printf("test failed, good: %d, total: %d\n", good, total_valid_key);
+ printf("allowed error percentage: %f\n", delta);
+ // print out the whole benchmark and results
+ for (auto &kv : benchmark) {
+ if (results.find(kv.first) == results.end()) {
+ continue;
+ }
+
+ if (_use_hll) {
+ printf("key: %s, actual: %lu, estimated: %f, allowed range [%f, %f], hll result: %f\n",
+ kv.first.c_str(),
+ kv.second.size(),
+ results[kv.first], (1-sigma) * kv.second.size(),
+ (1+sigma) * (1+eplilon) * kv.second.size(),
+ ST_hyperloglog_count(benchmark_sthll[kv.first], ms_to_timeval(_last_time)));
+ } else {
+ printf("key: %s, actual: %lu, estimated: %f, allowed range [%f, %f]\n",
+ kv.first.c_str(),
+ kv.second.size(),
+ results[kv.first], (1-sigma) * kv.second.size(),
+ (1+sigma) * (1+eplilon) * kv.second.size());
+ }
+ }
+ }
+ return ret;
+ }
+
+ double cal_recall_rate(int top_n) {
+ std::vector<std::pair<std::string, int>> top_n_keys;
+ for (auto &kv : benchmark) {
+ int fanout = kv.second.size();
+ top_n_keys.push_back(std::make_pair(kv.first, fanout));
+ }
+ std::sort(top_n_keys.begin(), top_n_keys.end(), [](const std::pair<std::string, int> &a, const std::pair<std::string, int> &b) {
+ return a.second > b.second;
+ });
+
+ int good = 0;
+ int total_ss = MIN(top_n, (int)top_n_keys.size());
+ for (int i = 0; i < total_ss; i++) {
+ if (results.find(top_n_keys[i].first) != results.end()) {
+ good++;
+ }
+ }
+
+ return 1.0 * good / total_ss;
+ }
+
+ void print() {
+ for (auto &kv : benchmark) {
+ double hll_result = _use_hll ? ST_hyperloglog_count(benchmark_sthll[kv.first], ms_to_timeval(_last_time)) : -1.0;
+ printf("key: %s, benchmark size: %lu, estimated size: %f, hll result: %f\n",
+ kv.first.c_str(), kv.second.size(), results.find(kv.first) == results.end() ? -1.0 : results[kv.first], hll_result);
+ }
+
+ for (auto &kv : results) {
+ if (benchmark.find(kv.first) == benchmark.end()) {
+ printf("key not existed in benchmark:: %s, estimated size: %f\n", kv.first.c_str(), kv.second);
+ }
+ }
+ }
+
+ bool key_exist(std::string key) {
+ return results.find(key) != results.end();
+ }
+
+ std::unordered_map<std::string, std::unordered_set<std::string>> benchmark;
+ std::unordered_map<std::string, double> results;
+ std::unordered_map<std::string, struct ST_hyperloglog *> benchmark_sthll;
+};
+
+TestRet *spread_sketch_test(struct spread_sketch *ss, long long window_s, long long duration_s, class InputGenerator &gen)
+{
+ long long duration_ms = duration_s * 1000;
+ long long last_window_start_ms = duration_ms - window_s * 1000;
+
+ TestRet *test_ret = new TestRet(4, window_s);
+
+ struct Flow flow = gen.next();
+ long long start_ms = flow.time;
+ while (flow.time - start_ms <= duration_ms) {
+ std::string src_ip = flow.src_ip;
+ std::string dst_ip = flow.dst_ip;
+ long long now = flow.time;
+ spread_sketch_add(ss, src_ip.c_str(), src_ip.size(), dst_ip.c_str(), dst_ip.size(), NULL, ms_to_timeval(now));
+ if (now >= last_window_start_ms) {
+ test_ret->feed(src_ip, dst_ip, now);
+ }
+
+ flow = gen.next();
+ }
+
+ struct spread_sketch_list *results = spread_sketch_list(ss);
+ test_ret->read_ss_query_result(results);
+
+ return test_ret;
+}
+
+TestRet *spread_sketch_generic_test(int precision, int depth, int width, long long window_s, long long duration_s, int n_super_spreader, int n_total, int fanout_superspreader, int fanout_others)
+{
+ long long now = 0;
+ const int N_INSTANCES = 10;
+
+ struct spread_sketch *ss_instances[N_INSTANCES];
+ for (int i = 0; i < N_INSTANCES; i++) {
+ ss_instances[i] = spread_sketch_new(depth, width, precision, window_s * 1000, ms_to_timeval(now));
+ }
+
+ double ms_per_packet = 1.0 * (window_s * 1000) / (fanout_superspreader * n_super_spreader + fanout_others * (n_total - n_super_spreader));
+ class InputGenerator gen(n_super_spreader, n_total - n_super_spreader, 0.5, fanout_superspreader, fanout_others, now, ms_per_packet / 1.0);
+ long long duration_ms = duration_s * 1000;
+ long long last_window_start_ms = duration_ms - window_s * 1000;
+ TestRet *test_ret = new TestRet(precision, window_s);
+
+ struct Flow flow = gen.next();
+ long long start_ms = flow.time;
+ while (flow.time - start_ms <= duration_ms) {
+ std::string src_ip = flow.src_ip;
+ std::string dst_ip = flow.dst_ip;
+ long long now = flow.time;
+
+ struct spread_sketch *ss = ss_instances[rand() % N_INSTANCES];
+ spread_sketch_add(ss, src_ip.c_str(), src_ip.size(), dst_ip.c_str(), dst_ip.size(), NULL, ms_to_timeval(now));
+ if (now >= last_window_start_ms) {
+ test_ret->feed(src_ip, dst_ip, now);
+ }
+
+ flow = gen.next();
+ }
+
+ struct spread_sketch *ss = ss_instances[0];
+ for (int i = 1; i < N_INSTANCES; i++) {
+ spread_sketch_merge(ss, ss_instances[i]);
+ }
+
+ struct spread_sketch_list *results = spread_sketch_list(ss);
+ test_ret->read_ss_query_result(results);
+
+ for (int i = 0; i < N_INSTANCES; i++) {
+ spread_sketch_free(ss_instances[i]);
+ }
+
+ return test_ret;
+}
+
+
+double cal_CM_delta(int depth) {
+ return 1.0 / (1 << depth);
+}
+
+double cal_CM_epsilon(int width) {
+ return 2.0 / width;
+}
+
+TEST(SpreadSketch, OneEntry)
+{
+ TestRet *test_ret = spread_sketch_generic_test(
+ 4, // hll precision
+ 4, 1024, // depth, width
+ 10, 100, // window, duration
+ 0, 1, // n_super_spreader, n_total. just one flow, so no error on CM sketch
+ 10000, 10000); // fanout_superspreader, fanout_others
+
+ bool good = test_ret->check_result(4, 0, 0);
+ EXPECT_TRUE(good);
+
+ delete test_ret;
+}
+
+TEST(SpreadSketch, OneEntryNoSliding)
+{
+ // use very little fp compared to big sketch so that CM sketch error are negligible
+ TestRet *test_ret = spread_sketch_generic_test(
+ 4, // hll precision
+ 4, 1024, // depth, width
+ 10, 1, // window, duration, window is larger than duration
+ 0, 1, // n_super_spreader, n_total. just one flow, so no error on CM sketch
+ 10000, 10000); // fanout_superspreader, fanout_others
+
+ bool good = test_ret->check_result(4, 0, 0);
+ EXPECT_TRUE(good);
+
+ delete test_ret;
+}
+
+TEST(SpreadSketch, Precision)
+{
+ TestRet *test_ret = spread_sketch_generic_test(
+ 6, // hll precision
+ 4, 1024, // depth, width
+ 100, 200, // window, duration
+ 500, 10000, // n_super_spreader, n_total.
+ 1000, 3); // fanout_superspreader, fanout_others
+
+ double mre = test_ret->cal_mre();
+ EXPECT_LE(mre, 0.4);
+
+ bool good = test_ret->check_result(6, cal_CM_epsilon(1024), cal_CM_delta(4));
+ EXPECT_TRUE(good);
+
+ double recall_rate = test_ret->cal_recall_rate(500); // 500: n_super_spreader
+ EXPECT_GE(recall_rate, 0.85);
+
+ // printf("normal_flows recall rate: %f, mre: %f \n", recall_rate, mre);
+
+ delete test_ret;
+}
+
+TEST(SpreadSketch, PrecisionOfScarceEntry)
+{
+ int window_s = 100;
+ int n_super_spreader = 100;
+ struct spread_sketch *ss = spread_sketch_new(5, 1024, 4, window_s * 1000, ms_to_timeval(0));
+ InputGenerator gen(n_super_spreader, 1000, 0.5, 1000, 3, 0, 100);
+
+ TestRet *test_ret = spread_sketch_test(ss, window_s, window_s * 5, gen);
+
+ double mre = test_ret->cal_mre();
+ EXPECT_LE(mre, 0.4);
+
+ // not a typical case, so STHLL accuracy is not guaranteed
+
+ double recall_rate = test_ret->cal_recall_rate(n_super_spreader);
+ EXPECT_GE(recall_rate, 0.9);
+
+ // printf("scarce_flow recall rate: %f, mre: %f \n", recall_rate, mre);
+
+ delete test_ret;
+ spread_sketch_free(ss);
+}
+
+TEST(SpreadSketch, Expire) {
+ int n_super_spreader = 1000;
+ int window_s = 10;
+
+ struct spread_sketch *ss = spread_sketch_new(4, 1024, 6, window_s * 1000, ms_to_timeval(0));
+ long long now = 0;
+
+ for (int i = 0; i < n_super_spreader; i++) {
+ std::string src = "old_" + std::to_string(i);
+
+ for (int k = 0; k < 1000; k++) {
+ std::string dst = "d_" + std::to_string(k);
+ spread_sketch_add(ss, src.c_str(), src.size(), dst.c_str(), dst.size(), NULL, ms_to_timeval(now));
+ now += 5;
+ }
+ }
+ InputGenerator gen(n_super_spreader, n_super_spreader * 9, 0.5, 1000, 3, now, 0.01);
+ TestRet *test_ret = spread_sketch_test(ss, window_s, window_s * 3, gen);
+ // all old keys should be decayed
+ for (int i = 0; i < n_super_spreader; i++) {
+ EXPECT_FALSE(test_ret->key_exist("old_" + std::to_string(i)));
+ }
+
+ double recall_rate = test_ret->cal_recall_rate(n_super_spreader);
+ EXPECT_GE(recall_rate, 0.9);
+ // printf("decay_old_key_not_included recall rate: %f\n", recall_rate);
+
+ delete test_ret;
+ spread_sketch_free(ss);
+}
+
+TEST(SpreadSketch, Serialize) {
+ struct spread_sketch *ss = spread_sketch_new(4, 1024, 4, 10*1000, ms_to_timeval(0));
+ int key1 = 1;
+ int key2 = 2;
+ int dstKey1 = 11;
+ int dstKey2 = 22;
+
+ spread_sketch_add(ss, (char *)&key1, sizeof(key1), (char *)&dstKey1, sizeof(dstKey1), NULL, ms_to_timeval(0));
+ spread_sketch_add(ss, (char *)&key1, sizeof(key1), (char *)&dstKey2, sizeof(dstKey2), NULL, ms_to_timeval(10));
+ spread_sketch_add(ss, (char *)&key2, sizeof(key2), (char *)&dstKey1, sizeof(dstKey1), NULL, ms_to_timeval(20));
+
+ char *blob;
+ size_t blob_sz;
+ spread_sketch_serialize(ss, &blob, &blob_sz);
+
+ struct spread_sketch *ss_deserialized = spread_sketch_deserialize(blob, blob_sz);
+
+ struct spread_sketch_list *result = spread_sketch_list(ss);
+ struct spread_sketch_list *result_deserialized = spread_sketch_list(ss_deserialized);
+
+ EXPECT_EQ(result->n_results, result_deserialized->n_results);
+ for (int i = 0; i < result->n_results; i++) {
+ EXPECT_EQ(result->key_len[i], result_deserialized->key_len[i]);
+ EXPECT_TRUE(memcmp(result->key[i], result_deserialized->key[i], result->key_len[i]) == 0);
+ EXPECT_EQ(result->count[i], result_deserialized->count[i]);
+ }
+
+ ss_query_result_free(result);
+ ss_query_result_free(result_deserialized);
+ spread_sketch_free(ss);
+ spread_sketch_free(ss_deserialized);
+ free(blob);
+}
+
+void spread_sketch_parameter_experiments(int depth, int width, int precision, long long window_s, int n_super_spreader, int fanout_superspreader) {
+ struct spread_sketch *ss = spread_sketch_new(depth, width, precision, window_s * 1000, ms_to_timeval(0));
+ InputGenerator gen(n_super_spreader, n_super_spreader * 9, 0.5, fanout_superspreader, 3, 0, 1);
+ TestRet *test_ret = spread_sketch_test(ss, window_s, window_s * 5, gen);
+
+ double mre = test_ret->cal_mre();
+ double recall_rate = test_ret->cal_recall_rate(n_super_spreader);
+ size_t memory_cost = spread_sketch_calculate_memory_usage(ss);
+ spread_sketch_free(ss);
+
+ delete test_ret;
+
+ printf("depth: %d, width: %d, precision: %d, window: %lld, n_super_spreader: %d, fanout_superspreader: %d. \n\trecall rate: %f, mre: %f, memory cost: %fkb\n",
+ depth, width, precision, window_s, n_super_spreader, fanout_superspreader, recall_rate, mre, memory_cost / 1024.0);
+ printf("===============================================\n");
+}
+
+TEST(SpreadSketchExperiments, UniformDistribution) {
+ long long window_s = 10;
+ // check CM sketch related parameters
+ int depth[] = {3, 4};
+ int width[] = {64, 128, 256, 512, 1024, 2048};
+ int n_super_spreader[] = {10, 100, 1000};
+
+ for (size_t i = 0; i < sizeof(depth) / sizeof(int); i++) {
+ for (size_t j = 0; j < sizeof(width) / sizeof(int); j++) {
+ for (size_t l = 0; l < sizeof(n_super_spreader) / sizeof(int); l++) {
+ spread_sketch_parameter_experiments(depth[i], width[j], 4, window_s, n_super_spreader[l], 500);
+ }
+ }
+ }
+
+ // HLL related parameters
+ int precision[] = {4,5,6,7};
+ int fanouts[] = {10, 100, 1000, 10000};
+ for (size_t i = 0; i < sizeof(precision) / sizeof(int); i++) {
+ for (size_t j = 0; j < sizeof(fanouts) / sizeof(int); j++) {
+ spread_sketch_parameter_experiments(4, 1024, precision[i], window_s, 100, fanouts[j]);
+ }
+ }
+
+ // window
+ long long windows[] = {5, 10, 20, 30, 60};
+ for (size_t i = 0; i < sizeof(windows) / sizeof(long long); i++) {
+ spread_sketch_parameter_experiments(3, 256, 5, windows[i], 100, 500);
+ }
+}
+
+
+//===========================================================================
+//= Function to generate Zipf (power law) distributed random variables =
+//= - Input: alpha and N =
+//= - Output: Returns with Zipf distributed random variable =
+//===========================================================================
+int zipf(double alpha, int n)
+{
+ static bool first = true; // Static first time flag
+ static double c = 0; // Normalization constant
+ double z; // Uniform random number (0 < z < 1)
+ double sum_prob; // Sum of probabilities
+ double zipf_value; // Computed exponential value to be returned
+ int i; // Loop counter
+
+ // Compute normalization constant on first call only
+ if (first)
+ {
+ for (i=1; i<=n; i++)
+ c = c + (1.0 / pow((double) i, alpha));
+ c = 1.0 / c;
+ first = false;
+ }
+
+ // Pull a uniform random number (0 < z < 1)
+ do
+ {
+ z = (double)rand() / (double)RAND_MAX;
+ }
+ while ((z == 0.0) || (z == 1.0));
+
+ // Map z to the value
+ sum_prob = 0;
+ for (i=1; i<=n; i++)
+ {
+ sum_prob = sum_prob + c / pow((double) i, alpha);
+ if (sum_prob >= z)
+ {
+ zipf_value = i;
+ break;
+ }
+ }
+
+ return(zipf_value);
+}
+
+class ZipfInputGenerator {
+private:
+ double _now;
+ double _ms_per_packet;
+
+ const int MAX_DATA = 1000000;
+ std::pair<std::string, std::string> *loadeds;
+ unsigned cursor;
+
+public:
+ ZipfInputGenerator(double alpha, int n, long long now, double ms_per_packet) {
+ _now = now;
+ _ms_per_packet = ms_per_packet;
+ _alpha = alpha;
+ _n = n;
+
+ // generate data and write them to file
+ std::string filename = "zipf_" + std::to_string(alpha) + "_" + std::to_string(n) + ".txt";
+
+ std::unordered_map<int, int> fanout_map; // src_ip_id -> fanout being used
+
+ if (access(filename.c_str(), F_OK) != 0) {
+ printf("file %s not found, generating data\n", filename.c_str());
+
+ std::ofstream file(filename);
+ if (!file.is_open()) {
+ printf("failed to open file %s\n", filename.c_str());
+ return;
+ }
+
+ for (int i = 0; i < MAX_DATA; i++) {
+ int src_id = zipf(alpha, n);
+ int fanout = fanout_map.find(src_id) == fanout_map.end() ? 0 : fanout_map[src_id];
+ fanout_map[src_id] = fanout + 1;
+
+ file << "s_" << src_id << " d_" << fanout << std::endl;
+ }
+
+ file.close();
+ printf("data generated and saved to file %s\n", filename.c_str());
+ }
+
+ // load data
+ std::ifstream file(filename);
+ if (!file.is_open()) {
+ printf("failed to open file %s\n", filename.c_str());
+ return;
+ }
+
+ loadeds = new std::pair<std::string, std::string>[MAX_DATA];
+ std::string line;
+ int i = 0;
+ while (std::getline(file, line) && i < MAX_DATA) {
+ std::istringstream iss(line);
+ std::string src_ip, dst_ip;
+ iss >> src_ip >> dst_ip;
+ loadeds[i] = std::make_pair(src_ip, dst_ip);
+ i++;
+ }
+ file.close();
+ }
+
+ struct Flow next() {
+ int r_cursor = cursor % MAX_DATA;
+ struct Flow flow;
+ flow.src_ip = loadeds[r_cursor].first;
+ flow.dst_ip = loadeds[r_cursor].second;
+ flow.time = _now;
+
+ _now += _ms_per_packet;
+ cursor++;
+
+ return flow;
+ }
+
+ ~ZipfInputGenerator() {
+ delete[] loadeds;
+ }
+
+ double _alpha;
+ int _n;
+};
+
+void parameter_experiments_given_zipf_experiment(int depth, int width, int precision, long long window_s, ZipfInputGenerator &input) {
+ struct spread_sketch *ss = spread_sketch_new(depth, width, precision, window_s * 1000, ms_to_timeval(0));
+ TestRet *ret = new TestRet();
+
+ Flow flow = input.next();
+ long long start_ms = flow.time;
+ long long duration_ms = 1000 * window_s * 5;
+ long long last_window = duration_ms - window_s * 1000;
+
+ while (flow.time - start_ms <= duration_ms) {
+ spread_sketch_add(ss, flow.src_ip.c_str(), flow.src_ip.size(), flow.dst_ip.c_str(), flow.dst_ip.size(), NULL, ms_to_timeval(flow.time));
+ flow = input.next();
+
+ if (flow.time >= last_window) {
+ ret->feed(flow.src_ip, flow.dst_ip, flow.time);
+ }
+ }
+
+ struct spread_sketch_list *results = spread_sketch_list(ss);
+ ret->read_ss_query_result(results);
+
+ double mre = ret->cal_mre();
+ double mre_reach_top_10 = ret->cal_mre(0.1);
+ double recall_rate = ret->cal_recall_rate(input._n / 10); // int n_src_ip = 10 * n_superspreader
+
+ printf("depth: %d, width: %d, precision: %d, window: %lld. Skewness: %f n_src: %d.\n\tmre: %f, recall: %f, top_mre: %f\n",
+ depth, width, precision, window_s, input._alpha, input._n, mre, recall_rate, mre_reach_top_10);
+
+ delete ret;
+ spread_sketch_free(ss);
+}
+
+TEST(SpreadSketchExperiments, ZipfDistribution) {
+ /*
+ test with flows in zipf distribution fanout.
+ Every time window, there are `n_flows_per_window` distinct flows generated. Cardinality({y|x=x_i}) ~ Zipf(N, skewness), where N = 10 * n_superspreader
+ */
+ long long window_s = 10;
+ // check CM sketch related parameters
+ int depth[] = {3,4};
+ int width[] = {64,128,256,512,1024};
+ double skewness[] = {0.3,0.7,1.0};
+ int n_superspreader[] = {10,100,1000};
+
+ for (size_t i = 0; i < sizeof(depth) / sizeof(int); i++) {
+ for (size_t j = 0; j < sizeof(width) / sizeof(int); j++) {
+ for (size_t l = 0; l < sizeof(skewness) / sizeof(double); l++) {
+ for (size_t k = 0; k < sizeof(n_superspreader) / sizeof(int); k++) {
+ int n_src_ip = 10 * n_superspreader[k];
+ int n_flows_per_window = n_src_ip * 100; // 10 * 1000 * 100 = 1e6
+ double ms_per_packet = 1.0 * window_s * 1000 / n_flows_per_window;
+
+ ZipfInputGenerator input(skewness[l], n_src_ip, 0, ms_per_packet);
+ parameter_experiments_given_zipf_experiment(depth[i], width[j], 4, window_s, input);
+ }
+ }
+ }
+ }
+
+ // // HLL related parameters
+ int precision[] = {4,5,6,7};
+ for (size_t i = 0; i < sizeof(precision) / sizeof(int); i++) {
+ for (size_t j = 0; j < sizeof(skewness) / sizeof(double); j++) {
+ for (size_t k = 0; k < sizeof(n_superspreader) / sizeof(int); k++) {
+ int n_src_ip = 10 * n_superspreader[k];
+ int n_flows_per_window = n_src_ip * 100; // 10 * 1000 * 100 = 1e6
+ double ms_per_packet = 1.0 * window_s * 1000 / n_flows_per_window;
+
+ ZipfInputGenerator input(skewness[j], n_src_ip, 0, ms_per_packet);
+ parameter_experiments_given_zipf_experiment(4, 1024, precision[i], window_s, input);
+ }
+ }
+ }
+
+ // after deciding precision = 6, and width = K, check which depth is better.
+ int depth2[] = {3, 4};
+ int width2_10[] = {60};
+ int width2_100[] = {100,150,200,250,300,350,400};
+ int width2_1000[] = {800,900,1000,1100,1200,1300,1400,1500};
+ int *width2[3] = {width2_10, width2_100, width2_1000};
+ size_t n_w_choice[3] = {sizeof(width2_10) / sizeof(int), sizeof(width2_100) / sizeof(int), sizeof(width2_1000) / sizeof(int)};
+
+ for (size_t i = 0; i < sizeof(depth2) / sizeof(int); i++) {
+ for (size_t j = 0; j < sizeof(n_superspreader) / sizeof(int); j++) {
+ int *widths_tmp = width2[j];
+
+ for (size_t k = 0; k < n_w_choice[j]; k++) {
+ int n_src_ip = 10 * n_superspreader[j];
+ int n_flows_per_window = n_src_ip * 100;
+ double ms_per_packet = 1.0 * window_s * 1000 / n_flows_per_window;
+
+ ZipfInputGenerator input(0.7, n_src_ip, 0, ms_per_packet);
+ parameter_experiments_given_zipf_experiment(depth2[i], widths_tmp[k], 6, window_s, input);
+ }
+ }
+ }
+
+ // verify the strategy of parameter setting
+ int n_superspreader2[] = {10, 20, 50,100, 150, 200,250,300,350,400,450,500,600,700,800,900};
+ for (size_t i = 0; i < sizeof(n_superspreader2) / sizeof(int); i++) {
+ int n_src_ip = 10 * n_superspreader2[i];
+ int n_flows_per_window = n_src_ip * 500;
+ double ms_per_packet = 1.0 * window_s * 1000 / n_flows_per_window;
+
+ ZipfInputGenerator input(1, n_src_ip, 0, ms_per_packet);
+ int width;
+ if (n_superspreader2[i] <= 100) {
+ width = n_superspreader2[i] * 3 / 2;
+ } else {
+ width = n_superspreader2[i] + 50;
+ }
+ if (width < 32) {
+ width = 32;
+ }
+
+ parameter_experiments_given_zipf_experiment(3, width, 6, window_s, input);
+ }
+}
+
+int main(int argc, char ** argv)
+{
+ int ret=0;
+ ::testing::InitGoogleTest(&argc, argv);
+ ::testing::GTEST_FLAG(filter) = "-SpreadSketchExperiments.*"; // Exclude SpreadSketchExperiments tests
+ // ::testing::GTEST_FLAG(filter) = "*serialize_with_nonstring_keys";
+
+ ret=RUN_ALL_TESTS();
+ return ret;
+}
diff --git a/docs/command_toc.md b/docs/command_toc.md
index a9c7de6..bb4a9a9 100644
--- a/docs/command_toc.md
+++ b/docs/command_toc.md
@@ -11,6 +11,7 @@ The supported command are category as follows:
* [Bloom Filter Type](./commands/bloom_filter.md)
* [Count-Min Sketch Type](./commands/count_min_sketch.md)
* [HyperLogLog Type](./commands/hyperloglog.md)
+* [Spread Sketch Type](./commands/spread_sketch.md)
* [Cluster Management](./commands/cluster.md)
* [Trouble Shooting](./commands/trouble_shooting.md)
## COMMAND LIST
diff --git a/docs/commands/spread_sketch.md b/docs/commands/spread_sketch.md
new file mode 100644
index 0000000..9175e11
--- /dev/null
+++ b/docs/commands/spread_sketch.md
@@ -0,0 +1,155 @@
+## Spread Sketch
+
+Spread Sketch is a probabilistic data structure that estimates the cardinalities of a huge number of entries. It's implemented based on [A High-Performance Invertible Sketch for Network-Wide Superspreader Detection](https://ieeexplore.ieee.org/abstract/document/9858870/) and innovated for the support of near-continuous time window based on the [Staggered HyperLogLog (STHLL)](https://www.sciencedirect.com/science/article/abs/pii/S0140366422002407) and level decay mechanism. The unique counter in sketch is implemented by STHLL instead of Multiple Resolution Bitmap suggested in the original paper.
+
+
+### SSINITBYDIM
+
+Syntax
+
+```
+SSINITBYDIM key width depth precision [TIME window-milliseconds]
+```
+
+ Initialize a Spread Sketch that can estimates cardinality of entries within a sliding time window with following arguments:
+ - width: Number of counters in each array. Reduces the error size.
+ - depth: Number of counter-arrays. Reduces the probability for an error of a certain size.
+ - precision -- Used by Hyperloglog, controlling the number of registers in the HyperLogLog. The precision should be between 4 to 16.
+ - TIME -- Optional. Creates a time-limited Spread Sketch. 'window-milliseconds' specifies the duration of the time window in milliseconds. This mode allows estimating the cardinality of items within the range (now - window-milliseconds, now).
+
+The error of Spread Sketch is determined by combining the error of STHLL and the error of CM Sketch. The memory usage of Spread Sketch can be estimated by width * depth * (4 + m), and gets slightly larger as more entries are added.
+
+Return
+- Simple String Reply: OK if the HyperLogLog created successfully.
+- Empty Array if key already exists.
+
+
+### SSINITBYCAPACITY
+
+Syntax
+
+```
+SSINITBYCAPACITY key capacity precision [TIME window-milliseconds]
+```
+
+ Initialize a Spread Sketch that can estimates cardinality of entries within a sliding time window with following arguments:
+ - capacity -- The expected number of entries which will be inserted. It determines the width and depth of the sketch by `width = capacity` an `depth = lg(capacity)`. Width less than 40 will be set to 40, and depth less than 3 will be set to 3.
+ - precision -- The data of the HyperLogLog is stored in an array of m registers. m is 2^precision, e.g. precision=4, m=16. The precision should be between 4 to 16.
+ - TIME -- Optional. Creates a time-limited Spread Sketch. 'window-milliseconds' specifies the duration of the time window in milliseconds. This mode allows estimating the cardinality of items within the range (now - window-milliseconds, now).
+
+Return
+- Simple String Reply: OK if the HyperLogLog created successfully.
+- Empty Array if key already exists.
+
+
+### SSADD
+
+Syntax
+
+```
+SSADD key entry item [item ...]
+```
+
+Add each `entry` and `item` to the Spread Sketch stored at `key`. For each the `entry`, the cardinality of `item` will be estimated by the Spread Sketch.
+
+Return
+- Integer reply an updated cardinality of the `entry` in the sketch.
+- Integer reply -1 if the key does not exist.
+
+### SSLIST
+
+Syntax
+
+```
+SSLIST key
+```
+
+Returns all the entries in record (usually those with high cardinality in high probability) as well as the cardinality of each entry.
+
+Return
+- Array of key-cardinality pairs if the key exists.
+- Empty Array if the key does not exist.
+
+Examples
+```
+swarmkv-basic-test> SSINITBYCAPACITY dos_port_scan 100 6
+OK
+swarmkv-basic-test> SSADD dos_port_scan 1.1.1.1 1.1.1.1:22-244.10.1.1:8888
+(integer) 1
+swarmkv-basic-test> SSADD dos_port_scan 10.10.10.10 any_string_can_be_item item2 just_add_many
+(integer) 3
+swarmkv-basic-test> SSLIST dos_port_scan
+1) "1.1.1.1"
+2) (integer) 1
+3) "10.10.10.10"
+4) (integer) 3
+```
+
+### SSQUERY
+
+Syntax
+
+```
+SSQUERY key entry
+```
+
+Returns the cardinality for an entry in a sketch.
+
+Return
+
+- Integer reply: The cardinality for an entry in a sketch.
+- Integer 0 if key does not exist.
+
+
+### SSMQUERY
+
+Syntax
+
+```
+SSMQUERY key entry [entry ...]
+```
+
+Returns the cardinality for one or more entries. It's the batch query operation version of `SSQUERY`.
+
+Return
+- Array reply of Integer reply with an cardinality of each of the entries in the sketch.
+- Empty Array if key does not exist.
+
+
+
+### SSINFO
+
+Syntax
+```
+SSINFO key
+```
+Returns the information of the sketch.
+- Width: The parameter specified in `SSINITBYDIM` or if the SS is init with `SSINITBYCAPACITY`, width approximately equals to capacity and slightly larger since they are tuned by the results of experiments.
+- Depth: The parameter specified in `SSINITBYDIM` or if the SS is init with `SSINITBYCAPACITY`, depth equals to lg(capacity).
+- Precision: The precision of the HyperLogLog.
+- TimeWindowMs: The time window in milliseconds. 0 if the SS is not time-limited.
+- ErrorCMS: The error rate of the sketch. Contributes to the error in correlation with total cardinality.
+- ErrorHLL: The error rate of the HyperLogLog. Contributes to the error in correlation with each entry.
+- Probability: The probability of inflated cardinality.
+
+Return
+- Array reply with information of the Spread Sketch.
+
+Examples
+```
+swarmkv-basic-test> SSINFO dos_port_scan
+ 1) "Width"
+ 2) (integer) 150
+ 3) "Depth"
+ 4) (integer) 3
+ 5) "Precision"
+ 6) (integer) 6
+ 7) "TimeWindowMs"
+ 8) (integer) 0
+ 9) "ErrorCMS"
+1) (double) 0.013333
+2) "ErrorHLL"
+3) (double) 0.130000
+4) "Probability"
+5) (double) 0.125000
+```
diff --git a/include/swarmkv/swarmkv.h b/include/swarmkv/swarmkv.h
index 69f2fd8..92e5079 100644
--- a/include/swarmkv/swarmkv.h
+++ b/include/swarmkv/swarmkv.h
@@ -168,7 +168,9 @@ void swarmkv_bfadd(struct swarmkv * db, const char * key, size_t keylen, const c
void swarmkv_bfmexists(struct swarmkv * db, const char * key, size_t keylen, const char *items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t *cb, void *cb_arg);
void swarmkv_cmsincrby(struct swarmkv * db, const char * key, size_t keylen, const char * items[], const size_t items_len[], long long increments[], size_t n_increment, swarmkv_on_reply_callback_t * cb, void * cb_arg);
+void swarmkv_ssadd(struct swarmkv * db, const char * key, size_t keylen, const char * item, const size_t item_len, const char *members[], size_t members_len[], size_t n_member, swarmkv_on_reply_callback_t * cb, void * cb_arg);
void swarmkv_cmsmquery(struct swarmkv * db, const char * key, size_t keylen, const char * items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t * cb, void * cb_arg);
+void swarmkv_ssmquery(struct swarmkv * db, const char * key, size_t keylen, const char * items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t * cb, void * cb_arg);
//Used by swarmkv-cli
size_t swarmkv_get_possible_command_name(struct swarmkv *db, const char *prefix, const char *cmd_names[], size_t sz);
diff --git a/readme.md b/readme.md
index 1d86632..4e103b2 100644
--- a/readme.md
+++ b/readme.md
@@ -33,6 +33,7 @@ SwarmKV Data Types
- [Bloom Filter](./docs/commands/bloom_filter.md) by age-partitioned bloom filter with the ability to expire.
- [Count-Min Sketch](./docs/commands/count_min_sketch.md).
- [HyperLogLog](./docs/commands/hyperloglog.md) by staggered HyperLogLog with the ability to expire.
+- [Spread Sketch](./docs/commands/spread_sketch.md) by Spread Sketch with the ability to expire.
# Getting started
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 8ee7449..84fe726 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -26,7 +26,7 @@ add_definitions(-fPIC)
set(SWARMKV_SRC swarmkv_cmd_spec.c swarmkv.c swarmkv_api.c
swarmkv_mesh.c swarmkv_rpc.c swarmkv_message.c swarmkv_net.c
swarmkv_sync.c swarmkv_store.c swarmkv_keyspace.c swarmkv_monitor.c
- t_string.c t_set.c t_token_bucket.c t_hash.c t_bloom_filter.c t_cms.c t_hyperloglog.c
+ t_string.c t_set.c t_token_bucket.c t_hash.c t_bloom_filter.c t_cms.c t_hyperloglog.c t_spread_sketch.c
swarmkv_common.c swarmkv_utils.c future_promise.c http_client.c)
set(LIB_SOURCE_FILES
diff --git a/src/swarmkv.c b/src/swarmkv.c
index d3013ef..34f3ae8 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -26,6 +26,7 @@
#include "t_bloom_filter.h"
#include "t_cms.h"
#include "t_hyperloglog.h"
+#include "t_spread_sketch.h"
#include "uthash.h"
#include "sds.h"
@@ -1177,6 +1178,29 @@ void command_spec_init(struct swarmkv *db)
1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
pfinfo_command, db->mod_store);
+ /*Spread sketch commands*/
+ swarmkv_command_table_register(db->mod_command_table, "SSINITBYDIM", "key width depth precision [TIME window-milliseconds]",
+ 4, 1, CMD_KEY_OW, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ ssinitbydim_command, db->mod_store);
+ swarmkv_command_table_register(db->mod_command_table, "SSINITBYCAPACITY", "key capacity precision [TIME window-milliseconds]*/",
+ 3, 1, CMD_KEY_OW, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ ssinitbycapacity_command, db->mod_store);
+ swarmkv_command_table_register(db->mod_command_table, "SSADD", "key entry item [item ...] ",
+ 3, 1, CMD_KEY_RW, REPLY_INT_MINORS1, AUTO_ROUTE,
+ ssadd_command, db->mod_store);
+ swarmkv_command_table_register(db->mod_command_table, "SSLIST", "key",
+ 1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ sslist_command, db->mod_store);
+ swarmkv_command_table_register(db->mod_command_table, "SSQUERY", "key entry",
+ 2, 1, CMD_KEY_RO, REPLY_INT_0, AUTO_ROUTE,
+ ssquery_command, db->mod_store);
+ swarmkv_command_table_register(db->mod_command_table, "SSMQUERY", "key entry [entry ...]",
+ 2, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ ssmquery_command, db->mod_store);
+ swarmkv_command_table_register(db->mod_command_table, "SSINFO", "key",
+ 1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ ssinfo_command, db->mod_store);
+
/* Debug Commands */
swarmkv_command_table_register(db->mod_command_table, "INFO", "[section]",
0, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE,
diff --git a/src/swarmkv_api.c b/src/swarmkv_api.c
index 882721f..00147e8 100644
--- a/src/swarmkv_api.c
+++ b/src/swarmkv_api.c
@@ -579,4 +579,38 @@ void swarmkv_cmsmquery(struct swarmkv *db, const char *key, size_t keylen, const
}
swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2 + n_items, argv, argv_len);
return;
-} \ No newline at end of file
+}
+
+void swarmkv_ssadd(struct swarmkv *db, const char *key, size_t keylen, const char *item, const size_t item_len, const char *members[], size_t members_len[], size_t n_member, swarmkv_on_reply_callback_t *cb, void *cb_arg)
+{
+ const char *argv[3 + n_member];
+ size_t argv_len[3 + n_member];
+ argv[0] = "SSADD";
+ argv_len[0] = strlen(argv[0]);
+ argv[1] = key;
+ argv_len[1] = keylen;
+ argv[2] = item;
+ argv_len[2] = item_len;
+ for (size_t i = 0; i < n_member; i++)
+ {
+ argv[3 + i] = members[i];
+ argv_len[3 + i] = members_len[i];
+ }
+ swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 3 + n_member, argv, argv_len);
+}
+
+void swarmkv_ssmquery(struct swarmkv *db, const char *key, size_t keylen, const char *items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t *cb, void *cb_arg)
+{
+ const char *argv[2 + n_items];
+ size_t argv_len[2 + n_items];
+ argv[0] = "SSMQUERY";
+ argv_len[0] = strlen(argv[0]);
+ argv[1] = key;
+ argv_len[1] = keylen;
+ for (size_t i = 0; i < n_items; i++)
+ {
+ argv[2 + i] = items[i];
+ argv_len[2 + i] = items_len[i];
+ }
+ swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2 + n_items, argv, argv_len);
+}
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c
index 8d0d563..d3319a7 100644
--- a/src/swarmkv_store.c
+++ b/src/swarmkv_store.c
@@ -112,6 +112,13 @@ struct swarmkv_obj_specs sobj_specs[__SWARMKV_OBJ_TYPE_MAX] =
.obj_merge_blob = (void (*)(void *, const char *, size_t))ST_hyperloglog_merge_blob,
.obj_replicate = (void *(*)(uuid_t, const char *, size_t))ST_hyperloglog_replicate,
.obj_size = (size_t(*)(const void *))ST_hyperloglog_mem_size},
+ {.type=OBJ_TYPE_SPREAD_SKETCH,
+ .type_name="spread-sketch",
+ .obj_free=(void (*)(void *))spread_sketch_free,
+ .obj_serialize=(void (*)(const void *, char **, size_t *))spread_sketch_serialize,
+ .obj_merge_blob=(void (*)(void *, const char *, size_t))spread_sketch_merge_blob,
+ .obj_replicate=(void * (*)(uuid_t, const char *, size_t))spread_sketch_replicate,
+ .obj_size=(size_t (*)(const void *))spread_sketch_calculate_memory_usage},
{.type = OBJ_TYPE_UNDEFINED,
.type_name = "undefined",
.obj_free = undefined_obj_free,
diff --git a/src/swarmkv_store.h b/src/swarmkv_store.h
index 0be7b61..6bde09a 100644
--- a/src/swarmkv_store.h
+++ b/src/swarmkv_store.h
@@ -16,6 +16,7 @@
#include "ap_bloom.h"
#include "cm_sketch.h"
#include "st_hyperloglog.h"
+#include "spread_sketch.h"
enum sobj_type
{
@@ -29,6 +30,7 @@ enum sobj_type
OBJ_TYPE_BLOOM_FILTER,
OBJ_TYPE_CMS,
OBJ_TYPE_HYPERLOGLOG,
+ OBJ_TYPE_SPREAD_SKETCH,
OBJ_TYPE_UNDEFINED,
__SWARMKV_OBJ_TYPE_MAX
};
@@ -49,6 +51,7 @@ struct sobj
struct AP_bloom *bloom;
struct CM_sketch *cms;
struct ST_hyperloglog *hll;
+ struct spread_sketch *spread_sketch;
void *raw;
};
};
diff --git a/src/t_spread_sketch.c b/src/t_spread_sketch.c
new file mode 100644
index 0000000..d6a7131
--- /dev/null
+++ b/src/t_spread_sketch.c
@@ -0,0 +1,307 @@
+#include <stdlib.h>
+#include <assert.h>
+#include <math.h>
+
+#include "swarmkv_common.h"
+#include "swarmkv_utils.h"
+#include "swarmkv_store.h"
+#include "swarmkv_error.h"
+#include "spread_sketch.h"
+
+enum cmd_exec_result ssinitbydim_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+ /* SSINITBYDIM key width depth precision [TIME window-milliseconds]*/
+ const sds key = cmd->argv[1];
+
+ long width = strtol(cmd->argv[2], NULL, 10);
+ if (width <= 0)
+ {
+ *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[2]);
+ return FINISHED;
+ }
+ long depth = strtol(cmd->argv[3], NULL, 10);
+ if (depth <= 0)
+ {
+ *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[3]);
+ return FINISHED;
+ }
+ long precision = strtol(cmd->argv[4], NULL, 10);
+ if (precision < HLL_MIN_PRECISION || precision > HLL_MAX_PRECISION)
+ {
+ *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[4]);
+ return FINISHED;
+ }
+ long long time_window_ms = 0;
+ if (cmd->argc == 7)
+ {
+ if (strncasecmp(cmd->argv[5], "TIME", 4) != 0)
+ {
+ *reply = swarmkv_reply_new_error(error_arg_string_should_be, cmd->argv[5], "TIME");
+ return FINISHED;
+ }
+ if (str2integer(cmd->argv[6], &time_window_ms) < 0)
+ {
+ *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[6]);
+ return FINISHED;
+ }
+ }
+
+ struct sobj *obj = store_lookup(mod_store, key);
+ if (!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+
+ if (obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ assert(obj->raw == NULL);
+
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ obj->spread_sketch = spread_sketch_new(width, depth, precision, time_window_ms, now);
+
+ obj->type = OBJ_TYPE_SPREAD_SKETCH;
+ *reply = swarmkv_reply_new_status("OK");
+ }
+ else
+ {
+ *reply = swarmkv_reply_new_array(0);
+ }
+ return FINISHED;
+}
+
+enum cmd_exec_result ssinitbycapacity_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+ /* SSINITBYCAPACITY key capacity precision [TIME window-milliseconds]*/
+ const sds key = cmd->argv[1];
+ long expected_superspreader_count = strtol(cmd->argv[2], NULL, 10);
+ if (expected_superspreader_count <= 0)
+ {
+ *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[2]);
+ return FINISHED;
+ }
+ long precision = strtol(cmd->argv[3], NULL, 10);
+ if (precision < HLL_MIN_PRECISION || precision > HLL_MAX_PRECISION)
+ {
+ *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[3]);
+ return FINISHED;
+ }
+ long long time_window_ms = 0; // when set to 0, it means no time decay is used
+ if (cmd->argc == 6)
+ {
+ if (strncasecmp(cmd->argv[4], "TIME", 4) != 0)
+ {
+ *reply = swarmkv_reply_new_error(error_arg_string_should_be, cmd->argv[4], "TIME");
+ return FINISHED;
+ }
+ if (str2integer(cmd->argv[5], &time_window_ms) < 0)
+ {
+ *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[5]);
+ return FINISHED;
+ }
+ }
+
+ struct sobj *obj = store_lookup(mod_store, key);
+ if (!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+
+ if (obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ assert(obj->raw == NULL);
+
+ struct timeval now;
+ gettimeofday(&now, NULL);
+
+ int width, depth;
+ unsigned char precision_dummy;
+ spread_sketch_recommend_parameters(expected_superspreader_count, &width, &depth, &precision_dummy);
+ obj->spread_sketch = spread_sketch_new(width, depth, precision, time_window_ms, now);
+
+ obj->type = OBJ_TYPE_SPREAD_SKETCH;
+ *reply = swarmkv_reply_new_status("OK");
+ }
+ else
+ {
+ *reply = swarmkv_reply_new_array(0);
+ }
+ return FINISHED;
+}
+
+enum cmd_exec_result ssadd_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+ // SSADD key entry item [item ...]
+ const sds key = cmd->argv[1];
+ struct sobj *obj = store_lookup(mod_store, key);
+ if (!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ if (obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
+ if (obj->type != OBJ_TYPE_SPREAD_SKETCH)
+ {
+ *reply = swarmkv_reply_new_error(error_wrong_type);
+ return FINISHED;
+ }
+
+ const sds entry_key = cmd->argv[2];
+
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ for (size_t i = 3; i < cmd->argc; i++)
+ {
+ spread_sketch_add(obj->spread_sketch, entry_key, sdslen(entry_key), cmd->argv[i], sdslen(cmd->argv[i]), NULL, now);
+ }
+
+ double est = spread_sketch_query(obj->spread_sketch, entry_key, sdslen(entry_key), now);
+ *reply = swarmkv_reply_new_integer((int)(est + 0.5));
+
+ store_mark_object_as_modified(mod_store, obj);
+ return FINISHED;
+}
+
+enum cmd_exec_result sslist_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+ // SSLIST key
+ const sds key = cmd->argv[1];
+
+ struct sobj *obj = store_lookup(mod_store, key);
+ if (!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ if (obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
+ if (obj->type != OBJ_TYPE_SPREAD_SKETCH)
+ {
+ *reply = swarmkv_reply_new_error(error_wrong_type);
+ return FINISHED;
+ }
+
+ struct spread_sketch *ss = obj->spread_sketch;
+ size_t n_entry;
+ char **entry_keys = NULL;
+ size_t *entry_keys_len = NULL;
+ spread_sketch_list_entries(ss, &entry_keys, &entry_keys_len, &n_entry);
+
+ *reply = swarmkv_reply_new_array(n_entry * 2);
+ for (size_t i = 0; i < n_entry; i++)
+ {
+ const char *entry_tmp = entry_keys[i];
+ size_t len_tmp = entry_keys_len[i];
+ int est_tmp = (int)(spread_sketch_get_cardinality(ss, entry_tmp, len_tmp) + 0.5);
+
+ (*reply)->elements[i * 2] = swarmkv_reply_new_string(entry_tmp, len_tmp);
+ (*reply)->elements[i * 2 + 1] = swarmkv_reply_new_integer(est_tmp);
+ }
+
+ free(entry_keys);
+ free(entry_keys_len);
+ return FINISHED;
+}
+
+enum cmd_exec_result ssmquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+ // SSMQUERY key entry [entry ...]
+ const sds key = cmd->argv[1];
+ struct sobj *obj = store_lookup(mod_store, key);
+ if (!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ if (obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
+ if (obj->type != OBJ_TYPE_SPREAD_SKETCH)
+ {
+ *reply = swarmkv_reply_new_error(error_wrong_type);
+ return FINISHED;
+ }
+
+ struct spread_sketch *ss = obj->spread_sketch;
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ *reply = swarmkv_reply_new_array(cmd->argc - 2);
+ for (size_t i = 2; i < cmd->argc; i++)
+ {
+ const sds entry_key = cmd->argv[i];
+ double est = spread_sketch_query(ss, entry_key, sdslen(entry_key), now);
+ (*reply)->elements[i - 2] = swarmkv_reply_new_integer((int)(est + 0.5));
+ }
+ return FINISHED;
+}
+enum cmd_exec_result ssquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+ // SSQUERY key entry
+ enum cmd_exec_result ret;
+ struct swarmkv_reply *tmp_reply = NULL;
+ ret = ssmquery_command(mod_store, cmd, &tmp_reply);
+ if (ret == FINISHED)
+ {
+ if (tmp_reply->type == SWARMKV_REPLY_ARRAY)
+ {
+ *reply = swarmkv_reply_dup(tmp_reply->elements[0]);
+ swarmkv_reply_free(tmp_reply);
+ }
+ else
+ {
+ *reply = tmp_reply;
+ }
+ }
+ return ret;
+}
+enum cmd_exec_result ssinfo_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+ // SSINFO key
+ const sds key = cmd->argv[1];
+
+ struct sobj *obj = store_lookup(mod_store, key);
+ if (!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ if (obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
+ if (obj->type != OBJ_TYPE_SPREAD_SKETCH)
+ {
+ *reply = swarmkv_reply_new_error(error_wrong_type);
+ return FINISHED;
+ }
+
+ struct spread_sketch *ss = obj->spread_sketch;
+ int depth, width, time_window_ms;
+ unsigned char precision;
+ spread_sketch_get_parameter(ss, &depth, &width, &precision, &time_window_ms);
+ double error_cms = 2.0 / (double)width;
+ double error_hll = ST_hyperloglog_error_for_precision(precision);
+ double probability = 1 / pow(2, depth);
+
+ int i = 0;
+ *reply = swarmkv_reply_new_array(14);
+ (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("Width");
+ (*reply)->elements[i++] = swarmkv_reply_new_integer(width);
+ (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("Depth");
+ (*reply)->elements[i++] = swarmkv_reply_new_integer(depth);
+ (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("Precision");
+ (*reply)->elements[i++] = swarmkv_reply_new_integer(precision);
+ (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("TimeWindowMs");
+ (*reply)->elements[i++] = swarmkv_reply_new_integer(time_window_ms);
+ (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("ErrorCMS");
+ (*reply)->elements[i++] = swarmkv_reply_new_double(error_cms);
+ (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("ErrorHLL");
+ (*reply)->elements[i++] = swarmkv_reply_new_double(error_hll);
+ (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("Probability");
+ (*reply)->elements[i++] = swarmkv_reply_new_double(probability);
+ assert(i == 14);
+
+ return FINISHED;
+} \ No newline at end of file
diff --git a/src/t_spread_sketch.h b/src/t_spread_sketch.h
new file mode 100644
index 0000000..e8f6180
--- /dev/null
+++ b/src/t_spread_sketch.h
@@ -0,0 +1,11 @@
+#pragma once
+#include "swarmkv_common.h"
+#include "swarmkv_cmd_spec.h"
+
+enum cmd_exec_result ssinitbydim_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result ssinitbycapacity_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result ssadd_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result sslist_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result ssinfo_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result ssquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result ssmquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); \ No newline at end of file
diff --git a/test/swarmkv_gtest.cpp b/test/swarmkv_gtest.cpp
index 7dae39e..033eeff 100644
--- a/test/swarmkv_gtest.cpp
+++ b/test/swarmkv_gtest.cpp
@@ -784,7 +784,7 @@ TEST_F(SwarmkvBasicTest, TypeBloomFilter)
EXPECT_EQ(reply->elements[i]->integer, 1);
}
}
-TEST_F(SwarmkvBasicTest, TypeCMS)
+TEST_F(SwarmkvBasicTest, TypeCountMinSketch)
{
struct swarmkv *db=SwarmkvBasicTest::db;
const char *key="cms-001";
@@ -976,6 +976,121 @@ TEST_F(SwarmkvBasicTest, HashTags)
}
}
+TEST_F(SwarmkvBasicTest, TypeSpreadSketch)
+{
+ struct swarmkv *db=SwarmkvBasicTest::db;
+ const char *key="ss-001";
+ struct swarmkv_reply *reply=NULL;
+ long long width=100, depth=2;
+ int precision=6;
+ reply=swarmkv_command(db, "SSINITBYDIM %s %lld %lld %d", key, width, depth,precision);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ swarmkv_reply_free(reply);
+
+ const char *entries[] = {"1.1.1.1", "2.2.2.2"};
+ const char *item_1[]={"item1_1", "item1_2","item1_3"};
+ const char *item_2[]={"item2_1"};
+ const int n_item[2] = {3, 1};
+ reply=swarmkv_command(db, "SSADD %s %s %s %s %s", key, entries[0], item_1[0], item_1[1], item_1[2]);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 3);
+ swarmkv_reply_free(reply);
+ reply=swarmkv_command(db, "SSADD %s %s %s", key, entries[1], item_2[0]);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 1);
+ swarmkv_reply_free(reply);
+
+ reply=swarmkv_command(db, "SSLIST %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_ARRAY);
+ EXPECT_EQ(reply->n_element, sizeof(entries)/sizeof(entries[0]) * 2);
+ for(size_t i=0; i<reply->n_element / 2; i++)
+ {
+ EXPECT_EQ(reply->elements[i*2]->type, SWARMKV_REPLY_STRING);
+ EXPECT_EQ(reply->elements[i*2+1]->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_STREQ(reply->elements[i*2]->str, entries[i]);
+ EXPECT_EQ(reply->elements[i*2+1]->integer, n_item[i]);
+ }
+ swarmkv_reply_free(reply);
+
+ reply=swarmkv_command(db, "SSINFO %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_ARRAY);
+ ASSERT_EQ(reply->n_element, 14);
+ ASSERT_EQ(reply->elements[1]->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->elements[1]->integer, depth);
+ ASSERT_EQ(reply->elements[3]->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->elements[3]->integer, width);
+ ASSERT_EQ(reply->elements[5]->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->elements[5]->integer, precision);
+ ASSERT_EQ(reply->elements[7]->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->elements[7]->integer, 0);
+ swarmkv_reply_free(reply);
+
+ reply=swarmkv_command(db, "type %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STRING);
+ EXPECT_STREQ(reply->str, "spread-sketch");
+ swarmkv_reply_free(reply);
+
+ reply=swarmkv_command(db, "DEL %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 1);
+ swarmkv_reply_free(reply);
+
+ // time-decay spread sketch
+ long long time_window_ms=1000;
+ reply=swarmkv_command(db, "SSINITBYCAPACITY %s %d %d TIME %lld", key, 10, 12, time_window_ms);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ swarmkv_reply_free(reply);
+
+ // add many entries, each 3 items
+ for (int i=0; i<1000; i++) {
+ reply=swarmkv_command(db, "SSADD %s old%d %d %d %d", key, i, 1, 2, 3);
+ swarmkv_reply_free(reply);
+ }
+
+ usleep(time_window_ms*1000 * 2);
+
+ // add other entries, they should replace the old entries.
+ for (int i=0; i<1000; i++) {
+ reply=swarmkv_command(db, "SSADD %s new%d %d %d %d", key, i, 1, 2, 3);
+ swarmkv_reply_free(reply);
+ }
+ reply=swarmkv_command(db, "SSLIST %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_ARRAY);
+ int error_cnt = 0;
+ for (size_t i=0; i<reply->n_element / 2; i++) {
+ EXPECT_EQ(reply->elements[2 * i]->type, SWARMKV_REPLY_STRING);
+ EXPECT_TRUE(strncmp(reply->elements[2 * i]->str, "new", 3) == 0);
+ EXPECT_EQ(reply->elements[2 * i + 1]->type, SWARMKV_REPLY_INTEGER);
+ if (reply->elements[2 * i + 1]->integer != 3) {
+ error_cnt++;
+ }
+ }
+ EXPECT_LE(error_cnt, 1); // allow 1 of element estimations to be wrong because of probabilistic sketch algorithm error.
+ swarmkv_reply_free(reply);
+
+ //API test
+ key="ss-api";
+ reply=swarmkv_command(db, "SSINITBYDIM %s %lld %lld %d TIME %lld", key, width, depth,precision, time_window_ms);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ swarmkv_reply_free(reply);
+ size_t item_2_len[] = {strlen(item_2[0])};
+
+ swarmkv_ssadd(db, key, strlen(key), entries[0], strlen(entries[0]), item_2, item_2_len, 1, copy_reply_callback, &reply);
+ swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 1);
+ swarmkv_reply_free(reply);
+
+ size_t entries_len[] = {strlen(entries[0]), strlen(entries[1])};
+ swarmkv_ssmquery(db, key, strlen(key), entries, entries_len, 2, copy_reply_callback, &reply);
+ swarmkv_caller_loop(db, SWARMKV_LOOP_ONCE, NULL);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_ARRAY);
+ ASSERT_EQ(reply->n_element, 2);
+ EXPECT_EQ(reply->elements[0]->integer, 1);
+ EXPECT_EQ(reply->elements[1]->integer, 0);
+ swarmkv_reply_free(reply);
+}
+
class SwarmkvTwoNodes : public testing::Test
{
@@ -1938,7 +2053,7 @@ TEST_F(SwarmkvTwoNodes, TypeBloomFilter)
EXPECT_EQ(reply->integer, 1);
swarmkv_reply_free(reply);
}
-TEST_F(SwarmkvTwoNodes, TypeCMS)
+TEST_F(SwarmkvTwoNodes, TypeCountMinSketch)
{
struct swarmkv *db[2];
db[0]=SwarmkvTwoNodes::db1;
@@ -2004,7 +2119,6 @@ TEST_F(SwarmkvTwoNodes, TypeHyperLogLog)
const char *key="hll-001";
const char *prefix[4]={"Philippe", "Flajolet", "Invents", "Hyperloglog"};
-
char precision=12;
reply=swarmkv_command(db[0], "PFINIT %s %d", key, precision);
ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
@@ -2026,6 +2140,42 @@ TEST_F(SwarmkvTwoNodes, TypeHyperLogLog)
}
}
}
+TEST_F(SwarmkvTwoNodes, TypeSpreadSketch)
+{
+ struct swarmkv *db[2];
+ db[0]=SwarmkvTwoNodes::db1;
+ db[1]=SwarmkvTwoNodes::db2;
+ struct swarmkv_reply *reply=NULL;
+ const char *key="ss-001";
+
+ const char *entries[]={"1.1.1.1", "deviceA", "1","2","3"};
+ int n_entry = sizeof(entries)/sizeof(entries[0]);
+ int n_loop = 10000;
+
+ reply=swarmkv_command(db[0], "SSINITBYCAPACITY %s 5", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ EXPECT_STREQ(reply->str, "OK");
+ swarmkv_reply_free(reply);
+ srand(171);
+ for(int i=0; i<n_loop; i++)
+ {
+ reply=swarmkv_command(db[i%2], "SSADD %s %s item%d", key, entries[i%n_entry], i);
+ swarmkv_reply_free(reply);
+ if(i!=0&&i%1000==0) {
+ wait_for_sync();
+ reply=swarmkv_command(db[(i+1)%2], "SSLIST %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_ARRAY);
+ EXPECT_EQ(reply->n_element, n_entry*2);
+ for (int j = 0; j < n_entry; j++) {
+ int add_on_entry = i/n_entry;
+ EXPECT_NEAR(reply->elements[j*2+1]->integer, add_on_entry, add_on_entry/10 + i/20); // 1/10: apprroximately the hll_error. 1/20: the error caused by CM sketch.
+ // printf("i : %d, entry: %s, cardinality: %lld\n", i, reply->elements[j*2]->str, reply->elements[j*2+1]->integer);
+ }
+
+ swarmkv_reply_free(reply);
+ }
+ }
+}
TEST_F(SwarmkvTwoNodes, Info)
{
struct swarmkv *db[2];
@@ -2390,6 +2540,8 @@ int main(int argc, char ** argv)
int ret=0;
g_current_thread_id=syscall(SYS_gettid);
::testing::InitGoogleTest(&argc, argv);
+ // ::testing::GTEST_FLAG(filter) = "-*.TypeSS";
+ // ::testing::GTEST_FLAG(filter) = "SwarmkvTwoNodes.TypeSS";
ret=RUN_ALL_TESTS();
return ret;
}