summaryrefslogtreecommitdiff
path: root/src/cube.c
diff options
context:
space:
mode:
authorchenzizhan <[email protected]>2024-07-17 10:35:24 +0800
committerchenzizhan <[email protected]>2024-07-17 10:35:24 +0800
commit6595cbbde1280b6c7d3c445697e39aa18fa9741f (patch)
tree0fe9af32b13926b6aa8682337b500012ddecafca /src/cube.c
parentc488da1f8346baf8d5a0260da9c5934c8dfdfbef (diff)
primary metric in spreadsketch/heavykeeper
Diffstat (limited to 'src/cube.c')
-rw-r--r--src/cube.c134
1 files changed, 114 insertions, 20 deletions
diff --git a/src/cube.c b/src/cube.c
index 6069cdd..e8231d3 100644
--- a/src/cube.c
+++ b/src/cube.c
@@ -18,6 +18,7 @@
#define DEFAULT_N_METRIC 32
#define DEFAULT_N_CUBE 64
+static const struct timeval DUMMY_TIME_VAL = {0, 0};
struct exdata_new_args {
const struct field *cell_dimensions;
@@ -531,10 +532,13 @@ struct cube *cube_new(const struct field *dimensions, size_t n_dimensions, enum
cube->table = hash_table_new(max_n_cell);
hash_table_set_exdata_schema(cube->table, exdata_new_i, exdata_free_i, exdata_merge_i, exdata_reset_i, exdata_copy_i);
break;
- case SAMPLING_MODE_TOP_CARDINALITY:
- cube->spread_sketch = spread_sketch_new(max_n_cell);
+ case SAMPLING_MODE_TOP_CARDINALITY: {
+ int width, depth;
+ unsigned char precision;
+ spread_sketch_get_parameter_recommendation(max_n_cell, &depth, &width, &precision);
+ cube->spread_sketch = spread_sketch_new(depth, width, precision, 0, DUMMY_TIME_VAL);
spread_sketch_set_exdata_schema(cube->spread_sketch, exdata_new_i, exdata_free_i, exdata_merge_i, exdata_reset_i, exdata_copy_i);
- break;
+ break; }
default:
assert(0);
break;
@@ -674,16 +678,11 @@ struct cell *get_cell_in_spread_sketch_cube(struct cube *cube, const struct fiel
if (cube->primary_metric_id != metric_id) {
cell_data = spread_sketch_get0_exdata(cube->spread_sketch, key, key_len);
if (cell_data == NULL) {
- int tmp_ret = spread_sketch_add(cube->spread_sketch, key, key_len, DUMMY_ITEM_HASH, (void *)&args);
+ int tmp_ret = spread_sketch_add_hash(cube->spread_sketch, key, key_len, DUMMY_ITEM_HASH, (void *)&args, DUMMY_TIME_VAL);
if (tmp_ret == 1) {
cell_data = spread_sketch_get0_exdata(cube->spread_sketch, key, key_len);
}
}
- } else {
- int tmp_ret = spread_sketch_add(cube->spread_sketch, key, key_len, item_hash, (void *)&args);
- if (tmp_ret == 1) {
- cell_data = spread_sketch_get0_exdata(cube->spread_sketch, key, key_len);
- }
}
free(key);
@@ -827,10 +826,20 @@ int cube_hll_add(struct cube *cube, int metric_id, const struct field *dimension
return FS_ERR_INVALID_METRIC_ID;
}
- uint64_t hash = 0; // just any value, if we do not need to update the primary metric of spread sketch cube, hash value is not used
if (cube->sampling_mode == SAMPLING_MODE_TOP_CARDINALITY && cube->primary_metric_id == metric_id) {
- hash = XXH3_64bits(key, key_len);
+ char *dimension_as_string;
+ size_t dimension_string_len;
+ field_array_to_key(dimensions, n_dimensions, &dimension_as_string, &dimension_string_len);
+
+ struct exdata_new_args args;
+ args.cell_dimensions = dimensions;
+ args.n_dimensions = n_dimensions;
+
+ int tmp_ret = spread_sketch_add(cube->spread_sketch, dimension_as_string, dimension_string_len, key, key_len, (void *)&args, DUMMY_TIME_VAL);
+ free(dimension_as_string);
+ return tmp_ret == 1 ? FS_OK : FS_ERR_TOO_MANY_CELLS;
}
+
struct cell *cell_data = NULL;
switch (cube->sampling_mode) {
case SAMPLING_MODE_COMPREHENSIVE: {
@@ -840,7 +849,7 @@ int cube_hll_add(struct cube *cube, int metric_id, const struct field *dimension
cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, 0, metric_id);
break;}
case SAMPLING_MODE_TOP_CARDINALITY: {
- cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, hash, metric_id);
+ cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, 0, metric_id);
break;}
default:
assert(0);
@@ -881,8 +890,20 @@ int cube_hll_add_field(struct cube *cube, int metric_id, const struct field *dim
uint64_t hash = 0; // just any value, if we do not need to update the primary metric of spread sketch cube, hash value is not used
if (cube->sampling_mode == SAMPLING_MODE_TOP_CARDINALITY && cube->primary_metric_id == metric_id) {
+ char *key;
+ size_t key_len;
+ field_array_to_key(dimensions, n_dimensions, &key, &key_len);
+
+ struct exdata_new_args args;
+ args.cell_dimensions = dimensions;
+ args.n_dimensions = n_dimensions;
hash = field_array_to_hash(tags_key, n_tag_key);
+
+ int tmp_ret = spread_sketch_add_hash(cube->spread_sketch, key, key_len, hash, (void *)&args, DUMMY_TIME_VAL);
+ free(key);
+ return tmp_ret == 1 ? FS_OK : FS_ERR_TOO_MANY_CELLS;
}
+
struct cell *cell_data = NULL;
switch (cube->sampling_mode) {
case SAMPLING_MODE_COMPREHENSIVE: {
@@ -920,6 +941,24 @@ int cube_counter_incrby(struct cube *cube, int metric_id, const struct field *di
if (manifest == NULL || manifest->type != METRIC_TYPE_COUNTER) {
return FS_ERR_INVALID_METRIC_ID;
}
+
+ if (cube->primary_metric_id == metric_id && cube->sampling_mode == SAMPLING_MODE_TOPK) {
+ if (increment <= 0) {
+ return FS_ERR_INVALID_PARAM;
+ }
+
+ char *key;
+ size_t key_len;
+ field_array_to_key(dimensions, n_dimensions, &key, &key_len);
+
+ struct exdata_new_args args;
+ args.cell_dimensions = dimensions;
+ args.n_dimensions = n_dimensions;
+
+ int tmp_ret = heavy_keeper_add(cube->heavykeeper, key, key_len, increment, (void *)&args);
+ free(key);
+ return tmp_ret == 1 ? FS_OK : FS_ERR_TOO_MANY_CELLS;
+ }
struct cell *cell_data = NULL;
switch (cube->sampling_mode) {
@@ -1061,10 +1100,13 @@ struct cube *cube_fork(const struct cube *cube) {
ret->table = hash_table_new(cube->max_n_cell);
hash_table_set_exdata_schema(ret->table, exdata_new_i, exdata_free_i, exdata_merge_i, exdata_reset_i, exdata_copy_i);
break;
- case SAMPLING_MODE_TOP_CARDINALITY:
- ret->spread_sketch = spread_sketch_new(cube->max_n_cell);
+ case SAMPLING_MODE_TOP_CARDINALITY: {
+ int width, depth, dummy_time;
+ unsigned char precision;
+ spread_sketch_get_parameter(cube->spread_sketch, &depth, &width, &precision, &dummy_time);
+ ret->spread_sketch = spread_sketch_new(depth, width, precision, 0, DUMMY_TIME_VAL);
spread_sketch_set_exdata_schema(ret->spread_sketch, exdata_new_i, exdata_free_i, exdata_merge_i, exdata_reset_i, exdata_copy_i);
- break;
+ break;}
default:
assert(0);
break;
@@ -1114,6 +1156,10 @@ void cube_get_cells(const struct cube *cube, struct field_list **cell_dimensions
return;
}
+ char **spread_sketch_keys = NULL;
+ size_t *spread_sketch_keys_lens = NULL;
+ long long *heavy_keeper_counts = NULL;
+
struct cell **cell_datas = (struct cell **)malloc(sizeof(struct cell *) * n_cell_tmp);
switch (cube->sampling_mode) {
case SAMPLING_MODE_COMPREHENSIVE:
@@ -1122,8 +1168,12 @@ void cube_get_cells(const struct cube *cube, struct field_list **cell_dimensions
case SAMPLING_MODE_TOPK:
heavy_keeper_list(cube->heavykeeper, (void **)cell_datas, n_cell_tmp);
break;
- case SAMPLING_MODE_TOP_CARDINALITY:
- spread_sketch_list(cube->spread_sketch, (void **)cell_datas, n_cell_tmp);
+ case SAMPLING_MODE_TOP_CARDINALITY: {
+ spread_sketch_list_keys(cube->spread_sketch, &spread_sketch_keys, &spread_sketch_keys_lens, &n_cell_tmp);
+ for (int i = 0; i < n_cell_tmp; i++) {
+ cell_datas[i] = spread_sketch_get0_exdata(cube->spread_sketch, spread_sketch_keys[i], spread_sketch_keys_lens[i]);
+ }
+ }
break;
default:
assert(0);
@@ -1134,7 +1184,7 @@ void cube_get_cells(const struct cube *cube, struct field_list **cell_dimensions
struct tmp_sorted_data_spread_sketch_cell *tmp_sorted_data = (struct tmp_sorted_data_spread_sketch_cell *)malloc(sizeof(struct tmp_sorted_data_spread_sketch_cell) * n_cell_tmp);
for (int i = 0; i < n_cell_tmp; i++) {
tmp_sorted_data[i].data = cell_datas[i];
- tmp_sorted_data[i].hll_value = metric_hll_get(cell_datas[i]->slots[cube->primary_metric_id]);
+ tmp_sorted_data[i].hll_value = spread_sketch_get_cardinality(cube->spread_sketch, spread_sketch_keys[i], spread_sketch_keys_lens[i]);
}
qsort(tmp_sorted_data, n_cell_tmp, sizeof(struct tmp_sorted_data_spread_sketch_cell), compare_tmp_sorted_data_spread_sketch_cell);
@@ -1162,6 +1212,9 @@ void cube_get_cells(const struct cube *cube, struct field_list **cell_dimensions
}
free(cell_datas);
+ free(heavy_keeper_counts);
+ free(spread_sketch_keys);
+ free(spread_sketch_keys_lens);
}
const struct cell *get_cell_by_tag_list(const struct cube *cube, const struct field_list *fields)
@@ -1211,6 +1264,20 @@ const struct metric *get_metric_by_tag_list(const struct cube *cube, const struc
int cube_counter_get(const struct cube *cube, int metric_id, const struct field_list *fields, long long *value)
{
+ if (cube->sampling_mode == SAMPLING_MODE_TOPK && cube->primary_metric_id == metric_id) {
+ char *dimension_in_string;
+ size_t dimension_string_len;
+ field_array_to_key(fields->field, fields->n_field, &dimension_in_string, &dimension_string_len);
+
+ long long count = 0;
+ void *exdata_dummy = NULL;
+ heavy_keeper_one_point_query(cube->heavykeeper, dimension_in_string, dimension_string_len, &count, &exdata_dummy);
+ *value = count;
+
+ free(dimension_in_string);
+ return count == 0 ? FS_ERR_INVALID_TAG : FS_OK;
+ }
+
int ret;
const struct metric *metric = get_metric_by_tag_list(cube, fields, metric_id, &ret);
if (ret != FS_OK) {
@@ -1226,6 +1293,18 @@ int cube_counter_get(const struct cube *cube, int metric_id, const struct field_
int cube_hll_get(const struct cube *cube, int metric_id, const struct field_list *fields, double *value)
{
+ if (cube->sampling_mode == SAMPLING_MODE_TOP_CARDINALITY && cube->primary_metric_id == metric_id) {
+ char *dimension_in_string;
+ size_t dimension_string_len;
+ field_array_to_key(fields->field, fields->n_field, &dimension_in_string, &dimension_string_len);
+
+ double hll_value = spread_sketch_get_cardinality(cube->spread_sketch, dimension_in_string, dimension_string_len);
+ *value = hll_value;
+
+ free(dimension_in_string);
+ return FS_OK;
+ }
+
int ret;
const struct metric *metric = get_metric_by_tag_list(cube, fields, metric_id, &ret);
if (ret != FS_OK) {
@@ -1268,7 +1347,17 @@ int cube_histogram_count_le_value(const struct cube *cube, int metric_id, const
return FS_OK;
}
-int cube_get_serialization(const struct cube *cube, int metric_id, const struct field_list *fields, char **blob, size_t *blob_size) {
+int cube_get_serialization_as_base64(const struct cube *cube, int metric_id, const struct field_list *fields, char **blob, size_t *blob_size) {
+ if (cube->sampling_mode == SAMPLING_MODE_TOP_CARDINALITY && cube->primary_metric_id == metric_id) {
+ char *dimension_in_string;
+ size_t dimension_string_len;
+ field_array_to_key(fields->field, fields->n_field, &dimension_in_string, &dimension_string_len);
+
+ *blob = spread_sketch_get_hll_base64_serialization(cube->spread_sketch, dimension_in_string, dimension_string_len);
+ *blob_size = strlen(*blob);
+ return FS_OK;
+ }
+
int ret;
const struct metric *metric = get_metric_by_tag_list(cube, fields, metric_id, &ret);
if (ret != FS_OK) {
@@ -1304,8 +1393,13 @@ void cube_get_metrics_in_cell(const struct cube *cube, const struct field_list *
return;
}
- *metric_id_out = (int *)malloc(sizeof(int) * cell_data->next_index);
+ *metric_id_out = (int *)malloc(sizeof(int) * cell_data->next_index + 1); // +1: for primary metric
int n_metric = 0;
+ if (cube->sampling_mode == SAMPLING_MODE_TOP_CARDINALITY || cube->sampling_mode == SAMPLING_MODE_TOPK) { // primary metric is not stored in cell_data
+ (*metric_id_out)[n_metric] = cube->primary_metric_id;
+ n_metric++;
+ }
+
for (int i = 0; i < cell_data->next_index; i++) {
if (cell_data->slots[i] != NULL) {
(*metric_id_out)[n_metric] = i;