From 6595cbbde1280b6c7d3c445697e39aa18fa9741f Mon Sep 17 00:00:00 2001 From: chenzizhan Date: Wed, 17 Jul 2024 10:35:24 +0800 Subject: primary metric in spreadsketch/heavykeeper --- include/fieldstat/fieldstat.h | 2 +- src/cells/hll_common.c | 35 ++++++++++- src/cells/hll_common.h | 1 + src/cells/spread_sketch.c | 100 ++++++++++++++++++++++--------- src/cells/spread_sketch.h | 25 ++++---- src/cube.c | 134 +++++++++++++++++++++++++++++++++++------- src/cube.h | 2 +- src/exporter/cjson_exporter.c | 2 +- src/fieldstat.c | 4 +- src/metrics/hyperloglog.c | 33 ++--------- src/metrics/hyperloglog.h | 2 +- src/metrics/metric.c | 2 +- test/test_exporter_json.cpp | 2 +- test/test_merge.cpp | 9 +-- test/test_metric_counter.cpp | 7 --- test/test_metric_hll.cpp | 4 +- 16 files changed, 251 insertions(+), 113 deletions(-) diff --git a/include/fieldstat/fieldstat.h b/include/fieldstat/fieldstat.h index 6ab1764..b5fef15 100644 --- a/include/fieldstat/fieldstat.h +++ b/include/fieldstat/fieldstat.h @@ -234,7 +234,7 @@ long long fieldstat_histogram_value_at_percentile(const struct fieldstat *instan long long fieldstat_histogram_count_le_value(const struct fieldstat *instance, int cube_id, const struct field_list *cell_dimensions, int metric_id, long long value); // get the base 64 encoded string of the serialized blob of a cell -void fieldstat_get_serialized_blob(const struct fieldstat *instance, int cube_id, int metric_id, const struct field_list *cell_dimensions, char **blob, size_t *blob_size); +void fieldstat_metric_get_serialization_as_base64(const struct fieldstat *instance, int cube_id, int metric_id, const struct field_list *cell_dimensions, char **blob, size_t *blob_size); void fieldstat_tag_list_arr_free(struct field_list *tag_list, size_t n_cell); diff --git a/src/cells/hll_common.c b/src/cells/hll_common.c index 7017242..47dc113 100644 --- a/src/cells/hll_common.c +++ b/src/cells/hll_common.c @@ -2,6 +2,8 @@ #include "hll_common.h" #include "crdt_utils.h" +#include "base64/b64.h" + #include #include #include @@ -298,7 +300,7 @@ void hll_advance_reset_index(long long *reset_idx, long long reset_reg_count, un long long hll_get_reset_register_count(unsigned char precision, long long time_window_ms, struct timeval now, struct timeval *reset_time) { long long reset_time_slot_us=RESET_TIME_SLOT_US(time_window_ms, precision); - long long delta_us=timeval_delta_us(*reset_time, now); + long long delta_us=timeval_delta_us((*reset_time), now); if(delta_us < reset_time_slot_us) return 0; @@ -349,3 +351,34 @@ int hll_should_slide(unsigned char precision, long long time_window_ms, struct t return 1; return 0; } + +void hll_encode_into_base64(const uint32_t *registers, unsigned char precision, char **blob, size_t *blob_sz) +{ + size_t sz=0, offset=0; + size_t num_reg = NUM_REG(precision); + size_t words = INT_CEIL(num_reg, REG_PER_WORD); + + sz = hll_size(precision) + 2; // [version][precision][data..., + 2 for version and precision + unsigned char *buffer = ALLOC(unsigned char, sz); + + const unsigned char version = 1; + memcpy(buffer+offset, &version, 1); + offset += 1; + + memcpy(buffer+offset, &precision, 1); + offset += 1; + + for (int i = 0; i < words; i++) { + uint32_t word = registers[i]; + buffer[offset++] = word >> 24; + buffer[offset++] = word >> 16; + buffer[offset++] = word >> 8; + buffer[offset++] = word; + } + + char *enc = b64_encode(buffer, sz); + free(buffer); + *blob_sz = strlen(enc); + *blob = enc; + return; +} \ No newline at end of file diff --git a/src/cells/hll_common.h b/src/cells/hll_common.h index a110153..b62530a 100644 --- a/src/cells/hll_common.h +++ b/src/cells/hll_common.h @@ -21,6 +21,7 @@ void hll_reset_registers(uint32_t *registers, unsigned char precision, long long long long hll_get_reset_register_count(unsigned char precision, long long time_window_ms, struct timeval now, struct timeval *reset_time); void hll_advance_reset_index(long long *reset_idx, long long reset_reg_count, unsigned char precision); void hll_merge(uint32_t *dst_registers, const uint32_t *src_registers, unsigned char precision); +void hll_encode_into_base64(const uint32_t *registers, unsigned char precision, char **blob, size_t *blob_sz); #ifdef __cplusplus } diff --git a/src/cells/spread_sketch.c b/src/cells/spread_sketch.c index 96436e3..fd1c06d 100644 --- a/src/cells/spread_sketch.c +++ b/src/cells/spread_sketch.c @@ -109,7 +109,6 @@ static void *default_copy_fn(void *exdata) { return exdata; } - uint32_t cal_true_level(const struct spread_sketch *ss, int bucket_idx, long long t) { /* return f(t), the actual level of bucket, which satisfy: @@ -117,7 +116,11 @@ uint32_t cal_true_level(const struct spread_sketch *ss, int bucket_idx, long lon 2. f(t0 + T) = 1 3. f((t0) = L ) */ - struct bucket *bucket = &ss->buckets[bucket_idx]; + struct bucket *bucket = &ss->buckets[bucket_idx]; + if (ss->time_window_ms == 0) { + return bucket->level; + } + unsigned L = bucket->level; long long T = ss->time_window_ms; long long t0 = bucket->last_update_ms; @@ -248,7 +251,11 @@ struct spread_sketch *spread_sketch_new(int depth, int width, unsigned char prec return pthis; } -void move_registers_forward(struct spread_sketch *ss, struct timeval *now) { +void move_registers_forward(struct spread_sketch *ss, const struct timeval *now) { + if (ss->time_window_ms == 0) { + return; + } + long long reset_reg_count = hll_get_reset_register_count(ss->precision, ss->time_window_ms, *now, &ss->reset_time); if (reset_reg_count > 0) { for (int i = 0; i < ss->depth * ss->width; i++) { @@ -267,12 +274,14 @@ void min_level_state_update(struct spread_sketch *ss, uint32_t level_old, uint32 } // return 0 if not added, return 1 if added -int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg, , struct timeval now) { +int spread_sketch_add_hash(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg, struct timeval now) { uint32_t level = (uint32_t)__builtin_clzll(item_hash) + 1; long long now_ms = now.tv_sec * 1000 + now.tv_usec / 1000; if (level <= ss->level_min) { - if (item_hash == DUMMY_ITEM_HASH) { + if (item_hash == DUMMY_ITEM_HASH) { // TODO: 这个地方的逻辑对不上,其实返回1应该是“在sketch中”就返回1,为了get exdata 的问题,增加了一种特殊情况。 + // 正常来说,应该是在任何时候,都调用add_sketch,不区分primary or not,并走同样的逻辑,在不包含于sketch时不get。 + // todo: 不对,如果hll 在内部,那么hll 的更新总是要完成,这里只是区别了要更新hll 和不更新hll 的情况 return 0; // DUMMY_ITEM_HASH is the case of "recording a key" instead of update hll. Just return 0 to inform the key is not added to the sketch } @@ -311,7 +320,7 @@ int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_leng uint32_t true_level = bucket->content == NULL ? 0: cal_true_level(ss, bucket_idx, now_ms); if (level > true_level) { - min_level_state_update(ss, old_level, level); + min_level_state_update(ss, true_level, level); // printf("update key %s to %s, and level %u to %u, in bucket (r,w)=(%d,%u)\n", bucket->content == NULL ? "NULL": bucket->content->key, key, true_level, level, i, hash_x % ss->width); const struct entry *content_old = bucket->content; if (content_old != NULL) { @@ -332,6 +341,11 @@ int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_leng return in_sketch ? 1 : 0; } +int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, const char* item, size_t item_len, void *arg, struct timeval now) { + uint64_t hash = XXH3_64bits(item, item_len); + return spread_sketch_add_hash(ss, key, key_length, hash, arg, now); +} + double spread_sketch_point_query(const struct spread_sketch *ss, const char *key, size_t key_length) { uint64_t hash_x_tmp = XXH3_64bits_withSeed(key, key_length, 171); uint32_t hash_x1 = (uint32_t) (hash_x_tmp >> 32); @@ -350,7 +364,7 @@ double spread_sketch_point_query(const struct spread_sketch *ss, const char *key return count_min; } -struct spread_sketch *duplicate_and_step(const struct spread_sketch *ss, struct timeval *now) { +struct spread_sketch *duplicate_and_step(const struct spread_sketch *ss, const struct timeval *now) { struct spread_sketch *duplicate = malloc(sizeof(struct spread_sketch)); duplicate->depth = ss->depth; duplicate->width = ss->width; @@ -367,17 +381,6 @@ struct spread_sketch *duplicate_and_step(const struct spread_sketch *ss, struct return duplicate; } -double spread_sketch_query(const struct spread_sketch *ss, const char *key, size_t key_length, struct timeval now) { - if (hll_should_slide(ss->precision, ss->time_window_ms, now, ss->reset_time)) { - struct spread_sketch *duplicate = duplicate_and_step(ss, &now); - double ret = spread_sketch_point_query(duplicate, key, key_length); - spread_sketch_free(duplicate); - return ret; - } else { - return spread_sketch_point_query(ss, key, key_length); - } -} - void spread_sketch_free(struct spread_sketch *ss) { smart_ptr_table_free(ss->table); for (int i = 0; i < ss->depth * ss->width; i++) { @@ -401,12 +404,13 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch * continue; } + hll_merge(bucket_dst->sthll_register, src->buckets[i].sthll_register, dst->precision); + if (bucket_dst->content == NULL) { bucket_dst->content = smart_ptr_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL); bucket_dst->level = bucket_src->level; bucket_dst->last_update_ms = bucket_src->last_update_ms; - bucket_dst->sthll_register = hll_duplicate(bucket_src->sthll_register, dst->precision); min_level_state_update(dst, 0, bucket_src->level); continue; } @@ -431,8 +435,6 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch * bucket_dst->last_update_ms = bucket_src->last_update_ms; } } - - hll_merge(bucket_dst->sthll_register, src->buckets[i].sthll_register, dst->precision); } // for exdata @@ -518,8 +520,53 @@ size_t spread_sketch_list(const struct spread_sketch *ss, void **exdatas, size_t return count; } -double spread_sketch_point_query +void spread_sketch_list_keys(const struct spread_sketch *ss, char ***keys, size_t **key_lens, size_t *n_keys) { + size_t count_max = HASH_COUNT(ss->table->entry); + size_t count = 0; + + *keys = malloc(count_max * sizeof(char *)); + *key_lens = malloc(count_max * sizeof(size_t)); + + struct entry *content, *tmp; + HASH_ITER(hh, ss->table->entry, content, tmp) { + if (content->dying) { + continue; + } + + (*keys)[count] = content->key; + (*key_lens)[count] = content->key_len; + count++; + } +} + +double spread_sketch_get_cardinality(const struct spread_sketch *ss, const char *key, size_t key_len) { + double est = spread_sketch_point_query(ss, key, key_len); + return est; +} +char *spread_sketch_get_hll_base64_serialization(const struct spread_sketch *ss, const char *key, size_t key_len) { + uint64_t hash_x_tmp = XXH3_64bits_withSeed(key, key_len, 171); + uint32_t hash_x1 = (uint32_t) (hash_x_tmp >> 32); + uint32_t hash_x2 = (uint32_t) hash_x_tmp; + uint32_t *register_ret = NULL; + double min_cardinality = (double)INT32_MAX; + + for (int i = 0; i < ss->depth; i++) { + uint32_t hash_x = hash_x1 + i * hash_x2; + int bucket_idx = (hash_x % ss->width) + i * ss->width; + + double est = hll_count(ss->buckets[bucket_idx].sthll_register, ss->precision, ss->reset_idx, ss->time_window_ms); + if (est < min_cardinality) { + min_cardinality = est; + register_ret = ss->buckets[bucket_idx].sthll_register; + } + } + + char *enc; + size_t enc_len; + hll_encode_into_base64(register_ret, ss->precision, &enc, &enc_len); + return enc; +} struct spread_sketch *spread_sketch_copy(const struct spread_sketch *src) { struct spread_sketch *dst = malloc(sizeof(struct spread_sketch)); @@ -549,10 +596,10 @@ void spread_sketch_get_parameter_recommendation(int expected_super_spreader_numb *depth_out = logk; int w; - if (max_super_spreader_number <= 100) { - w = max_super_spreader_number * 3 /2; // * 1.5, when the number is small, we need more width + if (expected_super_spreader_number <= 100) { + w = expected_super_spreader_number * 3 /2; // * 1.5, when the number is small, we need more width } else { - w = max_super_spreader_number + 50; // + 50: 100*1.5-100 = 50, make w=f(k) continuous + w = expected_super_spreader_number + 50; // + 50: 100*1.5-100 = 50, make w=f(k) continuous } if (w < 40) { w = 40; @@ -578,9 +625,6 @@ void spread_sketch_change_precision(struct spread_sketch *ss, unsigned char prec ss->precision = precision; } -void hyperloglog_serialize_for_networking(const struct hyperloglog *h, char **blob, size_t *blob_sz) { - hyperloglog_serialize_for_networking(data->hll, blob, blob_size); -} // todo: serialize(swarm kv) // todo: swarm kv 上实现reset 和copy。 \ No newline at end of file diff --git a/src/cells/spread_sketch.h b/src/cells/spread_sketch.h index baec128..b1c7a34 100644 --- a/src/cells/spread_sketch.h +++ b/src/cells/spread_sketch.h @@ -1,41 +1,38 @@ #pragma once -#include + #ifdef __cplusplus extern "C"{ #endif #include "exdata.h" +#include //uint64_t +#include // struct timeval +#include #define DUMMY_ITEM_HASH (1ULL<<63) // level(left most zeros) = 0 struct spread_sketch; - struct spread_sketch *spread_sketch_new(int depth, int width, unsigned char precision, int time_window_ms, struct timeval now); - void spread_sketch_free(struct spread_sketch *ss); void spread_sketch_reset(struct spread_sketch *ss); -int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg, struct timeval now); -// void spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, const char* item, size_t item_len, void *arg); -// TODO: 增加add_hash 接口 -void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn); +int spread_sketch_add_hash(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg, struct timeval now); +int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, const char* item, size_t item_len, void *arg, struct timeval now); -void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *key, size_t key_len); +void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn); // get the number of cells in the heavy keeper int spread_sketch_get_count(const struct spread_sketch *ss); -size_t spread_sketch_list(const struct spread_sketch *ss, void **exdatas, size_t n_exdatas); -// 一系列对于spread sketch的查询 -// size_t spread_sketch_list_keys(const struct spread_sketch *ss, const char *keys[], size_t n_keys); -// const uint32_t spread_sketch_query_register(ss, const char *key, size_t key_len); - -// size_t spread_sketch_list(const struct spread_sketch *ss, void **exdatas, size_t n_exdatas); +void spread_sketch_list_keys(const struct spread_sketch *ss, char ***keys, size_t **key_lens, size_t *n_keys); +double spread_sketch_get_cardinality(const struct spread_sketch *ss, const char *key, size_t key_len); +char *spread_sketch_get_hll_base64_serialization(const struct spread_sketch *ss, const char *key, size_t key_len); +void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *key, size_t key_len); void spread_sketch_merge(struct spread_sketch *dest, const struct spread_sketch *src); diff --git a/src/cube.c b/src/cube.c index 6069cdd..e8231d3 100644 --- a/src/cube.c +++ b/src/cube.c @@ -18,6 +18,7 @@ #define DEFAULT_N_METRIC 32 #define DEFAULT_N_CUBE 64 +static const struct timeval DUMMY_TIME_VAL = {0, 0}; struct exdata_new_args { const struct field *cell_dimensions; @@ -531,10 +532,13 @@ struct cube *cube_new(const struct field *dimensions, size_t n_dimensions, enum cube->table = hash_table_new(max_n_cell); hash_table_set_exdata_schema(cube->table, exdata_new_i, exdata_free_i, exdata_merge_i, exdata_reset_i, exdata_copy_i); break; - case SAMPLING_MODE_TOP_CARDINALITY: - cube->spread_sketch = spread_sketch_new(max_n_cell); + case SAMPLING_MODE_TOP_CARDINALITY: { + int width, depth; + unsigned char precision; + spread_sketch_get_parameter_recommendation(max_n_cell, &depth, &width, &precision); + cube->spread_sketch = spread_sketch_new(depth, width, precision, 0, DUMMY_TIME_VAL); spread_sketch_set_exdata_schema(cube->spread_sketch, exdata_new_i, exdata_free_i, exdata_merge_i, exdata_reset_i, exdata_copy_i); - break; + break; } default: assert(0); break; @@ -674,16 +678,11 @@ struct cell *get_cell_in_spread_sketch_cube(struct cube *cube, const struct fiel if (cube->primary_metric_id != metric_id) { cell_data = spread_sketch_get0_exdata(cube->spread_sketch, key, key_len); if (cell_data == NULL) { - int tmp_ret = spread_sketch_add(cube->spread_sketch, key, key_len, DUMMY_ITEM_HASH, (void *)&args); + int tmp_ret = spread_sketch_add_hash(cube->spread_sketch, key, key_len, DUMMY_ITEM_HASH, (void *)&args, DUMMY_TIME_VAL); if (tmp_ret == 1) { cell_data = spread_sketch_get0_exdata(cube->spread_sketch, key, key_len); } } - } else { - int tmp_ret = spread_sketch_add(cube->spread_sketch, key, key_len, item_hash, (void *)&args); - if (tmp_ret == 1) { - cell_data = spread_sketch_get0_exdata(cube->spread_sketch, key, key_len); - } } free(key); @@ -827,10 +826,20 @@ int cube_hll_add(struct cube *cube, int metric_id, const struct field *dimension return FS_ERR_INVALID_METRIC_ID; } - uint64_t hash = 0; // just any value, if we do not need to update the primary metric of spread sketch cube, hash value is not used if (cube->sampling_mode == SAMPLING_MODE_TOP_CARDINALITY && cube->primary_metric_id == metric_id) { - hash = XXH3_64bits(key, key_len); + char *dimension_as_string; + size_t dimension_string_len; + field_array_to_key(dimensions, n_dimensions, &dimension_as_string, &dimension_string_len); + + struct exdata_new_args args; + args.cell_dimensions = dimensions; + args.n_dimensions = n_dimensions; + + int tmp_ret = spread_sketch_add(cube->spread_sketch, dimension_as_string, dimension_string_len, key, key_len, (void *)&args, DUMMY_TIME_VAL); + free(dimension_as_string); + return tmp_ret == 1 ? FS_OK : FS_ERR_TOO_MANY_CELLS; } + struct cell *cell_data = NULL; switch (cube->sampling_mode) { case SAMPLING_MODE_COMPREHENSIVE: { @@ -840,7 +849,7 @@ int cube_hll_add(struct cube *cube, int metric_id, const struct field *dimension cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, 0, metric_id); break;} case SAMPLING_MODE_TOP_CARDINALITY: { - cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, hash, metric_id); + cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, 0, metric_id); break;} default: assert(0); @@ -881,8 +890,20 @@ int cube_hll_add_field(struct cube *cube, int metric_id, const struct field *dim uint64_t hash = 0; // just any value, if we do not need to update the primary metric of spread sketch cube, hash value is not used if (cube->sampling_mode == SAMPLING_MODE_TOP_CARDINALITY && cube->primary_metric_id == metric_id) { + char *key; + size_t key_len; + field_array_to_key(dimensions, n_dimensions, &key, &key_len); + + struct exdata_new_args args; + args.cell_dimensions = dimensions; + args.n_dimensions = n_dimensions; hash = field_array_to_hash(tags_key, n_tag_key); + + int tmp_ret = spread_sketch_add_hash(cube->spread_sketch, key, key_len, hash, (void *)&args, DUMMY_TIME_VAL); + free(key); + return tmp_ret == 1 ? FS_OK : FS_ERR_TOO_MANY_CELLS; } + struct cell *cell_data = NULL; switch (cube->sampling_mode) { case SAMPLING_MODE_COMPREHENSIVE: { @@ -920,6 +941,24 @@ int cube_counter_incrby(struct cube *cube, int metric_id, const struct field *di if (manifest == NULL || manifest->type != METRIC_TYPE_COUNTER) { return FS_ERR_INVALID_METRIC_ID; } + + if (cube->primary_metric_id == metric_id && cube->sampling_mode == SAMPLING_MODE_TOPK) { + if (increment <= 0) { + return FS_ERR_INVALID_PARAM; + } + + char *key; + size_t key_len; + field_array_to_key(dimensions, n_dimensions, &key, &key_len); + + struct exdata_new_args args; + args.cell_dimensions = dimensions; + args.n_dimensions = n_dimensions; + + int tmp_ret = heavy_keeper_add(cube->heavykeeper, key, key_len, increment, (void *)&args); + free(key); + return tmp_ret == 1 ? FS_OK : FS_ERR_TOO_MANY_CELLS; + } struct cell *cell_data = NULL; switch (cube->sampling_mode) { @@ -1061,10 +1100,13 @@ struct cube *cube_fork(const struct cube *cube) { ret->table = hash_table_new(cube->max_n_cell); hash_table_set_exdata_schema(ret->table, exdata_new_i, exdata_free_i, exdata_merge_i, exdata_reset_i, exdata_copy_i); break; - case SAMPLING_MODE_TOP_CARDINALITY: - ret->spread_sketch = spread_sketch_new(cube->max_n_cell); + case SAMPLING_MODE_TOP_CARDINALITY: { + int width, depth, dummy_time; + unsigned char precision; + spread_sketch_get_parameter(cube->spread_sketch, &depth, &width, &precision, &dummy_time); + ret->spread_sketch = spread_sketch_new(depth, width, precision, 0, DUMMY_TIME_VAL); spread_sketch_set_exdata_schema(ret->spread_sketch, exdata_new_i, exdata_free_i, exdata_merge_i, exdata_reset_i, exdata_copy_i); - break; + break;} default: assert(0); break; @@ -1114,6 +1156,10 @@ void cube_get_cells(const struct cube *cube, struct field_list **cell_dimensions return; } + char **spread_sketch_keys = NULL; + size_t *spread_sketch_keys_lens = NULL; + long long *heavy_keeper_counts = NULL; + struct cell **cell_datas = (struct cell **)malloc(sizeof(struct cell *) * n_cell_tmp); switch (cube->sampling_mode) { case SAMPLING_MODE_COMPREHENSIVE: @@ -1122,8 +1168,12 @@ void cube_get_cells(const struct cube *cube, struct field_list **cell_dimensions case SAMPLING_MODE_TOPK: heavy_keeper_list(cube->heavykeeper, (void **)cell_datas, n_cell_tmp); break; - case SAMPLING_MODE_TOP_CARDINALITY: - spread_sketch_list(cube->spread_sketch, (void **)cell_datas, n_cell_tmp); + case SAMPLING_MODE_TOP_CARDINALITY: { + spread_sketch_list_keys(cube->spread_sketch, &spread_sketch_keys, &spread_sketch_keys_lens, &n_cell_tmp); + for (int i = 0; i < n_cell_tmp; i++) { + cell_datas[i] = spread_sketch_get0_exdata(cube->spread_sketch, spread_sketch_keys[i], spread_sketch_keys_lens[i]); + } + } break; default: assert(0); @@ -1134,7 +1184,7 @@ void cube_get_cells(const struct cube *cube, struct field_list **cell_dimensions struct tmp_sorted_data_spread_sketch_cell *tmp_sorted_data = (struct tmp_sorted_data_spread_sketch_cell *)malloc(sizeof(struct tmp_sorted_data_spread_sketch_cell) * n_cell_tmp); for (int i = 0; i < n_cell_tmp; i++) { tmp_sorted_data[i].data = cell_datas[i]; - tmp_sorted_data[i].hll_value = metric_hll_get(cell_datas[i]->slots[cube->primary_metric_id]); + tmp_sorted_data[i].hll_value = spread_sketch_get_cardinality(cube->spread_sketch, spread_sketch_keys[i], spread_sketch_keys_lens[i]); } qsort(tmp_sorted_data, n_cell_tmp, sizeof(struct tmp_sorted_data_spread_sketch_cell), compare_tmp_sorted_data_spread_sketch_cell); @@ -1162,6 +1212,9 @@ void cube_get_cells(const struct cube *cube, struct field_list **cell_dimensions } free(cell_datas); + free(heavy_keeper_counts); + free(spread_sketch_keys); + free(spread_sketch_keys_lens); } const struct cell *get_cell_by_tag_list(const struct cube *cube, const struct field_list *fields) @@ -1211,6 +1264,20 @@ const struct metric *get_metric_by_tag_list(const struct cube *cube, const struc int cube_counter_get(const struct cube *cube, int metric_id, const struct field_list *fields, long long *value) { + if (cube->sampling_mode == SAMPLING_MODE_TOPK && cube->primary_metric_id == metric_id) { + char *dimension_in_string; + size_t dimension_string_len; + field_array_to_key(fields->field, fields->n_field, &dimension_in_string, &dimension_string_len); + + long long count = 0; + void *exdata_dummy = NULL; + heavy_keeper_one_point_query(cube->heavykeeper, dimension_in_string, dimension_string_len, &count, &exdata_dummy); + *value = count; + + free(dimension_in_string); + return count == 0 ? FS_ERR_INVALID_TAG : FS_OK; + } + int ret; const struct metric *metric = get_metric_by_tag_list(cube, fields, metric_id, &ret); if (ret != FS_OK) { @@ -1226,6 +1293,18 @@ int cube_counter_get(const struct cube *cube, int metric_id, const struct field_ int cube_hll_get(const struct cube *cube, int metric_id, const struct field_list *fields, double *value) { + if (cube->sampling_mode == SAMPLING_MODE_TOP_CARDINALITY && cube->primary_metric_id == metric_id) { + char *dimension_in_string; + size_t dimension_string_len; + field_array_to_key(fields->field, fields->n_field, &dimension_in_string, &dimension_string_len); + + double hll_value = spread_sketch_get_cardinality(cube->spread_sketch, dimension_in_string, dimension_string_len); + *value = hll_value; + + free(dimension_in_string); + return FS_OK; + } + int ret; const struct metric *metric = get_metric_by_tag_list(cube, fields, metric_id, &ret); if (ret != FS_OK) { @@ -1268,7 +1347,17 @@ int cube_histogram_count_le_value(const struct cube *cube, int metric_id, const return FS_OK; } -int cube_get_serialization(const struct cube *cube, int metric_id, const struct field_list *fields, char **blob, size_t *blob_size) { +int cube_get_serialization_as_base64(const struct cube *cube, int metric_id, const struct field_list *fields, char **blob, size_t *blob_size) { + if (cube->sampling_mode == SAMPLING_MODE_TOP_CARDINALITY && cube->primary_metric_id == metric_id) { + char *dimension_in_string; + size_t dimension_string_len; + field_array_to_key(fields->field, fields->n_field, &dimension_in_string, &dimension_string_len); + + *blob = spread_sketch_get_hll_base64_serialization(cube->spread_sketch, dimension_in_string, dimension_string_len); + *blob_size = strlen(*blob); + return FS_OK; + } + int ret; const struct metric *metric = get_metric_by_tag_list(cube, fields, metric_id, &ret); if (ret != FS_OK) { @@ -1304,8 +1393,13 @@ void cube_get_metrics_in_cell(const struct cube *cube, const struct field_list * return; } - *metric_id_out = (int *)malloc(sizeof(int) * cell_data->next_index); + *metric_id_out = (int *)malloc(sizeof(int) * cell_data->next_index + 1); // +1: for primary metric int n_metric = 0; + if (cube->sampling_mode == SAMPLING_MODE_TOP_CARDINALITY || cube->sampling_mode == SAMPLING_MODE_TOPK) { // primary metric is not stored in cell_data + (*metric_id_out)[n_metric] = cube->primary_metric_id; + n_metric++; + } + for (int i = 0; i < cell_data->next_index; i++) { if (cell_data->slots[i] != NULL) { (*metric_id_out)[n_metric] = i; diff --git a/src/cube.h b/src/cube.h index 8b6db1c..134ede8 100644 --- a/src/cube.h +++ b/src/cube.h @@ -35,7 +35,7 @@ int cube_counter_get(const struct cube *cube, int metric_id, const struct field_ int cube_hll_get(const struct cube *cube, int metric_id, const struct field_list *dimensions, double *value); int cube_histogram_value_at_percentile(const struct cube *cube, int metric_id, const struct field_list *dimensions, double percentile, long long *value); int cube_histogram_count_le_value(const struct cube *cube, int metric_id, const struct field_list *dimensions, long long value, long long *count); -int cube_get_serialization(const struct cube *cube, int metric_id, const struct field_list *dimensions, char **blob, size_t *blob_size); +int cube_get_serialization_as_base64(const struct cube *cube, int metric_id, const struct field_list *dimensions, char **blob, size_t *blob_size); int cube_get_cell_count(const struct cube *cube); enum sampling_mode cube_get_sampling_mode(const struct cube *cube); diff --git a/src/exporter/cjson_exporter.c b/src/exporter/cjson_exporter.c index 1606df5..6a52c6a 100644 --- a/src/exporter/cjson_exporter.c +++ b/src/exporter/cjson_exporter.c @@ -542,7 +542,7 @@ struct export_kv_pair *cell_query_with_iter(const struct cell_iter *iter, int me if (type == METRIC_TYPE_HLL || type == METRIC_TYPE_HISTOGRAM) { char *value; size_t len; - fieldstat_get_serialized_blob(iter->instance, cube_id, metric_id, &iter->tag_list[iter->curr_cell_idx], &value, &len); + fieldstat_metric_get_serialization_as_base64(iter->instance, cube_id, metric_id, &iter->tag_list[iter->curr_cell_idx], &value, &len); if (value == NULL) { return NULL; } diff --git a/src/fieldstat.c b/src/fieldstat.c index 12c482b..4f4582b 100644 --- a/src/fieldstat.c +++ b/src/fieldstat.c @@ -306,7 +306,7 @@ long long fieldstat_histogram_count_le_value(const struct fieldstat *instance, i return count; } -void fieldstat_get_serialized_blob(const struct fieldstat *instance, int cube_id, int metric_id, const struct field_list *cell_dimensions, char **blob, size_t *blob_size) +void fieldstat_metric_get_serialization_as_base64(const struct fieldstat *instance, int cube_id, int metric_id, const struct field_list *cell_dimensions, char **blob, size_t *blob_size) { const struct cube *cube = cube_manager_get_cube_by_id(instance->cube_manager, cube_id); if (cube == NULL) { @@ -315,7 +315,7 @@ void fieldstat_get_serialized_blob(const struct fieldstat *instance, int cube_id return; } - cube_get_serialization(cube, metric_id, cell_dimensions, blob, blob_size); + cube_get_serialization_as_base64(cube, metric_id, cell_dimensions, blob, blob_size); } void fieldstat_tag_list_arr_free(struct field_list *tag_list, size_t n_cell) diff --git a/src/metrics/hyperloglog.c b/src/metrics/hyperloglog.c index b13a2a2..ea05887 100644 --- a/src/metrics/hyperloglog.c +++ b/src/metrics/hyperloglog.c @@ -14,6 +14,8 @@ #include #include "base64/b64.h" +#include "hll_common.h" + const size_t BLOB_HDR_SIZE= offsetof(struct hyperloglog, registers); @@ -171,36 +173,9 @@ struct hyperloglog *hyperloglog_deserialize(const char *blob, size_t blob_sz) return h; } -void hyperloglog_serialize_for_networking(const struct hyperloglog *h, char **blob, size_t *blob_sz) +void hyperloglog_serialize_into_base64(const struct hyperloglog *h, char **blob, size_t *blob_sz) { - size_t sz=0, offset=0; - size_t num_reg = NUM_REG(h->cfg.precision); - size_t words = INT_CEIL(num_reg, REG_PER_WORD); - - sz = hyperloglog_serialized_size(h) + 1; // [precision][version][data... - unsigned char *buffer = ALLOC(unsigned char, sz); - - const unsigned char version = 1; - memcpy(buffer+offset, &version, 1); - offset += 1; - - unsigned char precision = h->cfg.precision; - memcpy(buffer+offset, &precision, 1); - offset += 1; - - for (int i = 0; i < words; i++) { - uint32_t word = h->registers[i]; - buffer[offset++] = word >> 24; - buffer[offset++] = word >> 16; - buffer[offset++] = word >> 8; - buffer[offset++] = word; - } - - char *enc = b64_encode(buffer, sz); - free(buffer); - *blob_sz = strlen(enc); - *blob = enc; - return; + hll_encode_into_base64(h->registers, h->cfg.precision, blob, blob_sz); } void hyperloglog_merge_blob(struct hyperloglog *dest, const char *blob, size_t blob_sz) diff --git a/src/metrics/hyperloglog.h b/src/metrics/hyperloglog.h index b0e6db5..b1b2330 100644 --- a/src/metrics/hyperloglog.h +++ b/src/metrics/hyperloglog.h @@ -49,7 +49,7 @@ void hyperloglog_reset(struct hyperloglog *h); double hyperloglog_count(const struct hyperloglog *h); size_t hyperloglog_serialized_size(const struct hyperloglog *h); void hyperloglog_serialize(const struct hyperloglog *h, char **blob, size_t *blob_sz); -void hyperloglog_serialize_for_networking(const struct hyperloglog *h, char **blob, size_t *blob_sz); +void hyperloglog_serialize_into_base64(const struct hyperloglog *h, char **blob, size_t *blob_sz); struct hyperloglog *hyperloglog_deserialize(const char *blob, size_t blob_sz); int hyperloglog_merge(struct hyperloglog *dest, const struct hyperloglog *src); void hyperloglog_merge_blob(struct hyperloglog *dest, const char *blob, size_t blob_sz); diff --git a/src/metrics/metric.c b/src/metrics/metric.c index 42f9c94..4b0c290 100644 --- a/src/metrics/metric.c +++ b/src/metrics/metric.c @@ -297,7 +297,7 @@ void metric_serialize(const struct metric *pthis, char **blob, size_t *blob_size struct metric_data *data = pthis->data; enum metric_type type = pthis->type; if (type == METRIC_TYPE_HLL) { - hyperloglog_serialize_for_networking(data->hll, blob, blob_size); + hyperloglog_serialize_into_base64(data->hll, blob, blob_size); return; } if (type == METRIC_TYPE_HISTOGRAM) { diff --git a/test/test_exporter_json.cpp b/test/test_exporter_json.cpp index 53dbabc..45a05cf 100644 --- a/test/test_exporter_json.cpp +++ b/test/test_exporter_json.cpp @@ -84,7 +84,7 @@ void test_check_if_metric_gauge_correct(cJSON *metric_obj, const char *name) { char *blob_gauge_benchmark = NULL; size_t size_dummy = 0; - hyperloglog_serialize_for_networking(g_hll_standard, &blob_gauge_benchmark, &size_dummy); + hyperloglog_serialize_into_base64(g_hll_standard, &blob_gauge_benchmark, &size_dummy); cJSON *gauge_obj = cJSON_GetObjectItem(metric_obj, name); EXPECT_NE(gauge_obj, nullptr); diff --git a/test/test_merge.cpp b/test/test_merge.cpp index 011ad6d..6e8f802 100644 --- a/test/test_merge.cpp +++ b/test/test_merge.cpp @@ -120,8 +120,8 @@ TEST(unit_test_merge, new_cell_on_existing_cube_and_metric_comprehensive) int n_cube; fieldstat_get_cubes(instance_dest, &cube_id_dest, &n_cube); EXPECT_TRUE(n_cube == 1); - free(cube_id_dest); EXPECT_STREQ(fieldstat_get_metric_name(instance_dest, cube_id_dest[0], 0), "metric_name"); + free(cube_id_dest); long long measure = merge_test_fieldstat_counter_get(instance, cube_id, metric_id); EXPECT_EQ(measure, 10086); @@ -255,8 +255,8 @@ TEST(unit_test_merge, new_cell_on_existing_cube_and_metric_topk) int n_cube; fieldstat_get_cubes(instance_dest, &cube_id_dest, &n_cube); EXPECT_TRUE(n_cube == 1); - free(cube_id_dest); EXPECT_STREQ(fieldstat_get_metric_name(instance_dest,cube_id_dest[0], 0), "metric_name"); + free(cube_id_dest); long long measure = merge_test_fieldstat_counter_get(instance, cube_id, metric_id); EXPECT_EQ(measure, 10086); @@ -288,8 +288,8 @@ TEST(unit_test_merge, merge_existing_cell_on_existing_cube_and_metric_topk) fieldstat_get_cubes(instance_dest, &cube_id_dest, &n_cube); EXPECT_TRUE(n_cube == 1); int ret_cube_id = cube_id_dest[0]; - free(cube_id_dest); EXPECT_STREQ(fieldstat_get_metric_name(instance_dest,cube_id_dest[0], 0), "metric_name"); + free(cube_id_dest); struct field_list *tag_list = NULL; size_t n_cell = 0; @@ -597,6 +597,7 @@ TEST(unit_test_merge, new_too_many_cells_on_one_metric_given_source_cube_reset_a fieldstat_tag_list_arr_free(tag_list, n_cell); } + TEST(unit_test_merge, gen_dest_full_all_src_inserted_given_src_flows_larger_spreadsketch) { int K = 100; SpreadSketchZipfGenerator flow_generator(1.0, K); // exactly the number of cells, so there will be almost all(in case of hash collision happen) cells added successfully @@ -712,7 +713,7 @@ TEST(unit_test_merge, merge_accuracy_test_gen_dest_full_some_inserted_and_some_m int main(int argc, char *argv[]) { testing::InitGoogleTest(&argc, argv); - // testing::GTEST_FLAG(filter) = "*spreadsketch"; + // testing::GTEST_FLAG(filter) = "*gen_dest_full_all_src_inserted_given_src_flows_larger_spreadsketch"; return RUN_ALL_TESTS(); } \ No newline at end of file diff --git a/test/test_metric_counter.cpp b/test/test_metric_counter.cpp index c6774f9..d8ce16c 100644 --- a/test/test_metric_counter.cpp +++ b/test/test_metric_counter.cpp @@ -202,13 +202,6 @@ TEST(metric_test_counter, add_and_query_on_dummy_cell_of_topk) long long measure = my_fieldstat_counter_get(instance, cube_id, metric_id); EXPECT_EQ(measure, 1); - int *metric_ids_in_cell = NULL; - size_t n_metric_in_cell = 0; - fieldstat_get_metric_in_cell(instance, cube_id, &TEST_TAG_LIST_INT, &metric_ids_in_cell, &n_metric_in_cell); - EXPECT_EQ(n_metric_in_cell, 1); - EXPECT_EQ(metric_ids_in_cell[0], metric_id); - free(metric_ids_in_cell); - fieldstat_free(instance); } diff --git a/test/test_metric_hll.cpp b/test/test_metric_hll.cpp index 6d8be58..204c762 100644 --- a/test/test_metric_hll.cpp +++ b/test/test_metric_hll.cpp @@ -138,7 +138,7 @@ TEST(metric_test_hll, serialize_with_b64_and_query) char *blob = NULL; size_t blob_len = 0; - fieldstat_get_serialized_blob(instance, 0, 0, &TEST_TAG_LIST_INT, &blob, &blob_len); + fieldstat_metric_get_serialization_as_base64(instance, 0, 0, &TEST_TAG_LIST_INT, &blob, &blob_len); size_t dec_size = 0; unsigned char *dec = b64_decode_ex(blob, blob_len, &dec_size); @@ -183,7 +183,7 @@ TEST(metric_test_hll, serialize_with_b64_and_query_with_python_api) char *blob = NULL; size_t blob_len = 0; - fieldstat_get_serialized_blob(instance, 0, 0, &TEST_TAG_LIST_INT, &blob, &blob_len); + fieldstat_metric_get_serialization_as_base64(instance, 0, 0, &TEST_TAG_LIST_INT, &blob, &blob_len); bool flag = fieldstat_is_hll(blob); EXPECT_EQ(flag, true); -- cgit v1.2.3