summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorchenzizhan <[email protected]>2024-07-11 16:14:09 +0800
committerchenzizhan <[email protected]>2024-07-11 16:14:09 +0800
commit5dc3d8a96bb203abc1ee050cd0c884f2ab989dba (patch)
tree38f5bc67522843c1cca51a30e413e4f8f9d1834e /src
parent677f337e195e3b9b6e416109df8d51c14da2791b (diff)
spread sketch merge, reset
Diffstat (limited to 'src')
-rw-r--r--src/cube.c295
-rw-r--r--src/cube.h5
-rw-r--r--src/fieldstat.c8
-rw-r--r--src/tags/spread_sketch.c93
-rw-r--r--src/tags/spread_sketch.h3
5 files changed, 309 insertions, 95 deletions
diff --git a/src/cube.c b/src/cube.c
index 47c9065..a01b775 100644
--- a/src/cube.c
+++ b/src/cube.c
@@ -302,7 +302,7 @@ int cube_manager_find(const struct cube_manager *pthis, const struct field *cube
{
char key_stack[4096];
char *key = key_stack;
- int key_len = field_array_to_key_safe(cube_dimensions, n_dimension, key, sizeof(key));
+ int key_len = field_array_to_key_safe(cube_dimensions, n_dimension, key, sizeof(key_stack));
bool free_key = false;
if (key_len < 0) { // very unlikely to happen
char *key_heap;
@@ -575,6 +575,7 @@ struct cube *cube_new(const struct field *dimensions, size_t n_dimensions, enum
case SAMPLING_MODE_SPREADSKETCH:
cube->spread_sketch = spread_sketch_new(max_n_cell);
spread_sketch_set_exdata_schema(cube->spread_sketch, exdata_new_i, exdata_free_i, exdata_merge_i, exdata_reset_i, exdata_copy_i);
+ break;
default:
assert(0);
break;
@@ -633,10 +634,10 @@ void cube_set_primary_metric(struct cube *cube, int metric_id) {
cube->primary_metric_id = metric_id;
}
-struct cell *get_cell(struct cube *cube, const struct field *dimensions, size_t n_dimension, long long increment, int metric_id) {
- char key_stack[4096];
+struct cell *get_cell_in_comprehensive_cube(struct cube *cube, const struct field *dimensions, size_t n_dimension) {
+ char key_stack[4096] = {0};
char *key = key_stack;
- int key_len = field_array_to_key_safe(dimensions, n_dimension, key, sizeof(key));
+ int key_len = field_array_to_key_safe(dimensions, n_dimension, key, sizeof(key_stack));
bool free_key = false;
if (key_len < 0) { // very unlikely to happen
char *key_heap;
@@ -652,51 +653,100 @@ struct cell *get_cell(struct cube *cube, const struct field *dimensions, size_t
args.n_dimensions = n_dimension;
struct cell *cell_data = NULL;
- switch (cube->sampling_mode) {
- case SAMPLING_MODE_TOPK: {
- if (cube->primary_metric_id != metric_id) {
- cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len);
- if (cell_data == NULL) {
- int tmp_ret = heavy_keeper_add(cube->topk, key, key_len, 0, (void *)&args);
- if (tmp_ret == 1) {
- cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len);
- }
- }
- } else {
- // heavy_keeper_add should be called anyway, to let the topk record update.
- int tmp_ret = heavy_keeper_add(cube->topk, key, key_len, increment, (void *)&args);
- if (tmp_ret == 1) {
- cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len);
- }
+ assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE);
+
+ cell_data = hash_table_get0_exdata(cube->comprehensive, key, key_len);
+ if (cell_data == NULL) {
+ int tmp_ret = hash_table_add(cube->comprehensive, key, key_len, (void *)&args);
+ if (tmp_ret == 1) {
+ cell_data = hash_table_get0_exdata(cube->comprehensive, key, key_len);
}
- break;}
- case SAMPLING_MODE_COMPREHENSIVE: {
- cell_data = hash_table_get0_exdata(cube->comprehensive, key, key_len);
+ }
+
+ if (free_key) {
+ free(key);
+ }
+ return cell_data;
+}
+
+struct cell *get_cell_in_topk_cube(struct cube *cube, const struct field *dimensions, size_t n_dimension, long long increment, int metric_id) {
+ char key_stack[4096] = {0};
+ char *key = key_stack;
+ int key_len = field_array_to_key_safe(dimensions, n_dimension, key, sizeof(key_stack));
+ bool free_key = false;
+ if (key_len < 0) { // very unlikely to happen
+ char *key_heap;
+ size_t key_len_tmp;
+ field_array_to_key_endeavor(dimensions, n_dimension, &key_heap, &key_len_tmp);
+ key = key_heap;
+ key_len = key_len_tmp;
+ free_key = true;
+ }
+
+ struct exdata_new_args args;
+ args.cell_dimensions = dimensions;
+ args.n_dimensions = n_dimension;
+
+ struct cell *cell_data = NULL;
+ assert(cube->sampling_mode == SAMPLING_MODE_TOPK);
+ if (cube->primary_metric_id != metric_id) { // FIXME: TODO: 我想把这个先get 再add 的逻辑直接改成add然后看返回值,结果在fuzz test 中的特殊码返回值里发现了问题。
+ cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len);
if (cell_data == NULL) {
- int tmp_ret = hash_table_add(cube->comprehensive, key, key_len, (void *)&args);
+ int tmp_ret = heavy_keeper_add(cube->topk, key, key_len, 0, (void *)&args);
if (tmp_ret == 1) {
- cell_data = hash_table_get0_exdata(cube->comprehensive, key, key_len);
+ cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len);
}
}
- break;}
- case SAMPLING_MODE_SPREADSKETCH: {
- if (cube->primary_metric_id != metric_id) {
- cell_data = spread_sketch_get0_exdata(cube->spread_sketch, tag_in_string, tag_len);
- // todo: spread sketch 没办法支持dummy 场景。首先,我不能让所有metric 都走spread sketch流程,
- // 因为正常来说,用level=0 的hashy 做数值,没有任何意义,肯定都更新不了,只是在刚开始的时候,起一个记录key 的作用。
- // 而,如果像是topk 那样,给count=0 的一席之地,那么存在问题:spread sketch本身不对记录的个数有限制,所以什么时候停止记录呢?这样的设计也太麻烦了。
- // 之前跟老板讨论的时候,给了两个方案,方案1:做一个buffer,如果get exdata0 get 不到,则往buffer 中的cell 里写,等到来了primary以后,把cell 送进去。
- // 方案2:简单略去第一轮添加时的情况。这会造成很少量的误差。不过,实际上这个操作不是逐包而是会话开始结束时来一次,所以误差也不会太小。必须是让贺岚风能第一个操作primary metric。
+ } else {
+ // heavy_keeper_add should be called anyway, to let the topk record update.
+ int tmp_ret = heavy_keeper_add(cube->topk, key, key_len, increment, (void *)&args);
+ if (tmp_ret == 1) {
+ cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len);
}
+ }
+
+ if (free_key) {
+ free(key);
+ }
+ return cell_data;
+}
+
+struct cell *get_cell_in_spread_sketch_cube(struct cube *cube, const struct field *dimensions, size_t n_dimension, uint64_t item_hash, int metric_id) {
+ char key_stack[4096] = {0};
+ char *key = key_stack;
+ int key_len = field_array_to_key_safe(dimensions, n_dimension, key, sizeof(key_stack));
+ bool free_key = false;
+ if (key_len < 0) { // very unlikely to happen
+ char *key_heap;
+ size_t key_len_tmp;
+ field_array_to_key_endeavor(dimensions, n_dimension, &key_heap, &key_len_tmp);
+ key = key_heap;
+ key_len = key_len_tmp;
+ free_key = true;
+ }
+
+ struct exdata_new_args args;
+ args.cell_dimensions = dimensions;
+ args.n_dimensions = n_dimension;
+
+ struct cell *cell_data = NULL;
+ assert(cube->sampling_mode == SAMPLING_MODE_SPREADSKETCH);
+
+ // todo: spread sketch 现在支持dummy 的方式是让他们也走sketch,可以用“满行”来减少这种计算,但确实加入level 低的内容,会走相同的流程,不像heavy keeper 一样就简单的查哈希表。
+ if (cube->primary_metric_id != metric_id) {
+ cell_data = spread_sketch_get0_exdata(cube->spread_sketch, key, key_len);
if (cell_data == NULL) {
- int tmp_ret = spread_sketch_add(cube->spread_sketch, tag_in_string, tag_len, 0, (void *)&args);
+ int tmp_ret = spread_sketch_add(cube->spread_sketch, key, key_len, DUMMY_ITEM_HASH, (void *)&args);
if (tmp_ret == 1) {
- cell_data = spread_sketch_get0_exdata(cube->spread_sketch, tag_in_string, tag_len);
+ cell_data = spread_sketch_get0_exdata(cube->spread_sketch, key, key_len);
}
}
- break;}
- }
-
+ } else {
+ int tmp_ret = spread_sketch_add(cube->spread_sketch, key, key_len, item_hash, (void *)&args);
+ if (tmp_ret == 1) {
+ cell_data = spread_sketch_get0_exdata(cube->spread_sketch, key, key_len);
+ }
+ }
if (free_key) {
free(key);
@@ -704,14 +754,30 @@ struct cell *get_cell(struct cube *cube, const struct field *dimensions, size_t
return cell_data;
}
+
int cube_histogram_record(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, long long value) {
assert(manifest->type == METRIC_TYPE_HISTOGRAM);
assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE || (cube->primary_metric_id != manifest->id));
- struct cell *cell_data = get_cell(cube, dimensions, n_dimensions, 0, manifest->id);
+ struct cell *cell_data = NULL;
+ switch (cube->sampling_mode) {
+ case SAMPLING_MODE_COMPREHENSIVE: {
+ cell_data = get_cell_in_comprehensive_cube(cube, dimensions, n_dimensions);
+ break;}
+ case SAMPLING_MODE_TOPK: {
+ cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, 0, manifest->id);
+ break;}
+ case SAMPLING_MODE_SPREADSKETCH: {
+ cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, 0, manifest->id);
+ break;}
+ default:
+ assert(0);
+ break;
+ }
if (cell_data == NULL) {
return FS_ERR_TOO_MANY_CELLS;
}
+
struct metric *metric = add_or_find_metric_in_cell(manifest, cell_data);
int ret = metric_histogram_record(metric, value);
@@ -723,19 +789,37 @@ int cube_histogram_record(struct cube *cube, const struct metric_manifest *manif
int cube_hll_add(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, const char *key, size_t key_len) {
assert(manifest->type == METRIC_TYPE_HLL);
- assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE || (cube->primary_metric_id != manifest->id));
+ assert(cube->sampling_mode != SAMPLING_MODE_TOPK || cube->primary_metric_id != manifest->id);
- struct cell *cell_data = get_cell(cube, dimensions, n_dimensions, 0, manifest->id);
+ uint64_t hash = 0; // just any value, if we do not need to update the primary metric of spread sketch cube, hash value is not used
+ if (cube->sampling_mode == SAMPLING_MODE_SPREADSKETCH && cube->primary_metric_id == manifest->id) {
+ hash = XXH3_64bits(key, key_len);
+ }
+ struct cell *cell_data = NULL;
+ switch (cube->sampling_mode) {
+ case SAMPLING_MODE_COMPREHENSIVE: {
+ cell_data = get_cell_in_comprehensive_cube(cube, dimensions, n_dimensions);
+ break;}
+ case SAMPLING_MODE_TOPK: {
+ cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, 0, manifest->id);
+ break;}
+ case SAMPLING_MODE_SPREADSKETCH: {
+ cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, hash, manifest->id);
+ break;}
+ default:
+ assert(0);
+ break;
+ }
if (cell_data == NULL) {
return FS_ERR_TOO_MANY_CELLS;
}
- struct metric *metric = add_or_find_metric_in_cell(manifest, cell_data);
+ struct metric *metric = add_or_find_metric_in_cell(manifest, cell_data);
metric_hll_add(metric, key, key_len);
return FS_OK;
}
-uint64_t tags2hash(const struct field *field, size_t n_dimensions) {
+uint64_t field_array_to_hash(const struct field *field, size_t n_dimensions) {
XXH3_state_t state = {0};
XXH3_64bits_reset(&state);
@@ -751,27 +835,65 @@ uint64_t tags2hash(const struct field *field, size_t n_dimensions) {
return XXH3_64bits_digest(&state);
}
-int cube_hll_add_tag(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, const struct field *tags_key, size_t n_tag_key)
+int cube_hll_add_field(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, const struct field *tags_key, size_t n_tag_key)
{
assert(manifest->type == METRIC_TYPE_HLL);
assert(cube->sampling_mode != SAMPLING_MODE_TOPK || (cube->primary_metric_id != manifest->id));
- struct cell *cell_data = get_cell(cube, dimensions, n_dimensions, 0, manifest->id);
+ uint64_t hash = 0; // just any value, if we do not need to update the primary metric of spread sketch cube, hash value is not used
+ if (cube->sampling_mode == SAMPLING_MODE_SPREADSKETCH && cube->primary_metric_id == manifest->id) {
+ hash = field_array_to_hash(tags_key, n_tag_key);
+ }
+ struct cell *cell_data = NULL;
+ switch (cube->sampling_mode) {
+ case SAMPLING_MODE_COMPREHENSIVE: {
+ cell_data = get_cell_in_comprehensive_cube(cube, dimensions, n_dimensions);
+ break;}
+ case SAMPLING_MODE_TOPK: {
+ cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, 0, manifest->id);
+ break;}
+ case SAMPLING_MODE_SPREADSKETCH: {
+ cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, hash, manifest->id);
+ break;}
+ default:
+ assert(0);
+ break;
+ }
if (cell_data == NULL) {
return FS_ERR_TOO_MANY_CELLS;
}
struct metric *metric = add_or_find_metric_in_cell(manifest, cell_data);
-
- uint64_t hash = tags2hash(tags_key, n_tag_key);
+
+ if (hash == 0) { // hash is not calculated yet.
+ hash = field_array_to_hash(tags_key, n_tag_key);
+ }
metric_hll_add_hash(metric, hash);
return FS_OK;
}
int cube_counter_incrby(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, long long increment) {
assert(manifest->type == METRIC_TYPE_COUNTER);
- assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE || (cube->primary_metric_id != manifest->id || increment >= 0));
+ assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE ||
+ (cube->sampling_mode == SAMPLING_MODE_TOPK && (cube->primary_metric_id != manifest->id || increment >= 0)) ||
+ (cube->sampling_mode == SAMPLING_MODE_SPREADSKETCH && cube->primary_metric_id != manifest->id)
+ );
- struct cell *cell_data = get_cell(cube, dimensions, n_dimensions, increment, manifest->id);
+ struct cell *cell_data = NULL;
+ switch (cube->sampling_mode) {
+ case SAMPLING_MODE_COMPREHENSIVE: {
+ cell_data = get_cell_in_comprehensive_cube(cube, dimensions, n_dimensions);
+ break;}
+ case SAMPLING_MODE_TOPK: {
+ cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, increment, manifest->id);
+ break;}
+ case SAMPLING_MODE_SPREADSKETCH: {
+ cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, 0, manifest->id);
+ break;}
+ default:
+ assert(0);
+ break;
+ }
+
if (cell_data == NULL) {
return FS_ERR_TOO_MANY_CELLS;
}
@@ -786,7 +908,21 @@ int cube_counter_set(struct cube *cube, const struct metric_manifest *manifest,
assert(manifest->type == METRIC_TYPE_COUNTER);
assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE || (cube->primary_metric_id != manifest->id));
- struct cell *cell_data = get_cell(cube, dimensions, n_dimensions, 0, manifest->id);
+ struct cell *cell_data = NULL;
+ switch (cube->sampling_mode) {
+ case SAMPLING_MODE_COMPREHENSIVE: {
+ cell_data = get_cell_in_comprehensive_cube(cube, dimensions, n_dimensions);
+ break;}
+ case SAMPLING_MODE_TOPK: {
+ cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, 0, manifest->id);
+ break;}
+ case SAMPLING_MODE_SPREADSKETCH: {
+ cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, 0, manifest->id);
+ break;}
+ default:
+ assert(0);
+ break;
+ }
if (cell_data == NULL) {
return FS_ERR_TOO_MANY_CELLS;
}
@@ -809,6 +945,9 @@ struct cube *cube_copy(const struct cube *cube)
case SAMPLING_MODE_COMPREHENSIVE:
cube_dup->comprehensive = hash_table_copy(cube->comprehensive);
break;
+ case SAMPLING_MODE_SPREADSKETCH:
+ cube_dup->spread_sketch = spread_sketch_copy(cube->spread_sketch);
+ break;
default:
assert(0);
break;
@@ -829,6 +968,9 @@ void cube_merge(struct cube *dest, const struct cube *src)
case SAMPLING_MODE_COMPREHENSIVE:
hash_table_merge(dest->comprehensive, src->comprehensive);
break;
+ case SAMPLING_MODE_SPREADSKETCH:
+ spread_sketch_merge(dest->spread_sketch, src->spread_sketch);
+ break;
default:
assert(0);
break;
@@ -842,6 +984,21 @@ struct cube *cube_fork(const struct cube *cube) {
return ret;
}
+struct tmp_sorted_data_spread_sketch_cell {
+ double hll_value;
+ struct cell *data;
+};
+static int compare_tmp_sorted_data_spread_sketch_cell(const void *a, const void *b) { // sort in descending order
+ const struct tmp_sorted_data_spread_sketch_cell *aa = (const struct tmp_sorted_data_spread_sketch_cell *)a;
+ const struct tmp_sorted_data_spread_sketch_cell *bb = (const struct tmp_sorted_data_spread_sketch_cell *)b;
+ if (aa->hll_value < bb->hll_value) {
+ return 1;
+ } else if (aa->hll_value > bb->hll_value) {
+ return -1;
+ } else {
+ return 0;
+ }
+}
void cube_get_cells(const struct cube *cube, struct field_list **cell_dimensions, size_t *n_cell)
{
size_t n_cell_tmp = 0;
@@ -852,6 +1009,9 @@ void cube_get_cells(const struct cube *cube, struct field_list **cell_dimensions
case SAMPLING_MODE_TOPK:
n_cell_tmp = heavy_keeper_get_count(cube->topk);
break;
+ case SAMPLING_MODE_SPREADSKETCH:
+ n_cell_tmp = spread_sketch_get_count(cube->spread_sketch);
+ break;
default:
assert(0);
}
@@ -870,14 +1030,34 @@ void cube_get_cells(const struct cube *cube, struct field_list **cell_dimensions
case SAMPLING_MODE_TOPK:
heavy_keeper_list(cube->topk, (void **)cell_datas, n_cell_tmp);
break;
+ case SAMPLING_MODE_SPREADSKETCH:
+ spread_sketch_list(cube->spread_sketch, (void **)cell_datas, n_cell_tmp);
+ break;
default:
assert(0);
}
+ // spread sketch often stores more than max_n_cell. So sort out the top max_n_cell cells.
+ if (cube->sampling_mode == SAMPLING_MODE_SPREADSKETCH && n_cell_tmp > cube->max_n_cell) {
+ struct tmp_sorted_data_spread_sketch_cell *tmp_sorted_data = (struct tmp_sorted_data_spread_sketch_cell *)malloc(sizeof(struct tmp_sorted_data_spread_sketch_cell) * n_cell_tmp);
+ for (int i = 0; i < n_cell_tmp; i++) {
+ tmp_sorted_data[i].data = cell_datas[i];
+ tmp_sorted_data[i].hll_value = metric_hll_get(cell_datas[i]->metrics[cube->primary_metric_id]);
+ }
+ qsort(tmp_sorted_data, n_cell_tmp, sizeof(struct tmp_sorted_data_spread_sketch_cell), compare_tmp_sorted_data_spread_sketch_cell);
+
+ free(cell_datas);
+ cell_datas = (struct cell **)malloc(sizeof(struct cell *) * cube->max_n_cell);
+ for (int i = 0; i < cube->max_n_cell; i++) {
+ cell_datas[i] = tmp_sorted_data[i].data;
+ }
+ n_cell_tmp = cube->max_n_cell;
+ free(tmp_sorted_data);
+ }
+
struct field_list *tag_list_ret = (struct field_list *)malloc(sizeof(struct field_list) * n_cell_tmp);
*cell_dimensions = tag_list_ret;
*n_cell = n_cell_tmp;
-
for (int i = 0; i < n_cell_tmp; i++) {
struct cell *cell_data = cell_datas[i];
struct field_list *tag_list_tmp = &tag_list_ret[i];
@@ -907,6 +1087,9 @@ const struct cell *get_cell_by_tag_list(const struct cube *cube, const struct fi
case SAMPLING_MODE_COMPREHENSIVE:
ret = hash_table_get0_exdata(cube->comprehensive, tag_in_string, tag_len);
break;
+ case SAMPLING_MODE_SPREADSKETCH:
+ ret = spread_sketch_get0_exdata(cube->spread_sketch, tag_in_string, tag_len);
+ break;
default:
assert(0);
return NULL;
@@ -1018,7 +1201,7 @@ int cube_get_cell_count(const struct cube *cube) {
}
}
-void cube_get_cells_used_by_metric(const struct cube *cube, const struct field_list *fields, int **metric_id_out, size_t *n_metric_out) {
+void cube_get_metrics_in_cell(const struct cube *cube, const struct field_list *fields, int **metric_id_out, size_t *n_metric_out) {
const struct cell *cell_data = get_cell_by_tag_list(cube, fields);
if (cell_data == NULL) {
*metric_id_out = NULL;
@@ -1051,4 +1234,8 @@ struct field_list *cube_get_identifier(const struct cube *cube) {
return tag_list;
-} \ No newline at end of file
+}
+
+enum sampling_mode cube_get_sampling_mode(const struct cube *cube) {
+ return cube->sampling_mode;
+}
diff --git a/src/cube.h b/src/cube.h
index 2b08530..6802b2d 100644
--- a/src/cube.h
+++ b/src/cube.h
@@ -22,7 +22,7 @@ struct cube *cube_fork(const struct cube *cube); // only copy the cube configura
int cube_histogram_record(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, long long value);
int cube_hll_add(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, const char *key, size_t key_len);
-int cube_hll_add_tag(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, const struct field *tags_key, size_t n_tag_key);
+int cube_hll_add_field(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, const struct field *tags_key, size_t n_tag_key);
int cube_counter_incrby(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, long long increment);
int cube_counter_set(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, long long value);
@@ -33,8 +33,9 @@ int cube_histogram_count_le_value(const struct cube *cube, int metric_id, const
int cube_get_serialization(const struct cube *cube, int metric_id, const struct field_list *dimensions, char **blob, size_t *blob_size);
int cube_get_cell_count(const struct cube *cube);
+enum sampling_mode cube_get_sampling_mode(const struct cube *cube);
void cube_get_cells(const struct cube *cube, struct field_list **tag_list, size_t *n_cell);
-void cube_get_cells_used_by_metric(const struct cube *cube, const struct field_list *dimensions, int **metric_id_out, size_t *n_metric_out);
+void cube_get_metrics_in_cell(const struct cube *cube, const struct field_list *dimensions, int **metric_id_out, size_t *n_metric_out);
void cube_set_primary_metric(struct cube *cube, int metric_id);
struct field_list *cube_get_identifier(const struct cube *cube);
diff --git a/src/fieldstat.c b/src/fieldstat.c
index abf3e91..b847bd5 100644
--- a/src/fieldstat.c
+++ b/src/fieldstat.c
@@ -204,7 +204,9 @@ int fieldstat_cube_set_primary_metric(struct fieldstat *instance, int cube_id, i
if (manifest == NULL) {
return FS_ERR_INVALID_METRIC_ID;
}
- if (manifest->type != METRIC_TYPE_COUNTER) {
+ if (cube_get_sampling_mode(cube) == SAMPLING_MODE_COMPREHENSIVE ||
+ (cube_get_sampling_mode(cube) == SAMPLING_MODE_TOPK && manifest->type != METRIC_TYPE_COUNTER) ||
+ (cube_get_sampling_mode(cube) == SAMPLING_MODE_SPREADSKETCH && manifest->type != METRIC_TYPE_HLL)) {
return FS_ERR_INVALID_PARAM;
}
@@ -351,7 +353,7 @@ int fieldstat_hll_add_field(struct fieldstat *instance, int cube_id, int metric_
return FS_ERR_INVALID_METRIC_ID;
}
- return cube_hll_add_tag(cube, manifest, cell_dimensions, n_dimensions, item, item_len);
+ return cube_hll_add_field(cube, manifest, cell_dimensions, n_dimensions, item, item_len);
}
// cppcheck-suppress [constParameterPointer, unmatchedSuppression]
@@ -589,5 +591,5 @@ int fieldstat_find_cube(const struct fieldstat *instance, const struct field *cu
void fieldstat_get_metric_in_cell(const struct fieldstat *instance, int cube_id, const struct field_list *cell_dimensions, int **metric_id_out, size_t *n_metric_out)
{
const struct cube *cube = cube_manager_get_cube_by_id(instance->cube_manager, cube_id);
- return cube_get_cells_used_by_metric(cube, cell_dimensions, metric_id_out, n_metric_out);
+ return cube_get_metrics_in_cell(cube, cell_dimensions, metric_id_out, n_metric_out);
} \ No newline at end of file
diff --git a/src/tags/spread_sketch.c b/src/tags/spread_sketch.c
index c654e89..598ef9b 100644
--- a/src/tags/spread_sketch.c
+++ b/src/tags/spread_sketch.c
@@ -28,10 +28,11 @@
会让cell 的操作变得麻烦,无法借用老的cell manager 流程,get exdata 会得到一个exdata 的数组(可能多个),而非单独的一个,要对多个cell综合起来当一个cell 看。。修改量非常小,但是确实会影响代码本身的整洁度。
*/
-struct smart_ptr { // todo:entry
+
+struct entry {
int ref_count;
void *exdata;
-
+ bool dying;
char *key;
size_t key_len;
UT_hash_handle hh;
@@ -45,14 +46,14 @@ struct spread_sketch_scheme {
exdata_copy_cb copy_fn;
};
-struct smart_ptr_table { // TODO: entry table
- struct smart_ptr *entry;
+struct entry_table {
+ struct entry *entry;
struct spread_sketch_scheme scheme;
};
struct bucket {
- struct smart_ptr *content;
+ struct entry *content;
uint32_t level;
};
@@ -63,7 +64,7 @@ struct spread_sketch {
struct spread_sketch_scheme scheme;
struct bucket *buckets;
- struct smart_ptr_table *table;
+ struct entry_table *table;
uint32_t *min_level_per_row; // TODO: 先看看性能吧, 之后再写。用来记录每行最小的level,从而跳过行数。对于64位的level,维持一个计数,额外使用64 r的空间,当一个最小位数的level 计数到0时,更新最小level。
// TODO: 对比heavy keeper,不仅仅是跳过的问题,heavykeeper 无论什么情况,在输入0的时候都不会走sketch 更新。
@@ -91,19 +92,22 @@ static inline bool key_equal(const char *key1, size_t key1_len, const char *key2
return memcmp(key1, key2, key1_len) == 0;
}
static inline char *key_dup(const char *key, size_t key_len) {
- char *ret = malloc(key_len);
+ char *ret = malloc(key_len+1);
memcpy(ret, key, key_len);
+ ret[key_len] = '\0';
return ret;
}
-struct smart_ptr *smart_ptr_table_get(struct smart_ptr_table *table, const char *key, size_t key_len, void *arg) {
- struct smart_ptr *ret;
+struct entry *smart_ptr_table_get(struct entry_table *table, const char *key, size_t key_len, void *arg) {
+ struct entry *ret;
HASH_FIND(hh, table->entry, key, key_len, ret);
if (ret != NULL) {
+ ret->dying = false;
ret->ref_count++;
} else {
- ret = malloc(sizeof(struct smart_ptr));
+ ret = malloc(sizeof(struct entry));
+ ret->dying = false;
ret->ref_count = 1;
ret->key = key_dup(key, key_len);
ret->key_len = key_len;
@@ -118,8 +122,8 @@ struct smart_ptr *smart_ptr_table_get(struct smart_ptr_table *table, const char
return ret;
}
-int smart_ptr_table_release(struct smart_ptr_table *table, const char *key, size_t key_len) {
- struct smart_ptr *ret;
+int smart_ptr_table_release(struct entry_table *table, const char *key, size_t key_len) {
+ struct entry *ret;
HASH_FIND(hh, table->entry, key, key_len, ret);
if (ret == NULL) {
return -1;
@@ -127,6 +131,7 @@ int smart_ptr_table_release(struct smart_ptr_table *table, const char *key, size
ret->ref_count--;
if (ret->ref_count == 0) {
+ // printf("release %s\n", key);
HASH_DEL(table->entry, ret);
table->scheme.free_fn(ret->exdata);
free(ret->key);
@@ -136,8 +141,8 @@ int smart_ptr_table_release(struct smart_ptr_table *table, const char *key, size
return 0;
}
-void smart_ptr_table_free(struct smart_ptr_table *table) {
- struct smart_ptr *current, *tmp;
+void smart_ptr_table_free(struct entry_table *table) {
+ struct entry *current, *tmp;
HASH_ITER(hh, table->entry, current, tmp) {
HASH_DEL(table->entry, current);
table->scheme.free_fn(current->exdata);
@@ -147,8 +152,8 @@ void smart_ptr_table_free(struct smart_ptr_table *table) {
free(table);
}
-struct smart_ptr_table *smart_ptr_table_new() {
- struct smart_ptr_table *table = malloc(sizeof(struct smart_ptr_table));
+struct entry_table *smart_ptr_table_new() {
+ struct entry_table *table = malloc(sizeof(struct entry_table));
table->entry = NULL;
table->scheme.new_fn = default_new_fn;
@@ -194,9 +199,10 @@ struct spread_sketch *spread_sketch_new(int expected_query_num) {
}
// return 0 if not added, return 1 if added
-int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t hash_identifier, void *arg) {// todo: entry, item
+int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg) {
// uint64_t hash_identifier = XXH3_64bits_withSeed(identifier, identifier_length, 171);
- uint32_t level = (uint32_t)__builtin_clzll(hash_identifier) + 1;
+ uint32_t level = (uint32_t)__builtin_clzll(item_hash) + 1;
+ // printf("spread_sketch_add key %s, level %u\n", key, level);
// https://www.eecs.harvard.edu/~michaelm/postscripts/tr-02-05.pdf
// A technique from the hashing literature is to use two hash functions h1(x) and h2(x) to simulate additional hash functions of the form gi(x) = h1(x) + ih2(x)
@@ -212,17 +218,22 @@ int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_leng
struct bucket *bucket = &ss->buckets[bucket_idx];
if (bucket->content != NULL && key_equal(bucket->content->key, bucket->content->key_len, key, key_length)) {
+ bucket->content->dying = false;
+
if (bucket->level < level) {
bucket->level = level;
}
in_sketch = true;
} else {
- uint32_t true_level = bucket->content == NULL ? 0: bucket->level;
-
- if (true_level < level) {
- const struct smart_ptr *content_old = bucket->content;
- smart_ptr_table_release(ss->table, content_old->key, content_old->key_len);
- struct smart_ptr *content_new = smart_ptr_table_get(ss->table, key, key_length, arg);
+ uint32_t old_level = bucket->content == NULL ? 0: bucket->level;
+
+ if (level > old_level) {
+ // printf("update key %s to %s, and level %u to %u, in bucket (r,w)=(%d,%u)\n", bucket->content == NULL ? "NULL": bucket->content->key, key, old_level, level, i, hash_x % ss->width);
+ const struct entry *content_old = bucket->content;
+ if (content_old != NULL) {
+ smart_ptr_table_release(ss->table, content_old->key, content_old->key_len);
+ }
+ struct entry *content_new = smart_ptr_table_get(ss->table, key, key_length, arg);
bucket->content = content_new;
bucket->level = level;
@@ -248,7 +259,7 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch *
const struct bucket *bucket_src = &src->buckets[i];
struct bucket *bucket_dst = &dst->buckets[i];
- if (bucket_src->content == NULL) {
+ if (bucket_src->content == NULL || bucket_src->content->dying) {
continue;
}
if (bucket_dst->content == NULL) {
@@ -256,6 +267,7 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch *
bucket_dst->level = bucket_src->level;
continue;
}
+ bucket_dst->content->dying = false;
if (key_equal(bucket_src->content->key, bucket_src->content->key_len, bucket_dst->content->key, bucket_dst->content->key_len)) {
if (bucket_src->level > bucket_dst->level) {
@@ -272,10 +284,10 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch *
const struct spread_sketch_scheme *scheme = &dst->table->scheme;
- struct smart_ptr *content_dest, *content_src, *tmp;
+ struct entry *content_dest, *content_src, *tmp;
HASH_ITER(hh, dst->table->entry, content_dest, tmp) {
HASH_FIND(hh, src->table->entry, content_dest->key, content_dest->key_len, content_src);
- if (content_src == NULL) {
+ if (content_src == NULL || content_src->dying) {
continue;
}
@@ -288,9 +300,9 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch *
}
void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *key, size_t key_len) {
- struct smart_ptr *content;
+ struct entry *content;
HASH_FIND(hh, ss->table->entry, key, key_len, content);
- if (content == NULL) {
+ if (content == NULL || content->dying) {
return NULL;
}
return content->exdata;
@@ -302,9 +314,10 @@ void spread_sketch_reset(struct spread_sketch *ss) {
bucket->level = 0;
}
- struct smart_ptr *content, *tmp;
+ struct entry *content, *tmp;
HASH_ITER(hh, ss->table->entry, content, tmp) {
ss->scheme.reset_fn(content->exdata);
+ content->dying = true;
}
}
@@ -319,15 +332,24 @@ void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new
}
int spread_sketch_get_count(const struct spread_sketch *ss) {
- return HASH_CNT(hh, ss->table->entry);
+ int cnt = 0;
+ struct entry *content, *tmp;
+ HASH_ITER(hh, ss->table->entry, content, tmp) {
+ if (!content->dying) {
+ cnt++;
+ }
+ }
+
+ return cnt;
}
-// 这个函数还是会忠实的把内部所有内容都拿出来,但是预期是最多expected_query_num 个,所以在外层要做一个排序。
-// 这个排序在这里面没法做,因为没有具体的hll计数。
size_t spread_sketch_list(const struct spread_sketch *ss, void **exdatas, size_t n_exdatas) {
size_t count = 0;
- struct smart_ptr *content, *tmp;
+ struct entry *content, *tmp;
HASH_ITER(hh, ss->table->entry, content, tmp) {
+ if (content->dying) {
+ continue;
+ }
if (count >= n_exdatas) {
break;
}
@@ -343,9 +365,10 @@ struct spread_sketch *spread_sketch_copy(const struct spread_sketch *src) {
dst->buckets = calloc(dst->depth * dst->width, sizeof(struct bucket));
dst->table = smart_ptr_table_new();
+ spread_sketch_set_exdata_schema(dst, src->scheme.new_fn, src->scheme.free_fn, src->scheme.merge_fn, src->scheme.reset_fn, src->scheme.copy_fn);
for (int i = 0; i < dst->depth * dst->width; i++) {
- if (src->buckets[i].content == NULL) {
+ if (src->buckets[i].content == NULL || src->buckets[i].content->dying) {
continue;
}
dst->buckets[i].content = smart_ptr_table_get(dst->table, src->buckets[i].content->key, src->buckets[i].content->key_len, NULL);
diff --git a/src/tags/spread_sketch.h b/src/tags/spread_sketch.h
index f50cae4..9717238 100644
--- a/src/tags/spread_sketch.h
+++ b/src/tags/spread_sketch.h
@@ -8,6 +8,7 @@ extern "C"{
#include "exdata.h"
+#define DUMMY_ITEM_HASH (1ULL<<63) // level(left most zeros) = 0
struct spread_sketch;
@@ -18,7 +19,7 @@ void spread_sketch_free(struct spread_sketch *ss);
void spread_sketch_reset(struct spread_sketch *ss);
-int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t hash_identifier, void *arg);
+int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg);
void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn);
void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *key, size_t key_len);