summaryrefslogtreecommitdiff
path: root/src/cells/spread_sketch.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/cells/spread_sketch.c')
-rw-r--r--src/cells/spread_sketch.c241
1 files changed, 156 insertions, 85 deletions
diff --git a/src/cells/spread_sketch.c b/src/cells/spread_sketch.c
index 3e9b11e..4746170 100644
--- a/src/cells/spread_sketch.c
+++ b/src/cells/spread_sketch.c
@@ -12,38 +12,11 @@
#include "hll_common.h"
#include "exdata.h"
-// todo:把primary metric 记到sketch 里,且使用特殊的st Hyperloglog
-// todo: topk 也是这样
-/*
-primary metric:认为primary metric 的precision 设置是无效的,在创建spread sketch 时,就已经设置了hll。
-这样做的好处:如果不这样,而是在set primary时,那么要提供接口“替换所有hll”,这使得set primary metric 接口的职责变成了:1. 将ss 中的hll 删除。2. 将cell 中的metric结构替换 3. 读取配置并产生新的方法。太复杂了。
-实际上,关键在于,set primary metric 的时候,并没有register metric,所以create cube 和set primary metric 一定是两个接口,而create cube 的时候,明明应该已经创建了hll。
-方案2:
-修改set primary metric 的逻辑,一个cube上只能调用一次,且必须调用一次。(选用这个)
------------------------------------------------------------------------------------------------
-关于添加hll 的过程:
-当是primary metric 的时候,不操作metric。而是走一个一般的spread sketch流程。query时采用新逻辑。
+// TODO: 适配swarm kv 时的问题:
+// 需要返回一个double,正常spread sketch 一定返回值(key 不在也更新hll),所以这里的查询怎么样比较好?
-在swarm kv 里,spread sketch 需要支持serialize 操作。也就是最好提供一个serialize方法。但是这个东西不用怎么测,实际上不会出现。
-*/
-
-/*
-把primary metric 加到 spread sketch 的问题。确认一些实现上不舒服的地方
-
-对于comprehensive 和 topk 的cube,创建cube 时,cube 获得了创建primary metric 的全部参数,但是spread sketch内部的precision 是在fieldstat_cube_set_primary_metric 时确定的。准备提供一个spread_sketch_change_configuration() 接口,在fieldstat_cube_set_primary_metric 时,比较默认的precision与primary metric 的precision的值,如果不同,调用此接口,删除所有的Hyperloglog 并重新创建。
-
-fieldstat输出时,产生的Hyperloglog结果是一个序列化的base64字符串。需要提供一个spread_sketch_query_as_base64接口,用于输出spread sketch 内部的Hyperloglog结构。因为spread sketch 内的sketch每行都创建了一个Hyperloglog,所以primary metric 的base64查询,可能对应最多 r 个不同的查询结果,我会先遍历调用hll_count,查询计数最小的行数,再把对应最小计数的Hyperloglog 序列化。
-
-swarmkv 内需要spread_sketch_serialize接口,因为两遍的spread sketch必须一样,所以多增加一个exdata_serialize 回调。deserialize 时,需要额外传入一组回调函数,用来对spread sketch初始化,也可以不传入(分为spread_skech_deserialze和spread_skech_deserialze_with_exdata 两个函数),认为不存在exdata,所有新的exdata 是NULL。
-*/
-/*
-1. 修改set primary metric 为 set sampling
-2. 增加一个list key 和一堆根据key 查base64之类的接口
-3. serialize 不用实现带exdata的情况
-*/
-
struct entry {
int ref_count;
void *exdata;
@@ -89,8 +62,7 @@ struct spread_sketch {
struct bucket *buckets;
struct entry_table *table;
- int level_cnt[65]; // 64: the level range is [1,65] // count every level number。
- uint32_t level_min; // the minimum level in the sketch
+ int level0_cnt; // used to filter out dummy adding
};
static void *default_new_fn(void *arg) {
@@ -109,11 +81,19 @@ static void *default_copy_fn(void *exdata) {
return exdata;
}
+struct spread_sketch_scheme DEFAULT_SCHEME = {
+ .new_fn = default_new_fn,
+ .free_fn = default_free_fn,
+ .merge_fn = default_merge_fn,
+ .reset_fn = default_reset_fn,
+ .copy_fn = default_copy_fn
+};
+
uint32_t cal_true_level(const struct spread_sketch *ss, int bucket_idx, long long t) {
/*
return f(t), the actual level of bucket, which satisfy:
1. d 2^f(t)/dt is constants
- 2. f(t0 + T) = 1
+ 2. f(t0 + 2T) = 0
3. f((t0) = L )
*/
struct bucket *bucket = &ss->buckets[bucket_idx];
@@ -126,15 +106,15 @@ uint32_t cal_true_level(const struct spread_sketch *ss, int bucket_idx, long lon
long long t0 = bucket->last_update_ms;
assert(t >= t0);
- if (t - t0 >= T) {
- return 1;
+ if (t - t0 >= 2 * T) {
+ return 0;
}
if (L <= 1) {
- return L;
+ return t - t0 >= T ? 0 : L;
}
long long tmp_exp = 1 << L;
- double a = ((double)(2 - tmp_exp)) / ((double)T);
+ double a = ((double)(1 - tmp_exp)) / (2.0 * T);
double b = (double)(tmp_exp);
return (uint32_t)(log2(a * ((double)(t-t0)) + b) + 0.5);
@@ -153,6 +133,15 @@ static inline char *key_dup(const char *key, size_t key_len) {
return ret;
}
+struct entry *smart_ptr_table_find(struct entry_table *table, const char *key, size_t key_len) {
+ struct entry *ret;
+ HASH_FIND(hh, table->entry, key, key_len, ret);
+ if (ret == NULL || ret->dying) {
+ return NULL;
+ }
+ return ret;
+}
+
struct entry *smart_ptr_table_get(struct entry_table *table, const char *key, size_t key_len, void *arg) {
struct entry *ret;
HASH_FIND(hh, table->entry, key, key_len, ret);
@@ -211,11 +200,7 @@ struct entry_table *smart_ptr_table_new() {
struct entry_table *table = malloc(sizeof(struct entry_table));
table->entry = NULL;
- table->scheme.new_fn = default_new_fn;
- table->scheme.free_fn = default_free_fn;
- table->scheme.merge_fn = default_merge_fn;
- table->scheme.reset_fn = default_reset_fn;
- table->scheme.copy_fn = default_copy_fn;
+ table->scheme = DEFAULT_SCHEME;
return table;
}
@@ -232,11 +217,7 @@ struct spread_sketch *spread_sketch_new(int depth, int width, unsigned char prec
pthis->reset_time = now;
pthis->buckets = calloc(depth * width, sizeof(struct bucket));
- pthis->scheme.new_fn = default_new_fn;
- pthis->scheme.free_fn = default_free_fn;
- pthis->scheme.merge_fn = default_merge_fn;
- pthis->scheme.reset_fn = default_reset_fn;
- pthis->scheme.copy_fn = default_copy_fn;
+ pthis->scheme = DEFAULT_SCHEME;
pthis->table = smart_ptr_table_new();
pthis->table->scheme = pthis->scheme;
@@ -244,9 +225,7 @@ struct spread_sketch *spread_sketch_new(int depth, int width, unsigned char prec
pthis->buckets[i].sthll_register = hll_create_register(precision);
}
- pthis->level_min = 0;
- memset(pthis->level_cnt, 0, sizeof(pthis->level_cnt));
- pthis->level_cnt[0] = pthis->depth * pthis->width;
+ pthis->level0_cnt = pthis->depth * pthis->width;
return pthis;
}
@@ -265,34 +244,18 @@ void move_registers_forward(struct spread_sketch *ss, const struct timeval *now)
}
}
-void min_level_state_update(struct spread_sketch *ss, uint32_t level_old, uint32_t level_new) {
- ss->level_cnt[level_old]--;
- ss->level_cnt[level_new]++;
- while (ss->level_cnt[ss->level_min] == 0) {
- ss->level_min++;
- }
-}
-
// return 0 if not added, return 1 if added
int spread_sketch_add_hash(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg, struct timeval now) {
uint32_t level = (uint32_t)__builtin_clzll(item_hash) + 1;
long long now_ms = now.tv_sec * 1000 + now.tv_usec / 1000;
-
- if (level <= ss->level_min) {
- if (item_hash == DUMMY_ITEM_HASH) { // TODO: 这个地方的逻辑对不上,其实返回1应该是“在sketch中”就返回1,为了get exdata 的问题,增加了一种特殊情况。
- // 正常来说,应该是在任何时候,都调用add_sketch,不区分primary or not,并走同样的逻辑,在不包含于sketch时不get。
- // todo: 不对,如果hll 在内部,那么hll 的更新总是要完成,这里只是区别了要更新hll 和不更新hll 的情况
- return 0; // DUMMY_ITEM_HASH is the case of "recording a key" instead of update hll. Just return 0 to inform the key is not added to the sketch
- }
- struct entry *content;
- HASH_FIND(hh, ss->table->entry, key, key_length, content);
- if (content != NULL && !content->dying) {
- return 1;
- } else {
+ if (item_hash == DUMMY_ITEM_HASH) {
+ if (ss->level0_cnt == 0) {
return 0;
}
- }
+ assert(ss->level0_cnt>0);
+ }
+
// https://www.eecs.harvard.edu/~michaelm/postscripts/tr-02-05.pdf
// A technique from the hashing literature is to use two hash functions h1(x) and h2(x) to simulate additional hash functions of the form gi(x) = h1(x) + ih2(x)
// Assuming that the 128-bit xxhash function is perfect, we can view it as a combination of two 64-bit hash functions.
@@ -312,7 +275,9 @@ int spread_sketch_add_hash(struct spread_sketch *ss, const char *key, size_t key
bucket->last_update_ms = now_ms;
if (bucket->level < level) {
- min_level_state_update(ss, bucket->level, level);
+ if (bucket->level == 0) {
+ ss->level0_cnt--;
+ }
bucket->level = level;
}
in_sketch = true;
@@ -320,7 +285,10 @@ int spread_sketch_add_hash(struct spread_sketch *ss, const char *key, size_t key
uint32_t true_level = bucket->content == NULL ? 0: cal_true_level(ss, bucket_idx, now_ms);
if (level > true_level) {
- min_level_state_update(ss, true_level, level);
+ if (bucket->level == 0) {
+ ss->level0_cnt--;
+ }
+
// printf("update key %s to %s, and level %u to %u, in bucket (r,w)=(%d,%u)\n", bucket->content == NULL ? "NULL": bucket->content->key, key, true_level, level, i, hash_x % ss->width);
const struct entry *content_old = bucket->content;
if (content_old != NULL) {
@@ -408,20 +376,21 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch *
}
hll_merge(bucket_dst->sthll_register, src->buckets[i].sthll_register, dst->precision);
+ if (bucket_dst->level == 0 && bucket_src->level > 0) {
+ dst->level0_cnt--;
+ }
if (bucket_dst->content == NULL) {
bucket_dst->content = smart_ptr_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL);
bucket_dst->level = bucket_src->level;
bucket_dst->last_update_ms = bucket_src->last_update_ms;
- min_level_state_update(dst, 0, bucket_src->level);
continue;
}
bucket_dst->content->dying = false;
if (key_equal(bucket_src->content->key, bucket_src->content->key_len, bucket_dst->content->key, bucket_dst->content->key_len)) {
if (bucket_src->level > bucket_dst->level) {
- min_level_state_update(dst, bucket_dst->level, bucket_src->level);
bucket_dst->level = bucket_src->level;
}
if (bucket_src->last_update_ms > bucket_dst->last_update_ms) {
@@ -432,7 +401,6 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch *
uint32_t true_level_dst = cal_true_level(dst, i, bucket_dst->last_update_ms);
if (true_level_src > true_level_dst) {
- min_level_state_update(dst, bucket_dst->level, bucket_src->level);
smart_ptr_table_release(dst->table, bucket_dst->content->key, bucket_dst->content->key_len);
bucket_dst->content = smart_ptr_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL);
bucket_dst->last_update_ms = bucket_src->last_update_ms;
@@ -459,9 +427,8 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch *
}
void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *key, size_t key_len) {
- struct entry *content;
- HASH_FIND(hh, ss->table->entry, key, key_len, content);
- if (content == NULL || content->dying) {
+ const struct entry *content = smart_ptr_table_find(ss->table, key, key_len);
+ if (content == NULL) {
return NULL;
}
return content->exdata;
@@ -480,9 +447,7 @@ void spread_sketch_reset(struct spread_sketch *ss) {
content->dying = true;
}
- memset(ss->level_cnt, 0, sizeof(ss->level_cnt));
- ss->level_cnt[0] = ss->depth * ss->width;
- ss->level_min = 0;
+ ss->level0_cnt = ss->depth * ss->width;
}
void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn) {
@@ -540,10 +505,12 @@ void spread_sketch_list_keys(const struct spread_sketch *ss, char ***keys, size_
(*key_lens)[count] = content->key_len;
count++;
}
+
+ *n_keys = count;
}
double spread_sketch_get_cardinality(const struct spread_sketch *ss, const char *key, size_t key_len) {
- if (spread_sketch_get0_exdata(ss, key, key_len) == NULL) {
+ if (smart_ptr_table_find(ss->table, key, key_len) == NULL) {
return -1;
}
@@ -577,7 +544,7 @@ char *spread_sketch_get_hll_base64_serialization(const struct spread_sketch *ss,
struct spread_sketch *spread_sketch_copy(const struct spread_sketch *src) {
struct spread_sketch *dst = malloc(sizeof(struct spread_sketch));
- memcpy(dst, src, sizeof(struct spread_sketch)); // copy the depth, width, level_cnt, level_min
+ memcpy(dst, src, sizeof(struct spread_sketch));
dst->buckets = calloc(dst->depth * dst->width, sizeof(struct bucket));
dst->table = smart_ptr_table_new();
@@ -634,6 +601,110 @@ void spread_sketch_change_precision(struct spread_sketch *ss, unsigned char prec
ss->precision = precision;
}
+void spread_sketch_serialize(const struct spread_sketch *ss, char **blob, size_t *blob_sz)
+{
+ /*
+ format:
+ struct spread_sketch(including useless pointers)
+ struct bucket * depth * width
+ int64_t * depth * width for keylen + key(close-knit, key_len, key, key_len, key, ..., in order of bucket index)
+ */
+ // get serialize size
+
+ size_t sz = 0;
+ sz += sizeof(struct spread_sketch);
+ sz += ss->depth * ss->width * (sizeof(struct bucket) + hll_size(ss->precision) + sizeof(int64_t));
+ for (int i = 0; i < ss->depth * ss->width; i++) {
+ if (ss->buckets[i].content != NULL) {
+ sz += ss->buckets[i].content->key_len;
+ }
+ }
+
+ char *buffer = malloc(sz);
+ *blob = buffer;
+ *blob_sz = sz;
+
+ memcpy(buffer, ss, sizeof(struct spread_sketch));
+ buffer += sizeof(struct spread_sketch);
+ for (int i = 0; i < ss->depth * ss->width; i++) {
+ const struct bucket *bucket = &ss->buckets[i];
+ memcpy(buffer, bucket, sizeof(struct bucket));
+ buffer += sizeof(struct bucket);
+ memcpy(buffer, bucket->sthll_register, hll_size(ss->precision));
+ buffer += hll_size(ss->precision);
+ }
+ for (int i = 0; i < ss->depth * ss->width; i++) {
+ int64_t key_len;
+ if (ss->buckets[i].content != NULL) {
+ key_len = ss->buckets[i].content->key_len;
+ } else {
+ key_len = 0;
+ }
+ memcpy(buffer, &key_len, sizeof(int64_t));
+ buffer += sizeof(int64_t);
+
+ if (key_len > 0) {
+ memcpy(buffer, ss->buckets[i].content->key, key_len);
+ buffer += key_len;
+ }
+ }
+}
+
+struct spread_sketch *spread_sketch_deserialize(const char *blob, size_t blob_sz)
+{
+ struct spread_sketch *ss = malloc(sizeof(struct spread_sketch));
+ memcpy(ss, blob, sizeof(struct spread_sketch));
+ blob += sizeof(struct spread_sketch);
+ ss->table = smart_ptr_table_new();
+ ss->buckets = calloc(ss->depth * ss->width, sizeof(struct bucket));
+ ss->scheme = DEFAULT_SCHEME;
+
+ for (int i = 0; i < ss->depth * ss->width; i++) {
+ struct bucket *bucket = &ss->buckets[i];
+ memcpy(bucket, blob, sizeof(struct bucket));
+ blob += sizeof(struct bucket);
+
+ bucket->sthll_register = hll_create_register(ss->precision);
+ memcpy(bucket->sthll_register, blob, hll_size(ss->precision));
+ blob += hll_size(ss->precision);
+ }
+
+ for (int i = 0; i < ss->depth * ss->width; i++) {
+ int64_t key_len;
+ memcpy(&key_len, blob, sizeof(int64_t));
+ blob += sizeof(int64_t);
+ const char *key = blob;
+ blob += key_len;
+
+ struct entry *content = smart_ptr_table_get(ss->table, key, key_len, NULL);
+ ss->buckets[i].content = content;
+ }
+
+ return ss;
+}
+
+void spread_sketch_merge_blob(struct spread_sketch *dst, const char *blob, size_t blob_sz)
+{
+ struct spread_sketch *src = spread_sketch_deserialize(blob, blob_sz);
+ spread_sketch_merge(dst, src);
+ spread_sketch_free(src);
+}
+
+size_t spread_sketch_calculate_memory_usage(const struct spread_sketch *ss)
+{
+ size_t ret = 0;
+ ret += sizeof(struct spread_sketch);
+
+ size_t bucket_size = sizeof(struct bucket) + hll_size(ss->precision);
+ printf("every bucket size: %zu\n", bucket_size);
+ ret += ss->depth * ss->width * bucket_size;
-// todo: serialize(swarm kv)
-// todo: swarm kv 上实现reset 和copy。 \ No newline at end of file
+ printf("the number of content: %u\n", HASH_COUNT(ss->table->entry));
+ struct entry *content, *tmp;
+ HASH_ITER(hh, ss->table->entry, content, tmp) {
+ ret += sizeof(struct entry);
+ ret += content->key_len;
+ // assume the exdata is NULL
+ }
+ return ret;
+} \ No newline at end of file