diff options
Diffstat (limited to 'src/cells/spread_sketch.c')
| -rw-r--r-- | src/cells/spread_sketch.c | 241 |
1 files changed, 156 insertions, 85 deletions
diff --git a/src/cells/spread_sketch.c b/src/cells/spread_sketch.c index 3e9b11e..4746170 100644 --- a/src/cells/spread_sketch.c +++ b/src/cells/spread_sketch.c @@ -12,38 +12,11 @@ #include "hll_common.h" #include "exdata.h" -// todo:把primary metric 记到sketch 里,且使用特殊的st Hyperloglog -// todo: topk 也是这样 -/* -primary metric:认为primary metric 的precision 设置是无效的,在创建spread sketch 时,就已经设置了hll。 -这样做的好处:如果不这样,而是在set primary时,那么要提供接口“替换所有hll”,这使得set primary metric 接口的职责变成了:1. 将ss 中的hll 删除。2. 将cell 中的metric结构替换 3. 读取配置并产生新的方法。太复杂了。 -实际上,关键在于,set primary metric 的时候,并没有register metric,所以create cube 和set primary metric 一定是两个接口,而create cube 的时候,明明应该已经创建了hll。 -方案2: -修改set primary metric 的逻辑,一个cube上只能调用一次,且必须调用一次。(选用这个) ------------------------------------------------------------------------------------------------ -关于添加hll 的过程: -当是primary metric 的时候,不操作metric。而是走一个一般的spread sketch流程。query时采用新逻辑。 +// TODO: 适配swarm kv 时的问题: +// 需要返回一个double,正常spread sketch 一定返回值(key 不在也更新hll),所以这里的查询怎么样比较好? -在swarm kv 里,spread sketch 需要支持serialize 操作。也就是最好提供一个serialize方法。但是这个东西不用怎么测,实际上不会出现。 -*/ - -/* -把primary metric 加到 spread sketch 的问题。确认一些实现上不舒服的地方 - -对于comprehensive 和 topk 的cube,创建cube 时,cube 获得了创建primary metric 的全部参数,但是spread sketch内部的precision 是在fieldstat_cube_set_primary_metric 时确定的。准备提供一个spread_sketch_change_configuration() 接口,在fieldstat_cube_set_primary_metric 时,比较默认的precision与primary metric 的precision的值,如果不同,调用此接口,删除所有的Hyperloglog 并重新创建。 - -fieldstat输出时,产生的Hyperloglog结果是一个序列化的base64字符串。需要提供一个spread_sketch_query_as_base64接口,用于输出spread sketch 内部的Hyperloglog结构。因为spread sketch 内的sketch每行都创建了一个Hyperloglog,所以primary metric 的base64查询,可能对应最多 r 个不同的查询结果,我会先遍历调用hll_count,查询计数最小的行数,再把对应最小计数的Hyperloglog 序列化。 - -swarmkv 内需要spread_sketch_serialize接口,因为两遍的spread sketch必须一样,所以多增加一个exdata_serialize 回调。deserialize 时,需要额外传入一组回调函数,用来对spread sketch初始化,也可以不传入(分为spread_skech_deserialze和spread_skech_deserialze_with_exdata 两个函数),认为不存在exdata,所有新的exdata 是NULL。 -*/ -/* -1. 修改set primary metric 为 set sampling -2. 增加一个list key 和一堆根据key 查base64之类的接口 -3. serialize 不用实现带exdata的情况 -*/ - struct entry { int ref_count; void *exdata; @@ -89,8 +62,7 @@ struct spread_sketch { struct bucket *buckets; struct entry_table *table; - int level_cnt[65]; // 64: the level range is [1,65] // count every level number。 - uint32_t level_min; // the minimum level in the sketch + int level0_cnt; // used to filter out dummy adding }; static void *default_new_fn(void *arg) { @@ -109,11 +81,19 @@ static void *default_copy_fn(void *exdata) { return exdata; } +struct spread_sketch_scheme DEFAULT_SCHEME = { + .new_fn = default_new_fn, + .free_fn = default_free_fn, + .merge_fn = default_merge_fn, + .reset_fn = default_reset_fn, + .copy_fn = default_copy_fn +}; + uint32_t cal_true_level(const struct spread_sketch *ss, int bucket_idx, long long t) { /* return f(t), the actual level of bucket, which satisfy: 1. d 2^f(t)/dt is constants - 2. f(t0 + T) = 1 + 2. f(t0 + 2T) = 0 3. f((t0) = L ) */ struct bucket *bucket = &ss->buckets[bucket_idx]; @@ -126,15 +106,15 @@ uint32_t cal_true_level(const struct spread_sketch *ss, int bucket_idx, long lon long long t0 = bucket->last_update_ms; assert(t >= t0); - if (t - t0 >= T) { - return 1; + if (t - t0 >= 2 * T) { + return 0; } if (L <= 1) { - return L; + return t - t0 >= T ? 0 : L; } long long tmp_exp = 1 << L; - double a = ((double)(2 - tmp_exp)) / ((double)T); + double a = ((double)(1 - tmp_exp)) / (2.0 * T); double b = (double)(tmp_exp); return (uint32_t)(log2(a * ((double)(t-t0)) + b) + 0.5); @@ -153,6 +133,15 @@ static inline char *key_dup(const char *key, size_t key_len) { return ret; } +struct entry *smart_ptr_table_find(struct entry_table *table, const char *key, size_t key_len) { + struct entry *ret; + HASH_FIND(hh, table->entry, key, key_len, ret); + if (ret == NULL || ret->dying) { + return NULL; + } + return ret; +} + struct entry *smart_ptr_table_get(struct entry_table *table, const char *key, size_t key_len, void *arg) { struct entry *ret; HASH_FIND(hh, table->entry, key, key_len, ret); @@ -211,11 +200,7 @@ struct entry_table *smart_ptr_table_new() { struct entry_table *table = malloc(sizeof(struct entry_table)); table->entry = NULL; - table->scheme.new_fn = default_new_fn; - table->scheme.free_fn = default_free_fn; - table->scheme.merge_fn = default_merge_fn; - table->scheme.reset_fn = default_reset_fn; - table->scheme.copy_fn = default_copy_fn; + table->scheme = DEFAULT_SCHEME; return table; } @@ -232,11 +217,7 @@ struct spread_sketch *spread_sketch_new(int depth, int width, unsigned char prec pthis->reset_time = now; pthis->buckets = calloc(depth * width, sizeof(struct bucket)); - pthis->scheme.new_fn = default_new_fn; - pthis->scheme.free_fn = default_free_fn; - pthis->scheme.merge_fn = default_merge_fn; - pthis->scheme.reset_fn = default_reset_fn; - pthis->scheme.copy_fn = default_copy_fn; + pthis->scheme = DEFAULT_SCHEME; pthis->table = smart_ptr_table_new(); pthis->table->scheme = pthis->scheme; @@ -244,9 +225,7 @@ struct spread_sketch *spread_sketch_new(int depth, int width, unsigned char prec pthis->buckets[i].sthll_register = hll_create_register(precision); } - pthis->level_min = 0; - memset(pthis->level_cnt, 0, sizeof(pthis->level_cnt)); - pthis->level_cnt[0] = pthis->depth * pthis->width; + pthis->level0_cnt = pthis->depth * pthis->width; return pthis; } @@ -265,34 +244,18 @@ void move_registers_forward(struct spread_sketch *ss, const struct timeval *now) } } -void min_level_state_update(struct spread_sketch *ss, uint32_t level_old, uint32_t level_new) { - ss->level_cnt[level_old]--; - ss->level_cnt[level_new]++; - while (ss->level_cnt[ss->level_min] == 0) { - ss->level_min++; - } -} - // return 0 if not added, return 1 if added int spread_sketch_add_hash(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg, struct timeval now) { uint32_t level = (uint32_t)__builtin_clzll(item_hash) + 1; long long now_ms = now.tv_sec * 1000 + now.tv_usec / 1000; - - if (level <= ss->level_min) { - if (item_hash == DUMMY_ITEM_HASH) { // TODO: 这个地方的逻辑对不上,其实返回1应该是“在sketch中”就返回1,为了get exdata 的问题,增加了一种特殊情况。 - // 正常来说,应该是在任何时候,都调用add_sketch,不区分primary or not,并走同样的逻辑,在不包含于sketch时不get。 - // todo: 不对,如果hll 在内部,那么hll 的更新总是要完成,这里只是区别了要更新hll 和不更新hll 的情况 - return 0; // DUMMY_ITEM_HASH is the case of "recording a key" instead of update hll. Just return 0 to inform the key is not added to the sketch - } - struct entry *content; - HASH_FIND(hh, ss->table->entry, key, key_length, content); - if (content != NULL && !content->dying) { - return 1; - } else { + if (item_hash == DUMMY_ITEM_HASH) { + if (ss->level0_cnt == 0) { return 0; } - } + assert(ss->level0_cnt>0); + } + // https://www.eecs.harvard.edu/~michaelm/postscripts/tr-02-05.pdf // A technique from the hashing literature is to use two hash functions h1(x) and h2(x) to simulate additional hash functions of the form gi(x) = h1(x) + ih2(x) // Assuming that the 128-bit xxhash function is perfect, we can view it as a combination of two 64-bit hash functions. @@ -312,7 +275,9 @@ int spread_sketch_add_hash(struct spread_sketch *ss, const char *key, size_t key bucket->last_update_ms = now_ms; if (bucket->level < level) { - min_level_state_update(ss, bucket->level, level); + if (bucket->level == 0) { + ss->level0_cnt--; + } bucket->level = level; } in_sketch = true; @@ -320,7 +285,10 @@ int spread_sketch_add_hash(struct spread_sketch *ss, const char *key, size_t key uint32_t true_level = bucket->content == NULL ? 0: cal_true_level(ss, bucket_idx, now_ms); if (level > true_level) { - min_level_state_update(ss, true_level, level); + if (bucket->level == 0) { + ss->level0_cnt--; + } + // printf("update key %s to %s, and level %u to %u, in bucket (r,w)=(%d,%u)\n", bucket->content == NULL ? "NULL": bucket->content->key, key, true_level, level, i, hash_x % ss->width); const struct entry *content_old = bucket->content; if (content_old != NULL) { @@ -408,20 +376,21 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch * } hll_merge(bucket_dst->sthll_register, src->buckets[i].sthll_register, dst->precision); + if (bucket_dst->level == 0 && bucket_src->level > 0) { + dst->level0_cnt--; + } if (bucket_dst->content == NULL) { bucket_dst->content = smart_ptr_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL); bucket_dst->level = bucket_src->level; bucket_dst->last_update_ms = bucket_src->last_update_ms; - min_level_state_update(dst, 0, bucket_src->level); continue; } bucket_dst->content->dying = false; if (key_equal(bucket_src->content->key, bucket_src->content->key_len, bucket_dst->content->key, bucket_dst->content->key_len)) { if (bucket_src->level > bucket_dst->level) { - min_level_state_update(dst, bucket_dst->level, bucket_src->level); bucket_dst->level = bucket_src->level; } if (bucket_src->last_update_ms > bucket_dst->last_update_ms) { @@ -432,7 +401,6 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch * uint32_t true_level_dst = cal_true_level(dst, i, bucket_dst->last_update_ms); if (true_level_src > true_level_dst) { - min_level_state_update(dst, bucket_dst->level, bucket_src->level); smart_ptr_table_release(dst->table, bucket_dst->content->key, bucket_dst->content->key_len); bucket_dst->content = smart_ptr_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL); bucket_dst->last_update_ms = bucket_src->last_update_ms; @@ -459,9 +427,8 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch * } void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *key, size_t key_len) { - struct entry *content; - HASH_FIND(hh, ss->table->entry, key, key_len, content); - if (content == NULL || content->dying) { + const struct entry *content = smart_ptr_table_find(ss->table, key, key_len); + if (content == NULL) { return NULL; } return content->exdata; @@ -480,9 +447,7 @@ void spread_sketch_reset(struct spread_sketch *ss) { content->dying = true; } - memset(ss->level_cnt, 0, sizeof(ss->level_cnt)); - ss->level_cnt[0] = ss->depth * ss->width; - ss->level_min = 0; + ss->level0_cnt = ss->depth * ss->width; } void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn) { @@ -540,10 +505,12 @@ void spread_sketch_list_keys(const struct spread_sketch *ss, char ***keys, size_ (*key_lens)[count] = content->key_len; count++; } + + *n_keys = count; } double spread_sketch_get_cardinality(const struct spread_sketch *ss, const char *key, size_t key_len) { - if (spread_sketch_get0_exdata(ss, key, key_len) == NULL) { + if (smart_ptr_table_find(ss->table, key, key_len) == NULL) { return -1; } @@ -577,7 +544,7 @@ char *spread_sketch_get_hll_base64_serialization(const struct spread_sketch *ss, struct spread_sketch *spread_sketch_copy(const struct spread_sketch *src) { struct spread_sketch *dst = malloc(sizeof(struct spread_sketch)); - memcpy(dst, src, sizeof(struct spread_sketch)); // copy the depth, width, level_cnt, level_min + memcpy(dst, src, sizeof(struct spread_sketch)); dst->buckets = calloc(dst->depth * dst->width, sizeof(struct bucket)); dst->table = smart_ptr_table_new(); @@ -634,6 +601,110 @@ void spread_sketch_change_precision(struct spread_sketch *ss, unsigned char prec ss->precision = precision; } +void spread_sketch_serialize(const struct spread_sketch *ss, char **blob, size_t *blob_sz) +{ + /* + format: + struct spread_sketch(including useless pointers) + struct bucket * depth * width + int64_t * depth * width for keylen + key(close-knit, key_len, key, key_len, key, ..., in order of bucket index) + */ + // get serialize size + + size_t sz = 0; + sz += sizeof(struct spread_sketch); + sz += ss->depth * ss->width * (sizeof(struct bucket) + hll_size(ss->precision) + sizeof(int64_t)); + for (int i = 0; i < ss->depth * ss->width; i++) { + if (ss->buckets[i].content != NULL) { + sz += ss->buckets[i].content->key_len; + } + } + + char *buffer = malloc(sz); + *blob = buffer; + *blob_sz = sz; + + memcpy(buffer, ss, sizeof(struct spread_sketch)); + buffer += sizeof(struct spread_sketch); + for (int i = 0; i < ss->depth * ss->width; i++) { + const struct bucket *bucket = &ss->buckets[i]; + memcpy(buffer, bucket, sizeof(struct bucket)); + buffer += sizeof(struct bucket); + memcpy(buffer, bucket->sthll_register, hll_size(ss->precision)); + buffer += hll_size(ss->precision); + } + for (int i = 0; i < ss->depth * ss->width; i++) { + int64_t key_len; + if (ss->buckets[i].content != NULL) { + key_len = ss->buckets[i].content->key_len; + } else { + key_len = 0; + } + memcpy(buffer, &key_len, sizeof(int64_t)); + buffer += sizeof(int64_t); + + if (key_len > 0) { + memcpy(buffer, ss->buckets[i].content->key, key_len); + buffer += key_len; + } + } +} + +struct spread_sketch *spread_sketch_deserialize(const char *blob, size_t blob_sz) +{ + struct spread_sketch *ss = malloc(sizeof(struct spread_sketch)); + memcpy(ss, blob, sizeof(struct spread_sketch)); + blob += sizeof(struct spread_sketch); + ss->table = smart_ptr_table_new(); + ss->buckets = calloc(ss->depth * ss->width, sizeof(struct bucket)); + ss->scheme = DEFAULT_SCHEME; + + for (int i = 0; i < ss->depth * ss->width; i++) { + struct bucket *bucket = &ss->buckets[i]; + memcpy(bucket, blob, sizeof(struct bucket)); + blob += sizeof(struct bucket); + + bucket->sthll_register = hll_create_register(ss->precision); + memcpy(bucket->sthll_register, blob, hll_size(ss->precision)); + blob += hll_size(ss->precision); + } + + for (int i = 0; i < ss->depth * ss->width; i++) { + int64_t key_len; + memcpy(&key_len, blob, sizeof(int64_t)); + blob += sizeof(int64_t); + const char *key = blob; + blob += key_len; + + struct entry *content = smart_ptr_table_get(ss->table, key, key_len, NULL); + ss->buckets[i].content = content; + } + + return ss; +} + +void spread_sketch_merge_blob(struct spread_sketch *dst, const char *blob, size_t blob_sz) +{ + struct spread_sketch *src = spread_sketch_deserialize(blob, blob_sz); + spread_sketch_merge(dst, src); + spread_sketch_free(src); +} + +size_t spread_sketch_calculate_memory_usage(const struct spread_sketch *ss) +{ + size_t ret = 0; + ret += sizeof(struct spread_sketch); + + size_t bucket_size = sizeof(struct bucket) + hll_size(ss->precision); + printf("every bucket size: %zu\n", bucket_size); + ret += ss->depth * ss->width * bucket_size; -// todo: serialize(swarm kv) -// todo: swarm kv 上实现reset 和copy。
\ No newline at end of file + printf("the number of content: %u\n", HASH_COUNT(ss->table->entry)); + struct entry *content, *tmp; + HASH_ITER(hh, ss->table->entry, content, tmp) { + ret += sizeof(struct entry); + ret += content->key_len; + // assume the exdata is NULL + } + return ret; +}
\ No newline at end of file |
