summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author郑超 <[email protected]>2024-03-30 04:18:41 +0000
committer郑超 <[email protected]>2024-03-30 04:18:41 +0000
commita2d902c3434fe0edb3ecc9d70a95b9d6b790f379 (patch)
tree6450b570c8510bf0a7127813ab5ec6aae593179e
parentaf8bbfb568851b144a00622810efb6bbff628389 (diff)
Add new data type: Count-Min Sketch. Rename `BFRESERVE` to `BFINIT` for consistency.
-rw-r--r--CRDT/CMakeLists.txt5
-rw-r--r--CRDT/ap_bloom.c36
-rw-r--r--CRDT/bulk_token_bucket.c2
-rw-r--r--CRDT/cm_sketch.c452
-rw-r--r--CRDT/cm_sketch.h32
-rw-r--r--CRDT/crdt_utils.c13
-rw-r--r--CRDT/crdt_utils.h16
-rw-r--r--CRDT/g_array.c5
-rw-r--r--CRDT/probabilistic_crdt_gtest.cpp224
-rw-r--r--CRDT/tb_crdt_gtest.cpp2
-rw-r--r--docs/commands/bloom_filter.md16
-rw-r--r--include/swarmkv/swarmkv.h1
-rw-r--r--readme.md2
-rw-r--r--src/CMakeLists.txt4
-rw-r--r--src/swarmkv.c72
-rw-r--r--src/swarmkv_error.h1
-rw-r--r--src/swarmkv_monitor.c38
-rw-r--r--src/swarmkv_store.c25
-rw-r--r--src/swarmkv_store.h5
-rw-r--r--src/t_bloom_filter.c7
-rw-r--r--src/t_bloom_filter.h2
-rw-r--r--src/t_cms.c322
-rw-r--r--src/t_cms.h10
-rw-r--r--src/t_hash.c6
-rw-r--r--src/t_set.c4
-rw-r--r--src/t_string.c6
-rw-r--r--src/t_token_bucket.c20
-rw-r--r--test/swarmkv_gtest.cpp161
-rw-r--r--test/test_utils.c2
29 files changed, 1073 insertions, 418 deletions
diff --git a/CRDT/CMakeLists.txt b/CRDT/CMakeLists.txt
index 412fc7e..b912f44 100644
--- a/CRDT/CMakeLists.txt
+++ b/CRDT/CMakeLists.txt
@@ -1,8 +1,9 @@
add_definitions(-D_GNU_SOURCE)
add_definitions(-fPIC)
-add_library(CRDT lww_register.c pn_counter.c or_map.c or_set.c cm_sketch.c st_hyperloglog.c ap_bloom.c
- g_array.c token_bucket_common.c oc_token_bucket.c fair_token_bucket.c bulk_token_bucket.c)
+add_library(CRDT lww_register.c pn_counter.c or_map.c or_set.c st_hyperloglog.c ap_bloom.c cm_sketch.c
+ g_array.c token_bucket_common.c oc_token_bucket.c fair_token_bucket.c bulk_token_bucket.c
+ crdt_utils.c)
include_directories(${PROJECT_SOURCE_DIR}/deps/mpack
${PROJECT_SOURCE_DIR}/deps/uthash
diff --git a/CRDT/ap_bloom.c b/CRDT/ap_bloom.c
index a7356f6..51014e9 100644
--- a/CRDT/ap_bloom.c
+++ b/CRDT/ap_bloom.c
@@ -1,6 +1,5 @@
#include "ap_bloom.h"
#include "crdt_utils.h"
-#include "xxhash.h"
#include "utlist.h"
#include "utarray.h"
@@ -19,22 +18,6 @@
#define SLICE_EXPANSION 2
#define FILL_RATIO_THRESHOLD 0.1
#define MIN_CAPACITY 1000
-// Use double hashing for fast hashing,
-// reference: Kirsch, Adam, and Michael Mitzenmacher. "Less hashing, same performance: Building a better bloom filter."
-// Algorithms–ESA 2006: 14th Annual European Symposium, Zurich, Switzerland, September 11-13, 2006. Proceedings 14. Springer Berlin Heidelberg, 2006.
-// https://www.eecs.harvard.edu/~michaelm/postscripts/rsa2008.pdf
-struct bloom_hashval
-{
- uint64_t a;
- uint64_t b;
-};
-struct bloom_hashval bloom_calc_hash(const void *buffer, int len)
-{
- struct bloom_hashval rv;
- rv.a=XXH3_64bits_withSeed(buffer, len, 0x9747b28c);
- rv.b=XXH3_64bits_withSeed(buffer, len, rv.a);
- return rv;
-}
//The test_bit_set_bit() code snap is from RedisBloom (BSD License).
@@ -130,9 +113,9 @@ static struct ap_slice * ap_slice_duplicate(const struct ap_slice *slice)
return new_slice;
}
#define HASH_TO_OFFSET(hash, slice) ((hash.a + slice->hash_index * hash.b) % (slice->slice_size<<3))
-void ap_slice_add_hash(struct ap_slice *slice, struct bloom_hashval hash, struct timeval now)
+void ap_slice_add_hash(struct ap_slice *slice, struct double_hash hash, struct timeval now)
{
- int index = HASH_TO_OFFSET(hash, slice);
+ int index=double_hash_generate(&hash, slice->hash_index, slice->slice_size<<3);
int added = !test_bit_set_bit(slice->data, index, MODE_WRITE);
slice->last_insert = now;
@@ -143,10 +126,9 @@ void ap_slice_add_hash(struct ap_slice *slice, struct bloom_hashval hash, struct
slice->popcount += added;
return;
}
-int ap_slice_check_hash(const struct ap_slice *slice, struct bloom_hashval hash)
+int ap_slice_check_hash(const struct ap_slice *slice, struct double_hash hash)
{
- //int index = (hash.a + slice->hash_index * hash.b) % (slice->slice_size<<3);
- int index = HASH_TO_OFFSET(hash, slice);
+ int index=double_hash_generate(&hash, slice->hash_index, slice->slice_size<<3);
return test_bit_set_bit(slice->data, index, MODE_READ);
}
struct ap_slice_event
@@ -218,7 +200,7 @@ int ap_state_is_match(struct ap_state *state)
}
return 0;
}
-static void ap_slice_chain_check_hash(const struct ap_slice *head, struct bloom_hashval hash, struct ap_state *state)
+static void ap_slice_chain_check_hash(const struct ap_slice *head, struct double_hash hash, struct ap_state *state)
{
//In a stackable (scalable) Bloom filter, checking for membership now involves inspecting each layer for presence.
const struct ap_slice *slice=NULL;
@@ -258,7 +240,7 @@ static void ap_slice_chain_check_hash(const struct ap_slice *head, struct bloom_
* 0 - element was not present and was added
* 1 - element (or a collision) had already been added previously
*/
-static void ap_slice_chain_add_hash(struct ap_slice **head, struct bloom_hashval hash, struct timeval now)
+static void ap_slice_chain_add_hash(struct ap_slice **head, struct double_hash hash, struct timeval now)
{
//add new slice if the current slice is full
if( (double) (*head)->popcount / ((*head)->slice_size<<3) > FILL_RATIO_THRESHOLD)
@@ -550,7 +532,8 @@ static void slide_time(struct AP_bloom *bloom, struct timeval now)
void AP_bloom_add(struct AP_bloom *bloom, struct timeval now, const char *buffer, int len)
{
- struct bloom_hashval hash = bloom_calc_hash(buffer, len);
+ struct double_hash hash;
+ double_hash_init(&hash, buffer, len);
if(bloom->cfg.time_window_ms)
{
slide_time(bloom, now);
@@ -569,7 +552,8 @@ int AP_bloom_check(const struct AP_bloom *bloom, struct timeval now, const char
{
return 0;
}
- struct bloom_hashval hash = bloom_calc_hash(buffer, len);
+ struct double_hash hash;
+ double_hash_init(&hash, buffer, len);
int chain_num = bloom->chain_num;
struct ap_state state;
diff --git a/CRDT/bulk_token_bucket.c b/CRDT/bulk_token_bucket.c
index 5d9c1ca..7fb45e8 100644
--- a/CRDT/bulk_token_bucket.c
+++ b/CRDT/bulk_token_bucket.c
@@ -12,7 +12,7 @@
#define PERTURB_INTERVAL_MAX_MS 8000
#define BTB_ST_HLL_PRECISION 7
-#define BTB_ST_HLL_WINDOW_MIN_MS 1000
+#define BTB_ST_HLL_WINDOW_MIN_MS 10000
struct btb_configuration
{
diff --git a/CRDT/cm_sketch.c b/CRDT/cm_sketch.c
index 9cdaca3..06da71e 100644
--- a/CRDT/cm_sketch.c
+++ b/CRDT/cm_sketch.c
@@ -9,206 +9,103 @@
#include <assert.h>
#include <math.h> //pow
-#define CMS_MAX_DEPTH 16
-
struct cms_opt
{
int width;
int depth;
};
-static const struct cms_opt cms_default_opt = {
- .width=1024*16,
- .depth=6
-};
-static uint64_t __fnv_1a(const char *key, size_t len, int seed)
-{
- // FNV-1a hash (http://www.isthe.com/chongo/tech/comp/fnv/)
- size_t i;
- uint64_t h = 14695981039346656037ULL + (31 * seed); // FNV_OFFSET 64 bit with magic number seed
- for (i = 0; i < len; ++i){
- h = h ^ (unsigned char) key[i];
- h = h * 1099511628211ULL; // FNV_PRIME 64 bit
- }
- return h;
-}
-static int32_t __safe_add(int32_t a, int32_t b)
-{
- if (a == INT32_MAX || a == INT32_MIN) {
- return a;
- }
- int32_t c=0;
- c = ((int64_t) a + b > INT32_MAX) ? INT32_MAX : (a + b);
- return c;
-}
-static int32_t __safe_sub(int32_t a, int32_t b)
-{
- if (a == INT32_MAX || a == INT32_MIN) {
- return a;
- }
- int32_t c = 0;
- c = ((int64_t) b - a < INT32_MIN) ? INT32_MAX : (a - b);
- return c;
-}
-
-struct cms_bin
+struct cms_node
{
uuid_t uuid;
unsigned long long sequence;
uint64_t elements_added;
int width;
int depth;
+ int *array;
UT_hash_handle hh;
- int32_t *bins;
};
-struct cms_bin *cms_bin_new(uuid_t uuid, int width, int depth)
-{
- struct cms_bin *bin=NULL;
- bin=ALLOC(struct cms_bin, 1);
- uuid_copy(bin->uuid, uuid);
- bin->width=width;
- bin->depth=depth;
- bin->bins=ALLOC(int32_t, width*depth);
- return bin;
-}
-static int32_t cms_bin_query(const struct cms_bin *bin, const uint64_t *hashes, size_t n_hash)
-{
- if(n_hash!=bin->depth) return 0;
-
- int32_t added=INT32_MAX;
- for(int i=0; i<bin->depth; i++)
- {
- uint64_t idx=(hashes[i] % bin->width) + (i * bin->width);
- added=MIN(added, bin->bins[idx]);
- }
- return added;
-}
-#define CMLS16_BASE 1.00025
-double point_value(int32_t c)
-{
- if(c==0) return 0;
- return pow(CMLS16_BASE, c-1);
-}
-static int32_t cms_bin_log_query(const struct cms_bin *bin, const uint64_t *hashes, size_t n_hash)
-{
- if(n_hash!=bin->depth) return 0;
- int32_t c=INT32_MAX;
- for(int i=0; i<bin->depth; i++)
- {
- int32_t idx=(hashes[i] % bin->width) + (i * bin->width);
- c=MIN(c, bin->bins[idx]);
- }
- if(c<=1)
- {
- return point_value(c);
- }
- else
- {
- double v=point_value(c+1);
- v=(1-v)/(1-CMLS16_BASE);
- return (int32_t)v;
- }
+struct cms_node *cms_node_new(uuid_t uuid, int width, int depth)
+{
+ struct cms_node *node=ALLOC(struct cms_node, 1);
+ uuid_copy(node->uuid, uuid);
+ node->width=width;
+ node->depth=depth;
+ node->array=ALLOC(int, width*depth);
+ return node;
}
-int increase_decision(int32_t c)
+void cms_node_free(struct cms_node *node)
{
- long pr=(long)pow(CMLS16_BASE, c);
- long r=random()%pr;
- if(r==0) return 1;
- else return 0;
+ free(node->array);
+ free(node);
+ return;
}
-static void cms_bin_log_add(struct cms_bin *bin, const uint64_t *hashes, size_t n_hash, int32_t times)
+
+static void cms_node_incrby(struct cms_node *node, int *hash, int n_hash, int increment)
{
- if(n_hash!=bin->depth) return;
- int32_t c=INT32_MAX;
- for(int i=0; i<bin->depth; i++)
+ assert(n_hash==node->depth);
+ for(int i=0; i<node->depth; i++)
{
- int32_t idx=(hashes[i] % bin->width) + (i * bin->width);
- c=MIN(c, bin->bins[idx]);
+ assert(hash[i]>=0 && hash[i]<node->width);
+ int offset = hash[i] + i*node->width;
+ __builtin_add_overflow(node->array[offset], increment, &node->array[offset]);
+ //node->array[offset] += increment;
}
-
- long pr=(long)pow(CMLS16_BASE, c);
- long r=random()%pr;
- for(int t=0; t<times; t++)
- {
- int increase=increase_decision(c);
- if(!increase)
- continue;
- for(int i=0; i<bin->depth; i++)
- {
- int32_t idx=(hashes[i] % bin->width) + (i * bin->width);
- if(bin->bins[idx] == c)
- {
- bin->bins[idx]++;
- }
- }
- c++;
- }
- //bin->bins[cu_idx]+=times;
- bin->elements_added += times;
- bin->sequence++;
+ node->elements_added += increment;
+ node->sequence ++;
return;
}
-static int32_t cms_bin_add(struct cms_bin *bin, const uint64_t *hashes, size_t n_hash, int32_t times)
+static int cms_node_query(const struct cms_node *node, const int *hash, int n_hash)
{
- if(n_hash!=bin->depth) return 0;
- int32_t cu_idx=-1;//conservative update idx
- for(int i=0; i<bin->depth; i++)
+ assert(n_hash==node->depth);
+ int min_val=INT32_MAX;
+ for(int i=0; i<node->depth; i++)
{
- int32_t idx=(hashes[i] % bin->width) + (i * bin->width);
- if(cu_idx<0) cu_idx=idx;
- if(bin->bins[cu_idx] < bin->bins[idx])
- {
- cu_idx=idx;
- }
- if(times>0)
- {
- bin->bins[idx] = __safe_add(bin->bins[idx], times);
- }
- else
+ assert(hash[i]>=0 && hash[i]<node->width);
+ int offset = hash[i] + i*node->width;
+ if(min_val>node->array[offset])
{
- bin->bins[idx] = __safe_sub(bin->bins[idx], 0-times);
+ min_val=node->array[offset];
}
}
- //bin->bins[cu_idx]+=times;
- bin->elements_added += times;
- bin->sequence++;
- return bin->bins[cu_idx];
+ return min_val;
}
-static void cms_bin_free(struct cms_bin *bin)
+static void cms_node_reset(struct cms_node *node)
{
- free(bin->bins);
- free(bin);
+ memset(node->array, 0, sizeof(int)*node->width*node->depth);
+ node->elements_added=0;
+ node->sequence++;
+ return;
}
-static size_t cms_bin_blob_size(const struct cms_bin *bin)
+static size_t cms_node_blob_size(const struct cms_node *node)
{
- return offsetof(struct cms_bin, hh) + sizeof(int32_t)*bin->depth*bin->width;
+ return offsetof(struct cms_node, hh) + sizeof(int32_t)*node->depth*node->width;
}
-static size_t cms_bin_size(const struct cms_bin *bin)
+static size_t cms_node_size(const struct cms_node *node)
{
- return sizeof(struct cms_bin)+sizeof(int32_t)*bin->depth*bin->width;
+ return sizeof(struct cms_node)+sizeof(int32_t)*node->depth*node->width;
}
-static size_t cms_bin_serialize(const struct cms_bin *bin, char *buff, size_t buff_sz)
+static size_t cms_node_serialize(const struct cms_node *node, char *buff, size_t buff_sz)
{
size_t offset=0;
- assert(buff_sz>=cms_bin_blob_size(bin));
- memcpy(buff, bin, offsetof(struct cms_bin, hh));
- offset += offsetof(struct cms_bin, hh);
- memcpy(buff+offset, bin->bins, sizeof(int32_t)*bin->depth*bin->width);
- offset += sizeof(int32_t)*bin->depth*bin->width;
- assert(offset==cms_bin_blob_size(bin));
+ assert(buff_sz>=cms_node_blob_size(node));
+ memcpy(buff, node, offsetof(struct cms_node, hh));
+ offset += offsetof(struct cms_node, hh);
+ memcpy(buff+offset, node->array, sizeof(int)*node->depth*node->width);
+ offset += sizeof(int)*node->depth*node->width;
+ assert(offset==cms_node_blob_size(node));
return offset;
}
-static struct cms_bin *cms_bin_deserialize(const char *blob, size_t blob_sz)
+static struct cms_node *cms_node_deserialize(const char *blob, size_t blob_sz)
{
- struct cms_bin *bin=ALLOC(struct cms_bin, 1);
+ struct cms_node *node=ALLOC(struct cms_node, 1);
size_t offset=0;
- memcpy(bin, blob, offsetof(struct cms_bin, hh));
- offset += offsetof(struct cms_bin, hh);
- bin->bins=ALLOC(int32_t, bin->depth*bin->width);
- assert(sizeof(int32_t)*bin->depth*bin->width==blob_sz-offset);
- memcpy(bin->bins, blob+offset, sizeof(int32_t)*bin->depth*bin->width);
- return bin;
+ memcpy(node, blob, offsetof(struct cms_node, hh));
+ offset += offsetof(struct cms_node, hh);
+ node->array = ALLOC(int, node->depth*node->width);
+ memcpy(node->array, blob+offset, sizeof(int)*node->depth*node->width);
+ return node;
}
-static void cms_bin_merge(struct cms_bin *dst, const struct cms_bin *src)
+static void cms_node_merge(struct cms_node *dst, const struct cms_node *src)
{
assert(dst->depth==src->depth);
assert(dst->width==src->width);
@@ -216,97 +113,72 @@ static void cms_bin_merge(struct cms_bin *dst, const struct cms_bin *src)
{
dst->elements_added=src->elements_added;
dst->sequence=src->sequence;
- memcpy(dst->bins, src->bins, sizeof(int32_t)*dst->width*dst->depth);
+ memcpy(dst->array, src->array, sizeof(int)*dst->width*dst->depth);
}
return;
}
struct CM_sketch
{
- struct cms_opt opt;
uuid_t my_uuid;
- struct cms_bin *hash_bins;
- uint64_t hashes_max[CMS_MAX_DEPTH], hashes_min[CMS_MAX_DEPTH];
- int is_virgin;
+ struct cms_opt opt;
+ struct cms_node *hash_nodes;
};
-
-
-struct CM_sketch *CM_sketch_new(uuid_t my_id)
+struct CM_sketch *CM_sketch_new(uuid_t my_id, int width, int depth)
{
struct CM_sketch *cms=ALLOC(struct CM_sketch, 1);
uuid_copy(cms->my_uuid, my_id);
- memcpy(&cms->opt, &cms_default_opt, sizeof(cms_default_opt));
- cms->is_virgin=1;
+ cms->opt.width=width;
+ cms->opt.depth=depth;
return cms;
}
-static int cms_query(const struct CM_sketch *cms, const uint64_t *hashes, size_t n_hash)
+static int CM_sketch_query_hash(const struct CM_sketch *cms, const int *hashes, size_t n_hash)
{
int added_global=0;
- struct cms_bin *bin=NULL, *tmp=NULL;
- HASH_ITER(hh, cms->hash_bins, bin, tmp)
+ struct cms_node *node=NULL, *tmp=NULL;
+ HASH_ITER(hh, cms->hash_nodes, node, tmp)
{
- added_global+=cms_bin_query(bin, hashes, n_hash);
+ added_global += cms_node_query(node, hashes, n_hash);
}
return added_global;
}
-int CM_sketch_query(const struct CM_sketch *cms, const char *key, size_t len)
+int CM_sketch_query(const struct CM_sketch *cms, const char *item, size_t len)
{
- uint64_t hashes[CMS_MAX_DEPTH]={0};
- int added_global=0;
- assert(cms->opt.depth <= CMS_MAX_DEPTH);
+ struct double_hash dhash;
+ double_hash_init(&dhash, item, len);
+ int hash[cms->opt.depth];
for(int i=0; i<cms->opt.depth; i++)
{
- hashes[i]=XXH3_64bits_withSeed(key, len, i);
- //hashes[i]=__fnv_1a(key, len, i);
+ hash[i]=double_hash_generate(&dhash, i, cms->opt.width);
}
-
- added_global=cms_query(cms, hashes, cms->opt.depth);
- return added_global;
+ return CM_sketch_query_hash(cms, hash, cms->opt.depth);
}
-int CM_sketch_add_n(struct CM_sketch *cms, const char *key, size_t len, int times)
+int CM_sketch_incrby(struct CM_sketch *cms, const char *item, size_t len, int increment)
{
- struct cms_bin *bin=NULL;
- HASH_FIND(hh, cms->hash_bins, cms->my_uuid, sizeof(cms->my_uuid), bin);
- if(!bin)
- {
- bin=cms_bin_new(cms->my_uuid, cms->opt.width, cms->opt.depth);
- HASH_ADD_KEYPTR(hh, cms->hash_bins, bin->uuid, sizeof(bin->uuid), bin);
- }
-
- uint64_t hashes[CMS_MAX_DEPTH]={0};
- for(int i=0; i<cms->opt.depth && i<CMS_MAX_DEPTH; i++)
- {
- hashes[i]=XXH3_64bits_withSeed(key, len, i);
- //hashes[i]=__fnv_1a(key, len, i);
- }
-
- cms_bin_add(bin, hashes, cms->opt.depth, times);
- long long added=cms_query(cms, hashes, cms->opt.depth);
- long long max, min;
- max=cms_query(cms, cms->hashes_max, cms->opt.depth);
- if(added>max || cms->is_virgin)
+ struct double_hash dhash;
+ double_hash_init(&dhash, item, len);
+ int hash[cms->opt.depth];
+ for(int i=0; i<cms->opt.depth; i++)
{
- memcpy(cms->hashes_max, hashes, sizeof(cms->hashes_max));
+ hash[i]=double_hash_generate(&dhash, i, cms->opt.width);
}
- min=cms_query(cms, cms->hashes_min, cms->opt.depth);
- if(added<min || cms->is_virgin)
+ struct cms_node *node=NULL;
+ HASH_FIND(hh, cms->hash_nodes, cms->my_uuid, sizeof(cms->my_uuid), node);
+ if(!node)
{
- memcpy(cms->hashes_min, hashes, sizeof(cms->hashes_min));
+ node=cms_node_new(cms->my_uuid, cms->opt.width, cms->opt.depth);
+ HASH_ADD_KEYPTR(hh, cms->hash_nodes, node->uuid, sizeof(node->uuid), node);
}
- cms->is_virgin=0;
- return added;
-}
-int CM_sketch_remove_n(struct CM_sketch *CM_sketch, const char *key, size_t len, int times)
-{
- return CM_sketch_add_n(CM_sketch, key, len, 0-times);
+ cms_node_incrby(node, hash, cms->opt.depth, increment);
+ return CM_sketch_query_hash(cms, hash, cms->opt.depth);
}
void CM_sketch_free(struct CM_sketch *cms)
{
- struct cms_bin *bin=NULL, *tmp=NULL;
- HASH_ITER(hh, cms->hash_bins, bin, tmp)
+ struct cms_node *node=NULL, *tmp=NULL;
+ HASH_ITER(hh, cms->hash_nodes, node, tmp)
{
- HASH_DELETE(hh, cms->hash_bins, bin);
- cms_bin_free(bin);
+ HASH_DELETE(hh, cms->hash_nodes, node);
+ cms_node_free(node);
}
free(cms);
return;
@@ -315,27 +187,42 @@ void CM_sketch_free(struct CM_sketch *cms)
size_t CM_sketch_size(const struct CM_sketch *cms)
{
size_t sz=0;
- struct cms_bin *bin=NULL, *tmp=NULL;
- HASH_ITER(hh, cms->hash_bins, bin, tmp)
+ struct cms_node *node=NULL, *tmp=NULL;
+ HASH_ITER(hh, cms->hash_nodes, node, tmp)
{
- sz+=cms_bin_size(bin);
+ sz += cms_node_size(node);
}
- sz+=sizeof(struct CM_sketch);
+ sz += sizeof(struct CM_sketch);
return sz;
}
+size_t CM_sketch_blob_size(const struct CM_sketch *cms)
+{
+ size_t sz=0;
+ struct cms_node *node=NULL, *tmp=NULL;
+ sz += offsetof(struct CM_sketch, hash_nodes);
+ sz += sizeof(size_t);
+ HASH_ITER(hh, cms->hash_nodes, node, tmp)
+ {
+ sz += sizeof(size_t);
+ sz += cms_node_blob_size(node);
+ }
+ return sz;
+}
void CM_sketch_serialize(const struct CM_sketch *cms, char **blob, size_t *blob_sz)
{
- size_t mpack_buff_sz=CM_sketch_size(cms)+HASH_COUNT(cms->hash_bins)*sizeof(size_t);
+ size_t mpack_buff_sz=CM_sketch_blob_size(cms);
char *mpack_buff=ALLOC(char, mpack_buff_sz);
- struct cms_bin *bin=NULL, *tmp=NULL;
+ struct cms_node *node=NULL, *tmp=NULL;
size_t offset=0;
- *((size_t*)(mpack_buff+offset))=HASH_COUNT(cms->hash_bins);
- offset+=sizeof(size_t);
- HASH_ITER(hh, cms->hash_bins, bin, tmp)
+ memcpy(mpack_buff+offset, cms, offsetof(struct CM_sketch, hash_nodes));
+ offset += offsetof(struct CM_sketch, hash_nodes);
+ *((size_t*)(mpack_buff+offset))=HASH_COUNT(cms->hash_nodes);
+ offset += sizeof(size_t);
+ HASH_ITER(hh, cms->hash_nodes, node, tmp)
{
- *((size_t*)(mpack_buff+offset))=cms_bin_blob_size(bin);
+ *((size_t*)(mpack_buff+offset))=cms_node_blob_size(node);
offset += sizeof(size_t);
- offset += cms_bin_serialize(bin, mpack_buff+offset, mpack_buff_sz-offset);
+ offset += cms_node_serialize(node, mpack_buff+offset, mpack_buff_sz-offset);
}
*blob=mpack_buff;
*blob_sz=offset;
@@ -344,53 +231,41 @@ void CM_sketch_serialize(const struct CM_sketch *cms, char **blob, size_t *blob_
struct CM_sketch *CM_sketch_deserialize(const char *blob, size_t blob_sz)
{
struct CM_sketch *cms=ALLOC(struct CM_sketch, 1);
- size_t offset=0, bin_blob_sz=0;
+ size_t offset=0, node_blob_sz=0;
+ memcpy(cms, blob+offset, offsetof(struct CM_sketch, hash_nodes));
+ offset += offsetof(struct CM_sketch, hash_nodes);
size_t n_replica = *((size_t*)(blob+offset));
offset += sizeof(size_t);
- struct cms_bin *bin=NULL;
+ struct cms_node *node=NULL;
for(int i=0; i<n_replica; i++)
{
- bin_blob_sz=*((size_t*)(blob+offset));
+ node_blob_sz= *((size_t*)(blob+offset));
offset += sizeof(size_t);
- bin=cms_bin_deserialize(blob+offset, bin_blob_sz);
- offset+=bin_blob_sz;
- assert(bin_blob_sz==cms_bin_blob_size(bin));
- HASH_ADD_KEYPTR(hh, cms->hash_bins, bin->uuid, sizeof(bin->uuid), bin);
+ node=cms_node_deserialize(blob+offset, node_blob_sz);
+ offset += node_blob_sz;
+ assert(node_blob_sz==cms_node_blob_size(node));
+ assert(cms->opt.depth==node->depth);
+ assert(cms->opt.width==node->width);
+ HASH_ADD_KEYPTR(hh, cms->hash_nodes, node->uuid, sizeof(node->uuid), node);
}
return cms;
}
-int CM_sketch_elements(const struct CM_sketch *cms)
-{
- int total_count=0;
- struct cms_bin *bin=NULL, *tmp=NULL;
- HASH_ITER(hh, cms->hash_bins, bin, tmp)
- {
- total_count+=bin->elements_added;
- }
- return total_count;
-}
-int CM_sketch_max(const struct CM_sketch *cms)
-{
- int max=cms_query(cms, cms->hashes_max, cms->opt.depth);
- return max;
-}
-int CM_sketch_min(const struct CM_sketch *cms)
-{
- int min=cms_query(cms, cms->hashes_min, cms->opt.depth);
- return min;
-}
void CM_sketch_merge(struct CM_sketch *dst, const struct CM_sketch *src)
{
- struct cms_bin *src_bin=NULL, *dst_bin=NULL, *tmp=NULL;
- HASH_ITER(hh, src->hash_bins, src_bin, tmp)
+ struct cms_node *src_node=NULL, *dst_node=NULL, *tmp=NULL;
+ assert(dst->opt.width==src->opt.width);
+ assert(dst->opt.depth==src->opt.depth);
+ HASH_ITER(hh, src->hash_nodes, src_node, tmp)
{
- HASH_FIND(hh, dst->hash_bins, src_bin->uuid, sizeof(src_bin->uuid), dst_bin);
- if(!dst_bin)
+ assert(src_node->width==src->opt.width);
+ assert(src_node->depth==src->opt.depth);
+ HASH_FIND(hh, dst->hash_nodes, src_node->uuid, sizeof(src_node->uuid), dst_node);
+ if(!dst_node)
{
- dst_bin=cms_bin_new(src_bin->uuid, dst->opt.width, dst->opt.depth);
- HASH_ADD_KEYPTR(hh, dst->hash_bins, dst_bin->uuid, sizeof(dst_bin->uuid), dst_bin);
+ dst_node=cms_node_new(src_node->uuid, src_node->width, src_node->depth);
+ HASH_ADD_KEYPTR(hh, dst->hash_nodes, dst_node->uuid, sizeof(dst_node->uuid), dst_node);
}
- cms_bin_merge(dst_bin, src_bin);
+ cms_node_merge(dst_node, src_node);
}
}
void CM_sketch_merge_blob(struct CM_sketch *dst, const char *blob, size_t blob_sz)
@@ -410,9 +285,52 @@ void CM_sketch_info(const struct CM_sketch *cms, struct CM_sketch_info *info)
//https://cs.stackexchange.com/questions/44803/what-is-the-correct-way-to-determine-the-width-and-depth-of-a-count-min-sketch
info->width=cms->opt.width;
info->depth=cms->opt.depth;
- info->confidence = 1 - (1 / pow(2, info->depth));
- info->error_rate=2 / (double) info->width;
- info->n_replica=HASH_COUNT(cms->hash_bins);
- info->n_element=CM_sketch_elements(cms);
+ info->probability = 1 / pow(2, info->depth);
+ info->error=2 / (double) info->width;
+ info->n_replica=HASH_COUNT(cms->hash_nodes);
+ info->count=0;
+ struct cms_node *node=NULL, *tmp=NULL;
+ HASH_ITER(hh, cms->hash_nodes, node, tmp)
+ {
+ info->count += node->elements_added;
+ }
+ return;
+}
+void CM_sketch_list_replica(const struct CM_sketch *cms, uuid_t *replica, size_t *n_replica)
+{
+ struct cms_node *node=NULL, *tmp=NULL;
+ size_t i=0;
+ HASH_ITER(hh, cms->hash_nodes, node, tmp)
+ {
+ uuid_copy(replica[i], node->uuid);
+ i++;
+ }
+ *n_replica=i;
return;
+}
+int CM_sketch_clear_replica(struct CM_sketch *cms, uuid_t replica)
+{
+ struct cms_node *node=NULL;
+ // 'sizeof' on array function parameter 'replica' will return size of 'unsigned char *', so we use sizeof(uuid_t) instead.
+ HASH_FIND(hh, cms->hash_nodes, replica, sizeof(uuid_t), node);
+ if(!node)
+ {
+ return -1;
+ }
+ cms_node_reset(node);
+ return 0;
+}
+void CM_sketch_tool_dimension_by_prob(double error, double probability, int *width, int *depth)
+{
+ assert(error > 0 && error < 1);
+ assert(probability > 0 && probability < 1);
+
+ *width = ceil(2 / error);
+ *depth = ceil(log10f(probability) / log10f(0.5));
+}
+struct CM_sketch *CM_sketch_replicate(uuid_t uuid, const char *blob, size_t blob_sz)
+{
+ struct CM_sketch *cms=CM_sketch_deserialize(blob, blob_sz);
+ uuid_copy(cms->my_uuid, uuid);
+ return cms;
} \ No newline at end of file
diff --git a/CRDT/cm_sketch.h b/CRDT/cm_sketch.h
index 38d77c3..f9059a1 100644
--- a/CRDT/cm_sketch.h
+++ b/CRDT/cm_sketch.h
@@ -14,28 +14,40 @@ extern "C"
{
#endif
struct CM_sketch;
-struct CM_sketch *CM_sketch_new(uuid_t my_id);
+struct CM_sketch *CM_sketch_new(uuid_t my_id, int width, int depth);
void CM_sketch_free(struct CM_sketch *cms);
-int CM_sketch_query(const struct CM_sketch *cms, const char *key, size_t len);
-int CM_sketch_add_n(struct CM_sketch *cms, const char *key, size_t len, int times);
-int CM_sketch_remove_n(struct CM_sketch *cms, const char *key, size_t len, int times);
+int CM_sketch_query(const struct CM_sketch *cms, const char *item, size_t len);
+/*
+ * @param increment: the increment value, can be negative.
+*/
+int CM_sketch_incrby(struct CM_sketch *cms, const char *item, size_t len, int increment);
+
size_t CM_sketch_size(const struct CM_sketch *cms);
void CM_sketch_serialize(const struct CM_sketch *cms, char **blob, size_t *blob_sz);
struct CM_sketch *CM_sketch_deserialize(const char *blob, size_t blob_sz);
void CM_sketch_merge_blob(struct CM_sketch *dst, const char *blob, size_t blob_sz);
+struct CM_sketch *CM_sketch_replicate(uuid_t uuid, const char *blob, size_t blob_sz);
struct CM_sketch_info
{
int width;
int depth;
- double error_rate;
- double confidence;
- long long n_element;
+ double error;
+ double probability;
+ long long count;
long long n_replica;
};
void CM_sketch_info(const struct CM_sketch *cms, struct CM_sketch_info *info);
-int CM_sketch_elements(const struct CM_sketch *cms);
-int CM_sketch_max(const struct CM_sketch *cms);
-int CM_sketch_min(const struct CM_sketch *cms);
+/* Recommends width & depth for expected n different items,
+ with probability of an error - prob and over estimation
+ error - overEst (use 1 for max accuracy) */
+/*
+ * @param error: Estimate size of error. The error is a percent of total counted items. This effects the width of the sketch.
+ * @param probability: The desired probability for inflated count. This should be a decimal value between 0 and 1. This effects the depth of the sketch. For example, for a desired false positive rate of 0.1% (1 in 1000), error_rate should be set to 0.001. The closer this number is to zero, the greater the memory consumption per item and the more CPU usage per operation.
+*/
+void CM_sketch_tool_dimension_by_prob(double error, double probability, int *width, int *depth);
+/**/
+void CM_sketch_list_replica(const struct CM_sketch *cms, uuid_t *nodes, size_t *n_nodes);
+int CM_sketch_clear_replica(struct CM_sketch *cms, uuid_t node_uuid);
#ifdef __cplusplus
}
#endif \ No newline at end of file
diff --git a/CRDT/crdt_utils.c b/CRDT/crdt_utils.c
new file mode 100644
index 0000000..c796081
--- /dev/null
+++ b/CRDT/crdt_utils.c
@@ -0,0 +1,13 @@
+#include "crdt_utils.h"
+#include "xxhash.h"
+
+void double_hash_init(struct double_hash *rv, const void *buffer, int len)
+{
+ rv->a=XXH3_64bits_withSeed(buffer, len, 0x9747b28c);
+ rv->b=XXH3_64bits_withSeed(buffer, len, rv->a);
+ return;
+}
+int double_hash_generate(const struct double_hash *rv, int i, int m)
+{
+ return (rv->a + i * rv->b) % m;
+} \ No newline at end of file
diff --git a/CRDT/crdt_utils.h b/CRDT/crdt_utils.h
index 25e6576..951e44b 100644
--- a/CRDT/crdt_utils.h
+++ b/CRDT/crdt_utils.h
@@ -2,6 +2,8 @@
#include <stdlib.h> //calloc
#include <sys/time.h>//gettimeofday
+#include <stdint.h> //uint64_t
+
#define ALLOC(type, number) ((type *)calloc(sizeof(type), number))
#define FREE(p) {free(*p);*p=NULL;}
@@ -30,3 +32,17 @@
#define timeval_to_ms(t) ((t).tv_sec*1000+(t).tv_usec/1000)
#define likely(x) __builtin_expect((x),1)
#define unlikely(x) __builtin_expect((x),0)
+
+
+
+// Double hashing for fast hashing,
+// reference: Kirsch, Adam, and Michael Mitzenmacher. "Less hashing, same performance: Building a better bloom filter."
+// Algorithms–ESA 2006: 14th Annual European Symposium, Zurich, Switzerland, September 11-13, 2006. Proceedings 14. Springer Berlin Heidelberg, 2006.
+// https://www.eecs.harvard.edu/~michaelm/postscripts/rsa2008.pdf
+struct double_hash
+{
+ uint64_t a;
+ uint64_t b;
+};
+void double_hash_init(struct double_hash *rv, const void *buffer, int len);
+int double_hash_generate(const struct double_hash *rv, int i, int m); \ No newline at end of file
diff --git a/CRDT/g_array.c b/CRDT/g_array.c
index 797ce15..cf696b0 100644
--- a/CRDT/g_array.c
+++ b/CRDT/g_array.c
@@ -208,7 +208,10 @@ void g_array_merge(struct g_array *dst, const struct g_array *src)
if(src_item->sequence > dst_item->sequence)
{
memcpy(dst_item, src_item, G_ARRAY_ITEM_HEADER_SIZE);
- dst_item->array=realloc(dst_item->array, sizeof(struct counter_item)*dst_item->array_sz);
+ if(dst_item->array_sz < src_item->array_sz)
+ {
+ dst_item->array=realloc(dst_item->array, sizeof(struct counter_item)*dst_item->array_sz);
+ }
memcpy(dst_item->array, src_item->array, sizeof(struct counter_item)*dst_item->array_sz);
}
}
diff --git a/CRDT/probabilistic_crdt_gtest.cpp b/CRDT/probabilistic_crdt_gtest.cpp
index 8fd4de0..e2e423b 100644
--- a/CRDT/probabilistic_crdt_gtest.cpp
+++ b/CRDT/probabilistic_crdt_gtest.cpp
@@ -7,15 +7,17 @@
#include <unistd.h> //usleep
#include <uuid/uuid.h>
#include <math.h>
+#define CMS_TEST_WIDTH (1024*16)
+#define CMS_TEST_DEPTH 6
TEST(CMSketch, Basic)
{
uuid_t uuid;
uuid_generate(uuid);
- struct CM_sketch *cms=CM_sketch_new(uuid);
+ struct CM_sketch *cms=CM_sketch_new(uuid, CMS_TEST_WIDTH, CMS_TEST_DEPTH);
int ret=0, n_added=0;
for(int i=0; i<10; i++)
{
- ret=CM_sketch_add_n(cms, (char *)&i, sizeof(i), i+1);
+ ret=CM_sketch_incrby(cms, (char *)&i, sizeof(i), i+1);
n_added += i+1;
}
for(int i=0; i<10; i++)
@@ -25,7 +27,7 @@ TEST(CMSketch, Basic)
}
for(int i=0; i<10; i++)
{
- ret=CM_sketch_remove_n(cms, (char *)&i, sizeof(i), i);
+ ret=CM_sketch_incrby(cms, (char *)&i, sizeof(i), (0-i));
n_added -= i;
}
for(int i=0; i<10; i++)
@@ -35,20 +37,34 @@ TEST(CMSketch, Basic)
}
struct CM_sketch_info info;
CM_sketch_info(cms, &info);
- EXPECT_EQ(info.n_element, n_added);
- printf("error_rate: %f confidence: %f\n", info.error_rate, info.confidence);
+ EXPECT_EQ(info.count, n_added);
+ printf("error_rate: %f probability: %f\n", info.error, info.probability);
+ CM_sketch_free(cms);
+}
+TEST(CMSketch, DimByProb)
+{
+ uuid_t uuid;
+ uuid_generate(uuid);
+ int width=0, depth=0;
+ double error = 0.002, prob=0.001;
+ CM_sketch_tool_dimension_by_prob(error, prob, &width, &depth);
+ struct CM_sketch *cms=CM_sketch_new(uuid, width, depth);
+ struct CM_sketch_info info;
+ CM_sketch_info(cms, &info);
+ EXPECT_NEAR(info.error, error, error/2);
+ EXPECT_NEAR(info.probability, prob, prob/2);
CM_sketch_free(cms);
}
TEST(CMSketch, I5K)
{
uuid_t uuid;
uuid_generate(uuid);
- struct CM_sketch *cms=CM_sketch_new(uuid);
+ struct CM_sketch *cms=CM_sketch_new(uuid, CMS_TEST_WIDTH, CMS_TEST_DEPTH);
long long n_item=5000, ret=0;
long long base=10000, total_add=0;
for(long long i=0; i<n_item; i++)
{
- ret=CM_sketch_add_n(cms, (char *)&i, sizeof(i), base+i);
+ ret=CM_sketch_incrby(cms, (char *)&i, sizeof(i), base+i);
total_add+=(base+i);
}
struct CM_sketch_info info;
@@ -58,12 +74,12 @@ TEST(CMSketch, I5K)
{
ret=CM_sketch_query(cms, (char *)&i, sizeof(i));
//The formal expectation is total_add*info.error_rate
- if(abs(ret-(i+base))< (i+base)*info.error_rate)
+ if(abs(ret-(i+base))< (i+base)*info.error)
{
pass++;
}
}
- EXPECT_NEAR(pass, n_item, n_item*(1-info.confidence));
+ EXPECT_NEAR(pass, n_item, n_item*(1-info.probability));
CM_sketch_free(cms);
}
static void CMS_sync(struct CM_sketch *list[], size_t n)
@@ -86,27 +102,47 @@ static void CMS_sync(struct CM_sketch *list[], size_t n)
}
TEST(CMSketch, Merge)
{
- size_t replica_number=2, round=10;
- long long key=1234;
+ int replica_number=3, round=10;
+ int n_item=4;
+ const char *items[n_item]={"hello", "world", "good", "morning"};
+ int count[n_item]={400, 300, 200, 100001};
struct CM_sketch *cms[replica_number];
uuid_t uuid;
- for(size_t i=0; i<replica_number; i++)
+ uuid_generate(uuid);
+ cms[0]=CM_sketch_new(uuid, CMS_TEST_WIDTH, CMS_TEST_DEPTH);
+ char *blob=NULL;
+ size_t blob_sz=0;
+ CM_sketch_serialize(cms[0], &blob, &blob_sz);
+ for(int i=1; i<replica_number; i++)
{
uuid_generate(uuid);
- cms[i]=CM_sketch_new(uuid);
+ cms[i]=CM_sketch_replicate(uuid, blob, blob_sz);
}
- for(size_t i=0; i<round; i++)
+ free(blob);
+ blob=NULL;
+ for(int i=0; i<n_item; i++)
{
- CM_sketch_add_n(cms[i%replica_number], (char*) &key, sizeof(key), 1);
+ CM_sketch_incrby(cms[i%replica_number], items[i], strlen(items[i]), count[i]);
}
CMS_sync(cms, replica_number);
int ret=0;
- for(size_t i=0; i<replica_number; i++)
+ for(int i=0; i<round; i++)
{
- ret=CM_sketch_query(cms[i], (char *)&key, sizeof(key));
- EXPECT_EQ(ret, round);
+ int id=i%n_item;
+ ret=CM_sketch_query(cms[i%replica_number], items[id], strlen(items[id]));
+ EXPECT_EQ(ret, count[id]);
}
- for(size_t i=0; i<replica_number; i++)
+ for(int i=0; i<n_item; i++)
+ {
+ CM_sketch_incrby(cms[(i+1)%replica_number], items[i], strlen(items[i]), 0-count[i]);
+ }
+ CMS_sync(cms, replica_number);
+ for(int i=0; i<n_item; i++)
+ {
+ ret=CM_sketch_query(cms[i%replica_number], items[i], strlen(items[i]));
+ EXPECT_EQ(ret, 0);
+ }
+ for(int i=0; i<replica_number; i++)
{
CM_sketch_free(cms[i]);
}
@@ -120,12 +156,12 @@ TEST(CMSketch, Idempotent)
for(i=0; i<replica_number; i++)
{
uuid_generate(uuid);
- cms[i]=CM_sketch_new(uuid);
+ cms[i]=CM_sketch_new(uuid, CMS_TEST_WIDTH, CMS_TEST_DEPTH);
}
int n_added=0;
for(i=0; i<round; i++)
{
- CM_sketch_add_n(cms[i%replica_number], (char*) &i, sizeof(i), i);
+ CM_sketch_incrby(cms[i%replica_number], (char*) &i, sizeof(i), i);
n_added+=i;
}
CMS_sync(cms, replica_number);
@@ -138,17 +174,159 @@ TEST(CMSketch, Idempotent)
for(i=0; i<round; i++)
{
ret=CM_sketch_query(cms[(i+1)%replica_number], (char*) &i, sizeof(i));
- if((double)ret<((double)i+info.error_rate*n_added))
+ if((double)ret<((double)i+info.error*n_added))
{
success++;
}
}
- EXPECT_GE((double)success/round, info.confidence);
+ EXPECT_GE((double)success/round, info.probability);
for(i=0; i<replica_number; i++)
{
CM_sketch_free(cms[i]);
}
}
+TEST(CMSketch, Decrement)
+{
+ size_t replica_number=8, round=10000, i=0;
+ struct CM_sketch *cms[replica_number];
+ uuid_t uuid;
+
+ for(i=0; i<replica_number; i++)
+ {
+ uuid_generate(uuid);
+ cms[i]=CM_sketch_new(uuid, CMS_TEST_WIDTH, CMS_TEST_DEPTH);
+ }
+ int n_added=0;
+ for(i=0; i<round; i++)
+ {
+ CM_sketch_incrby(cms[i%replica_number], (char*) &i, sizeof(i), i*2);
+ n_added+=i*2;
+ }
+ CMS_sync(cms, replica_number);
+ int ret=0;
+ size_t success=0;
+ struct CM_sketch_info info;
+ CM_sketch_info(cms[0], &info);
+ EXPECT_EQ(info.count, n_added);
+ for(i=0; i<round; i++)
+ {
+ CM_sketch_incrby(cms[(i+1)%replica_number], (char*) &i, sizeof(i), 0-i);
+ n_added -= i;
+ }
+ CMS_sync(cms, replica_number);
+ CM_sketch_info(cms[0], &info);
+ EXPECT_EQ(info.count, n_added);
+ for(i=0; i<round; i++)
+ {
+ ret=CM_sketch_query(cms[(i+1)%replica_number], (char*) &i, sizeof(i));
+ if((double)ret<((double)i+info.error*n_added))
+ {
+ success++;
+ }
+ }
+ EXPECT_GE((double)success/round, 1-info.probability);
+ for(i=0; i<replica_number; i++)
+ {
+ CM_sketch_free(cms[i]);
+ }
+
+}
+TEST(CMSketch, Accuracy)
+{
+ size_t replica_number=8, round=10000, i=0;
+ struct CM_sketch *cms[replica_number];
+ uuid_t uuid;
+
+ for(i=0; i<replica_number; i++)
+ {
+ uuid_generate(uuid);
+ cms[i]=CM_sketch_new(uuid, CMS_TEST_WIDTH, CMS_TEST_DEPTH);
+ }
+ int n_added=0;
+ for(i=0; i<round; i++)
+ {
+ CM_sketch_incrby(cms[i%replica_number], (char*) &i, sizeof(i), i);
+ n_added+=i;
+ }
+ CMS_sync(cms, replica_number);
+ int ret=0;
+ size_t success=0;
+ struct CM_sketch_info info;
+ CM_sketch_info(cms[0], &info);
+ EXPECT_EQ(info.count, n_added);
+ for(i=0; i<round; i++)
+ {
+ ret=CM_sketch_query(cms[(i+1)%replica_number], (char*) &i, sizeof(i));
+ if((double)ret<((double)i+info.error*n_added))
+ {
+ success++;
+ }
+ }
+ EXPECT_GE((double)success/round, 1-info.probability);
+ for(i=0; i<replica_number; i++)
+ {
+ CM_sketch_free(cms[i]);
+ }
+}
+TEST(CMSketch, ResetReplica)
+{
+ size_t replica_number=2, round=10000, i=0;
+ struct CM_sketch *cms[replica_number];
+ int added[replica_number]={0};
+ uuid_t uuid[replica_number];
+
+ for(i=0; i<replica_number; i++)
+ {
+ uuid_generate(uuid[i]);
+ cms[i]=CM_sketch_new(uuid[i], CMS_TEST_WIDTH, CMS_TEST_DEPTH);
+ }
+
+ int n_added=0;
+ for(i=0; i<round; i++)
+ {
+ CM_sketch_incrby(cms[i%replica_number], (char*) &i, sizeof(i), i);
+ added[i%replica_number]+=i;
+ n_added+=i;
+ }
+ CMS_sync(cms, replica_number);
+ struct CM_sketch_info info;
+ CM_sketch_info(cms[0], &info);
+ EXPECT_EQ(info.count, n_added);
+ EXPECT_EQ(info.n_replica, replica_number);
+
+ uuid_t replica_uuid[replica_number];
+ size_t n_replica=replica_number;
+ CM_sketch_list_replica(cms[0], replica_uuid, &n_replica);
+ EXPECT_EQ(n_replica, replica_number);
+ int match=0;
+ for(i=0; i<replica_number; i++)
+ {
+ for(int j=0; j<(int)replica_number; j++)
+ {
+ if(0==uuid_compare(replica_uuid[i], uuid[j]))
+ {
+ match++;
+ continue;
+ }
+ }
+ }
+ EXPECT_EQ(match, replica_number);
+
+ int ret=0;
+ srand(171);
+ i=rand()%round;
+ ret=CM_sketch_query(cms[0], (char*) &i, sizeof(i));
+ EXPECT_NEAR(ret, i, info.error*n_added);
+ CM_sketch_clear_replica(cms[0], uuid[i%replica_number]);
+ ret=CM_sketch_query(cms[0], (char*) &i, sizeof(i));
+ EXPECT_EQ(ret, 0);
+
+ for(i=0; i<replica_number; i++)
+ {
+ CM_sketch_free(cms[i]);
+ }
+
+}
void st_hll_sync(struct ST_hyperloglog *list[], size_t n)
{
char *blob=NULL;
diff --git a/CRDT/tb_crdt_gtest.cpp b/CRDT/tb_crdt_gtest.cpp
index 8d3326b..aac7f1d 100644
--- a/CRDT/tb_crdt_gtest.cpp
+++ b/CRDT/tb_crdt_gtest.cpp
@@ -1672,7 +1672,7 @@ TEST(BulkTokenBucket, HighCollision)
}
bulk_token_bucket_test_print_result(test, n_case);
}
-TEST(BulkTokenBucket, VariousCIR)
+TEST(BulkTokenBucket, VariousRate)
{
int n_case=5;
struct btb_case test[n_case];
diff --git a/docs/commands/bloom_filter.md b/docs/commands/bloom_filter.md
index ae18e8f..0fe9474 100644
--- a/docs/commands/bloom_filter.md
+++ b/docs/commands/bloom_filter.md
@@ -1,15 +1,15 @@
## Bloom Filter
-### BFRESERVE
+### BFINIT
Syntax
```
-BFRESERVE key error_rate capacity [TIME window-milliseconds slice-number]
+BFINIT key error capacity [TIME window-milliseconds slice-number]
```
- Create an empty bloom filter that can keep elements within a sliding time window with following arguments:
- - error_rate -- Probability of collision (as long as the capacity is not exceeded). Should be between 0.000001 to 0.1.
+ Initialize an empty bloom filter that can keep elements within a sliding time window with following arguments:
+ - error -- Probability of collision (as long as the capacity is not exceeded). Should be between 0.000001 to 0.1.
- capacity -- The expected number of element which will be inserted. At least 1000. If the actual number of elements exceeds the capacity, the bloom filter will expand.
- TIME -- Create a time-limited bloom filter. The following arguments are required:
- window-milliseconds -- The duration of the time window in milliseconds. Items in time range (now - window-milliseconds, now] are kept.
@@ -86,10 +86,10 @@ Syntax
BFINFO key
```
Returns information about a Bloom filter.
-- ErrorRate -- error_rate arguments specified in BFRESERVE.
-- Capacity -- capacity arguments specified in BFRESERVE.
-- TimeWindowMs -- window-milliseconds arguments specified in BFRESERVE.
-- TimeSlices -- slice-number arguments specified in BFRESERVE.
+- Error -- error_rate arguments specified in `BFINIT`.
+- Capacity -- capacity arguments specified in `BFINIT`.
+- TimeWindowMs -- window-milliseconds arguments specified in `BFINIT`.
+- TimeSlices -- slice-number arguments specified in `BFINIT`.
- HashNum -- Hash function number caculated by the error_rate, which is log2(1/error_rate).
- TotalSlices -- The actual slice number after expansion.
- MaxExpansionTimes -- The maximum expansion times of every time slice.
diff --git a/include/swarmkv/swarmkv.h b/include/swarmkv/swarmkv.h
index b869bf1..9b1e2e1 100644
--- a/include/swarmkv/swarmkv.h
+++ b/include/swarmkv/swarmkv.h
@@ -170,6 +170,7 @@ char *swarmkv_get_command_hint(struct swarmkv *db, const char* cmd_name);
const char *swarmkv_self_address(const struct swarmkv *db);
+void swarmkv_self_uuid(const struct swarmkv *db, char buff[37]);
#ifdef __cplusplus
} /* end extern "C" */
#endif
diff --git a/readme.md b/readme.md
index 7a0c9b8..ba95891 100644
--- a/readme.md
+++ b/readme.md
@@ -33,7 +33,7 @@ SwarmKV Data Types
- Fair Token Bucket: Implements stochastic fairness allocation to ensure equitable resource distribution.
- Bulk Token Bucket: Optimized for scenarios requiring a large number of token buckets with identical configurations.
- Bloom Filter with the ability to expire.
-- Count-min Sketch [Todo]
+- Count-min Sketch
- HyperLogLog [Todo]
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 5518414..6c8b3e5 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -1,5 +1,5 @@
set(SWARMKV_MAJOR_VERSION 4)
-set(SWARMKV_MINOR_VERSION 1)
+set(SWARMKV_MINOR_VERSION 2)
set(SWARMKV_PATCH_VERSION 0)
set(SWARMKV_VERSION ${SWARMKV_MAJOR_VERSION}.${SWARMKV_MINOR_VERSION}.${SWARMKV_PATCH_VERSION})
@@ -26,7 +26,7 @@ add_definitions(-fPIC)
set(SWARMKV_SRC swarmkv.c swarmkv_api.c
swarmkv_mesh.c swarmkv_rpc.c swarmkv_message.c swarmkv_net.c
swarmkv_store.c swarmkv_sync.c swarmkv_keyspace.c swarmkv_monitor.c
- t_string.c t_set.c t_token_bucket.c t_hash.c t_bloom_filter.c
+ t_string.c t_set.c t_token_bucket.c t_hash.c t_bloom_filter.c t_cms.c
swarmkv_common.c swarmkv_utils.c future_promise.c http_client.c)
set(LIB_SOURCE_FILES
diff --git a/src/swarmkv.c b/src/swarmkv.c
index 5cc1356..382f56e 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -24,7 +24,7 @@
#include "t_hash.h"
#include "t_token_bucket.h"
#include "t_bloom_filter.h"
-
+#include "t_cms.h"
#include "uthash.h"
#include "sds.h"
@@ -62,7 +62,6 @@ struct swarmkv
{
struct swarmkv_module module;
char db_name[SWARMKV_SYMBOL_MAX];
- uuid_t bin_uuid;
struct swarmkv_options *opts;
int thread_counter;
@@ -879,6 +878,20 @@ void __on_msg_callback(struct swarmkv_msg *msg, void *arg)
}
}
}
+static const char *exec_ret2string(enum cmd_exec_result ret)
+{
+ switch(ret)
+ {
+ case NEED_KEY_ROUTE:
+ return "NEED_KEY_ROUTE";
+ case REDIRECT:
+ return "REDIRECT";
+ case FINISHED:
+ return "FINISHED";
+ default:
+ assert(0);
+ }
+}
#define INTER_THREAD_RPC_TIMEOUT_AHEAD 1000
void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swarmkv_cmd *cmd, struct future *future_of_caller)
{
@@ -970,7 +983,18 @@ void __exec_cmd(struct swarmkv *db, const node_t *target_node, const struct swar
clock_gettime(CLOCK_MONOTONIC_COARSE, &end);
swarmkv_monitor_record_command(db->mod_monitor, spec->name, timespec_diff_usec(&start, &end));
-
+ //if(strcasestr(spec->name, "CRDT"))
+ if(0){
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ printf("%ld.%.6ld %s %d %s %s %s\n",
+ now.tv_sec, now.tv_usec,
+ db->self.addr,
+ db->threads[cur_tid].recusion_depth,
+ spec->name,
+ spec->key_offset<0?"NULL":cmd->argv[spec->key_offset],
+ exec_ret2string(exec_ret));
+ }
switch(exec_ret)
{
case FINISHED:
@@ -1117,7 +1141,7 @@ void command_spec_init(struct swarmkv *db)
3, 1, CMD_KEY_OW, REPLY_ERROR, AUTO_ROUTE,
hincrby_command, db->mod_store);
- /* Token bucket commands */
+ /* Token Buckets commands */
command_register(&(db->command_table), "TCFG", "key rate capacity [PD seconds]",
3, 1, CMD_KEY_OW, REPLY_ERROR, AUTO_ROUTE,
tcfg_command, db->mod_store);
@@ -1146,10 +1170,10 @@ void command_spec_init(struct swarmkv *db)
1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
btinfo_command, db->mod_store);
- /*Bloom filter commands*/
- command_register(&(db->command_table), "BFRESERVE", "key error_rate capacity [TIME window-milliseconds slice-number]",
+ /*Bloom Filter commands*/
+ command_register(&(db->command_table), "BFINIT", "key error capacity [TIME window-milliseconds slice-number]",
3, 1, CMD_KEY_OW, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
- bfreserve_command, db->mod_store);
+ bfinit_command, db->mod_store);
command_register(&(db->command_table), "BFADD", "key item [item ...]",
2, 1, CMD_KEY_RW, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
bfadd_command, db->mod_store);
@@ -1166,6 +1190,32 @@ void command_spec_init(struct swarmkv *db)
1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
bfinfo_command, db->mod_store);
+ /*Count-min Sketch Commands*/
+ command_register(&(db->command_table), "CMSINITBYDIM", "key width depth",
+ 3, 1, CMD_KEY_OW, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ cmsinitbydim_command, db->mod_store);
+ command_register(&(db->command_table), "CMSINITBYPROB", "key error probability",
+ 3, 1, CMD_KEY_OW, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ cmsinitbyprob_command, db->mod_store);
+ command_register(&(db->command_table), "CMSINCRBY", "key item increment [item increment ...]",
+ 3, 1, CMD_KEY_RW, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ cmsincrby_command, db->mod_store);
+ command_register(&(db->command_table), "CMSQUERY", "key item",
+ 2, 1, CMD_KEY_RO, REPLY_INT_0, AUTO_ROUTE,
+ cmsquery_command, db->mod_store);
+ command_register(&(db->command_table), "CMSMQUERY", "key item [item ...]",
+ 2, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ cmsmquery_command, db->mod_store);
+ command_register(&(db->command_table), "CMSINFO", "key",
+ 1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ cmsinfo_command, db->mod_store);
+ command_register(&(db->command_table), "CMSRLIST", "key",
+ 1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ cmsrlist_command, db->mod_store);
+ command_register(&(db->command_table), "CMSRCLEAR", "key uuid",
+ 2, 1, CMD_KEY_RW, REPLY_ERROR, AUTO_ROUTE,
+ cmsrclear_command, db->mod_store);
+
/* Debug Commands */
command_register(&(db->command_table), "INFO", "[section]",
0, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE,
@@ -1420,9 +1470,6 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char *db_name,
}
db->logger=log_handle_create(log_path, opts->loglevel);
}
-
-
- uuid_copy(db->bin_uuid, opts->bin_uuid);
if(opts->dryrun)
{
}
@@ -1576,3 +1623,8 @@ const char *swarmkv_self_address(const struct swarmkv *db)
{
return db->self.addr;
}
+void swarmkv_self_uuid(const struct swarmkv *db, char buff[37])
+{
+ uuid_unparse(db->opts->bin_uuid, buff);
+ return;
+} \ No newline at end of file
diff --git a/src/swarmkv_error.h b/src/swarmkv_error.h
index 4da62e0..221d107 100644
--- a/src/swarmkv_error.h
+++ b/src/swarmkv_error.h
@@ -8,6 +8,7 @@
#define error_arg_not_valid_integer "ERR arg `%s` is not an integer or out of range"
#define error_arg_not_valid_address "ERR arg `%s` is not `IP:port` format"
#define error_arg_not_valid_float "ERR arg `%s` is not a valid float or out of range"
+#define error_arg_not_valid_uuid "ERR arg `%s` is not a valid UUID"
#define error_arg_parse_failed "ERR arg `%s` parse failed"
#define error_arg_string_should_be "ERR arg `%s` should be `%s`"
#define error_need_additional_arg "ERR arg `%s` should be fllowed by more args"
diff --git a/src/swarmkv_monitor.c b/src/swarmkv_monitor.c
index 95c88d7..0856b1c 100644
--- a/src/swarmkv_monitor.c
+++ b/src/swarmkv_monitor.c
@@ -10,12 +10,12 @@
#include <hdr/hdr_interval_recorder.h>
#include <pthread.h>
-#define METRIC_KEY_MAX 64
+#define KEY_LEN_MAX 64
#define SIGNIFICANT_FIGURES 2
#define LOWEST_TRACKABLE_VALUE 1
struct recorder
{
- char key[METRIC_KEY_MAX];
+ char key[KEY_LEN_MAX];
struct hdr_interval_recorder hdr_interval;
struct hdr_histogram *hdr_previous;
struct hdr_histogram *hdr_all_time;
@@ -45,7 +45,7 @@ void recorder_free(struct recorder *recorder)
}
struct recorder_metric
{
- char key[METRIC_KEY_MAX];
+ char key[KEY_LEN_MAX];
long long total_count;
long long max;
long long min;
@@ -83,11 +83,11 @@ void hdr_to_metric(const struct hdr_histogram *hdr, const char *key, struct reco
strncpy(metric->key, key, sizeof(metric->key));
metric->total_count=hdr->total_count;
if(metric->total_count==0) return;
- metric->p50=hdr_value_at_percentile(hdr, 0.5);
- metric->p80=hdr_value_at_percentile(hdr, 0.8);
- metric->p90=hdr_value_at_percentile(hdr, 0.9);
- metric->p95=hdr_value_at_percentile(hdr, 0.95);
- metric->p99=hdr_value_at_percentile(hdr, 0.99);
+ metric->p50=hdr_value_at_percentile(hdr, 50);
+ metric->p80=hdr_value_at_percentile(hdr, 80);
+ metric->p90=hdr_value_at_percentile(hdr, 90);
+ metric->p95=hdr_value_at_percentile(hdr, 95);
+ metric->p99=hdr_value_at_percentile(hdr, 99);
metric->total_count=hdr->total_count;
metric->max=hdr_max(hdr);
metric->mean=hdr_mean(hdr);
@@ -108,18 +108,7 @@ void recorder_sample(struct recorder *recorder, struct recorder_metric *metric)
hdr_to_metric(recorder->hdr_all_time, recorder->key, metric);
return;
}
-void recorder_sample_n(struct recorder **recorder, size_t n_recorder, long long max_latency_usec, struct recorder_metric *metric)
-{
- struct hdr_histogram *hdr_merged;
- hdr_init(1, max_latency_usec, 2, &hdr_merged);
- for(size_t i=0; i<n_recorder; i++)
- {
- recorder_commit(recorder[i]);
- hdr_add(hdr_merged, recorder[i]->hdr_all_time);
- }
- hdr_to_metric(hdr_merged, recorder[0]->key, metric);
- hdr_close(hdr_merged);
-}
+
void recorder_reset(struct recorder *recorder)
{
recorder->hdr_previous=hdr_interval_recorder_sample_and_recycle(&recorder->hdr_interval, recorder->hdr_previous);
@@ -421,4 +410,13 @@ enum cmd_exec_result latency_command(struct swarmkv_module *mod_monitor, const s
}
pthread_mutex_unlock(&monitor->mutex);
return FINISHED;
+}
+enum cmd_exec_result lastcmds_command(struct swarmkv_module *mod_monitor, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+/*LASTCMDS [offset]*/
+ struct swarmkv_monitor *monitor=module2monitor(mod_monitor);
+ pthread_mutex_lock(&monitor->mutex);
+
+ pthread_mutex_unlock(&monitor->mutex);
+ return FINISHED;
} \ No newline at end of file
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c
index 0bc7514..189a561 100644
--- a/src/swarmkv_store.c
+++ b/src/swarmkv_store.c
@@ -111,11 +111,20 @@ struct swarmkv_obj_specs sobj_specs[__SWARMKV_OBJ_TYPE_MAX] =
.obj_size=(size_t (*)(const void *))AP_bloom_mem_size
},
{
+ .type=OBJ_TYPE_CMS,
+ .type_name="count-min-sketch",
+ .obj_free=(void (*)(void *))CM_sketch_free,
+ .obj_serialize=(void (*)(const void *, char **, size_t *))CM_sketch_serialize,
+ .obj_merge_blob=(void (*)(void *, const char *, size_t))CM_sketch_merge_blob,
+ .obj_replicate=(void * (*)(uuid_t, const char *, size_t))CM_sketch_replicate,
+ .obj_size=(size_t (*)(const void *))CM_sketch_size
+ },
+ {
.type=OBJ_TYPE_UNDEFINED,
.type_name="undefined",
.obj_free=undefined_obj_free,
.obj_serialize=NULL,
- .obj_merge_blob=NULL,
+ .obj_merge_blob=NULL,
.obj_replicate=NULL,
.obj_size=(size_t (*)(const void *))undefined_obj_mem_size
}
@@ -331,7 +340,7 @@ void store_get_node_addr(struct swarmkv_module* mod_store, node_t *node)
node_copy(node, &store->self);
return;
}
-void sobj_need_sync(struct swarmkv_module *mod_store, struct sobj *obj)
+void store_mark_object_as_modified(struct swarmkv_module *mod_store, struct sobj *obj)
{
struct swarmkv_store *store=module2store(mod_store);
struct scontainer *ctr=container_of(obj, struct scontainer, obj);
@@ -423,7 +432,7 @@ void sobj_merge_blob(struct sobj *obj, const char *blob, size_t blob_sz, uuid_t
offset+=sizeof(size_t);
assert(offset+value_blob_sz==blob_sz);
const char *value_blob=blob+offset;
- if(!obj->raw)
+ if(obj->raw == NULL)
{
obj->raw=sobj_specs[obj->type].obj_replicate(uuid, value_blob, value_blob_sz);
}
@@ -1008,17 +1017,17 @@ enum cmd_exec_result crdt_info_command(struct swarmkv_module *mod_store, const s
size_t sz=0;
sz+=sobj_specs[ctr->obj.type].obj_size(ctr->obj.raw);
sz+=sizeof(struct scontainer)+sdslen(ctr->obj.key);
- size_t n_replica=0;
- n_replica=ctr->replica_node_list?utarray_len(ctr->replica_node_list):0;
- sz+=n_replica*sizeof(node_t);
+ size_t n_peer=0;
+ n_peer=ctr->replica_node_list?utarray_len(ctr->replica_node_list):0;
+ sz+=n_peer*sizeof(node_t);
int i=0;
*reply=swarmkv_reply_new_array(8);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Type");
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt(sobj_specs[ctr->obj.type].type_name);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Size");
(*reply)->elements[i++]=swarmkv_reply_new_integer(sz);
- (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Replicas");
- (*reply)->elements[i++]=swarmkv_reply_new_integer(n_replica);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Peers");
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(n_peer);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("LastModified");
(*reply)->elements[i++]=swarmkv_reply_new_integer(ctr->op_timestamp.tv_sec);
return FINISHED;
diff --git a/src/swarmkv_store.h b/src/swarmkv_store.h
index d1fc926..4e4e4cd 100644
--- a/src/swarmkv_store.h
+++ b/src/swarmkv_store.h
@@ -14,6 +14,7 @@
#include "fair_token_bucket.h"
#include "bulk_token_bucket.h"
#include "ap_bloom.h"
+#include "cm_sketch.h"
enum sobj_type
{
@@ -25,6 +26,7 @@ enum sobj_type
OBJ_TYPE_FAIR_TOKEN_BUCKET,
OBJ_TYPE_BULK_TOKEN_BUCKET,
OBJ_TYPE_BLOOM_FILTER,
+ OBJ_TYPE_CMS,
OBJ_TYPE_UNDEFINED,
__SWARMKV_OBJ_TYPE_MAX
};
@@ -43,6 +45,7 @@ struct sobj
struct fair_token_bucket *ftb;
struct bulk_token_bucket *btb;
struct AP_bloom *bloom;
+ struct CM_sketch *cms;
void *raw;
};
};
@@ -62,7 +65,7 @@ struct store_info
};
void swarmkv_store_info(struct swarmkv_module *module, struct store_info *info);
-void sobj_need_sync(struct swarmkv_module *mod_store, struct sobj *obj);
+void store_mark_object_as_modified(struct swarmkv_module *mod_store, struct sobj *obj);
int sobj_get_random_replica(struct sobj *obj, node_t *out);
enum cmd_exec_result handle_undefined_object(struct sobj *obj, struct swarmkv_reply **reply);
diff --git a/src/t_bloom_filter.c b/src/t_bloom_filter.c
index fbaad2e..9ad3623 100644
--- a/src/t_bloom_filter.c
+++ b/src/t_bloom_filter.c
@@ -7,9 +7,9 @@
#include <stdlib.h>
#include <assert.h>
-enum cmd_exec_result bfreserve_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+enum cmd_exec_result bfinit_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
{
-/* BFRESERVE key error_rate capacity [TIME window-milliseconds slice-number] */
+/* BFINIT key error_rate capacity [TIME window-milliseconds slice-number] */
struct sobj *obj=NULL;
const sds key=cmd->argv[1];
@@ -97,6 +97,7 @@ enum cmd_exec_result bfadd_command(struct swarmkv_module *mod_store, const struc
{
AP_bloom_add(obj->bloom, now, cmd->argv[i+2], sdslen(cmd->argv[i+2]));
}
+ store_mark_object_as_modified(mod_store, obj);
*reply=swarmkv_reply_new_status("OK");
return FINISHED;
}
@@ -202,7 +203,7 @@ enum cmd_exec_result bfinfo_command(struct swarmkv_module *mod_store, const stru
AP_bloom_info(obj->bloom, &info);
int i=0;
*reply=swarmkv_reply_new_array(20);
- (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("ErrorRate");
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Error");
(*reply)->elements[i++]=swarmkv_reply_new_double(info.error);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Capacity");
(*reply)->elements[i++]=swarmkv_reply_new_integer(info.capacity);
diff --git a/src/t_bloom_filter.h b/src/t_bloom_filter.h
index 0231b0c..3675e82 100644
--- a/src/t_bloom_filter.h
+++ b/src/t_bloom_filter.h
@@ -1,7 +1,7 @@
#pragma once
#include "swarmkv_common.h"
-enum cmd_exec_result bfreserve_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result bfinit_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
enum cmd_exec_result bfadd_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
enum cmd_exec_result bfexists_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
enum cmd_exec_result bfmexists_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
diff --git a/src/t_cms.c b/src/t_cms.c
new file mode 100644
index 0000000..5339cc9
--- /dev/null
+++ b/src/t_cms.c
@@ -0,0 +1,322 @@
+#include "swarmkv_common.h"
+#include "swarmkv_utils.h"
+#include "swarmkv_store.h"
+#include "swarmkv_error.h"
+#include "cm_sketch.h"
+
+#include <stdlib.h>
+#include <assert.h>
+
+enum cmd_exec_result cmsinitbydim_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+/* CMSINITBYDIM key width depth*/
+ struct sobj *obj=NULL;
+ const sds key=cmd->argv[1];
+
+ long long width=0, depth=0;
+
+ width=strtol(cmd->argv[2], NULL, 10);
+ if(width<=0)
+ {
+ *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[2]);
+ return FINISHED;
+ }
+ depth=strtol(cmd->argv[3], NULL, 10);
+ if(depth<=0)
+ {
+ *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[3]);
+ return FINISHED;
+ }
+ obj=store_lookup(mod_store, key);
+ if(!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ struct timeval now;
+ gettimeofday(&now, NULL);
+
+ if(obj->type==OBJ_TYPE_UNDEFINED)
+ {
+ assert(obj->raw==NULL);
+ uuid_t uuid;
+ store_get_uuid(mod_store, uuid);
+ obj->cms=CM_sketch_new(uuid, width, depth);
+ obj->type=OBJ_TYPE_CMS;
+ *reply=swarmkv_reply_new_status("OK");
+ }
+ else
+ {
+ *reply=swarmkv_reply_new_array(0);
+ }
+ return FINISHED;
+}
+enum cmd_exec_result cmsinitbyprob_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+/* CMSINITBYPROB key error probability*/
+ struct sobj *obj=NULL;
+ const sds key=cmd->argv[1];
+
+ double error=0, probability=0;
+ error=strtod(cmd->argv[2], NULL);
+ if(error < 0 || error >= 1.0)
+ {
+ *reply=swarmkv_reply_new_error(error_arg_not_valid_float, cmd->argv[2]);
+ return FINISHED;
+ }
+ probability=strtod(cmd->argv[3], NULL);
+ if(probability < 0 || probability >= 1.0)
+ {
+ *reply=swarmkv_reply_new_error(error_arg_not_valid_float, cmd->argv[3]);
+ return FINISHED;
+ }
+ int width=0, depth=0;
+ CM_sketch_tool_dimension_by_prob(error, probability, &width, &depth);
+ obj=store_lookup(mod_store, key);
+ if(!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ struct timeval now;
+ gettimeofday(&now, NULL);
+
+ if(obj->type==OBJ_TYPE_UNDEFINED)
+ {
+ assert(obj->raw==NULL);
+ uuid_t uuid;
+ store_get_uuid(mod_store, uuid);
+ obj->cms=CM_sketch_new(uuid, width, depth);
+ obj->type=OBJ_TYPE_CMS;
+ *reply=swarmkv_reply_new_status("OK");
+ }
+ else
+ {
+ *reply=swarmkv_reply_new_array(0);
+ }
+ return FINISHED;
+}
+static int parse_incrby_args(const struct swarmkv_cmd *cmd, long long *increments, size_t n_increment, int *invalid_idx)
+{
+ assert(n_increment == (cmd->argc-2)/2);
+ char *endptr=NULL;
+ for(int i=3, j=0; i<cmd->argc; i+=2, j++)
+ {
+ *invalid_idx=i;
+ increments[j]=strtol(cmd->argv[i], &endptr, 10);
+ if(*endptr != '\0')
+ {
+ return -1;
+ }
+ if(increments[j] > INT32_MAX || increments[j] < INT32_MIN)
+ {
+ return -1;
+ }
+ }
+ return 0;
+}
+enum cmd_exec_result cmsincrby_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+/*CMSINCRBY key item increment [item increment ...]*/
+ struct sobj *obj=NULL;
+ const sds key=cmd->argv[1];
+ obj=store_lookup(mod_store, key);
+ if(!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ struct timeval now;
+ gettimeofday(&now, NULL);
+
+ if(obj->type==OBJ_TYPE_UNDEFINED)
+ {
+ assert(obj->raw == NULL);
+ return handle_undefined_object(obj, reply);
+ }
+ else if(obj->type!=OBJ_TYPE_CMS)
+ {
+ *reply=swarmkv_reply_new_error(error_wrong_type);
+ return FINISHED;
+ }
+ size_t n_increment=(cmd->argc-2)/2;
+ long long increments[n_increment], query[n_increment];
+ int ret=0, invalid_idx=0;
+ ret = parse_incrby_args(cmd, increments, n_increment, &invalid_idx);
+ if(ret<0)
+ {
+ *reply=swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[invalid_idx]);
+ return FINISHED;
+ }
+ *reply=swarmkv_reply_new_array(n_increment);
+ for(int i=2, j=0; i<cmd->argc; i+=2, j++)
+ {
+ query[j]=CM_sketch_incrby(obj->cms, cmd->argv[i], sdslen(cmd->argv[i]), (int) increments[j]);
+ (*reply)->elements[j]=swarmkv_reply_new_integer(query[j]);
+ }
+ store_mark_object_as_modified(mod_store, obj);
+ return FINISHED;
+}
+enum cmd_exec_result cmsmquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+/*CMSQUERY key item [item ...]*/
+ struct sobj *obj=NULL;
+ const sds key=cmd->argv[1];
+
+ obj=store_lookup(mod_store, key);
+ if(!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ struct timeval now;
+ gettimeofday(&now, NULL);
+
+ if(obj->type==OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
+ else if(obj->type!=OBJ_TYPE_CMS)
+ {
+ *reply=swarmkv_reply_new_error(error_wrong_type);
+ return FINISHED;
+ }
+
+ *reply=swarmkv_reply_new_array(cmd->argc-2);
+ for(int i=0; i<cmd->argc-2; i++)
+ {
+ int query=0;
+ query=CM_sketch_query(obj->cms, cmd->argv[i+2], sdslen(cmd->argv[i+2]));
+ (*reply)->elements[i]=swarmkv_reply_new_integer(query);
+ }
+ return FINISHED;
+}
+enum cmd_exec_result cmsquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+ enum cmd_exec_result ret;
+ struct swarmkv_reply *tmp_reply=NULL;
+ ret=cmsmquery_command(mod_store, cmd, &tmp_reply);
+ if(ret==FINISHED)
+ {
+ if(tmp_reply->type==SWARMKV_REPLY_ARRAY)
+ {
+ *reply=swarmkv_reply_dup(tmp_reply->elements[0]);
+ swarmkv_reply_free(tmp_reply);
+ }
+ else
+ {
+ *reply=tmp_reply;
+ }
+ }
+ return ret;
+}
+enum cmd_exec_result cmsinfo_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+/*CMSINFO key*/
+ struct sobj *obj=NULL;
+ const sds key=cmd->argv[1];
+ obj=store_lookup(mod_store, key);
+ if(!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ if(obj->type==OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
+ else if(obj->type!=OBJ_TYPE_CMS)
+ {
+ *reply=swarmkv_reply_new_error(error_wrong_type);
+ return FINISHED;
+ }
+ struct CM_sketch_info info;
+ CM_sketch_info(obj->cms, &info);
+ int i=0;
+ *reply=swarmkv_reply_new_array(12);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Width");
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(info.width);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Depth");
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(info.depth);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Error");
+ (*reply)->elements[i++]=swarmkv_reply_new_double(info.error);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Probability");
+ (*reply)->elements[i++]=swarmkv_reply_new_double(info.probability);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Count");
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(info.count);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("ReplicaNumber");
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(info.n_replica);
+ assert(i==12);
+ return FINISHED;
+}
+enum cmd_exec_result cmsrlist_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+/*CMSRLIST key*/
+ struct sobj *obj=NULL;
+ const sds key=cmd->argv[1];
+ obj=store_lookup(mod_store, key);
+ if(!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ if(obj->type==OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
+ else if(obj->type!=OBJ_TYPE_CMS)
+ {
+ *reply=swarmkv_reply_new_error(error_wrong_type);
+ return FINISHED;
+ }
+ struct CM_sketch_info info;
+ CM_sketch_info(obj->cms, &info);
+ uuid_t replicas[info.n_replica];
+ size_t n_nodes=0;
+ CM_sketch_list_replica(obj->cms, replicas, &n_nodes);
+ *reply=swarmkv_reply_new_array(n_nodes);
+ for(int i=0; i<n_nodes; i++)
+ {
+ char uuid_str[37];
+ uuid_unparse(replicas[i], uuid_str);
+ (*reply)->elements[i]=swarmkv_reply_new_string(uuid_str, strlen(uuid_str)+1);
+ }
+ return FINISHED;
+}
+enum cmd_exec_result cmsrclear_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+/*CMSRCLEAR key uuid*/
+ struct sobj *obj=NULL;
+ const sds key=cmd->argv[1];
+ obj=store_lookup(mod_store, key);
+ if(!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ if(obj->type==OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
+ else if(obj->type!=OBJ_TYPE_CMS)
+ {
+ *reply=swarmkv_reply_new_error(error_wrong_type);
+ return FINISHED;
+ }
+ uuid_t uuid;
+ if(0 > uuid_parse(cmd->argv[2], uuid))
+ {
+ *reply=swarmkv_reply_new_error(error_arg_not_valid_uuid, cmd->argv[2]);
+ return FINISHED;
+ }
+ int ret=CM_sketch_clear_replica(obj->cms, uuid);
+ if(ret<0)
+ {
+ *reply=swarmkv_reply_new_error("replica uuid is not exist");
+ }
+ else
+ {
+ *reply=swarmkv_reply_new_status("OK");
+ store_mark_object_as_modified(mod_store, obj);
+ }
+ return FINISHED;
+} \ No newline at end of file
diff --git a/src/t_cms.h b/src/t_cms.h
new file mode 100644
index 0000000..b0348bd
--- /dev/null
+++ b/src/t_cms.h
@@ -0,0 +1,10 @@
+#pragma once
+#include "swarmkv_common.h"
+enum cmd_exec_result cmsinitbydim_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result cmsinitbyprob_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result cmsincrby_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result cmsquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result cmsmquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result cmsinfo_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result cmsrlist_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result cmsrclear_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); \ No newline at end of file
diff --git a/src/t_hash.c b/src/t_hash.c
index a5a7b0b..120f1ed 100644
--- a/src/t_hash.c
+++ b/src/t_hash.c
@@ -70,7 +70,7 @@ enum cmd_exec_result hset_command(struct swarmkv_module *mod_store, const struct
if(ret>0) n_added++;
}
*reply=swarmkv_reply_new_integer(n_added);
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
}
else
{
@@ -110,7 +110,7 @@ enum cmd_exec_result hdel_command(struct swarmkv_module *mod_store, const struct
if(ret>0) n_deleted++;
}
*reply=swarmkv_reply_new_integer(n_deleted);
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
return FINISHED;
}
@@ -246,7 +246,7 @@ enum cmd_exec_result hincrby_command(struct swarmkv_module *mod_store, const str
else
{
*reply=swarmkv_reply_new_integer(result);
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
}
}
else
diff --git a/src/t_set.c b/src/t_set.c
index 87d5504..e7d4d4a 100644
--- a/src/t_set.c
+++ b/src/t_set.c
@@ -37,7 +37,7 @@ enum cmd_exec_result sadd_command(struct swarmkv_module *mod_store, const struct
if(ret>0) n_added++;
}
*reply=swarmkv_reply_new_integer(n_added);
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
}
else
{
@@ -76,7 +76,7 @@ enum cmd_exec_result srem_command(struct swarmkv_module *mod_store, const struct
if(ret>0) n_removed++;
}
*reply=swarmkv_reply_new_integer(n_removed);
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
return FINISHED;
}
enum cmd_exec_result smembers_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
diff --git a/src/t_string.c b/src/t_string.c
index d5f2a59..1a5f5f5 100644
--- a/src/t_string.c
+++ b/src/t_string.c
@@ -72,7 +72,7 @@ enum cmd_exec_result set_command(struct swarmkv_module *mod_store, const struct
{
LWW_register_set(obj->string, value, sdslen(value));
*reply=swarmkv_reply_new_status("OK");
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
}
else if(obj->type==OBJ_TYPE_INTEGER)
{
@@ -80,7 +80,7 @@ enum cmd_exec_result set_command(struct swarmkv_module *mod_store, const struct
{
PN_counter_set(obj->counter, int_value);
*reply=swarmkv_reply_new_status("OK");
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
}
else
{
@@ -113,7 +113,7 @@ enum cmd_exec_result integer_generic(struct swarmkv_module *mod_store, const sds
}
value=PN_counter_incrby(obj->counter, increment);
*reply=swarmkv_reply_new_integer(value);
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
}
else
{
diff --git a/src/t_token_bucket.c b/src/t_token_bucket.c
index a16672f..d79f6b0 100644
--- a/src/t_token_bucket.c
+++ b/src/t_token_bucket.c
@@ -85,7 +85,7 @@ enum cmd_exec_result tcfg_command(struct swarmkv_module *mod_store, const struct
else if(obj->type==OBJ_TYPE_TOKEN_BUCKET)
{
OC_token_bucket_configure(obj->bucket, now, rate, period, capacity);
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
*reply=swarmkv_reply_new_status("OK");
}
else
@@ -182,7 +182,7 @@ enum cmd_exec_result tconsume_command(struct swarmkv_module *mod_store, const st
gettimeofday(&now, NULL);
allocated=OC_token_bucket_consume(obj->bucket, now, consume_type, request);
*reply=swarmkv_reply_new_integer(allocated);
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
return FINISHED;
}
bool is_power_of_2(long long num)
@@ -254,7 +254,7 @@ enum cmd_exec_result ftcfg_command(struct swarmkv_module *mod_store, const struc
else if(obj->type==OBJ_TYPE_FAIR_TOKEN_BUCKET)
{
fair_token_bucket_configure(obj->ftb, now, rate, period, capacity, divisor);
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
*reply=swarmkv_reply_new_status("OK");
}
else
@@ -313,7 +313,7 @@ enum cmd_exec_result ftconsume_command(struct swarmkv_module *mod_store, const s
allocated=fair_token_bucket_consume(obj->ftb, now, member, sdslen(member), weight, consume_type, request);
*reply=swarmkv_reply_new_integer(allocated);
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
return FINISHED;
}
enum cmd_exec_result ftinfo_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
@@ -426,7 +426,7 @@ enum cmd_exec_result btcfg_command(struct swarmkv_module *mod_store, const struc
else if(obj->type==OBJ_TYPE_BULK_TOKEN_BUCKET)
{
bulk_token_bucket_configure(obj->btb, now, rate, period, capacity, buckets);
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
*reply=swarmkv_reply_new_status("OK");
}
else
@@ -480,7 +480,7 @@ enum cmd_exec_result btconsume_command(struct swarmkv_module *mod_store, const s
allocated=bulk_token_bucket_consume(obj->btb, now, member, sdslen(member), consume_type, request);
*reply=swarmkv_reply_new_integer(allocated);
- sobj_need_sync(mod_store, obj);
+ store_mark_object_as_modified(mod_store, obj);
return FINISHED;
}
enum cmd_exec_result btinfo_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
@@ -515,8 +515,8 @@ enum cmd_exec_result btinfo_command(struct swarmkv_module *mod_store, const stru
available=bulk_token_bucket_read_available(obj->btb, now, cmd->argv[2], sdslen(cmd->argv[2]));
}
int i=0;
- *reply=swarmkv_reply_new_array(14);
- (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Refill");
+ *reply=swarmkv_reply_new_array(16);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Rate");
(*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.rate);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Period");
(*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.period);
@@ -528,8 +528,10 @@ enum cmd_exec_result btinfo_command(struct swarmkv_module *mod_store, const stru
(*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.approximate_keys);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Collisions");
(*reply)->elements[i++]=swarmkv_reply_new_double(btb_info.collision_rate);
+ (*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Replicas");
+ (*reply)->elements[i++]=swarmkv_reply_new_integer(btb_info.replicas);
(*reply)->elements[i++]=swarmkv_reply_new_string_fmt("Query");
(*reply)->elements[i++]=swarmkv_reply_new_integer(available);
- assert(i==14);
+ assert(i==16);
return FINISHED;
} \ No newline at end of file
diff --git a/test/swarmkv_gtest.cpp b/test/swarmkv_gtest.cpp
index ad6d94e..4af72dd 100644
--- a/test/swarmkv_gtest.cpp
+++ b/test/swarmkv_gtest.cpp
@@ -493,22 +493,23 @@ TEST_F(SwarmkvBasicTest, TypeBulkTokenBucket)
gettimeofday(&start, NULL);
gettimeofday(&now, NULL);
srand(171);
- int i=0, n_member=120;
+ int round=0, n_member=120;
while(now.tv_sec - start.tv_sec<3)
{
request_tokens=random()%(2*rate);
- reply=swarmkv_command(db, "BTCONSUME %s user-%d %lld", key, i%n_member, request_tokens);
+ reply=swarmkv_command(db, "BTCONSUME %s user-%d %lld", key, round%n_member, request_tokens);
if(reply->type==SWARMKV_REPLY_INTEGER)
{
allocated_tokens+=reply->integer;
}
swarmkv_reply_free(reply);
gettimeofday(&now, NULL);
- i++;
+ round++;
}
+ printf("consume round %d, speed %d ops\n", round, round/(int)(now.tv_sec-start.tv_sec));
EXPECT_LE(allocated_tokens/n_member, (now.tv_sec -start.tv_sec)*rate+capacity);
reply=swarmkv_command(db, "BTINFO %s", key);
- ASSERT_EQ(reply->n_element, 14);
+ ASSERT_EQ(reply->n_element, 16);
EXPECT_NEAR(reply->elements[9]->integer, n_member, n_member/5);
swarmkv_reply_free(reply);
@@ -517,6 +518,7 @@ TEST_F(SwarmkvBasicTest, TypeBulkTokenBucket)
EXPECT_EQ(reply->type, SWARMKV_REPLY_STATUS);
swarmkv_reply_free(reply);
long long t=0;
+ int i=0;
for(i=0; i<100; i++)
{
reply=swarmkv_command(db, "BTCONSUME %s user-001 10000", key);
@@ -535,7 +537,7 @@ TEST_F(SwarmkvBasicTest, TypeBloomFilter)
struct swarmkv_reply *reply=NULL;
long long time_window_ms=600, capacity=10000;
double error_rate=0.001;
- reply=swarmkv_command(db, "BFRESERVE %s %f %lld TIME %lld 12", key, error_rate, capacity, time_window_ms);
+ reply=swarmkv_command(db, "BFINIT %s %f %lld TIME %lld 12", key, error_rate, capacity, time_window_ms);
ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
swarmkv_reply_free(reply);
@@ -611,7 +613,7 @@ TEST_F(SwarmkvBasicTest, TypeBloomFilter)
swarmkv_reply_free(reply);
//No time window
- reply=swarmkv_command(db, "BFRESERVE %s %f %lld", key, error_rate, capacity);
+ reply=swarmkv_command(db, "BFINIT %s %f %lld", key, error_rate, capacity);
ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
swarmkv_reply_free(reply);
@@ -630,6 +632,52 @@ TEST_F(SwarmkvBasicTest, TypeBloomFilter)
ASSERT_EQ(reply->integer, 1);
swarmkv_reply_free(reply);
}
+TEST_F(SwarmkvBasicTest, TypeCMS)
+{
+ struct swarmkv *db=SwarmkvBasicTest::db;
+ const char *key="cms-001";
+ struct swarmkv_reply *reply=NULL;
+ long long width=100, depth=10;
+ reply=swarmkv_command(db, "CMSINITBYDIM %s %lld %lld", key, width, depth);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ swarmkv_reply_free(reply);
+
+ int n_item=5;
+ const char *item[n_item]={"zhangsan", "lisi", "王二麻子", "Tom", "铁蛋"};
+ reply=swarmkv_command(db, "CMSINCRBY %s %s 1 %s 1 %s 1 %s 1 %s 1", key, item[0], item[1], item[2], item[3], item[4]);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_ARRAY);
+ ASSERT_EQ(reply->n_element, n_item);
+ for(int i=0; i<n_item; i++)
+ {
+ ASSERT_EQ(reply->elements[i]->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->elements[i]->integer, 1);
+ }
+ swarmkv_reply_free(reply);
+
+ reply=swarmkv_command(db, "CMSMQUERY %s %s %s %s %s %s", key, item[0], item[1], item[2], item[3], item[4]);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_ARRAY);
+ ASSERT_EQ(reply->n_element, n_item);
+ for(size_t i=0; i<reply->n_element; i++)
+ {
+ ASSERT_EQ(reply->elements[i]->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->elements[i]->integer, 1);
+ }
+ swarmkv_reply_free(reply);
+
+ reply=swarmkv_command(db, "CMSQUERY %s %s", key, "non-exist-item");
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 0);
+ swarmkv_reply_free(reply);
+
+ reply=swarmkv_command(db, "CMSINFO %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_ARRAY);
+ ASSERT_EQ(reply->n_element, 12);
+ ASSERT_EQ(reply->elements[1]->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->elements[1]->integer, width);
+ ASSERT_EQ(reply->elements[3]->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->elements[3]->integer, depth);
+ swarmkv_reply_free(reply);
+}
TEST_F(SwarmkvBasicTest, EXPIRE_TTL)
{
struct swarmkv *db=SwarmkvBasicTest::db;
@@ -712,7 +760,7 @@ protected:
swarmkv_options_set_cluster_port(opts[i], TWO_NODES_TEST_CLUSTER_PORT+i);
swarmkv_options_set_health_check_port(opts[i], TWO_NODES_TEST_HEALTH_PORT+i);
swarmkv_options_set_logger(opts[i], logger);
- swarmkv_options_set_sync_interval_us(opts[i], 10*1000);
+ swarmkv_options_set_sync_interval_us(opts[i], 10000);
swarmkv_options_set_cluster_timeout_us(opts[i], very_long_timeout_us);
swarmkv_options_set_worker_thread_number(opts[i], 2);
swarmkv_options_set_caller_thread_number(opts[i], 1);
@@ -1530,7 +1578,7 @@ TEST_F(SwarmkvTwoNodes, TypeBulkTokenBucket)
token=random()%(2*rate/n_member/period);
requested_tokens+=token;
- reply=swarmkv_command(db[round%2], "BTCONSUME %s user-%lld %lld", key, member_id, token);
+ reply=swarmkv_command(db[member_id%2], "BTCONSUME %s user-%lld %lld", key, member_id, token);
ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
allocated_tokens+=reply->integer;
swarmkv_reply_free(reply);
@@ -1540,23 +1588,23 @@ TEST_F(SwarmkvTwoNodes, TypeBulkTokenBucket)
round++;
}
printf("consume round %d, speed %d ops\n", round, round/(int)(now.tv_sec-start.tv_sec));
- EXPECT_GE(round/(int)(now.tv_sec-start.tv_sec), 5000);
+ //EXPECT_GE(round/(int)(now.tv_sec-start.tv_sec), 5000);
long long upper_limit=(now.tv_sec-start.tv_sec)*rate/period+capacity;
upper_limit=upper_limit*n_member;
double accuracy=(double)allocated_tokens/(upper_limit<requested_tokens?upper_limit:requested_tokens);
EXPECT_NEAR(accuracy, 1, 0.035);
//wait_for_sync();
-
+ sleep(100);
reply=swarmkv_command(db[0], "BTINFO %s", key);
ASSERT_EQ(reply->type, SWARMKV_REPLY_ARRAY);
- ASSERT_EQ(reply->n_element, 14);
+ ASSERT_EQ(reply->n_element, 16);
EXPECT_NEAR(reply->elements[9]->integer, n_member, n_member/5);
swarmkv_reply_free(reply);
reply=swarmkv_command(db[1], "BTINFO %s user-001", key);
ASSERT_EQ(reply->type, SWARMKV_REPLY_ARRAY);
- ASSERT_EQ(reply->n_element, 14);
+ ASSERT_EQ(reply->n_element, 16);
EXPECT_NEAR(reply->elements[9]->integer, n_member, n_member/5);
EXPECT_LE(reply->elements[11]->integer, capacity);
swarmkv_reply_free(reply);
@@ -1594,15 +1642,15 @@ TEST_F(SwarmkvTwoNodes, TypeBulkTokenBucket)
swarmkv_reply_free(reply);
}
-TEST_F(SwarmkvTwoNodes, BloomFilter)
+TEST_F(SwarmkvTwoNodes, TypeBloomFilter)
{
struct swarmkv *db[2];
db[0]=SwarmkvTwoNodes::db1;
db[1]=SwarmkvTwoNodes::db2;
struct swarmkv_reply *reply=NULL;
const char *key="bloom-filter-001";
- const char *item[3]={"hello", "world", "bloom"};
- reply=swarmkv_command(db[0], "BFRESERVE %s 0.0001 1000000 time 3000000 13", key);
+ const char *item[4]={"hello", "world", "bloom", "filter"};
+ reply=swarmkv_command(db[0], "BFINIT %s 0.0001 1000000 time 3000000 13", key);
ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
EXPECT_STREQ(reply->str, "OK");
swarmkv_reply_free(reply);
@@ -1621,6 +1669,89 @@ TEST_F(SwarmkvTwoNodes, BloomFilter)
EXPECT_EQ(reply->elements[i]->integer, 1);
}
swarmkv_reply_free(reply);
+
+ reply=swarmkv_command(db[0], "BFMEXISTS %s %s %s %s", key, item[0], item[1], item[2]);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_ARRAY);
+ ASSERT_EQ(reply->n_element, 3);
+ for(int i=0; i<3; i++)
+ {
+ EXPECT_EQ(reply->elements[i]->integer, 1);
+ }
+ swarmkv_reply_free(reply);
+
+ reply=swarmkv_command(db[0], "BFEXISTS %s none-exists", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 0);
+ swarmkv_reply_free(reply);
+
+ reply=swarmkv_command(db[0], "BFADD %s %s", key, item[3]);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ swarmkv_reply_free(reply);
+
+ usleep(100*1000);
+
+ reply=swarmkv_command(db[1], "BFEXISTS %s %s", key, item[3]);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ EXPECT_EQ(reply->integer, 1);
+ swarmkv_reply_free(reply);
+}
+TEST_F(SwarmkvTwoNodes, TypeCMS)
+{
+ struct swarmkv *db[2];
+ db[0]=SwarmkvTwoNodes::db1;
+ db[1]=SwarmkvTwoNodes::db2;
+ struct swarmkv_reply *reply=NULL;
+ const char *key="count-min-sketch-001";
+ int n_item=3;
+ const char *item[n_item]={"count", "min", "sketch"};
+ int count[n_item]={0};
+
+ double error=0.002, probability=0.01;
+ reply=swarmkv_command(db[0], "CMSINITBYPROB %s %f %f", key, error, probability);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ EXPECT_STREQ(reply->str, "OK");
+ swarmkv_reply_free(reply);
+ srand(171);
+ int round=100;
+ for(int i=0; i<round; i++)
+ {
+ int id=i%n_item;
+ int increment=random()%100;
+ count[id]+=increment;
+ reply=swarmkv_command(db[i%2], "CMSINCRBY %s %s %d", key, item[id], increment);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_ARRAY);
+ ASSERT_EQ(reply->n_element, 1);
+ swarmkv_reply_free(reply);
+ reply=swarmkv_command(db[(i+1)%2], "CMSQUERY %s %s", key, item[id]);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_INTEGER);
+ swarmkv_reply_free(reply);
+ }
+ wait_for_sync();
+ reply=swarmkv_command(db[1], "CMSINFO %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_ARRAY);
+ long long total_count=reply->elements[9]->integer;
+ swarmkv_reply_free(reply);
+
+ reply=swarmkv_command(db[1], "CMSMQUERY %s %s %s %s", key, item[0], item[1], item[2]);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_ARRAY);
+ ASSERT_EQ(reply->n_element, n_item);
+ for(int i=0; i<n_item; i++)
+ {
+ EXPECT_NEAR(reply->elements[i]->integer, count[i], total_count*error);
+ }
+ swarmkv_reply_free(reply);
+
+ reply=swarmkv_command(db[0], "CMSRLIST %s", key);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_ARRAY);
+ ASSERT_EQ(reply->n_element, 2);
+ swarmkv_reply_free(reply);
+
+ char uuid[37];
+ swarmkv_self_uuid(db[0], uuid);
+ reply=swarmkv_command(db[0], "CMSRCLEAR %s %s", key, uuid);
+ ASSERT_EQ(reply->type, SWARMKV_REPLY_STATUS);
+ swarmkv_reply_free(reply);
+
}
TEST_F(SwarmkvTwoNodes, Info)
{
diff --git a/test/test_utils.c b/test/test_utils.c
index 5d0f355..f2fa637 100644
--- a/test/test_utils.c
+++ b/test/test_utils.c
@@ -62,7 +62,7 @@ int swarmkv_cli_add_slot_owner(const char *cluster_name, const char *node_string
}
void wait_for_sync()
{
- usleep(1000*1000);
+ usleep(50*1000);
}
struct cmd_exec_arg* cmd_exec_arg_new(void)