summaryrefslogtreecommitdiff
path: root/src/cube.c
diff options
context:
space:
mode:
authorchenzizhan <[email protected]>2024-07-11 16:14:09 +0800
committerchenzizhan <[email protected]>2024-07-11 16:14:09 +0800
commit5dc3d8a96bb203abc1ee050cd0c884f2ab989dba (patch)
tree38f5bc67522843c1cca51a30e413e4f8f9d1834e /src/cube.c
parent677f337e195e3b9b6e416109df8d51c14da2791b (diff)
spread sketch merge, reset
Diffstat (limited to 'src/cube.c')
-rw-r--r--src/cube.c295
1 files changed, 241 insertions, 54 deletions
diff --git a/src/cube.c b/src/cube.c
index 47c9065..a01b775 100644
--- a/src/cube.c
+++ b/src/cube.c
@@ -302,7 +302,7 @@ int cube_manager_find(const struct cube_manager *pthis, const struct field *cube
{
char key_stack[4096];
char *key = key_stack;
- int key_len = field_array_to_key_safe(cube_dimensions, n_dimension, key, sizeof(key));
+ int key_len = field_array_to_key_safe(cube_dimensions, n_dimension, key, sizeof(key_stack));
bool free_key = false;
if (key_len < 0) { // very unlikely to happen
char *key_heap;
@@ -575,6 +575,7 @@ struct cube *cube_new(const struct field *dimensions, size_t n_dimensions, enum
case SAMPLING_MODE_SPREADSKETCH:
cube->spread_sketch = spread_sketch_new(max_n_cell);
spread_sketch_set_exdata_schema(cube->spread_sketch, exdata_new_i, exdata_free_i, exdata_merge_i, exdata_reset_i, exdata_copy_i);
+ break;
default:
assert(0);
break;
@@ -633,10 +634,10 @@ void cube_set_primary_metric(struct cube *cube, int metric_id) {
cube->primary_metric_id = metric_id;
}
-struct cell *get_cell(struct cube *cube, const struct field *dimensions, size_t n_dimension, long long increment, int metric_id) {
- char key_stack[4096];
+struct cell *get_cell_in_comprehensive_cube(struct cube *cube, const struct field *dimensions, size_t n_dimension) {
+ char key_stack[4096] = {0};
char *key = key_stack;
- int key_len = field_array_to_key_safe(dimensions, n_dimension, key, sizeof(key));
+ int key_len = field_array_to_key_safe(dimensions, n_dimension, key, sizeof(key_stack));
bool free_key = false;
if (key_len < 0) { // very unlikely to happen
char *key_heap;
@@ -652,51 +653,100 @@ struct cell *get_cell(struct cube *cube, const struct field *dimensions, size_t
args.n_dimensions = n_dimension;
struct cell *cell_data = NULL;
- switch (cube->sampling_mode) {
- case SAMPLING_MODE_TOPK: {
- if (cube->primary_metric_id != metric_id) {
- cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len);
- if (cell_data == NULL) {
- int tmp_ret = heavy_keeper_add(cube->topk, key, key_len, 0, (void *)&args);
- if (tmp_ret == 1) {
- cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len);
- }
- }
- } else {
- // heavy_keeper_add should be called anyway, to let the topk record update.
- int tmp_ret = heavy_keeper_add(cube->topk, key, key_len, increment, (void *)&args);
- if (tmp_ret == 1) {
- cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len);
- }
+ assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE);
+
+ cell_data = hash_table_get0_exdata(cube->comprehensive, key, key_len);
+ if (cell_data == NULL) {
+ int tmp_ret = hash_table_add(cube->comprehensive, key, key_len, (void *)&args);
+ if (tmp_ret == 1) {
+ cell_data = hash_table_get0_exdata(cube->comprehensive, key, key_len);
}
- break;}
- case SAMPLING_MODE_COMPREHENSIVE: {
- cell_data = hash_table_get0_exdata(cube->comprehensive, key, key_len);
+ }
+
+ if (free_key) {
+ free(key);
+ }
+ return cell_data;
+}
+
+struct cell *get_cell_in_topk_cube(struct cube *cube, const struct field *dimensions, size_t n_dimension, long long increment, int metric_id) {
+ char key_stack[4096] = {0};
+ char *key = key_stack;
+ int key_len = field_array_to_key_safe(dimensions, n_dimension, key, sizeof(key_stack));
+ bool free_key = false;
+ if (key_len < 0) { // very unlikely to happen
+ char *key_heap;
+ size_t key_len_tmp;
+ field_array_to_key_endeavor(dimensions, n_dimension, &key_heap, &key_len_tmp);
+ key = key_heap;
+ key_len = key_len_tmp;
+ free_key = true;
+ }
+
+ struct exdata_new_args args;
+ args.cell_dimensions = dimensions;
+ args.n_dimensions = n_dimension;
+
+ struct cell *cell_data = NULL;
+ assert(cube->sampling_mode == SAMPLING_MODE_TOPK);
+ if (cube->primary_metric_id != metric_id) { // FIXME: TODO: 我想把这个先get 再add 的逻辑直接改成add然后看返回值,结果在fuzz test 中的特殊码返回值里发现了问题。
+ cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len);
if (cell_data == NULL) {
- int tmp_ret = hash_table_add(cube->comprehensive, key, key_len, (void *)&args);
+ int tmp_ret = heavy_keeper_add(cube->topk, key, key_len, 0, (void *)&args);
if (tmp_ret == 1) {
- cell_data = hash_table_get0_exdata(cube->comprehensive, key, key_len);
+ cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len);
}
}
- break;}
- case SAMPLING_MODE_SPREADSKETCH: {
- if (cube->primary_metric_id != metric_id) {
- cell_data = spread_sketch_get0_exdata(cube->spread_sketch, tag_in_string, tag_len);
- // todo: spread sketch 没办法支持dummy 场景。首先,我不能让所有metric 都走spread sketch流程,
- // 因为正常来说,用level=0 的hashy 做数值,没有任何意义,肯定都更新不了,只是在刚开始的时候,起一个记录key 的作用。
- // 而,如果像是topk 那样,给count=0 的一席之地,那么存在问题:spread sketch本身不对记录的个数有限制,所以什么时候停止记录呢?这样的设计也太麻烦了。
- // 之前跟老板讨论的时候,给了两个方案,方案1:做一个buffer,如果get exdata0 get 不到,则往buffer 中的cell 里写,等到来了primary以后,把cell 送进去。
- // 方案2:简单略去第一轮添加时的情况。这会造成很少量的误差。不过,实际上这个操作不是逐包而是会话开始结束时来一次,所以误差也不会太小。必须是让贺岚风能第一个操作primary metric。
+ } else {
+ // heavy_keeper_add should be called anyway, to let the topk record update.
+ int tmp_ret = heavy_keeper_add(cube->topk, key, key_len, increment, (void *)&args);
+ if (tmp_ret == 1) {
+ cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len);
}
+ }
+
+ if (free_key) {
+ free(key);
+ }
+ return cell_data;
+}
+
+struct cell *get_cell_in_spread_sketch_cube(struct cube *cube, const struct field *dimensions, size_t n_dimension, uint64_t item_hash, int metric_id) {
+ char key_stack[4096] = {0};
+ char *key = key_stack;
+ int key_len = field_array_to_key_safe(dimensions, n_dimension, key, sizeof(key_stack));
+ bool free_key = false;
+ if (key_len < 0) { // very unlikely to happen
+ char *key_heap;
+ size_t key_len_tmp;
+ field_array_to_key_endeavor(dimensions, n_dimension, &key_heap, &key_len_tmp);
+ key = key_heap;
+ key_len = key_len_tmp;
+ free_key = true;
+ }
+
+ struct exdata_new_args args;
+ args.cell_dimensions = dimensions;
+ args.n_dimensions = n_dimension;
+
+ struct cell *cell_data = NULL;
+ assert(cube->sampling_mode == SAMPLING_MODE_SPREADSKETCH);
+
+ // todo: spread sketch 现在支持dummy 的方式是让他们也走sketch,可以用“满行”来减少这种计算,但确实加入level 低的内容,会走相同的流程,不像heavy keeper 一样就简单的查哈希表。
+ if (cube->primary_metric_id != metric_id) {
+ cell_data = spread_sketch_get0_exdata(cube->spread_sketch, key, key_len);
if (cell_data == NULL) {
- int tmp_ret = spread_sketch_add(cube->spread_sketch, tag_in_string, tag_len, 0, (void *)&args);
+ int tmp_ret = spread_sketch_add(cube->spread_sketch, key, key_len, DUMMY_ITEM_HASH, (void *)&args);
if (tmp_ret == 1) {
- cell_data = spread_sketch_get0_exdata(cube->spread_sketch, tag_in_string, tag_len);
+ cell_data = spread_sketch_get0_exdata(cube->spread_sketch, key, key_len);
}
}
- break;}
- }
-
+ } else {
+ int tmp_ret = spread_sketch_add(cube->spread_sketch, key, key_len, item_hash, (void *)&args);
+ if (tmp_ret == 1) {
+ cell_data = spread_sketch_get0_exdata(cube->spread_sketch, key, key_len);
+ }
+ }
if (free_key) {
free(key);
@@ -704,14 +754,30 @@ struct cell *get_cell(struct cube *cube, const struct field *dimensions, size_t
return cell_data;
}
+
int cube_histogram_record(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, long long value) {
assert(manifest->type == METRIC_TYPE_HISTOGRAM);
assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE || (cube->primary_metric_id != manifest->id));
- struct cell *cell_data = get_cell(cube, dimensions, n_dimensions, 0, manifest->id);
+ struct cell *cell_data = NULL;
+ switch (cube->sampling_mode) {
+ case SAMPLING_MODE_COMPREHENSIVE: {
+ cell_data = get_cell_in_comprehensive_cube(cube, dimensions, n_dimensions);
+ break;}
+ case SAMPLING_MODE_TOPK: {
+ cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, 0, manifest->id);
+ break;}
+ case SAMPLING_MODE_SPREADSKETCH: {
+ cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, 0, manifest->id);
+ break;}
+ default:
+ assert(0);
+ break;
+ }
if (cell_data == NULL) {
return FS_ERR_TOO_MANY_CELLS;
}
+
struct metric *metric = add_or_find_metric_in_cell(manifest, cell_data);
int ret = metric_histogram_record(metric, value);
@@ -723,19 +789,37 @@ int cube_histogram_record(struct cube *cube, const struct metric_manifest *manif
int cube_hll_add(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, const char *key, size_t key_len) {
assert(manifest->type == METRIC_TYPE_HLL);
- assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE || (cube->primary_metric_id != manifest->id));
+ assert(cube->sampling_mode != SAMPLING_MODE_TOPK || cube->primary_metric_id != manifest->id);
- struct cell *cell_data = get_cell(cube, dimensions, n_dimensions, 0, manifest->id);
+ uint64_t hash = 0; // just any value, if we do not need to update the primary metric of spread sketch cube, hash value is not used
+ if (cube->sampling_mode == SAMPLING_MODE_SPREADSKETCH && cube->primary_metric_id == manifest->id) {
+ hash = XXH3_64bits(key, key_len);
+ }
+ struct cell *cell_data = NULL;
+ switch (cube->sampling_mode) {
+ case SAMPLING_MODE_COMPREHENSIVE: {
+ cell_data = get_cell_in_comprehensive_cube(cube, dimensions, n_dimensions);
+ break;}
+ case SAMPLING_MODE_TOPK: {
+ cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, 0, manifest->id);
+ break;}
+ case SAMPLING_MODE_SPREADSKETCH: {
+ cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, hash, manifest->id);
+ break;}
+ default:
+ assert(0);
+ break;
+ }
if (cell_data == NULL) {
return FS_ERR_TOO_MANY_CELLS;
}
- struct metric *metric = add_or_find_metric_in_cell(manifest, cell_data);
+ struct metric *metric = add_or_find_metric_in_cell(manifest, cell_data);
metric_hll_add(metric, key, key_len);
return FS_OK;
}
-uint64_t tags2hash(const struct field *field, size_t n_dimensions) {
+uint64_t field_array_to_hash(const struct field *field, size_t n_dimensions) {
XXH3_state_t state = {0};
XXH3_64bits_reset(&state);
@@ -751,27 +835,65 @@ uint64_t tags2hash(const struct field *field, size_t n_dimensions) {
return XXH3_64bits_digest(&state);
}
-int cube_hll_add_tag(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, const struct field *tags_key, size_t n_tag_key)
+int cube_hll_add_field(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, const struct field *tags_key, size_t n_tag_key)
{
assert(manifest->type == METRIC_TYPE_HLL);
assert(cube->sampling_mode != SAMPLING_MODE_TOPK || (cube->primary_metric_id != manifest->id));
- struct cell *cell_data = get_cell(cube, dimensions, n_dimensions, 0, manifest->id);
+ uint64_t hash = 0; // just any value, if we do not need to update the primary metric of spread sketch cube, hash value is not used
+ if (cube->sampling_mode == SAMPLING_MODE_SPREADSKETCH && cube->primary_metric_id == manifest->id) {
+ hash = field_array_to_hash(tags_key, n_tag_key);
+ }
+ struct cell *cell_data = NULL;
+ switch (cube->sampling_mode) {
+ case SAMPLING_MODE_COMPREHENSIVE: {
+ cell_data = get_cell_in_comprehensive_cube(cube, dimensions, n_dimensions);
+ break;}
+ case SAMPLING_MODE_TOPK: {
+ cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, 0, manifest->id);
+ break;}
+ case SAMPLING_MODE_SPREADSKETCH: {
+ cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, hash, manifest->id);
+ break;}
+ default:
+ assert(0);
+ break;
+ }
if (cell_data == NULL) {
return FS_ERR_TOO_MANY_CELLS;
}
struct metric *metric = add_or_find_metric_in_cell(manifest, cell_data);
-
- uint64_t hash = tags2hash(tags_key, n_tag_key);
+
+ if (hash == 0) { // hash is not calculated yet.
+ hash = field_array_to_hash(tags_key, n_tag_key);
+ }
metric_hll_add_hash(metric, hash);
return FS_OK;
}
int cube_counter_incrby(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, long long increment) {
assert(manifest->type == METRIC_TYPE_COUNTER);
- assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE || (cube->primary_metric_id != manifest->id || increment >= 0));
+ assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE ||
+ (cube->sampling_mode == SAMPLING_MODE_TOPK && (cube->primary_metric_id != manifest->id || increment >= 0)) ||
+ (cube->sampling_mode == SAMPLING_MODE_SPREADSKETCH && cube->primary_metric_id != manifest->id)
+ );
- struct cell *cell_data = get_cell(cube, dimensions, n_dimensions, increment, manifest->id);
+ struct cell *cell_data = NULL;
+ switch (cube->sampling_mode) {
+ case SAMPLING_MODE_COMPREHENSIVE: {
+ cell_data = get_cell_in_comprehensive_cube(cube, dimensions, n_dimensions);
+ break;}
+ case SAMPLING_MODE_TOPK: {
+ cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, increment, manifest->id);
+ break;}
+ case SAMPLING_MODE_SPREADSKETCH: {
+ cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, 0, manifest->id);
+ break;}
+ default:
+ assert(0);
+ break;
+ }
+
if (cell_data == NULL) {
return FS_ERR_TOO_MANY_CELLS;
}
@@ -786,7 +908,21 @@ int cube_counter_set(struct cube *cube, const struct metric_manifest *manifest,
assert(manifest->type == METRIC_TYPE_COUNTER);
assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE || (cube->primary_metric_id != manifest->id));
- struct cell *cell_data = get_cell(cube, dimensions, n_dimensions, 0, manifest->id);
+ struct cell *cell_data = NULL;
+ switch (cube->sampling_mode) {
+ case SAMPLING_MODE_COMPREHENSIVE: {
+ cell_data = get_cell_in_comprehensive_cube(cube, dimensions, n_dimensions);
+ break;}
+ case SAMPLING_MODE_TOPK: {
+ cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, 0, manifest->id);
+ break;}
+ case SAMPLING_MODE_SPREADSKETCH: {
+ cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, 0, manifest->id);
+ break;}
+ default:
+ assert(0);
+ break;
+ }
if (cell_data == NULL) {
return FS_ERR_TOO_MANY_CELLS;
}
@@ -809,6 +945,9 @@ struct cube *cube_copy(const struct cube *cube)
case SAMPLING_MODE_COMPREHENSIVE:
cube_dup->comprehensive = hash_table_copy(cube->comprehensive);
break;
+ case SAMPLING_MODE_SPREADSKETCH:
+ cube_dup->spread_sketch = spread_sketch_copy(cube->spread_sketch);
+ break;
default:
assert(0);
break;
@@ -829,6 +968,9 @@ void cube_merge(struct cube *dest, const struct cube *src)
case SAMPLING_MODE_COMPREHENSIVE:
hash_table_merge(dest->comprehensive, src->comprehensive);
break;
+ case SAMPLING_MODE_SPREADSKETCH:
+ spread_sketch_merge(dest->spread_sketch, src->spread_sketch);
+ break;
default:
assert(0);
break;
@@ -842,6 +984,21 @@ struct cube *cube_fork(const struct cube *cube) {
return ret;
}
+struct tmp_sorted_data_spread_sketch_cell {
+ double hll_value;
+ struct cell *data;
+};
+static int compare_tmp_sorted_data_spread_sketch_cell(const void *a, const void *b) { // sort in descending order
+ const struct tmp_sorted_data_spread_sketch_cell *aa = (const struct tmp_sorted_data_spread_sketch_cell *)a;
+ const struct tmp_sorted_data_spread_sketch_cell *bb = (const struct tmp_sorted_data_spread_sketch_cell *)b;
+ if (aa->hll_value < bb->hll_value) {
+ return 1;
+ } else if (aa->hll_value > bb->hll_value) {
+ return -1;
+ } else {
+ return 0;
+ }
+}
void cube_get_cells(const struct cube *cube, struct field_list **cell_dimensions, size_t *n_cell)
{
size_t n_cell_tmp = 0;
@@ -852,6 +1009,9 @@ void cube_get_cells(const struct cube *cube, struct field_list **cell_dimensions
case SAMPLING_MODE_TOPK:
n_cell_tmp = heavy_keeper_get_count(cube->topk);
break;
+ case SAMPLING_MODE_SPREADSKETCH:
+ n_cell_tmp = spread_sketch_get_count(cube->spread_sketch);
+ break;
default:
assert(0);
}
@@ -870,14 +1030,34 @@ void cube_get_cells(const struct cube *cube, struct field_list **cell_dimensions
case SAMPLING_MODE_TOPK:
heavy_keeper_list(cube->topk, (void **)cell_datas, n_cell_tmp);
break;
+ case SAMPLING_MODE_SPREADSKETCH:
+ spread_sketch_list(cube->spread_sketch, (void **)cell_datas, n_cell_tmp);
+ break;
default:
assert(0);
}
+ // spread sketch often stores more than max_n_cell. So sort out the top max_n_cell cells.
+ if (cube->sampling_mode == SAMPLING_MODE_SPREADSKETCH && n_cell_tmp > cube->max_n_cell) {
+ struct tmp_sorted_data_spread_sketch_cell *tmp_sorted_data = (struct tmp_sorted_data_spread_sketch_cell *)malloc(sizeof(struct tmp_sorted_data_spread_sketch_cell) * n_cell_tmp);
+ for (int i = 0; i < n_cell_tmp; i++) {
+ tmp_sorted_data[i].data = cell_datas[i];
+ tmp_sorted_data[i].hll_value = metric_hll_get(cell_datas[i]->metrics[cube->primary_metric_id]);
+ }
+ qsort(tmp_sorted_data, n_cell_tmp, sizeof(struct tmp_sorted_data_spread_sketch_cell), compare_tmp_sorted_data_spread_sketch_cell);
+
+ free(cell_datas);
+ cell_datas = (struct cell **)malloc(sizeof(struct cell *) * cube->max_n_cell);
+ for (int i = 0; i < cube->max_n_cell; i++) {
+ cell_datas[i] = tmp_sorted_data[i].data;
+ }
+ n_cell_tmp = cube->max_n_cell;
+ free(tmp_sorted_data);
+ }
+
struct field_list *tag_list_ret = (struct field_list *)malloc(sizeof(struct field_list) * n_cell_tmp);
*cell_dimensions = tag_list_ret;
*n_cell = n_cell_tmp;
-
for (int i = 0; i < n_cell_tmp; i++) {
struct cell *cell_data = cell_datas[i];
struct field_list *tag_list_tmp = &tag_list_ret[i];
@@ -907,6 +1087,9 @@ const struct cell *get_cell_by_tag_list(const struct cube *cube, const struct fi
case SAMPLING_MODE_COMPREHENSIVE:
ret = hash_table_get0_exdata(cube->comprehensive, tag_in_string, tag_len);
break;
+ case SAMPLING_MODE_SPREADSKETCH:
+ ret = spread_sketch_get0_exdata(cube->spread_sketch, tag_in_string, tag_len);
+ break;
default:
assert(0);
return NULL;
@@ -1018,7 +1201,7 @@ int cube_get_cell_count(const struct cube *cube) {
}
}
-void cube_get_cells_used_by_metric(const struct cube *cube, const struct field_list *fields, int **metric_id_out, size_t *n_metric_out) {
+void cube_get_metrics_in_cell(const struct cube *cube, const struct field_list *fields, int **metric_id_out, size_t *n_metric_out) {
const struct cell *cell_data = get_cell_by_tag_list(cube, fields);
if (cell_data == NULL) {
*metric_id_out = NULL;
@@ -1051,4 +1234,8 @@ struct field_list *cube_get_identifier(const struct cube *cube) {
return tag_list;
-} \ No newline at end of file
+}
+
+enum sampling_mode cube_get_sampling_mode(const struct cube *cube) {
+ return cube->sampling_mode;
+}