diff options
Diffstat (limited to 'src/cube.c')
| -rw-r--r-- | src/cube.c | 295 |
1 files changed, 241 insertions, 54 deletions
@@ -302,7 +302,7 @@ int cube_manager_find(const struct cube_manager *pthis, const struct field *cube { char key_stack[4096]; char *key = key_stack; - int key_len = field_array_to_key_safe(cube_dimensions, n_dimension, key, sizeof(key)); + int key_len = field_array_to_key_safe(cube_dimensions, n_dimension, key, sizeof(key_stack)); bool free_key = false; if (key_len < 0) { // very unlikely to happen char *key_heap; @@ -575,6 +575,7 @@ struct cube *cube_new(const struct field *dimensions, size_t n_dimensions, enum case SAMPLING_MODE_SPREADSKETCH: cube->spread_sketch = spread_sketch_new(max_n_cell); spread_sketch_set_exdata_schema(cube->spread_sketch, exdata_new_i, exdata_free_i, exdata_merge_i, exdata_reset_i, exdata_copy_i); + break; default: assert(0); break; @@ -633,10 +634,10 @@ void cube_set_primary_metric(struct cube *cube, int metric_id) { cube->primary_metric_id = metric_id; } -struct cell *get_cell(struct cube *cube, const struct field *dimensions, size_t n_dimension, long long increment, int metric_id) { - char key_stack[4096]; +struct cell *get_cell_in_comprehensive_cube(struct cube *cube, const struct field *dimensions, size_t n_dimension) { + char key_stack[4096] = {0}; char *key = key_stack; - int key_len = field_array_to_key_safe(dimensions, n_dimension, key, sizeof(key)); + int key_len = field_array_to_key_safe(dimensions, n_dimension, key, sizeof(key_stack)); bool free_key = false; if (key_len < 0) { // very unlikely to happen char *key_heap; @@ -652,51 +653,100 @@ struct cell *get_cell(struct cube *cube, const struct field *dimensions, size_t args.n_dimensions = n_dimension; struct cell *cell_data = NULL; - switch (cube->sampling_mode) { - case SAMPLING_MODE_TOPK: { - if (cube->primary_metric_id != metric_id) { - cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len); - if (cell_data == NULL) { - int tmp_ret = heavy_keeper_add(cube->topk, key, key_len, 0, (void *)&args); - if (tmp_ret == 1) { - cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len); - } - } - } else { - // heavy_keeper_add should be called anyway, to let the topk record update. - int tmp_ret = heavy_keeper_add(cube->topk, key, key_len, increment, (void *)&args); - if (tmp_ret == 1) { - cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len); - } + assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE); + + cell_data = hash_table_get0_exdata(cube->comprehensive, key, key_len); + if (cell_data == NULL) { + int tmp_ret = hash_table_add(cube->comprehensive, key, key_len, (void *)&args); + if (tmp_ret == 1) { + cell_data = hash_table_get0_exdata(cube->comprehensive, key, key_len); } - break;} - case SAMPLING_MODE_COMPREHENSIVE: { - cell_data = hash_table_get0_exdata(cube->comprehensive, key, key_len); + } + + if (free_key) { + free(key); + } + return cell_data; +} + +struct cell *get_cell_in_topk_cube(struct cube *cube, const struct field *dimensions, size_t n_dimension, long long increment, int metric_id) { + char key_stack[4096] = {0}; + char *key = key_stack; + int key_len = field_array_to_key_safe(dimensions, n_dimension, key, sizeof(key_stack)); + bool free_key = false; + if (key_len < 0) { // very unlikely to happen + char *key_heap; + size_t key_len_tmp; + field_array_to_key_endeavor(dimensions, n_dimension, &key_heap, &key_len_tmp); + key = key_heap; + key_len = key_len_tmp; + free_key = true; + } + + struct exdata_new_args args; + args.cell_dimensions = dimensions; + args.n_dimensions = n_dimension; + + struct cell *cell_data = NULL; + assert(cube->sampling_mode == SAMPLING_MODE_TOPK); + if (cube->primary_metric_id != metric_id) { // FIXME: TODO: 我想把这个先get 再add 的逻辑直接改成add然后看返回值,结果在fuzz test 中的特殊码返回值里发现了问题。 + cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len); if (cell_data == NULL) { - int tmp_ret = hash_table_add(cube->comprehensive, key, key_len, (void *)&args); + int tmp_ret = heavy_keeper_add(cube->topk, key, key_len, 0, (void *)&args); if (tmp_ret == 1) { - cell_data = hash_table_get0_exdata(cube->comprehensive, key, key_len); + cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len); } } - break;} - case SAMPLING_MODE_SPREADSKETCH: { - if (cube->primary_metric_id != metric_id) { - cell_data = spread_sketch_get0_exdata(cube->spread_sketch, tag_in_string, tag_len); - // todo: spread sketch 没办法支持dummy 场景。首先,我不能让所有metric 都走spread sketch流程, - // 因为正常来说,用level=0 的hashy 做数值,没有任何意义,肯定都更新不了,只是在刚开始的时候,起一个记录key 的作用。 - // 而,如果像是topk 那样,给count=0 的一席之地,那么存在问题:spread sketch本身不对记录的个数有限制,所以什么时候停止记录呢?这样的设计也太麻烦了。 - // 之前跟老板讨论的时候,给了两个方案,方案1:做一个buffer,如果get exdata0 get 不到,则往buffer 中的cell 里写,等到来了primary以后,把cell 送进去。 - // 方案2:简单略去第一轮添加时的情况。这会造成很少量的误差。不过,实际上这个操作不是逐包而是会话开始结束时来一次,所以误差也不会太小。必须是让贺岚风能第一个操作primary metric。 + } else { + // heavy_keeper_add should be called anyway, to let the topk record update. + int tmp_ret = heavy_keeper_add(cube->topk, key, key_len, increment, (void *)&args); + if (tmp_ret == 1) { + cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len); } + } + + if (free_key) { + free(key); + } + return cell_data; +} + +struct cell *get_cell_in_spread_sketch_cube(struct cube *cube, const struct field *dimensions, size_t n_dimension, uint64_t item_hash, int metric_id) { + char key_stack[4096] = {0}; + char *key = key_stack; + int key_len = field_array_to_key_safe(dimensions, n_dimension, key, sizeof(key_stack)); + bool free_key = false; + if (key_len < 0) { // very unlikely to happen + char *key_heap; + size_t key_len_tmp; + field_array_to_key_endeavor(dimensions, n_dimension, &key_heap, &key_len_tmp); + key = key_heap; + key_len = key_len_tmp; + free_key = true; + } + + struct exdata_new_args args; + args.cell_dimensions = dimensions; + args.n_dimensions = n_dimension; + + struct cell *cell_data = NULL; + assert(cube->sampling_mode == SAMPLING_MODE_SPREADSKETCH); + + // todo: spread sketch 现在支持dummy 的方式是让他们也走sketch,可以用“满行”来减少这种计算,但确实加入level 低的内容,会走相同的流程,不像heavy keeper 一样就简单的查哈希表。 + if (cube->primary_metric_id != metric_id) { + cell_data = spread_sketch_get0_exdata(cube->spread_sketch, key, key_len); if (cell_data == NULL) { - int tmp_ret = spread_sketch_add(cube->spread_sketch, tag_in_string, tag_len, 0, (void *)&args); + int tmp_ret = spread_sketch_add(cube->spread_sketch, key, key_len, DUMMY_ITEM_HASH, (void *)&args); if (tmp_ret == 1) { - cell_data = spread_sketch_get0_exdata(cube->spread_sketch, tag_in_string, tag_len); + cell_data = spread_sketch_get0_exdata(cube->spread_sketch, key, key_len); } } - break;} - } - + } else { + int tmp_ret = spread_sketch_add(cube->spread_sketch, key, key_len, item_hash, (void *)&args); + if (tmp_ret == 1) { + cell_data = spread_sketch_get0_exdata(cube->spread_sketch, key, key_len); + } + } if (free_key) { free(key); @@ -704,14 +754,30 @@ struct cell *get_cell(struct cube *cube, const struct field *dimensions, size_t return cell_data; } + int cube_histogram_record(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, long long value) { assert(manifest->type == METRIC_TYPE_HISTOGRAM); assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE || (cube->primary_metric_id != manifest->id)); - struct cell *cell_data = get_cell(cube, dimensions, n_dimensions, 0, manifest->id); + struct cell *cell_data = NULL; + switch (cube->sampling_mode) { + case SAMPLING_MODE_COMPREHENSIVE: { + cell_data = get_cell_in_comprehensive_cube(cube, dimensions, n_dimensions); + break;} + case SAMPLING_MODE_TOPK: { + cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, 0, manifest->id); + break;} + case SAMPLING_MODE_SPREADSKETCH: { + cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, 0, manifest->id); + break;} + default: + assert(0); + break; + } if (cell_data == NULL) { return FS_ERR_TOO_MANY_CELLS; } + struct metric *metric = add_or_find_metric_in_cell(manifest, cell_data); int ret = metric_histogram_record(metric, value); @@ -723,19 +789,37 @@ int cube_histogram_record(struct cube *cube, const struct metric_manifest *manif int cube_hll_add(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, const char *key, size_t key_len) { assert(manifest->type == METRIC_TYPE_HLL); - assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE || (cube->primary_metric_id != manifest->id)); + assert(cube->sampling_mode != SAMPLING_MODE_TOPK || cube->primary_metric_id != manifest->id); - struct cell *cell_data = get_cell(cube, dimensions, n_dimensions, 0, manifest->id); + uint64_t hash = 0; // just any value, if we do not need to update the primary metric of spread sketch cube, hash value is not used + if (cube->sampling_mode == SAMPLING_MODE_SPREADSKETCH && cube->primary_metric_id == manifest->id) { + hash = XXH3_64bits(key, key_len); + } + struct cell *cell_data = NULL; + switch (cube->sampling_mode) { + case SAMPLING_MODE_COMPREHENSIVE: { + cell_data = get_cell_in_comprehensive_cube(cube, dimensions, n_dimensions); + break;} + case SAMPLING_MODE_TOPK: { + cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, 0, manifest->id); + break;} + case SAMPLING_MODE_SPREADSKETCH: { + cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, hash, manifest->id); + break;} + default: + assert(0); + break; + } if (cell_data == NULL) { return FS_ERR_TOO_MANY_CELLS; } - struct metric *metric = add_or_find_metric_in_cell(manifest, cell_data); + struct metric *metric = add_or_find_metric_in_cell(manifest, cell_data); metric_hll_add(metric, key, key_len); return FS_OK; } -uint64_t tags2hash(const struct field *field, size_t n_dimensions) { +uint64_t field_array_to_hash(const struct field *field, size_t n_dimensions) { XXH3_state_t state = {0}; XXH3_64bits_reset(&state); @@ -751,27 +835,65 @@ uint64_t tags2hash(const struct field *field, size_t n_dimensions) { return XXH3_64bits_digest(&state); } -int cube_hll_add_tag(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, const struct field *tags_key, size_t n_tag_key) +int cube_hll_add_field(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, const struct field *tags_key, size_t n_tag_key) { assert(manifest->type == METRIC_TYPE_HLL); assert(cube->sampling_mode != SAMPLING_MODE_TOPK || (cube->primary_metric_id != manifest->id)); - struct cell *cell_data = get_cell(cube, dimensions, n_dimensions, 0, manifest->id); + uint64_t hash = 0; // just any value, if we do not need to update the primary metric of spread sketch cube, hash value is not used + if (cube->sampling_mode == SAMPLING_MODE_SPREADSKETCH && cube->primary_metric_id == manifest->id) { + hash = field_array_to_hash(tags_key, n_tag_key); + } + struct cell *cell_data = NULL; + switch (cube->sampling_mode) { + case SAMPLING_MODE_COMPREHENSIVE: { + cell_data = get_cell_in_comprehensive_cube(cube, dimensions, n_dimensions); + break;} + case SAMPLING_MODE_TOPK: { + cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, 0, manifest->id); + break;} + case SAMPLING_MODE_SPREADSKETCH: { + cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, hash, manifest->id); + break;} + default: + assert(0); + break; + } if (cell_data == NULL) { return FS_ERR_TOO_MANY_CELLS; } struct metric *metric = add_or_find_metric_in_cell(manifest, cell_data); - - uint64_t hash = tags2hash(tags_key, n_tag_key); + + if (hash == 0) { // hash is not calculated yet. + hash = field_array_to_hash(tags_key, n_tag_key); + } metric_hll_add_hash(metric, hash); return FS_OK; } int cube_counter_incrby(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, long long increment) { assert(manifest->type == METRIC_TYPE_COUNTER); - assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE || (cube->primary_metric_id != manifest->id || increment >= 0)); + assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE || + (cube->sampling_mode == SAMPLING_MODE_TOPK && (cube->primary_metric_id != manifest->id || increment >= 0)) || + (cube->sampling_mode == SAMPLING_MODE_SPREADSKETCH && cube->primary_metric_id != manifest->id) + ); - struct cell *cell_data = get_cell(cube, dimensions, n_dimensions, increment, manifest->id); + struct cell *cell_data = NULL; + switch (cube->sampling_mode) { + case SAMPLING_MODE_COMPREHENSIVE: { + cell_data = get_cell_in_comprehensive_cube(cube, dimensions, n_dimensions); + break;} + case SAMPLING_MODE_TOPK: { + cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, increment, manifest->id); + break;} + case SAMPLING_MODE_SPREADSKETCH: { + cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, 0, manifest->id); + break;} + default: + assert(0); + break; + } + if (cell_data == NULL) { return FS_ERR_TOO_MANY_CELLS; } @@ -786,7 +908,21 @@ int cube_counter_set(struct cube *cube, const struct metric_manifest *manifest, assert(manifest->type == METRIC_TYPE_COUNTER); assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE || (cube->primary_metric_id != manifest->id)); - struct cell *cell_data = get_cell(cube, dimensions, n_dimensions, 0, manifest->id); + struct cell *cell_data = NULL; + switch (cube->sampling_mode) { + case SAMPLING_MODE_COMPREHENSIVE: { + cell_data = get_cell_in_comprehensive_cube(cube, dimensions, n_dimensions); + break;} + case SAMPLING_MODE_TOPK: { + cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, 0, manifest->id); + break;} + case SAMPLING_MODE_SPREADSKETCH: { + cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, 0, manifest->id); + break;} + default: + assert(0); + break; + } if (cell_data == NULL) { return FS_ERR_TOO_MANY_CELLS; } @@ -809,6 +945,9 @@ struct cube *cube_copy(const struct cube *cube) case SAMPLING_MODE_COMPREHENSIVE: cube_dup->comprehensive = hash_table_copy(cube->comprehensive); break; + case SAMPLING_MODE_SPREADSKETCH: + cube_dup->spread_sketch = spread_sketch_copy(cube->spread_sketch); + break; default: assert(0); break; @@ -829,6 +968,9 @@ void cube_merge(struct cube *dest, const struct cube *src) case SAMPLING_MODE_COMPREHENSIVE: hash_table_merge(dest->comprehensive, src->comprehensive); break; + case SAMPLING_MODE_SPREADSKETCH: + spread_sketch_merge(dest->spread_sketch, src->spread_sketch); + break; default: assert(0); break; @@ -842,6 +984,21 @@ struct cube *cube_fork(const struct cube *cube) { return ret; } +struct tmp_sorted_data_spread_sketch_cell { + double hll_value; + struct cell *data; +}; +static int compare_tmp_sorted_data_spread_sketch_cell(const void *a, const void *b) { // sort in descending order + const struct tmp_sorted_data_spread_sketch_cell *aa = (const struct tmp_sorted_data_spread_sketch_cell *)a; + const struct tmp_sorted_data_spread_sketch_cell *bb = (const struct tmp_sorted_data_spread_sketch_cell *)b; + if (aa->hll_value < bb->hll_value) { + return 1; + } else if (aa->hll_value > bb->hll_value) { + return -1; + } else { + return 0; + } +} void cube_get_cells(const struct cube *cube, struct field_list **cell_dimensions, size_t *n_cell) { size_t n_cell_tmp = 0; @@ -852,6 +1009,9 @@ void cube_get_cells(const struct cube *cube, struct field_list **cell_dimensions case SAMPLING_MODE_TOPK: n_cell_tmp = heavy_keeper_get_count(cube->topk); break; + case SAMPLING_MODE_SPREADSKETCH: + n_cell_tmp = spread_sketch_get_count(cube->spread_sketch); + break; default: assert(0); } @@ -870,14 +1030,34 @@ void cube_get_cells(const struct cube *cube, struct field_list **cell_dimensions case SAMPLING_MODE_TOPK: heavy_keeper_list(cube->topk, (void **)cell_datas, n_cell_tmp); break; + case SAMPLING_MODE_SPREADSKETCH: + spread_sketch_list(cube->spread_sketch, (void **)cell_datas, n_cell_tmp); + break; default: assert(0); } + // spread sketch often stores more than max_n_cell. So sort out the top max_n_cell cells. + if (cube->sampling_mode == SAMPLING_MODE_SPREADSKETCH && n_cell_tmp > cube->max_n_cell) { + struct tmp_sorted_data_spread_sketch_cell *tmp_sorted_data = (struct tmp_sorted_data_spread_sketch_cell *)malloc(sizeof(struct tmp_sorted_data_spread_sketch_cell) * n_cell_tmp); + for (int i = 0; i < n_cell_tmp; i++) { + tmp_sorted_data[i].data = cell_datas[i]; + tmp_sorted_data[i].hll_value = metric_hll_get(cell_datas[i]->metrics[cube->primary_metric_id]); + } + qsort(tmp_sorted_data, n_cell_tmp, sizeof(struct tmp_sorted_data_spread_sketch_cell), compare_tmp_sorted_data_spread_sketch_cell); + + free(cell_datas); + cell_datas = (struct cell **)malloc(sizeof(struct cell *) * cube->max_n_cell); + for (int i = 0; i < cube->max_n_cell; i++) { + cell_datas[i] = tmp_sorted_data[i].data; + } + n_cell_tmp = cube->max_n_cell; + free(tmp_sorted_data); + } + struct field_list *tag_list_ret = (struct field_list *)malloc(sizeof(struct field_list) * n_cell_tmp); *cell_dimensions = tag_list_ret; *n_cell = n_cell_tmp; - for (int i = 0; i < n_cell_tmp; i++) { struct cell *cell_data = cell_datas[i]; struct field_list *tag_list_tmp = &tag_list_ret[i]; @@ -907,6 +1087,9 @@ const struct cell *get_cell_by_tag_list(const struct cube *cube, const struct fi case SAMPLING_MODE_COMPREHENSIVE: ret = hash_table_get0_exdata(cube->comprehensive, tag_in_string, tag_len); break; + case SAMPLING_MODE_SPREADSKETCH: + ret = spread_sketch_get0_exdata(cube->spread_sketch, tag_in_string, tag_len); + break; default: assert(0); return NULL; @@ -1018,7 +1201,7 @@ int cube_get_cell_count(const struct cube *cube) { } } -void cube_get_cells_used_by_metric(const struct cube *cube, const struct field_list *fields, int **metric_id_out, size_t *n_metric_out) { +void cube_get_metrics_in_cell(const struct cube *cube, const struct field_list *fields, int **metric_id_out, size_t *n_metric_out) { const struct cell *cell_data = get_cell_by_tag_list(cube, fields); if (cell_data == NULL) { *metric_id_out = NULL; @@ -1051,4 +1234,8 @@ struct field_list *cube_get_identifier(const struct cube *cube) { return tag_list; -}
\ No newline at end of file +} + +enum sampling_mode cube_get_sampling_mode(const struct cube *cube) { + return cube->sampling_mode; +} |
