diff options
| -rw-r--r-- | include/fieldstat/fieldstat.h | 10 | ||||
| -rw-r--r-- | src/cells/spread_sketch.c | 241 | ||||
| -rw-r--r-- | src/cells/spread_sketch.h | 27 | ||||
| -rw-r--r-- | test/test_fuzz_test.cpp | 2 |
4 files changed, 170 insertions, 110 deletions
diff --git a/include/fieldstat/fieldstat.h b/include/fieldstat/fieldstat.h index 0f77ec4..9bbdb1f 100644 --- a/include/fieldstat/fieldstat.h +++ b/include/fieldstat/fieldstat.h @@ -71,16 +71,6 @@ int fieldstat_cube_create(struct fieldstat *instance, const struct field *cube_d int fieldstat_cube_set_sampling(struct fieldstat *instance, int cube_id, enum sampling_mode mode, int max_n_cell, int primary_metric_id); /* - @brief Change the topk cube primary metric id. When fieldstat_counter_add or fieldstat_counter_set are called on the primary metric, the topk record of such cell will be updated. - the default primary metric id is 0. - @return FS_OK, FS_ERR_NULL_HANDLER or FS_ERR_INVALID_CUBE_ID. - FS_ERR_INVALID_METRIC_ID when the metric is not registered to instance. - FS_ERR_INVALID_PARAM when the cube is not a topk sampling cube, or the metric is not a counter. - -*/ -int fieldstat_cube_set_primary_metric(struct fieldstat *instance, int cube_id, int metric_id); - -/* * @brief Delete the cube of cube_id. All the cells and metrics are deleted. The cube_id may be reused by other new cubes. Increase the corresponding cube_version by 1. * @return FS_OK, FS_ERR_NULL_HANDLER or FS_ERR_INVALID_CUBE_ID */ diff --git a/src/cells/spread_sketch.c b/src/cells/spread_sketch.c index 3e9b11e..4746170 100644 --- a/src/cells/spread_sketch.c +++ b/src/cells/spread_sketch.c @@ -12,38 +12,11 @@ #include "hll_common.h" #include "exdata.h" -// todo:把primary metric 记到sketch 里,且使用特殊的st Hyperloglog -// todo: topk 也是这样 -/* -primary metric:认为primary metric 的precision 设置是无效的,在创建spread sketch 时,就已经设置了hll。 -这样做的好处:如果不这样,而是在set primary时,那么要提供接口“替换所有hll”,这使得set primary metric 接口的职责变成了:1. 将ss 中的hll 删除。2. 将cell 中的metric结构替换 3. 读取配置并产生新的方法。太复杂了。 -实际上,关键在于,set primary metric 的时候,并没有register metric,所以create cube 和set primary metric 一定是两个接口,而create cube 的时候,明明应该已经创建了hll。 -方案2: -修改set primary metric 的逻辑,一个cube上只能调用一次,且必须调用一次。(选用这个) ------------------------------------------------------------------------------------------------ -关于添加hll 的过程: -当是primary metric 的时候,不操作metric。而是走一个一般的spread sketch流程。query时采用新逻辑。 +// TODO: 适配swarm kv 时的问题: +// 需要返回一个double,正常spread sketch 一定返回值(key 不在也更新hll),所以这里的查询怎么样比较好? -在swarm kv 里,spread sketch 需要支持serialize 操作。也就是最好提供一个serialize方法。但是这个东西不用怎么测,实际上不会出现。 -*/ - -/* -把primary metric 加到 spread sketch 的问题。确认一些实现上不舒服的地方 - -对于comprehensive 和 topk 的cube,创建cube 时,cube 获得了创建primary metric 的全部参数,但是spread sketch内部的precision 是在fieldstat_cube_set_primary_metric 时确定的。准备提供一个spread_sketch_change_configuration() 接口,在fieldstat_cube_set_primary_metric 时,比较默认的precision与primary metric 的precision的值,如果不同,调用此接口,删除所有的Hyperloglog 并重新创建。 - -fieldstat输出时,产生的Hyperloglog结果是一个序列化的base64字符串。需要提供一个spread_sketch_query_as_base64接口,用于输出spread sketch 内部的Hyperloglog结构。因为spread sketch 内的sketch每行都创建了一个Hyperloglog,所以primary metric 的base64查询,可能对应最多 r 个不同的查询结果,我会先遍历调用hll_count,查询计数最小的行数,再把对应最小计数的Hyperloglog 序列化。 - -swarmkv 内需要spread_sketch_serialize接口,因为两遍的spread sketch必须一样,所以多增加一个exdata_serialize 回调。deserialize 时,需要额外传入一组回调函数,用来对spread sketch初始化,也可以不传入(分为spread_skech_deserialze和spread_skech_deserialze_with_exdata 两个函数),认为不存在exdata,所有新的exdata 是NULL。 -*/ -/* -1. 修改set primary metric 为 set sampling -2. 增加一个list key 和一堆根据key 查base64之类的接口 -3. serialize 不用实现带exdata的情况 -*/ - struct entry { int ref_count; void *exdata; @@ -89,8 +62,7 @@ struct spread_sketch { struct bucket *buckets; struct entry_table *table; - int level_cnt[65]; // 64: the level range is [1,65] // count every level number。 - uint32_t level_min; // the minimum level in the sketch + int level0_cnt; // used to filter out dummy adding }; static void *default_new_fn(void *arg) { @@ -109,11 +81,19 @@ static void *default_copy_fn(void *exdata) { return exdata; } +struct spread_sketch_scheme DEFAULT_SCHEME = { + .new_fn = default_new_fn, + .free_fn = default_free_fn, + .merge_fn = default_merge_fn, + .reset_fn = default_reset_fn, + .copy_fn = default_copy_fn +}; + uint32_t cal_true_level(const struct spread_sketch *ss, int bucket_idx, long long t) { /* return f(t), the actual level of bucket, which satisfy: 1. d 2^f(t)/dt is constants - 2. f(t0 + T) = 1 + 2. f(t0 + 2T) = 0 3. f((t0) = L ) */ struct bucket *bucket = &ss->buckets[bucket_idx]; @@ -126,15 +106,15 @@ uint32_t cal_true_level(const struct spread_sketch *ss, int bucket_idx, long lon long long t0 = bucket->last_update_ms; assert(t >= t0); - if (t - t0 >= T) { - return 1; + if (t - t0 >= 2 * T) { + return 0; } if (L <= 1) { - return L; + return t - t0 >= T ? 0 : L; } long long tmp_exp = 1 << L; - double a = ((double)(2 - tmp_exp)) / ((double)T); + double a = ((double)(1 - tmp_exp)) / (2.0 * T); double b = (double)(tmp_exp); return (uint32_t)(log2(a * ((double)(t-t0)) + b) + 0.5); @@ -153,6 +133,15 @@ static inline char *key_dup(const char *key, size_t key_len) { return ret; } +struct entry *smart_ptr_table_find(struct entry_table *table, const char *key, size_t key_len) { + struct entry *ret; + HASH_FIND(hh, table->entry, key, key_len, ret); + if (ret == NULL || ret->dying) { + return NULL; + } + return ret; +} + struct entry *smart_ptr_table_get(struct entry_table *table, const char *key, size_t key_len, void *arg) { struct entry *ret; HASH_FIND(hh, table->entry, key, key_len, ret); @@ -211,11 +200,7 @@ struct entry_table *smart_ptr_table_new() { struct entry_table *table = malloc(sizeof(struct entry_table)); table->entry = NULL; - table->scheme.new_fn = default_new_fn; - table->scheme.free_fn = default_free_fn; - table->scheme.merge_fn = default_merge_fn; - table->scheme.reset_fn = default_reset_fn; - table->scheme.copy_fn = default_copy_fn; + table->scheme = DEFAULT_SCHEME; return table; } @@ -232,11 +217,7 @@ struct spread_sketch *spread_sketch_new(int depth, int width, unsigned char prec pthis->reset_time = now; pthis->buckets = calloc(depth * width, sizeof(struct bucket)); - pthis->scheme.new_fn = default_new_fn; - pthis->scheme.free_fn = default_free_fn; - pthis->scheme.merge_fn = default_merge_fn; - pthis->scheme.reset_fn = default_reset_fn; - pthis->scheme.copy_fn = default_copy_fn; + pthis->scheme = DEFAULT_SCHEME; pthis->table = smart_ptr_table_new(); pthis->table->scheme = pthis->scheme; @@ -244,9 +225,7 @@ struct spread_sketch *spread_sketch_new(int depth, int width, unsigned char prec pthis->buckets[i].sthll_register = hll_create_register(precision); } - pthis->level_min = 0; - memset(pthis->level_cnt, 0, sizeof(pthis->level_cnt)); - pthis->level_cnt[0] = pthis->depth * pthis->width; + pthis->level0_cnt = pthis->depth * pthis->width; return pthis; } @@ -265,34 +244,18 @@ void move_registers_forward(struct spread_sketch *ss, const struct timeval *now) } } -void min_level_state_update(struct spread_sketch *ss, uint32_t level_old, uint32_t level_new) { - ss->level_cnt[level_old]--; - ss->level_cnt[level_new]++; - while (ss->level_cnt[ss->level_min] == 0) { - ss->level_min++; - } -} - // return 0 if not added, return 1 if added int spread_sketch_add_hash(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg, struct timeval now) { uint32_t level = (uint32_t)__builtin_clzll(item_hash) + 1; long long now_ms = now.tv_sec * 1000 + now.tv_usec / 1000; - - if (level <= ss->level_min) { - if (item_hash == DUMMY_ITEM_HASH) { // TODO: 这个地方的逻辑对不上,其实返回1应该是“在sketch中”就返回1,为了get exdata 的问题,增加了一种特殊情况。 - // 正常来说,应该是在任何时候,都调用add_sketch,不区分primary or not,并走同样的逻辑,在不包含于sketch时不get。 - // todo: 不对,如果hll 在内部,那么hll 的更新总是要完成,这里只是区别了要更新hll 和不更新hll 的情况 - return 0; // DUMMY_ITEM_HASH is the case of "recording a key" instead of update hll. Just return 0 to inform the key is not added to the sketch - } - struct entry *content; - HASH_FIND(hh, ss->table->entry, key, key_length, content); - if (content != NULL && !content->dying) { - return 1; - } else { + if (item_hash == DUMMY_ITEM_HASH) { + if (ss->level0_cnt == 0) { return 0; } - } + assert(ss->level0_cnt>0); + } + // https://www.eecs.harvard.edu/~michaelm/postscripts/tr-02-05.pdf // A technique from the hashing literature is to use two hash functions h1(x) and h2(x) to simulate additional hash functions of the form gi(x) = h1(x) + ih2(x) // Assuming that the 128-bit xxhash function is perfect, we can view it as a combination of two 64-bit hash functions. @@ -312,7 +275,9 @@ int spread_sketch_add_hash(struct spread_sketch *ss, const char *key, size_t key bucket->last_update_ms = now_ms; if (bucket->level < level) { - min_level_state_update(ss, bucket->level, level); + if (bucket->level == 0) { + ss->level0_cnt--; + } bucket->level = level; } in_sketch = true; @@ -320,7 +285,10 @@ int spread_sketch_add_hash(struct spread_sketch *ss, const char *key, size_t key uint32_t true_level = bucket->content == NULL ? 0: cal_true_level(ss, bucket_idx, now_ms); if (level > true_level) { - min_level_state_update(ss, true_level, level); + if (bucket->level == 0) { + ss->level0_cnt--; + } + // printf("update key %s to %s, and level %u to %u, in bucket (r,w)=(%d,%u)\n", bucket->content == NULL ? "NULL": bucket->content->key, key, true_level, level, i, hash_x % ss->width); const struct entry *content_old = bucket->content; if (content_old != NULL) { @@ -408,20 +376,21 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch * } hll_merge(bucket_dst->sthll_register, src->buckets[i].sthll_register, dst->precision); + if (bucket_dst->level == 0 && bucket_src->level > 0) { + dst->level0_cnt--; + } if (bucket_dst->content == NULL) { bucket_dst->content = smart_ptr_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL); bucket_dst->level = bucket_src->level; bucket_dst->last_update_ms = bucket_src->last_update_ms; - min_level_state_update(dst, 0, bucket_src->level); continue; } bucket_dst->content->dying = false; if (key_equal(bucket_src->content->key, bucket_src->content->key_len, bucket_dst->content->key, bucket_dst->content->key_len)) { if (bucket_src->level > bucket_dst->level) { - min_level_state_update(dst, bucket_dst->level, bucket_src->level); bucket_dst->level = bucket_src->level; } if (bucket_src->last_update_ms > bucket_dst->last_update_ms) { @@ -432,7 +401,6 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch * uint32_t true_level_dst = cal_true_level(dst, i, bucket_dst->last_update_ms); if (true_level_src > true_level_dst) { - min_level_state_update(dst, bucket_dst->level, bucket_src->level); smart_ptr_table_release(dst->table, bucket_dst->content->key, bucket_dst->content->key_len); bucket_dst->content = smart_ptr_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL); bucket_dst->last_update_ms = bucket_src->last_update_ms; @@ -459,9 +427,8 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch * } void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *key, size_t key_len) { - struct entry *content; - HASH_FIND(hh, ss->table->entry, key, key_len, content); - if (content == NULL || content->dying) { + const struct entry *content = smart_ptr_table_find(ss->table, key, key_len); + if (content == NULL) { return NULL; } return content->exdata; @@ -480,9 +447,7 @@ void spread_sketch_reset(struct spread_sketch *ss) { content->dying = true; } - memset(ss->level_cnt, 0, sizeof(ss->level_cnt)); - ss->level_cnt[0] = ss->depth * ss->width; - ss->level_min = 0; + ss->level0_cnt = ss->depth * ss->width; } void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn) { @@ -540,10 +505,12 @@ void spread_sketch_list_keys(const struct spread_sketch *ss, char ***keys, size_ (*key_lens)[count] = content->key_len; count++; } + + *n_keys = count; } double spread_sketch_get_cardinality(const struct spread_sketch *ss, const char *key, size_t key_len) { - if (spread_sketch_get0_exdata(ss, key, key_len) == NULL) { + if (smart_ptr_table_find(ss->table, key, key_len) == NULL) { return -1; } @@ -577,7 +544,7 @@ char *spread_sketch_get_hll_base64_serialization(const struct spread_sketch *ss, struct spread_sketch *spread_sketch_copy(const struct spread_sketch *src) { struct spread_sketch *dst = malloc(sizeof(struct spread_sketch)); - memcpy(dst, src, sizeof(struct spread_sketch)); // copy the depth, width, level_cnt, level_min + memcpy(dst, src, sizeof(struct spread_sketch)); dst->buckets = calloc(dst->depth * dst->width, sizeof(struct bucket)); dst->table = smart_ptr_table_new(); @@ -634,6 +601,110 @@ void spread_sketch_change_precision(struct spread_sketch *ss, unsigned char prec ss->precision = precision; } +void spread_sketch_serialize(const struct spread_sketch *ss, char **blob, size_t *blob_sz) +{ + /* + format: + struct spread_sketch(including useless pointers) + struct bucket * depth * width + int64_t * depth * width for keylen + key(close-knit, key_len, key, key_len, key, ..., in order of bucket index) + */ + // get serialize size + + size_t sz = 0; + sz += sizeof(struct spread_sketch); + sz += ss->depth * ss->width * (sizeof(struct bucket) + hll_size(ss->precision) + sizeof(int64_t)); + for (int i = 0; i < ss->depth * ss->width; i++) { + if (ss->buckets[i].content != NULL) { + sz += ss->buckets[i].content->key_len; + } + } + + char *buffer = malloc(sz); + *blob = buffer; + *blob_sz = sz; + + memcpy(buffer, ss, sizeof(struct spread_sketch)); + buffer += sizeof(struct spread_sketch); + for (int i = 0; i < ss->depth * ss->width; i++) { + const struct bucket *bucket = &ss->buckets[i]; + memcpy(buffer, bucket, sizeof(struct bucket)); + buffer += sizeof(struct bucket); + memcpy(buffer, bucket->sthll_register, hll_size(ss->precision)); + buffer += hll_size(ss->precision); + } + for (int i = 0; i < ss->depth * ss->width; i++) { + int64_t key_len; + if (ss->buckets[i].content != NULL) { + key_len = ss->buckets[i].content->key_len; + } else { + key_len = 0; + } + memcpy(buffer, &key_len, sizeof(int64_t)); + buffer += sizeof(int64_t); + + if (key_len > 0) { + memcpy(buffer, ss->buckets[i].content->key, key_len); + buffer += key_len; + } + } +} + +struct spread_sketch *spread_sketch_deserialize(const char *blob, size_t blob_sz) +{ + struct spread_sketch *ss = malloc(sizeof(struct spread_sketch)); + memcpy(ss, blob, sizeof(struct spread_sketch)); + blob += sizeof(struct spread_sketch); + ss->table = smart_ptr_table_new(); + ss->buckets = calloc(ss->depth * ss->width, sizeof(struct bucket)); + ss->scheme = DEFAULT_SCHEME; + + for (int i = 0; i < ss->depth * ss->width; i++) { + struct bucket *bucket = &ss->buckets[i]; + memcpy(bucket, blob, sizeof(struct bucket)); + blob += sizeof(struct bucket); + + bucket->sthll_register = hll_create_register(ss->precision); + memcpy(bucket->sthll_register, blob, hll_size(ss->precision)); + blob += hll_size(ss->precision); + } + + for (int i = 0; i < ss->depth * ss->width; i++) { + int64_t key_len; + memcpy(&key_len, blob, sizeof(int64_t)); + blob += sizeof(int64_t); + const char *key = blob; + blob += key_len; + + struct entry *content = smart_ptr_table_get(ss->table, key, key_len, NULL); + ss->buckets[i].content = content; + } + + return ss; +} + +void spread_sketch_merge_blob(struct spread_sketch *dst, const char *blob, size_t blob_sz) +{ + struct spread_sketch *src = spread_sketch_deserialize(blob, blob_sz); + spread_sketch_merge(dst, src); + spread_sketch_free(src); +} + +size_t spread_sketch_calculate_memory_usage(const struct spread_sketch *ss) +{ + size_t ret = 0; + ret += sizeof(struct spread_sketch); + + size_t bucket_size = sizeof(struct bucket) + hll_size(ss->precision); + printf("every bucket size: %zu\n", bucket_size); + ret += ss->depth * ss->width * bucket_size; -// todo: serialize(swarm kv) -// todo: swarm kv 上实现reset 和copy。
\ No newline at end of file + printf("the number of content: %u\n", HASH_COUNT(ss->table->entry)); + struct entry *content, *tmp; + HASH_ITER(hh, ss->table->entry, content, tmp) { + ret += sizeof(struct entry); + ret += content->key_len; + // assume the exdata is NULL + } + return ret; +}
\ No newline at end of file diff --git a/src/cells/spread_sketch.h b/src/cells/spread_sketch.h index b1c7a34..ec275ac 100644 --- a/src/cells/spread_sketch.h +++ b/src/cells/spread_sketch.h @@ -15,38 +15,37 @@ extern "C"{ struct spread_sketch; +// set time_window_ms to zero, the spread sketch will not slide and decay, in which case, `now` can be any value struct spread_sketch *spread_sketch_new(int depth, int width, unsigned char precision, int time_window_ms, struct timeval now); - void spread_sketch_free(struct spread_sketch *ss); - -void spread_sketch_reset(struct spread_sketch *ss); +void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn); int spread_sketch_add_hash(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg, struct timeval now); int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, const char* item, size_t item_len, void *arg, struct timeval now); -void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn); - -// get the number of cells in the heavy keeper +// get the number of keys stored in spread sketch int spread_sketch_get_count(const struct spread_sketch *ss); - +// list all the keys in spread sketch. User should free the arrays, but do not free the elements of strings in the array(because they are references to the internal data structure) +// Example: char **key; size_t *key_len; size_t n_keys; spread_sketch_list_keys(&key, &key_len, &n_keys); free(key); free(key_len); void spread_sketch_list_keys(const struct spread_sketch *ss, char ***keys, size_t **key_lens, size_t *n_keys); +// query the cardinality(or fanout) of a key in spread sketch. +// Even thought spread sketch algorithm does not requires keys to exist innately, when querying a key that is not present in the spread sketch, `spread_sketch_get_cardinality` will return -1. double spread_sketch_get_cardinality(const struct spread_sketch *ss, const char *key, size_t key_len); +// query a hyperloglog 's base64 serialization. The serialization format is [1,precision,register...] and then encoded by base64 char *spread_sketch_get_hll_base64_serialization(const struct spread_sketch *ss, const char *key, size_t key_len); void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *key, size_t key_len); void spread_sketch_merge(struct spread_sketch *dest, const struct spread_sketch *src); - struct spread_sketch *spread_sketch_copy(const struct spread_sketch *src); +void spread_sketch_serialize(const struct spread_sketch *ss, char **blob, size_t *blob_sz); +struct spread_sketch *spread_sketch_deserialize(const char *blob, size_t blob_sz); +void spread_sketch_merge_blob(struct spread_sketch *dst, const char *blob, size_t blob_sz); +void spread_sketch_reset(struct spread_sketch *ss); void spread_sketch_get_parameter(const struct spread_sketch *ss, int *depth_out, int *width_out, unsigned char *precision_out, int *time_window_ms_out); -/* // spread sketch alway store values more than expected_query_num,expected_query_num is a hint to set spread sketch parameters properly -*/ void spread_sketch_get_parameter_recommendation(int expected_super_spreader_number, int *depth_out, int *width_out, unsigned char *precision_out); - - -// for test -// void spread_sketch_one_point_query(const struct spread_sketch *ss, const char *key, size_t key_len, int *level_out, void **exdata_out); +size_t spread_sketch_calculate_memory_usage(const struct spread_sketch *ss); #ifdef __cplusplus } diff --git a/test/test_fuzz_test.cpp b/test/test_fuzz_test.cpp index aa3487a..0fa8241 100644 --- a/test/test_fuzz_test.cpp +++ b/test/test_fuzz_test.cpp @@ -475,7 +475,7 @@ TEST(Fuzz_test, many_instance_random_flow_unregister_calibrate_reset_fork_merge_ est_total += value_est; true_total += value_true; } - EXPECT_LE(abs(est_total - true_total) / true_total, 0.25); + EXPECT_LE(abs(est_total - true_total) / true_total, 0.2); // printf("spreadsketch Mean ratio e: %f\n", abs(est_total - true_total) / true_total); for (size_t j = 0; j < cell_num; j++) { |
