diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/cube.c | 295 | ||||
| -rw-r--r-- | src/cube.h | 5 | ||||
| -rw-r--r-- | src/fieldstat.c | 8 | ||||
| -rw-r--r-- | src/tags/spread_sketch.c | 93 | ||||
| -rw-r--r-- | src/tags/spread_sketch.h | 3 |
5 files changed, 309 insertions, 95 deletions
@@ -302,7 +302,7 @@ int cube_manager_find(const struct cube_manager *pthis, const struct field *cube { char key_stack[4096]; char *key = key_stack; - int key_len = field_array_to_key_safe(cube_dimensions, n_dimension, key, sizeof(key)); + int key_len = field_array_to_key_safe(cube_dimensions, n_dimension, key, sizeof(key_stack)); bool free_key = false; if (key_len < 0) { // very unlikely to happen char *key_heap; @@ -575,6 +575,7 @@ struct cube *cube_new(const struct field *dimensions, size_t n_dimensions, enum case SAMPLING_MODE_SPREADSKETCH: cube->spread_sketch = spread_sketch_new(max_n_cell); spread_sketch_set_exdata_schema(cube->spread_sketch, exdata_new_i, exdata_free_i, exdata_merge_i, exdata_reset_i, exdata_copy_i); + break; default: assert(0); break; @@ -633,10 +634,10 @@ void cube_set_primary_metric(struct cube *cube, int metric_id) { cube->primary_metric_id = metric_id; } -struct cell *get_cell(struct cube *cube, const struct field *dimensions, size_t n_dimension, long long increment, int metric_id) { - char key_stack[4096]; +struct cell *get_cell_in_comprehensive_cube(struct cube *cube, const struct field *dimensions, size_t n_dimension) { + char key_stack[4096] = {0}; char *key = key_stack; - int key_len = field_array_to_key_safe(dimensions, n_dimension, key, sizeof(key)); + int key_len = field_array_to_key_safe(dimensions, n_dimension, key, sizeof(key_stack)); bool free_key = false; if (key_len < 0) { // very unlikely to happen char *key_heap; @@ -652,51 +653,100 @@ struct cell *get_cell(struct cube *cube, const struct field *dimensions, size_t args.n_dimensions = n_dimension; struct cell *cell_data = NULL; - switch (cube->sampling_mode) { - case SAMPLING_MODE_TOPK: { - if (cube->primary_metric_id != metric_id) { - cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len); - if (cell_data == NULL) { - int tmp_ret = heavy_keeper_add(cube->topk, key, key_len, 0, (void *)&args); - if (tmp_ret == 1) { - cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len); - } - } - } else { - // heavy_keeper_add should be called anyway, to let the topk record update. - int tmp_ret = heavy_keeper_add(cube->topk, key, key_len, increment, (void *)&args); - if (tmp_ret == 1) { - cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len); - } + assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE); + + cell_data = hash_table_get0_exdata(cube->comprehensive, key, key_len); + if (cell_data == NULL) { + int tmp_ret = hash_table_add(cube->comprehensive, key, key_len, (void *)&args); + if (tmp_ret == 1) { + cell_data = hash_table_get0_exdata(cube->comprehensive, key, key_len); } - break;} - case SAMPLING_MODE_COMPREHENSIVE: { - cell_data = hash_table_get0_exdata(cube->comprehensive, key, key_len); + } + + if (free_key) { + free(key); + } + return cell_data; +} + +struct cell *get_cell_in_topk_cube(struct cube *cube, const struct field *dimensions, size_t n_dimension, long long increment, int metric_id) { + char key_stack[4096] = {0}; + char *key = key_stack; + int key_len = field_array_to_key_safe(dimensions, n_dimension, key, sizeof(key_stack)); + bool free_key = false; + if (key_len < 0) { // very unlikely to happen + char *key_heap; + size_t key_len_tmp; + field_array_to_key_endeavor(dimensions, n_dimension, &key_heap, &key_len_tmp); + key = key_heap; + key_len = key_len_tmp; + free_key = true; + } + + struct exdata_new_args args; + args.cell_dimensions = dimensions; + args.n_dimensions = n_dimension; + + struct cell *cell_data = NULL; + assert(cube->sampling_mode == SAMPLING_MODE_TOPK); + if (cube->primary_metric_id != metric_id) { // FIXME: TODO: 我想把这个先get 再add 的逻辑直接改成add然后看返回值,结果在fuzz test 中的特殊码返回值里发现了问题。 + cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len); if (cell_data == NULL) { - int tmp_ret = hash_table_add(cube->comprehensive, key, key_len, (void *)&args); + int tmp_ret = heavy_keeper_add(cube->topk, key, key_len, 0, (void *)&args); if (tmp_ret == 1) { - cell_data = hash_table_get0_exdata(cube->comprehensive, key, key_len); + cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len); } } - break;} - case SAMPLING_MODE_SPREADSKETCH: { - if (cube->primary_metric_id != metric_id) { - cell_data = spread_sketch_get0_exdata(cube->spread_sketch, tag_in_string, tag_len); - // todo: spread sketch 没办法支持dummy 场景。首先,我不能让所有metric 都走spread sketch流程, - // 因为正常来说,用level=0 的hashy 做数值,没有任何意义,肯定都更新不了,只是在刚开始的时候,起一个记录key 的作用。 - // 而,如果像是topk 那样,给count=0 的一席之地,那么存在问题:spread sketch本身不对记录的个数有限制,所以什么时候停止记录呢?这样的设计也太麻烦了。 - // 之前跟老板讨论的时候,给了两个方案,方案1:做一个buffer,如果get exdata0 get 不到,则往buffer 中的cell 里写,等到来了primary以后,把cell 送进去。 - // 方案2:简单略去第一轮添加时的情况。这会造成很少量的误差。不过,实际上这个操作不是逐包而是会话开始结束时来一次,所以误差也不会太小。必须是让贺岚风能第一个操作primary metric。 + } else { + // heavy_keeper_add should be called anyway, to let the topk record update. + int tmp_ret = heavy_keeper_add(cube->topk, key, key_len, increment, (void *)&args); + if (tmp_ret == 1) { + cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len); } + } + + if (free_key) { + free(key); + } + return cell_data; +} + +struct cell *get_cell_in_spread_sketch_cube(struct cube *cube, const struct field *dimensions, size_t n_dimension, uint64_t item_hash, int metric_id) { + char key_stack[4096] = {0}; + char *key = key_stack; + int key_len = field_array_to_key_safe(dimensions, n_dimension, key, sizeof(key_stack)); + bool free_key = false; + if (key_len < 0) { // very unlikely to happen + char *key_heap; + size_t key_len_tmp; + field_array_to_key_endeavor(dimensions, n_dimension, &key_heap, &key_len_tmp); + key = key_heap; + key_len = key_len_tmp; + free_key = true; + } + + struct exdata_new_args args; + args.cell_dimensions = dimensions; + args.n_dimensions = n_dimension; + + struct cell *cell_data = NULL; + assert(cube->sampling_mode == SAMPLING_MODE_SPREADSKETCH); + + // todo: spread sketch 现在支持dummy 的方式是让他们也走sketch,可以用“满行”来减少这种计算,但确实加入level 低的内容,会走相同的流程,不像heavy keeper 一样就简单的查哈希表。 + if (cube->primary_metric_id != metric_id) { + cell_data = spread_sketch_get0_exdata(cube->spread_sketch, key, key_len); if (cell_data == NULL) { - int tmp_ret = spread_sketch_add(cube->spread_sketch, tag_in_string, tag_len, 0, (void *)&args); + int tmp_ret = spread_sketch_add(cube->spread_sketch, key, key_len, DUMMY_ITEM_HASH, (void *)&args); if (tmp_ret == 1) { - cell_data = spread_sketch_get0_exdata(cube->spread_sketch, tag_in_string, tag_len); + cell_data = spread_sketch_get0_exdata(cube->spread_sketch, key, key_len); } } - break;} - } - + } else { + int tmp_ret = spread_sketch_add(cube->spread_sketch, key, key_len, item_hash, (void *)&args); + if (tmp_ret == 1) { + cell_data = spread_sketch_get0_exdata(cube->spread_sketch, key, key_len); + } + } if (free_key) { free(key); @@ -704,14 +754,30 @@ struct cell *get_cell(struct cube *cube, const struct field *dimensions, size_t return cell_data; } + int cube_histogram_record(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, long long value) { assert(manifest->type == METRIC_TYPE_HISTOGRAM); assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE || (cube->primary_metric_id != manifest->id)); - struct cell *cell_data = get_cell(cube, dimensions, n_dimensions, 0, manifest->id); + struct cell *cell_data = NULL; + switch (cube->sampling_mode) { + case SAMPLING_MODE_COMPREHENSIVE: { + cell_data = get_cell_in_comprehensive_cube(cube, dimensions, n_dimensions); + break;} + case SAMPLING_MODE_TOPK: { + cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, 0, manifest->id); + break;} + case SAMPLING_MODE_SPREADSKETCH: { + cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, 0, manifest->id); + break;} + default: + assert(0); + break; + } if (cell_data == NULL) { return FS_ERR_TOO_MANY_CELLS; } + struct metric *metric = add_or_find_metric_in_cell(manifest, cell_data); int ret = metric_histogram_record(metric, value); @@ -723,19 +789,37 @@ int cube_histogram_record(struct cube *cube, const struct metric_manifest *manif int cube_hll_add(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, const char *key, size_t key_len) { assert(manifest->type == METRIC_TYPE_HLL); - assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE || (cube->primary_metric_id != manifest->id)); + assert(cube->sampling_mode != SAMPLING_MODE_TOPK || cube->primary_metric_id != manifest->id); - struct cell *cell_data = get_cell(cube, dimensions, n_dimensions, 0, manifest->id); + uint64_t hash = 0; // just any value, if we do not need to update the primary metric of spread sketch cube, hash value is not used + if (cube->sampling_mode == SAMPLING_MODE_SPREADSKETCH && cube->primary_metric_id == manifest->id) { + hash = XXH3_64bits(key, key_len); + } + struct cell *cell_data = NULL; + switch (cube->sampling_mode) { + case SAMPLING_MODE_COMPREHENSIVE: { + cell_data = get_cell_in_comprehensive_cube(cube, dimensions, n_dimensions); + break;} + case SAMPLING_MODE_TOPK: { + cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, 0, manifest->id); + break;} + case SAMPLING_MODE_SPREADSKETCH: { + cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, hash, manifest->id); + break;} + default: + assert(0); + break; + } if (cell_data == NULL) { return FS_ERR_TOO_MANY_CELLS; } - struct metric *metric = add_or_find_metric_in_cell(manifest, cell_data); + struct metric *metric = add_or_find_metric_in_cell(manifest, cell_data); metric_hll_add(metric, key, key_len); return FS_OK; } -uint64_t tags2hash(const struct field *field, size_t n_dimensions) { +uint64_t field_array_to_hash(const struct field *field, size_t n_dimensions) { XXH3_state_t state = {0}; XXH3_64bits_reset(&state); @@ -751,27 +835,65 @@ uint64_t tags2hash(const struct field *field, size_t n_dimensions) { return XXH3_64bits_digest(&state); } -int cube_hll_add_tag(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, const struct field *tags_key, size_t n_tag_key) +int cube_hll_add_field(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, const struct field *tags_key, size_t n_tag_key) { assert(manifest->type == METRIC_TYPE_HLL); assert(cube->sampling_mode != SAMPLING_MODE_TOPK || (cube->primary_metric_id != manifest->id)); - struct cell *cell_data = get_cell(cube, dimensions, n_dimensions, 0, manifest->id); + uint64_t hash = 0; // just any value, if we do not need to update the primary metric of spread sketch cube, hash value is not used + if (cube->sampling_mode == SAMPLING_MODE_SPREADSKETCH && cube->primary_metric_id == manifest->id) { + hash = field_array_to_hash(tags_key, n_tag_key); + } + struct cell *cell_data = NULL; + switch (cube->sampling_mode) { + case SAMPLING_MODE_COMPREHENSIVE: { + cell_data = get_cell_in_comprehensive_cube(cube, dimensions, n_dimensions); + break;} + case SAMPLING_MODE_TOPK: { + cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, 0, manifest->id); + break;} + case SAMPLING_MODE_SPREADSKETCH: { + cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, hash, manifest->id); + break;} + default: + assert(0); + break; + } if (cell_data == NULL) { return FS_ERR_TOO_MANY_CELLS; } struct metric *metric = add_or_find_metric_in_cell(manifest, cell_data); - - uint64_t hash = tags2hash(tags_key, n_tag_key); + + if (hash == 0) { // hash is not calculated yet. + hash = field_array_to_hash(tags_key, n_tag_key); + } metric_hll_add_hash(metric, hash); return FS_OK; } int cube_counter_incrby(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, long long increment) { assert(manifest->type == METRIC_TYPE_COUNTER); - assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE || (cube->primary_metric_id != manifest->id || increment >= 0)); + assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE || + (cube->sampling_mode == SAMPLING_MODE_TOPK && (cube->primary_metric_id != manifest->id || increment >= 0)) || + (cube->sampling_mode == SAMPLING_MODE_SPREADSKETCH && cube->primary_metric_id != manifest->id) + ); - struct cell *cell_data = get_cell(cube, dimensions, n_dimensions, increment, manifest->id); + struct cell *cell_data = NULL; + switch (cube->sampling_mode) { + case SAMPLING_MODE_COMPREHENSIVE: { + cell_data = get_cell_in_comprehensive_cube(cube, dimensions, n_dimensions); + break;} + case SAMPLING_MODE_TOPK: { + cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, increment, manifest->id); + break;} + case SAMPLING_MODE_SPREADSKETCH: { + cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, 0, manifest->id); + break;} + default: + assert(0); + break; + } + if (cell_data == NULL) { return FS_ERR_TOO_MANY_CELLS; } @@ -786,7 +908,21 @@ int cube_counter_set(struct cube *cube, const struct metric_manifest *manifest, assert(manifest->type == METRIC_TYPE_COUNTER); assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE || (cube->primary_metric_id != manifest->id)); - struct cell *cell_data = get_cell(cube, dimensions, n_dimensions, 0, manifest->id); + struct cell *cell_data = NULL; + switch (cube->sampling_mode) { + case SAMPLING_MODE_COMPREHENSIVE: { + cell_data = get_cell_in_comprehensive_cube(cube, dimensions, n_dimensions); + break;} + case SAMPLING_MODE_TOPK: { + cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, 0, manifest->id); + break;} + case SAMPLING_MODE_SPREADSKETCH: { + cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, 0, manifest->id); + break;} + default: + assert(0); + break; + } if (cell_data == NULL) { return FS_ERR_TOO_MANY_CELLS; } @@ -809,6 +945,9 @@ struct cube *cube_copy(const struct cube *cube) case SAMPLING_MODE_COMPREHENSIVE: cube_dup->comprehensive = hash_table_copy(cube->comprehensive); break; + case SAMPLING_MODE_SPREADSKETCH: + cube_dup->spread_sketch = spread_sketch_copy(cube->spread_sketch); + break; default: assert(0); break; @@ -829,6 +968,9 @@ void cube_merge(struct cube *dest, const struct cube *src) case SAMPLING_MODE_COMPREHENSIVE: hash_table_merge(dest->comprehensive, src->comprehensive); break; + case SAMPLING_MODE_SPREADSKETCH: + spread_sketch_merge(dest->spread_sketch, src->spread_sketch); + break; default: assert(0); break; @@ -842,6 +984,21 @@ struct cube *cube_fork(const struct cube *cube) { return ret; } +struct tmp_sorted_data_spread_sketch_cell { + double hll_value; + struct cell *data; +}; +static int compare_tmp_sorted_data_spread_sketch_cell(const void *a, const void *b) { // sort in descending order + const struct tmp_sorted_data_spread_sketch_cell *aa = (const struct tmp_sorted_data_spread_sketch_cell *)a; + const struct tmp_sorted_data_spread_sketch_cell *bb = (const struct tmp_sorted_data_spread_sketch_cell *)b; + if (aa->hll_value < bb->hll_value) { + return 1; + } else if (aa->hll_value > bb->hll_value) { + return -1; + } else { + return 0; + } +} void cube_get_cells(const struct cube *cube, struct field_list **cell_dimensions, size_t *n_cell) { size_t n_cell_tmp = 0; @@ -852,6 +1009,9 @@ void cube_get_cells(const struct cube *cube, struct field_list **cell_dimensions case SAMPLING_MODE_TOPK: n_cell_tmp = heavy_keeper_get_count(cube->topk); break; + case SAMPLING_MODE_SPREADSKETCH: + n_cell_tmp = spread_sketch_get_count(cube->spread_sketch); + break; default: assert(0); } @@ -870,14 +1030,34 @@ void cube_get_cells(const struct cube *cube, struct field_list **cell_dimensions case SAMPLING_MODE_TOPK: heavy_keeper_list(cube->topk, (void **)cell_datas, n_cell_tmp); break; + case SAMPLING_MODE_SPREADSKETCH: + spread_sketch_list(cube->spread_sketch, (void **)cell_datas, n_cell_tmp); + break; default: assert(0); } + // spread sketch often stores more than max_n_cell. So sort out the top max_n_cell cells. + if (cube->sampling_mode == SAMPLING_MODE_SPREADSKETCH && n_cell_tmp > cube->max_n_cell) { + struct tmp_sorted_data_spread_sketch_cell *tmp_sorted_data = (struct tmp_sorted_data_spread_sketch_cell *)malloc(sizeof(struct tmp_sorted_data_spread_sketch_cell) * n_cell_tmp); + for (int i = 0; i < n_cell_tmp; i++) { + tmp_sorted_data[i].data = cell_datas[i]; + tmp_sorted_data[i].hll_value = metric_hll_get(cell_datas[i]->metrics[cube->primary_metric_id]); + } + qsort(tmp_sorted_data, n_cell_tmp, sizeof(struct tmp_sorted_data_spread_sketch_cell), compare_tmp_sorted_data_spread_sketch_cell); + + free(cell_datas); + cell_datas = (struct cell **)malloc(sizeof(struct cell *) * cube->max_n_cell); + for (int i = 0; i < cube->max_n_cell; i++) { + cell_datas[i] = tmp_sorted_data[i].data; + } + n_cell_tmp = cube->max_n_cell; + free(tmp_sorted_data); + } + struct field_list *tag_list_ret = (struct field_list *)malloc(sizeof(struct field_list) * n_cell_tmp); *cell_dimensions = tag_list_ret; *n_cell = n_cell_tmp; - for (int i = 0; i < n_cell_tmp; i++) { struct cell *cell_data = cell_datas[i]; struct field_list *tag_list_tmp = &tag_list_ret[i]; @@ -907,6 +1087,9 @@ const struct cell *get_cell_by_tag_list(const struct cube *cube, const struct fi case SAMPLING_MODE_COMPREHENSIVE: ret = hash_table_get0_exdata(cube->comprehensive, tag_in_string, tag_len); break; + case SAMPLING_MODE_SPREADSKETCH: + ret = spread_sketch_get0_exdata(cube->spread_sketch, tag_in_string, tag_len); + break; default: assert(0); return NULL; @@ -1018,7 +1201,7 @@ int cube_get_cell_count(const struct cube *cube) { } } -void cube_get_cells_used_by_metric(const struct cube *cube, const struct field_list *fields, int **metric_id_out, size_t *n_metric_out) { +void cube_get_metrics_in_cell(const struct cube *cube, const struct field_list *fields, int **metric_id_out, size_t *n_metric_out) { const struct cell *cell_data = get_cell_by_tag_list(cube, fields); if (cell_data == NULL) { *metric_id_out = NULL; @@ -1051,4 +1234,8 @@ struct field_list *cube_get_identifier(const struct cube *cube) { return tag_list; -}
\ No newline at end of file +} + +enum sampling_mode cube_get_sampling_mode(const struct cube *cube) { + return cube->sampling_mode; +} @@ -22,7 +22,7 @@ struct cube *cube_fork(const struct cube *cube); // only copy the cube configura int cube_histogram_record(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, long long value); int cube_hll_add(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, const char *key, size_t key_len); -int cube_hll_add_tag(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, const struct field *tags_key, size_t n_tag_key); +int cube_hll_add_field(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, const struct field *tags_key, size_t n_tag_key); int cube_counter_incrby(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, long long increment); int cube_counter_set(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, long long value); @@ -33,8 +33,9 @@ int cube_histogram_count_le_value(const struct cube *cube, int metric_id, const int cube_get_serialization(const struct cube *cube, int metric_id, const struct field_list *dimensions, char **blob, size_t *blob_size); int cube_get_cell_count(const struct cube *cube); +enum sampling_mode cube_get_sampling_mode(const struct cube *cube); void cube_get_cells(const struct cube *cube, struct field_list **tag_list, size_t *n_cell); -void cube_get_cells_used_by_metric(const struct cube *cube, const struct field_list *dimensions, int **metric_id_out, size_t *n_metric_out); +void cube_get_metrics_in_cell(const struct cube *cube, const struct field_list *dimensions, int **metric_id_out, size_t *n_metric_out); void cube_set_primary_metric(struct cube *cube, int metric_id); struct field_list *cube_get_identifier(const struct cube *cube); diff --git a/src/fieldstat.c b/src/fieldstat.c index abf3e91..b847bd5 100644 --- a/src/fieldstat.c +++ b/src/fieldstat.c @@ -204,7 +204,9 @@ int fieldstat_cube_set_primary_metric(struct fieldstat *instance, int cube_id, i if (manifest == NULL) { return FS_ERR_INVALID_METRIC_ID; } - if (manifest->type != METRIC_TYPE_COUNTER) { + if (cube_get_sampling_mode(cube) == SAMPLING_MODE_COMPREHENSIVE || + (cube_get_sampling_mode(cube) == SAMPLING_MODE_TOPK && manifest->type != METRIC_TYPE_COUNTER) || + (cube_get_sampling_mode(cube) == SAMPLING_MODE_SPREADSKETCH && manifest->type != METRIC_TYPE_HLL)) { return FS_ERR_INVALID_PARAM; } @@ -351,7 +353,7 @@ int fieldstat_hll_add_field(struct fieldstat *instance, int cube_id, int metric_ return FS_ERR_INVALID_METRIC_ID; } - return cube_hll_add_tag(cube, manifest, cell_dimensions, n_dimensions, item, item_len); + return cube_hll_add_field(cube, manifest, cell_dimensions, n_dimensions, item, item_len); } // cppcheck-suppress [constParameterPointer, unmatchedSuppression] @@ -589,5 +591,5 @@ int fieldstat_find_cube(const struct fieldstat *instance, const struct field *cu void fieldstat_get_metric_in_cell(const struct fieldstat *instance, int cube_id, const struct field_list *cell_dimensions, int **metric_id_out, size_t *n_metric_out) { const struct cube *cube = cube_manager_get_cube_by_id(instance->cube_manager, cube_id); - return cube_get_cells_used_by_metric(cube, cell_dimensions, metric_id_out, n_metric_out); + return cube_get_metrics_in_cell(cube, cell_dimensions, metric_id_out, n_metric_out); }
\ No newline at end of file diff --git a/src/tags/spread_sketch.c b/src/tags/spread_sketch.c index c654e89..598ef9b 100644 --- a/src/tags/spread_sketch.c +++ b/src/tags/spread_sketch.c @@ -28,10 +28,11 @@ 会让cell 的操作变得麻烦,无法借用老的cell manager 流程,get exdata 会得到一个exdata 的数组(可能多个),而非单独的一个,要对多个cell综合起来当一个cell 看。。修改量非常小,但是确实会影响代码本身的整洁度。 */ -struct smart_ptr { // todo:entry + +struct entry { int ref_count; void *exdata; - + bool dying; char *key; size_t key_len; UT_hash_handle hh; @@ -45,14 +46,14 @@ struct spread_sketch_scheme { exdata_copy_cb copy_fn; }; -struct smart_ptr_table { // TODO: entry table - struct smart_ptr *entry; +struct entry_table { + struct entry *entry; struct spread_sketch_scheme scheme; }; struct bucket { - struct smart_ptr *content; + struct entry *content; uint32_t level; }; @@ -63,7 +64,7 @@ struct spread_sketch { struct spread_sketch_scheme scheme; struct bucket *buckets; - struct smart_ptr_table *table; + struct entry_table *table; uint32_t *min_level_per_row; // TODO: 先看看性能吧, 之后再写。用来记录每行最小的level,从而跳过行数。对于64位的level,维持一个计数,额外使用64 r的空间,当一个最小位数的level 计数到0时,更新最小level。 // TODO: 对比heavy keeper,不仅仅是跳过的问题,heavykeeper 无论什么情况,在输入0的时候都不会走sketch 更新。 @@ -91,19 +92,22 @@ static inline bool key_equal(const char *key1, size_t key1_len, const char *key2 return memcmp(key1, key2, key1_len) == 0; } static inline char *key_dup(const char *key, size_t key_len) { - char *ret = malloc(key_len); + char *ret = malloc(key_len+1); memcpy(ret, key, key_len); + ret[key_len] = '\0'; return ret; } -struct smart_ptr *smart_ptr_table_get(struct smart_ptr_table *table, const char *key, size_t key_len, void *arg) { - struct smart_ptr *ret; +struct entry *smart_ptr_table_get(struct entry_table *table, const char *key, size_t key_len, void *arg) { + struct entry *ret; HASH_FIND(hh, table->entry, key, key_len, ret); if (ret != NULL) { + ret->dying = false; ret->ref_count++; } else { - ret = malloc(sizeof(struct smart_ptr)); + ret = malloc(sizeof(struct entry)); + ret->dying = false; ret->ref_count = 1; ret->key = key_dup(key, key_len); ret->key_len = key_len; @@ -118,8 +122,8 @@ struct smart_ptr *smart_ptr_table_get(struct smart_ptr_table *table, const char return ret; } -int smart_ptr_table_release(struct smart_ptr_table *table, const char *key, size_t key_len) { - struct smart_ptr *ret; +int smart_ptr_table_release(struct entry_table *table, const char *key, size_t key_len) { + struct entry *ret; HASH_FIND(hh, table->entry, key, key_len, ret); if (ret == NULL) { return -1; @@ -127,6 +131,7 @@ int smart_ptr_table_release(struct smart_ptr_table *table, const char *key, size ret->ref_count--; if (ret->ref_count == 0) { + // printf("release %s\n", key); HASH_DEL(table->entry, ret); table->scheme.free_fn(ret->exdata); free(ret->key); @@ -136,8 +141,8 @@ int smart_ptr_table_release(struct smart_ptr_table *table, const char *key, size return 0; } -void smart_ptr_table_free(struct smart_ptr_table *table) { - struct smart_ptr *current, *tmp; +void smart_ptr_table_free(struct entry_table *table) { + struct entry *current, *tmp; HASH_ITER(hh, table->entry, current, tmp) { HASH_DEL(table->entry, current); table->scheme.free_fn(current->exdata); @@ -147,8 +152,8 @@ void smart_ptr_table_free(struct smart_ptr_table *table) { free(table); } -struct smart_ptr_table *smart_ptr_table_new() { - struct smart_ptr_table *table = malloc(sizeof(struct smart_ptr_table)); +struct entry_table *smart_ptr_table_new() { + struct entry_table *table = malloc(sizeof(struct entry_table)); table->entry = NULL; table->scheme.new_fn = default_new_fn; @@ -194,9 +199,10 @@ struct spread_sketch *spread_sketch_new(int expected_query_num) { } // return 0 if not added, return 1 if added -int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t hash_identifier, void *arg) {// todo: entry, item +int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg) { // uint64_t hash_identifier = XXH3_64bits_withSeed(identifier, identifier_length, 171); - uint32_t level = (uint32_t)__builtin_clzll(hash_identifier) + 1; + uint32_t level = (uint32_t)__builtin_clzll(item_hash) + 1; + // printf("spread_sketch_add key %s, level %u\n", key, level); // https://www.eecs.harvard.edu/~michaelm/postscripts/tr-02-05.pdf // A technique from the hashing literature is to use two hash functions h1(x) and h2(x) to simulate additional hash functions of the form gi(x) = h1(x) + ih2(x) @@ -212,17 +218,22 @@ int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_leng struct bucket *bucket = &ss->buckets[bucket_idx]; if (bucket->content != NULL && key_equal(bucket->content->key, bucket->content->key_len, key, key_length)) { + bucket->content->dying = false; + if (bucket->level < level) { bucket->level = level; } in_sketch = true; } else { - uint32_t true_level = bucket->content == NULL ? 0: bucket->level; - - if (true_level < level) { - const struct smart_ptr *content_old = bucket->content; - smart_ptr_table_release(ss->table, content_old->key, content_old->key_len); - struct smart_ptr *content_new = smart_ptr_table_get(ss->table, key, key_length, arg); + uint32_t old_level = bucket->content == NULL ? 0: bucket->level; + + if (level > old_level) { + // printf("update key %s to %s, and level %u to %u, in bucket (r,w)=(%d,%u)\n", bucket->content == NULL ? "NULL": bucket->content->key, key, old_level, level, i, hash_x % ss->width); + const struct entry *content_old = bucket->content; + if (content_old != NULL) { + smart_ptr_table_release(ss->table, content_old->key, content_old->key_len); + } + struct entry *content_new = smart_ptr_table_get(ss->table, key, key_length, arg); bucket->content = content_new; bucket->level = level; @@ -248,7 +259,7 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch * const struct bucket *bucket_src = &src->buckets[i]; struct bucket *bucket_dst = &dst->buckets[i]; - if (bucket_src->content == NULL) { + if (bucket_src->content == NULL || bucket_src->content->dying) { continue; } if (bucket_dst->content == NULL) { @@ -256,6 +267,7 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch * bucket_dst->level = bucket_src->level; continue; } + bucket_dst->content->dying = false; if (key_equal(bucket_src->content->key, bucket_src->content->key_len, bucket_dst->content->key, bucket_dst->content->key_len)) { if (bucket_src->level > bucket_dst->level) { @@ -272,10 +284,10 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch * const struct spread_sketch_scheme *scheme = &dst->table->scheme; - struct smart_ptr *content_dest, *content_src, *tmp; + struct entry *content_dest, *content_src, *tmp; HASH_ITER(hh, dst->table->entry, content_dest, tmp) { HASH_FIND(hh, src->table->entry, content_dest->key, content_dest->key_len, content_src); - if (content_src == NULL) { + if (content_src == NULL || content_src->dying) { continue; } @@ -288,9 +300,9 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch * } void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *key, size_t key_len) { - struct smart_ptr *content; + struct entry *content; HASH_FIND(hh, ss->table->entry, key, key_len, content); - if (content == NULL) { + if (content == NULL || content->dying) { return NULL; } return content->exdata; @@ -302,9 +314,10 @@ void spread_sketch_reset(struct spread_sketch *ss) { bucket->level = 0; } - struct smart_ptr *content, *tmp; + struct entry *content, *tmp; HASH_ITER(hh, ss->table->entry, content, tmp) { ss->scheme.reset_fn(content->exdata); + content->dying = true; } } @@ -319,15 +332,24 @@ void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new } int spread_sketch_get_count(const struct spread_sketch *ss) { - return HASH_CNT(hh, ss->table->entry); + int cnt = 0; + struct entry *content, *tmp; + HASH_ITER(hh, ss->table->entry, content, tmp) { + if (!content->dying) { + cnt++; + } + } + + return cnt; } -// 这个函数还是会忠实的把内部所有内容都拿出来,但是预期是最多expected_query_num 个,所以在外层要做一个排序。 -// 这个排序在这里面没法做,因为没有具体的hll计数。 size_t spread_sketch_list(const struct spread_sketch *ss, void **exdatas, size_t n_exdatas) { size_t count = 0; - struct smart_ptr *content, *tmp; + struct entry *content, *tmp; HASH_ITER(hh, ss->table->entry, content, tmp) { + if (content->dying) { + continue; + } if (count >= n_exdatas) { break; } @@ -343,9 +365,10 @@ struct spread_sketch *spread_sketch_copy(const struct spread_sketch *src) { dst->buckets = calloc(dst->depth * dst->width, sizeof(struct bucket)); dst->table = smart_ptr_table_new(); + spread_sketch_set_exdata_schema(dst, src->scheme.new_fn, src->scheme.free_fn, src->scheme.merge_fn, src->scheme.reset_fn, src->scheme.copy_fn); for (int i = 0; i < dst->depth * dst->width; i++) { - if (src->buckets[i].content == NULL) { + if (src->buckets[i].content == NULL || src->buckets[i].content->dying) { continue; } dst->buckets[i].content = smart_ptr_table_get(dst->table, src->buckets[i].content->key, src->buckets[i].content->key_len, NULL); diff --git a/src/tags/spread_sketch.h b/src/tags/spread_sketch.h index f50cae4..9717238 100644 --- a/src/tags/spread_sketch.h +++ b/src/tags/spread_sketch.h @@ -8,6 +8,7 @@ extern "C"{ #include "exdata.h" +#define DUMMY_ITEM_HASH (1ULL<<63) // level(left most zeros) = 0 struct spread_sketch; @@ -18,7 +19,7 @@ void spread_sketch_free(struct spread_sketch *ss); void spread_sketch_reset(struct spread_sketch *ss); -int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t hash_identifier, void *arg); +int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg); void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn); void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *key, size_t key_len); |
