summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorchenzizhan <[email protected]>2024-07-10 11:03:24 +0800
committerchenzizhan <[email protected]>2024-07-10 11:03:24 +0800
commit677f337e195e3b9b6e416109df8d51c14da2791b (patch)
tree5000114106f11d715e95eb9762dddaf5af361c41 /src
parent34be9bf8b545162c1a32f751776906c5fd1f5ad3 (diff)
parente1fd771fc7e33ffd659535e81412179e8ac6929a (diff)
Merge branch 'spreadsketch' into refactor-heavykeeper-newkey
Diffstat (limited to 'src')
-rw-r--r--src/cube.c46
-rw-r--r--src/tags/spread_sketch.c358
-rw-r--r--src/tags/spread_sketch.h40
3 files changed, 441 insertions, 3 deletions
diff --git a/src/cube.c b/src/cube.c
index 6b1f386..47c9065 100644
--- a/src/cube.c
+++ b/src/cube.c
@@ -13,6 +13,7 @@
#include "metric.h"
#include "heavy_keeper.h"
#include "tag_map.h"
+#include "spread_sketch.h"
#define DEFAULT_N_METRIC 32
#define DEFAULT_N_CUBE 64
@@ -42,6 +43,7 @@ struct cube {
union {
struct heavy_keeper *topk;
struct hash_table *comprehensive;
+ struct spread_sketch *spread_sketch;
};
size_t max_n_cell;
@@ -570,6 +572,9 @@ struct cube *cube_new(const struct field *dimensions, size_t n_dimensions, enum
cube->comprehensive = hash_table_new(max_n_cell);
hash_table_set_exdata_schema(cube->comprehensive, exdata_new_i, exdata_free_i, exdata_merge_i, exdata_reset_i, exdata_copy_i);
break;
+ case SAMPLING_MODE_SPREADSKETCH:
+ cube->spread_sketch = spread_sketch_new(max_n_cell);
+ spread_sketch_set_exdata_schema(cube->spread_sketch, exdata_new_i, exdata_free_i, exdata_merge_i, exdata_reset_i, exdata_copy_i);
default:
assert(0);
break;
@@ -587,6 +592,9 @@ void cube_free(struct cube *cube) {
case SAMPLING_MODE_COMPREHENSIVE:
hash_table_free(cube->comprehensive);
break;
+ case SAMPLING_MODE_SPREADSKETCH:
+ spread_sketch_free(cube->spread_sketch);
+ break;
default:
assert(0);
break;
@@ -599,10 +607,25 @@ void cube_free(struct cube *cube) {
}
void cube_reset(struct cube *cube) {
- if (cube->sampling_mode == SAMPLING_MODE_TOPK) {
+ // if (cube->sampling_mode == SAMPLING_MODE_TOPK) {
+ // heavy_keeper_reset(cube->topk);
+ // } else {
+ // hash_table_reset(cube->comprehensive);
+ // }
+ switch (cube->sampling_mode)
+ {
+ case SAMPLING_MODE_TOPK:
heavy_keeper_reset(cube->topk);
- } else {
+ break;
+ case SAMPLING_MODE_COMPREHENSIVE:
hash_table_reset(cube->comprehensive);
+ break;
+ case SAMPLING_MODE_SPREADSKETCH:
+ spread_sketch_reset(cube->spread_sketch);
+ break;
+ default:
+ assert(0);
+ break;
}
}
@@ -656,7 +679,24 @@ struct cell *get_cell(struct cube *cube, const struct field *dimensions, size_t
}
}
break;}
- }
+ case SAMPLING_MODE_SPREADSKETCH: {
+ if (cube->primary_metric_id != metric_id) {
+ cell_data = spread_sketch_get0_exdata(cube->spread_sketch, tag_in_string, tag_len);
+ // todo: spread sketch 没办法支持dummy 场景。首先,我不能让所有metric 都走spread sketch流程,
+ // 因为正常来说,用level=0 的hashy 做数值,没有任何意义,肯定都更新不了,只是在刚开始的时候,起一个记录key 的作用。
+ // 而,如果像是topk 那样,给count=0 的一席之地,那么存在问题:spread sketch本身不对记录的个数有限制,所以什么时候停止记录呢?这样的设计也太麻烦了。
+ // 之前跟老板讨论的时候,给了两个方案,方案1:做一个buffer,如果get exdata0 get 不到,则往buffer 中的cell 里写,等到来了primary以后,把cell 送进去。
+ // 方案2:简单略去第一轮添加时的情况。这会造成很少量的误差。不过,实际上这个操作不是逐包而是会话开始结束时来一次,所以误差也不会太小。必须是让贺岚风能第一个操作primary metric。
+ }
+ if (cell_data == NULL) {
+ int tmp_ret = spread_sketch_add(cube->spread_sketch, tag_in_string, tag_len, 0, (void *)&args);
+ if (tmp_ret == 1) {
+ cell_data = spread_sketch_get0_exdata(cube->spread_sketch, tag_in_string, tag_len);
+ }
+ }
+ break;}
+ }
+
if (free_key) {
free(key);
diff --git a/src/tags/spread_sketch.c b/src/tags/spread_sketch.c
new file mode 100644
index 0000000..c654e89
--- /dev/null
+++ b/src/tags/spread_sketch.c
@@ -0,0 +1,358 @@
+#include <stdbool.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+#include <math.h>
+#include <assert.h>
+
+#include "xxhash/xxhash.h"
+#include "uthash.h"
+
+#include "spread_sketch.h"
+#include "exdata.h"
+
+/*
+方案1,smart ptr。
+省内存,更符合对cell manager 类结构的期待。
+额外增加一个额外的管理smart ptr的结构体。不过,总体来说修改量不算大,比如merge和add操作基本保持原样,仅仅是对key 的malloc 和free 做修改。
+对dummy 特殊情况的支持更容易。
+
+根据测试结果,如果是每次reset bucket 的时候都重置,误差都仅仅是微微增加,如果是每次所有sketch中的key 都失去索引才删除,这个影响只会更小。可以用。
+*/
+
+/*
+方案2,把exdata 放入bucket 里,每个bucket一份。
+
+可以保留spread sketch 的风味,不会引入新的误差。
+根据实验情况,大概会多占用一半的内存,因为根据测试的经验,保存的key 总量是bucket 总数的2/3左右。哈希表本身的HH handle 也占内存,这部分反而可以节约回去。
+会让cell 的操作变得麻烦,无法借用老的cell manager 流程,get exdata 会得到一个exdata 的数组(可能多个),而非单独的一个,要对多个cell综合起来当一个cell 看。。修改量非常小,但是确实会影响代码本身的整洁度。
+
+*/
+struct smart_ptr { // todo:entry
+ int ref_count;
+ void *exdata;
+
+ char *key;
+ size_t key_len;
+ UT_hash_handle hh;
+};
+
+struct spread_sketch_scheme {
+ exdata_new_cb new_fn;
+ exdata_free_cb free_fn;
+ exdata_merge_cb merge_fn;
+ exdata_reset_cb reset_fn;
+ exdata_copy_cb copy_fn;
+};
+
+struct smart_ptr_table { // TODO: entry table
+ struct smart_ptr *entry;
+
+ struct spread_sketch_scheme scheme;
+};
+
+struct bucket {
+ struct smart_ptr *content;
+ uint32_t level;
+};
+
+struct spread_sketch {
+ int depth;
+ int width;
+
+ struct spread_sketch_scheme scheme;
+
+ struct bucket *buckets;
+ struct smart_ptr_table *table;
+
+ uint32_t *min_level_per_row; // TODO: 先看看性能吧, 之后再写。用来记录每行最小的level,从而跳过行数。对于64位的level,维持一个计数,额外使用64 r的空间,当一个最小位数的level 计数到0时,更新最小level。
+ // TODO: 对比heavy keeper,不仅仅是跳过的问题,heavykeeper 无论什么情况,在输入0的时候都不会走sketch 更新。
+};
+
+static void *default_new_fn(void *arg) {
+ return NULL;
+}
+static void default_free_fn(void *exdata) {
+ return;
+}
+static void default_merge_fn(void *dest, void *src) {
+ return;
+}
+static void default_reset_fn(void *exdata) {
+ return;
+}
+static void *default_copy_fn(void *exdata) {
+ return exdata;
+}
+static inline bool key_equal(const char *key1, size_t key1_len, const char *key2, size_t key2_len) {
+ if (key1_len != key2_len) {
+ return false;
+ }
+ return memcmp(key1, key2, key1_len) == 0;
+}
+static inline char *key_dup(const char *key, size_t key_len) {
+ char *ret = malloc(key_len);
+ memcpy(ret, key, key_len);
+ return ret;
+}
+
+struct smart_ptr *smart_ptr_table_get(struct smart_ptr_table *table, const char *key, size_t key_len, void *arg) {
+ struct smart_ptr *ret;
+ HASH_FIND(hh, table->entry, key, key_len, ret);
+
+ if (ret != NULL) {
+ ret->ref_count++;
+ } else {
+ ret = malloc(sizeof(struct smart_ptr));
+ ret->ref_count = 1;
+ ret->key = key_dup(key, key_len);
+ ret->key_len = key_len;
+ if (arg == NULL) {
+ ret->exdata = NULL;
+ } else {
+ ret->exdata = table->scheme.new_fn(arg);
+ }
+ HASH_ADD_KEYPTR(hh, table->entry, ret->key, ret->key_len, ret);
+ }
+
+ return ret;
+}
+
+int smart_ptr_table_release(struct smart_ptr_table *table, const char *key, size_t key_len) {
+ struct smart_ptr *ret;
+ HASH_FIND(hh, table->entry, key, key_len, ret);
+ if (ret == NULL) {
+ return -1;
+ }
+
+ ret->ref_count--;
+ if (ret->ref_count == 0) {
+ HASH_DEL(table->entry, ret);
+ table->scheme.free_fn(ret->exdata);
+ free(ret->key);
+ free(ret);
+ }
+
+ return 0;
+}
+
+void smart_ptr_table_free(struct smart_ptr_table *table) {
+ struct smart_ptr *current, *tmp;
+ HASH_ITER(hh, table->entry, current, tmp) {
+ HASH_DEL(table->entry, current);
+ table->scheme.free_fn(current->exdata);
+ free(current->key);
+ free(current);
+ }
+ free(table);
+}
+
+struct smart_ptr_table *smart_ptr_table_new() {
+ struct smart_ptr_table *table = malloc(sizeof(struct smart_ptr_table));
+ table->entry = NULL;
+
+ table->scheme.new_fn = default_new_fn;
+ table->scheme.free_fn = default_free_fn;
+ table->scheme.merge_fn = default_merge_fn;
+ table->scheme.reset_fn = default_reset_fn;
+ table->scheme.copy_fn = default_copy_fn;
+
+ return table;
+}
+
+void get_parameter_recommendation(int max_super_spreader_number, int *depth_out, int *width_out)
+{
+ int logk = max_super_spreader_number >= 3200 ? 4 : 3; // lg3200 = 3.51,round up to 4
+ *depth_out = logk;
+
+ int w;
+ if (max_super_spreader_number <= 100) {
+ w = max_super_spreader_number * 3 /2; // * 1.5, when the number is small, we need more width
+ } else {
+ w = max_super_spreader_number + 50; // + 50: 100*1.5-100 = 50, make w=f(k) continuous
+ }
+ if (w < 40) {
+ w = 40;
+ }
+ *width_out = w;
+}
+
+struct spread_sketch *spread_sketch_new(int expected_query_num) {
+ struct spread_sketch *pthis = malloc(sizeof(struct spread_sketch));
+ get_parameter_recommendation(expected_query_num, &pthis->depth, &pthis->width);
+
+ pthis->buckets = calloc(pthis->depth * pthis->width, sizeof(struct bucket));
+ pthis->scheme.new_fn = default_new_fn;
+ pthis->scheme.free_fn = default_free_fn;
+ pthis->scheme.merge_fn = default_merge_fn;
+ pthis->scheme.reset_fn = default_reset_fn;
+ pthis->scheme.copy_fn = default_copy_fn;
+ pthis->table = smart_ptr_table_new();
+ pthis->table->scheme = pthis->scheme;
+
+ return pthis;
+}
+
+// return 0 if not added, return 1 if added
+int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t hash_identifier, void *arg) {// todo: entry, item
+ // uint64_t hash_identifier = XXH3_64bits_withSeed(identifier, identifier_length, 171);
+ uint32_t level = (uint32_t)__builtin_clzll(hash_identifier) + 1;
+
+ // https://www.eecs.harvard.edu/~michaelm/postscripts/tr-02-05.pdf
+ // A technique from the hashing literature is to use two hash functions h1(x) and h2(x) to simulate additional hash functions of the form gi(x) = h1(x) + ih2(x)
+ // Assuming that the 128-bit xxhash function is perfect, we can view it as a combination of two 64-bit hash functions.
+ uint64_t hash_x_tmp = XXH3_64bits_withSeed(key, key_length, 171);
+ uint32_t hash_x1 = (uint32_t) (hash_x_tmp >> 32);
+ uint32_t hash_x2 = (uint32_t) hash_x_tmp;
+
+ bool in_sketch = false;
+ for (int i = 0; i < ss->depth; i++) {
+ uint32_t hash_x = hash_x1 + i * hash_x2;
+ int bucket_idx = (hash_x % ss->width) + i * ss->width;
+ struct bucket *bucket = &ss->buckets[bucket_idx];
+
+ if (bucket->content != NULL && key_equal(bucket->content->key, bucket->content->key_len, key, key_length)) {
+ if (bucket->level < level) {
+ bucket->level = level;
+ }
+ in_sketch = true;
+ } else {
+ uint32_t true_level = bucket->content == NULL ? 0: bucket->level;
+
+ if (true_level < level) {
+ const struct smart_ptr *content_old = bucket->content;
+ smart_ptr_table_release(ss->table, content_old->key, content_old->key_len);
+ struct smart_ptr *content_new = smart_ptr_table_get(ss->table, key, key_length, arg);
+ bucket->content = content_new;
+ bucket->level = level;
+
+ in_sketch = true;
+ }
+ }
+ }
+
+ return in_sketch ? 1 : 0;
+}
+
+void spread_sketch_free(struct spread_sketch *ss) {
+ smart_ptr_table_free(ss->table);
+ free(ss->buckets);
+ free(ss);
+}
+
+void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch *src)
+{
+ assert(dst->depth == src->depth && dst->width == src->width);
+
+ for (int i = 0; i < dst->depth * dst->width; i++) {
+ const struct bucket *bucket_src = &src->buckets[i];
+ struct bucket *bucket_dst = &dst->buckets[i];
+
+ if (bucket_src->content == NULL) {
+ continue;
+ }
+ if (bucket_dst->content == NULL) {
+ bucket_dst->content = smart_ptr_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL);
+ bucket_dst->level = bucket_src->level;
+ continue;
+ }
+
+ if (key_equal(bucket_src->content->key, bucket_src->content->key_len, bucket_dst->content->key, bucket_dst->content->key_len)) {
+ if (bucket_src->level > bucket_dst->level) {
+ bucket_dst->level = bucket_src->level;
+ }
+ } else {
+ if (bucket_src->level > bucket_dst->level) {
+ smart_ptr_table_release(dst->table, bucket_dst->content->key, bucket_dst->content->key_len);
+ bucket_dst->content = smart_ptr_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL);
+ bucket_dst->level = bucket_src->level;
+ }
+ }
+ }
+
+ const struct spread_sketch_scheme *scheme = &dst->table->scheme;
+
+ struct smart_ptr *content_dest, *content_src, *tmp;
+ HASH_ITER(hh, dst->table->entry, content_dest, tmp) {
+ HASH_FIND(hh, src->table->entry, content_dest->key, content_dest->key_len, content_src);
+ if (content_src == NULL) {
+ continue;
+ }
+
+ if (content_dest->exdata == NULL) {
+ content_dest->exdata = scheme->copy_fn(content_src->exdata);
+ } else {
+ scheme->merge_fn(content_dest->exdata, content_src->exdata);
+ }
+ }
+}
+
+void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *key, size_t key_len) {
+ struct smart_ptr *content;
+ HASH_FIND(hh, ss->table->entry, key, key_len, content);
+ if (content == NULL) {
+ return NULL;
+ }
+ return content->exdata;
+}
+
+void spread_sketch_reset(struct spread_sketch *ss) {
+ for (int i = 0; i < ss->depth * ss->width; i++) {
+ struct bucket *bucket = &ss->buckets[i];
+ bucket->level = 0;
+ }
+
+ struct smart_ptr *content, *tmp;
+ HASH_ITER(hh, ss->table->entry, content, tmp) {
+ ss->scheme.reset_fn(content->exdata);
+ }
+}
+
+void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn) {
+ ss->scheme.new_fn = new_fn;
+ ss->scheme.free_fn = free_fn;
+ ss->scheme.merge_fn = merge_fn;
+ ss->scheme.reset_fn = reset_fn;
+ ss->scheme.copy_fn = copy_fn;
+
+ ss->table->scheme = ss->scheme;
+}
+
+int spread_sketch_get_count(const struct spread_sketch *ss) {
+ return HASH_CNT(hh, ss->table->entry);
+}
+
+// 这个函数还是会忠实的把内部所有内容都拿出来,但是预期是最多expected_query_num 个,所以在外层要做一个排序。
+// 这个排序在这里面没法做,因为没有具体的hll计数。
+size_t spread_sketch_list(const struct spread_sketch *ss, void **exdatas, size_t n_exdatas) {
+ size_t count = 0;
+ struct smart_ptr *content, *tmp;
+ HASH_ITER(hh, ss->table->entry, content, tmp) {
+ if (count >= n_exdatas) {
+ break;
+ }
+ exdatas[count] = content->exdata;
+ count++;
+ }
+ return count;
+}
+
+struct spread_sketch *spread_sketch_copy(const struct spread_sketch *src) {
+ struct spread_sketch *dst = malloc(sizeof(struct spread_sketch));
+ memcpy(dst, src, sizeof(struct spread_sketch));
+
+ dst->buckets = calloc(dst->depth * dst->width, sizeof(struct bucket));
+ dst->table = smart_ptr_table_new();
+
+ for (int i = 0; i < dst->depth * dst->width; i++) {
+ if (src->buckets[i].content == NULL) {
+ continue;
+ }
+ dst->buckets[i].content = smart_ptr_table_get(dst->table, src->buckets[i].content->key, src->buckets[i].content->key_len, NULL);
+ dst->buckets[i].level = src->buckets[i].level;
+ if (dst->buckets[i].content->exdata == NULL) {
+ dst->buckets[i].content->exdata = src->scheme.copy_fn(src->buckets[i].content->exdata);
+ }
+ }
+ return dst;
+}
diff --git a/src/tags/spread_sketch.h b/src/tags/spread_sketch.h
new file mode 100644
index 0000000..f50cae4
--- /dev/null
+++ b/src/tags/spread_sketch.h
@@ -0,0 +1,40 @@
+#pragma once
+
+#include <stddef.h>
+
+#ifdef __cplusplus
+extern "C"{
+#endif
+
+#include "exdata.h"
+
+
+struct spread_sketch;
+
+// spread sketch alway store values more than expected_query_num,expected_query_num is a hint to set spread sketch parameters properly
+struct spread_sketch *spread_sketch_new(int expected_query_num);
+
+void spread_sketch_free(struct spread_sketch *ss);
+
+void spread_sketch_reset(struct spread_sketch *ss);
+
+int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t hash_identifier, void *arg);
+void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn);
+
+void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *key, size_t key_len);
+
+// get the number of cells in the heavy keeper
+int spread_sketch_get_count(const struct spread_sketch *ss);
+
+size_t spread_sketch_list(const struct spread_sketch *ss, void **exdatas, size_t n_exdatas);
+
+void spread_sketch_merge(struct spread_sketch *dest, const struct spread_sketch *src);
+
+struct spread_sketch *spread_sketch_copy(const struct spread_sketch *src);
+
+// for test
+// void spread_sketch_one_point_query(const struct spread_sketch *ss, const char *key, size_t key_len, int *level_out, void **exdata_out);
+
+#ifdef __cplusplus
+}
+#endif \ No newline at end of file