diff options
| author | chenzizhan <[email protected]> | 2024-07-17 10:35:24 +0800 |
|---|---|---|
| committer | chenzizhan <[email protected]> | 2024-07-17 10:35:24 +0800 |
| commit | 6595cbbde1280b6c7d3c445697e39aa18fa9741f (patch) | |
| tree | 0fe9af32b13926b6aa8682337b500012ddecafca /src/cells | |
| parent | c488da1f8346baf8d5a0260da9c5934c8dfdfbef (diff) | |
primary metric in spreadsketch/heavykeeper
Diffstat (limited to 'src/cells')
| -rw-r--r-- | src/cells/hll_common.c | 35 | ||||
| -rw-r--r-- | src/cells/hll_common.h | 1 | ||||
| -rw-r--r-- | src/cells/spread_sketch.c | 100 | ||||
| -rw-r--r-- | src/cells/spread_sketch.h | 25 |
4 files changed, 118 insertions, 43 deletions
diff --git a/src/cells/hll_common.c b/src/cells/hll_common.c index 7017242..47dc113 100644 --- a/src/cells/hll_common.c +++ b/src/cells/hll_common.c @@ -2,6 +2,8 @@ #include "hll_common.h" #include "crdt_utils.h" +#include "base64/b64.h" + #include <string.h> #include <math.h> #include <assert.h> @@ -298,7 +300,7 @@ void hll_advance_reset_index(long long *reset_idx, long long reset_reg_count, un long long hll_get_reset_register_count(unsigned char precision, long long time_window_ms, struct timeval now, struct timeval *reset_time) { long long reset_time_slot_us=RESET_TIME_SLOT_US(time_window_ms, precision); - long long delta_us=timeval_delta_us(*reset_time, now); + long long delta_us=timeval_delta_us((*reset_time), now); if(delta_us < reset_time_slot_us) return 0; @@ -349,3 +351,34 @@ int hll_should_slide(unsigned char precision, long long time_window_ms, struct t return 1; return 0; } + +void hll_encode_into_base64(const uint32_t *registers, unsigned char precision, char **blob, size_t *blob_sz) +{ + size_t sz=0, offset=0; + size_t num_reg = NUM_REG(precision); + size_t words = INT_CEIL(num_reg, REG_PER_WORD); + + sz = hll_size(precision) + 2; // [version][precision][data..., + 2 for version and precision + unsigned char *buffer = ALLOC(unsigned char, sz); + + const unsigned char version = 1; + memcpy(buffer+offset, &version, 1); + offset += 1; + + memcpy(buffer+offset, &precision, 1); + offset += 1; + + for (int i = 0; i < words; i++) { + uint32_t word = registers[i]; + buffer[offset++] = word >> 24; + buffer[offset++] = word >> 16; + buffer[offset++] = word >> 8; + buffer[offset++] = word; + } + + char *enc = b64_encode(buffer, sz); + free(buffer); + *blob_sz = strlen(enc); + *blob = enc; + return; +}
\ No newline at end of file diff --git a/src/cells/hll_common.h b/src/cells/hll_common.h index a110153..b62530a 100644 --- a/src/cells/hll_common.h +++ b/src/cells/hll_common.h @@ -21,6 +21,7 @@ void hll_reset_registers(uint32_t *registers, unsigned char precision, long long long long hll_get_reset_register_count(unsigned char precision, long long time_window_ms, struct timeval now, struct timeval *reset_time); void hll_advance_reset_index(long long *reset_idx, long long reset_reg_count, unsigned char precision); void hll_merge(uint32_t *dst_registers, const uint32_t *src_registers, unsigned char precision); +void hll_encode_into_base64(const uint32_t *registers, unsigned char precision, char **blob, size_t *blob_sz); #ifdef __cplusplus } diff --git a/src/cells/spread_sketch.c b/src/cells/spread_sketch.c index 96436e3..fd1c06d 100644 --- a/src/cells/spread_sketch.c +++ b/src/cells/spread_sketch.c @@ -109,7 +109,6 @@ static void *default_copy_fn(void *exdata) { return exdata; } - uint32_t cal_true_level(const struct spread_sketch *ss, int bucket_idx, long long t) { /* return f(t), the actual level of bucket, which satisfy: @@ -117,7 +116,11 @@ uint32_t cal_true_level(const struct spread_sketch *ss, int bucket_idx, long lon 2. f(t0 + T) = 1 3. f((t0) = L ) */ - struct bucket *bucket = &ss->buckets[bucket_idx]; + struct bucket *bucket = &ss->buckets[bucket_idx]; + if (ss->time_window_ms == 0) { + return bucket->level; + } + unsigned L = bucket->level; long long T = ss->time_window_ms; long long t0 = bucket->last_update_ms; @@ -248,7 +251,11 @@ struct spread_sketch *spread_sketch_new(int depth, int width, unsigned char prec return pthis; } -void move_registers_forward(struct spread_sketch *ss, struct timeval *now) { +void move_registers_forward(struct spread_sketch *ss, const struct timeval *now) { + if (ss->time_window_ms == 0) { + return; + } + long long reset_reg_count = hll_get_reset_register_count(ss->precision, ss->time_window_ms, *now, &ss->reset_time); if (reset_reg_count > 0) { for (int i = 0; i < ss->depth * ss->width; i++) { @@ -267,12 +274,14 @@ void min_level_state_update(struct spread_sketch *ss, uint32_t level_old, uint32 } // return 0 if not added, return 1 if added -int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg, , struct timeval now) { +int spread_sketch_add_hash(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg, struct timeval now) { uint32_t level = (uint32_t)__builtin_clzll(item_hash) + 1; long long now_ms = now.tv_sec * 1000 + now.tv_usec / 1000; if (level <= ss->level_min) { - if (item_hash == DUMMY_ITEM_HASH) { + if (item_hash == DUMMY_ITEM_HASH) { // TODO: 这个地方的逻辑对不上,其实返回1应该是“在sketch中”就返回1,为了get exdata 的问题,增加了一种特殊情况。 + // 正常来说,应该是在任何时候,都调用add_sketch,不区分primary or not,并走同样的逻辑,在不包含于sketch时不get。 + // todo: 不对,如果hll 在内部,那么hll 的更新总是要完成,这里只是区别了要更新hll 和不更新hll 的情况 return 0; // DUMMY_ITEM_HASH is the case of "recording a key" instead of update hll. Just return 0 to inform the key is not added to the sketch } @@ -311,7 +320,7 @@ int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_leng uint32_t true_level = bucket->content == NULL ? 0: cal_true_level(ss, bucket_idx, now_ms); if (level > true_level) { - min_level_state_update(ss, old_level, level); + min_level_state_update(ss, true_level, level); // printf("update key %s to %s, and level %u to %u, in bucket (r,w)=(%d,%u)\n", bucket->content == NULL ? "NULL": bucket->content->key, key, true_level, level, i, hash_x % ss->width); const struct entry *content_old = bucket->content; if (content_old != NULL) { @@ -332,6 +341,11 @@ int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_leng return in_sketch ? 1 : 0; } +int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, const char* item, size_t item_len, void *arg, struct timeval now) { + uint64_t hash = XXH3_64bits(item, item_len); + return spread_sketch_add_hash(ss, key, key_length, hash, arg, now); +} + double spread_sketch_point_query(const struct spread_sketch *ss, const char *key, size_t key_length) { uint64_t hash_x_tmp = XXH3_64bits_withSeed(key, key_length, 171); uint32_t hash_x1 = (uint32_t) (hash_x_tmp >> 32); @@ -350,7 +364,7 @@ double spread_sketch_point_query(const struct spread_sketch *ss, const char *key return count_min; } -struct spread_sketch *duplicate_and_step(const struct spread_sketch *ss, struct timeval *now) { +struct spread_sketch *duplicate_and_step(const struct spread_sketch *ss, const struct timeval *now) { struct spread_sketch *duplicate = malloc(sizeof(struct spread_sketch)); duplicate->depth = ss->depth; duplicate->width = ss->width; @@ -367,17 +381,6 @@ struct spread_sketch *duplicate_and_step(const struct spread_sketch *ss, struct return duplicate; } -double spread_sketch_query(const struct spread_sketch *ss, const char *key, size_t key_length, struct timeval now) { - if (hll_should_slide(ss->precision, ss->time_window_ms, now, ss->reset_time)) { - struct spread_sketch *duplicate = duplicate_and_step(ss, &now); - double ret = spread_sketch_point_query(duplicate, key, key_length); - spread_sketch_free(duplicate); - return ret; - } else { - return spread_sketch_point_query(ss, key, key_length); - } -} - void spread_sketch_free(struct spread_sketch *ss) { smart_ptr_table_free(ss->table); for (int i = 0; i < ss->depth * ss->width; i++) { @@ -401,12 +404,13 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch * continue; } + hll_merge(bucket_dst->sthll_register, src->buckets[i].sthll_register, dst->precision); + if (bucket_dst->content == NULL) { bucket_dst->content = smart_ptr_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL); bucket_dst->level = bucket_src->level; bucket_dst->last_update_ms = bucket_src->last_update_ms; - bucket_dst->sthll_register = hll_duplicate(bucket_src->sthll_register, dst->precision); min_level_state_update(dst, 0, bucket_src->level); continue; } @@ -431,8 +435,6 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch * bucket_dst->last_update_ms = bucket_src->last_update_ms; } } - - hll_merge(bucket_dst->sthll_register, src->buckets[i].sthll_register, dst->precision); } // for exdata @@ -518,8 +520,53 @@ size_t spread_sketch_list(const struct spread_sketch *ss, void **exdatas, size_t return count; } -double spread_sketch_point_query +void spread_sketch_list_keys(const struct spread_sketch *ss, char ***keys, size_t **key_lens, size_t *n_keys) { + size_t count_max = HASH_COUNT(ss->table->entry); + size_t count = 0; + + *keys = malloc(count_max * sizeof(char *)); + *key_lens = malloc(count_max * sizeof(size_t)); + + struct entry *content, *tmp; + HASH_ITER(hh, ss->table->entry, content, tmp) { + if (content->dying) { + continue; + } + + (*keys)[count] = content->key; + (*key_lens)[count] = content->key_len; + count++; + } +} + +double spread_sketch_get_cardinality(const struct spread_sketch *ss, const char *key, size_t key_len) { + double est = spread_sketch_point_query(ss, key, key_len); + return est; +} +char *spread_sketch_get_hll_base64_serialization(const struct spread_sketch *ss, const char *key, size_t key_len) { + uint64_t hash_x_tmp = XXH3_64bits_withSeed(key, key_len, 171); + uint32_t hash_x1 = (uint32_t) (hash_x_tmp >> 32); + uint32_t hash_x2 = (uint32_t) hash_x_tmp; + uint32_t *register_ret = NULL; + double min_cardinality = (double)INT32_MAX; + + for (int i = 0; i < ss->depth; i++) { + uint32_t hash_x = hash_x1 + i * hash_x2; + int bucket_idx = (hash_x % ss->width) + i * ss->width; + + double est = hll_count(ss->buckets[bucket_idx].sthll_register, ss->precision, ss->reset_idx, ss->time_window_ms); + if (est < min_cardinality) { + min_cardinality = est; + register_ret = ss->buckets[bucket_idx].sthll_register; + } + } + + char *enc; + size_t enc_len; + hll_encode_into_base64(register_ret, ss->precision, &enc, &enc_len); + return enc; +} struct spread_sketch *spread_sketch_copy(const struct spread_sketch *src) { struct spread_sketch *dst = malloc(sizeof(struct spread_sketch)); @@ -549,10 +596,10 @@ void spread_sketch_get_parameter_recommendation(int expected_super_spreader_numb *depth_out = logk; int w; - if (max_super_spreader_number <= 100) { - w = max_super_spreader_number * 3 /2; // * 1.5, when the number is small, we need more width + if (expected_super_spreader_number <= 100) { + w = expected_super_spreader_number * 3 /2; // * 1.5, when the number is small, we need more width } else { - w = max_super_spreader_number + 50; // + 50: 100*1.5-100 = 50, make w=f(k) continuous + w = expected_super_spreader_number + 50; // + 50: 100*1.5-100 = 50, make w=f(k) continuous } if (w < 40) { w = 40; @@ -578,9 +625,6 @@ void spread_sketch_change_precision(struct spread_sketch *ss, unsigned char prec ss->precision = precision; } -void hyperloglog_serialize_for_networking(const struct hyperloglog *h, char **blob, size_t *blob_sz) { - hyperloglog_serialize_for_networking(data->hll, blob, blob_size); -} // todo: serialize(swarm kv) // todo: swarm kv 上实现reset 和copy。
\ No newline at end of file diff --git a/src/cells/spread_sketch.h b/src/cells/spread_sketch.h index baec128..b1c7a34 100644 --- a/src/cells/spread_sketch.h +++ b/src/cells/spread_sketch.h @@ -1,41 +1,38 @@ #pragma once -#include <stddef.h> + #ifdef __cplusplus extern "C"{ #endif #include "exdata.h" +#include <stdint.h> //uint64_t +#include <sys/time.h> // struct timeval +#include <stddef.h> #define DUMMY_ITEM_HASH (1ULL<<63) // level(left most zeros) = 0 struct spread_sketch; - struct spread_sketch *spread_sketch_new(int depth, int width, unsigned char precision, int time_window_ms, struct timeval now); - void spread_sketch_free(struct spread_sketch *ss); void spread_sketch_reset(struct spread_sketch *ss); -int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg, struct timeval now); -// void spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, const char* item, size_t item_len, void *arg); -// TODO: 增加add_hash 接口 -void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn); +int spread_sketch_add_hash(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg, struct timeval now); +int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, const char* item, size_t item_len, void *arg, struct timeval now); -void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *key, size_t key_len); +void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn); // get the number of cells in the heavy keeper int spread_sketch_get_count(const struct spread_sketch *ss); -size_t spread_sketch_list(const struct spread_sketch *ss, void **exdatas, size_t n_exdatas); -// 一系列对于spread sketch的查询 -// size_t spread_sketch_list_keys(const struct spread_sketch *ss, const char *keys[], size_t n_keys); -// const uint32_t spread_sketch_query_register(ss, const char *key, size_t key_len); - -// size_t spread_sketch_list(const struct spread_sketch *ss, void **exdatas, size_t n_exdatas); +void spread_sketch_list_keys(const struct spread_sketch *ss, char ***keys, size_t **key_lens, size_t *n_keys); +double spread_sketch_get_cardinality(const struct spread_sketch *ss, const char *key, size_t key_len); +char *spread_sketch_get_hll_base64_serialization(const struct spread_sketch *ss, const char *key, size_t key_len); +void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *key, size_t key_len); void spread_sketch_merge(struct spread_sketch *dest, const struct spread_sketch *src); |
