summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchenzizhan <[email protected]>2024-07-11 16:14:09 +0800
committerchenzizhan <[email protected]>2024-07-11 16:14:09 +0800
commit5dc3d8a96bb203abc1ee050cd0c884f2ab989dba (patch)
tree38f5bc67522843c1cca51a30e413e4f8f9d1834e
parent677f337e195e3b9b6e416109df8d51c14da2791b (diff)
spread sketch merge, reset
-rw-r--r--CMakeLists.txt2
-rw-r--r--include/fieldstat/fieldstat new.h240
-rw-r--r--src/cube.c295
-rw-r--r--src/cube.h5
-rw-r--r--src/fieldstat.c8
-rw-r--r--src/tags/spread_sketch.c93
-rw-r--r--src/tags/spread_sketch.h3
-rw-r--r--test/test_empty_tags.cpp36
-rw-r--r--test/test_fuzz_test.cpp4
-rw-r--r--test/test_merge.cpp244
-rw-r--r--test/test_metric_hll.cpp68
-rw-r--r--test/test_register_and_reset.cpp209
-rw-r--r--test/utils.cpp50
-rw-r--r--test/utils.hpp2
14 files changed, 1106 insertions, 153 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 2513d1e..06c35b7 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -23,7 +23,7 @@ set(CMAKE_POSITION_INDEPENDENT_CODE ON)
set(CMAKE_MACOSX_RPATH 0)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g -fPIC -Wall -lm -lz --std=gnu11")
-set(CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS} -Wall)
+set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -Wall")
# set(C_INCLUDE_PATH ${C_INCLUDE_PATH} /opt/MESA/include)
# set(CPLUS_INCLUDE_PATH ${CPLUS_INCLUDE_PATH} /opt/MESA/include)
diff --git a/include/fieldstat/fieldstat new.h b/include/fieldstat/fieldstat new.h
new file mode 100644
index 0000000..f9dcfda
--- /dev/null
+++ b/include/fieldstat/fieldstat new.h
@@ -0,0 +1,240 @@
+#pragma once
+#include <stdio.h>
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+#include <stddef.h>
+#include <stdbool.h>
+
+#define FS_OK 0
+#define FS_ERR_TOO_MANY_CELLS -1
+#define FS_ERR_NULL_HANDLER -2
+#define FS_ERR_INVALID_CUBE_ID -3
+#define FS_ERR_INVALID_METRIC_ID -4
+#define FS_ERR_INVALID_TAG -5
+#define FS_ERR_INVALID_PARAM -6
+#define FS_ERR_INVALID_KEY -7
+
+enum metric_type
+{
+ METRIC_TYPE_COUNTER,
+ METRIC_TYPE_HLL,
+ METRIC_TYPE_HISTOGRAM,
+};
+
+enum field_type
+{
+ TAG_INTEGER,
+ TAG_DOUBLE,
+ TAG_CSTRING,
+};
+
+enum sampling_mode {
+ SAMPLING_MODE_COMPREHENSIVE,
+ SAMPLING_MODE_TOPK,
+ SAMPLING_MODE_SPREADSKETCH,
+};
+
+struct field {
+ const char *key;
+ enum field_type type;
+ union{
+ long long value_longlong;
+ double value_double;
+ const char *value_str;
+ };
+};
+
+struct fieldstat;
+struct fieldstat *fieldstat_new();
+void fieldstat_free(struct fieldstat *instance);
+// copy only registered cubes and metrics, not including cells. Used to new a instance of the same schema.
+struct fieldstat *fieldstat_fork(const struct fieldstat *instance);
+
+/*
+ * let the configuration of target be the same as master, no matter what the configuration of target is.
+ * the configurations will be kept as much as possible, like cells in the same cube will be kept, but the cells in different cubes will be deleted.
+ * @ return FS_OK or FS_ERR_NULL_HANDLER
+*/
+int fieldstat_calibrate(const struct fieldstat *master, struct fieldstat *replica);
+/*
+ * @brief add an cube to this instance. Cube represents an template with a user-defined set of cells and metrics.
+ * @param cube_dimensions: tags that are shared by all cells in this cube. This is the key of the cube. Can be NULL. Must be unique. Shared_tags are ordered, which means that {"TAG_KEY": "123", "TAG_KEY2": "456"} and {"TAG_KEY2": "456", "TAG_KEY": "123"} are map to different cube.
+ * @param n_dimension: number of field in dimension.
+ * @param mode: sampling mode. Refer to enum sampling_mode.
+ * @param max_n_cell: max number of samplings(cells) in each cube. When mode is TOPK, max_n_cell > 0, while in COMPREHENSIVE mode, max_n_cell can be 0, meaning that there is no limit.
+ * @return cube id, if success; otherwise, return FS_ERR_NULL_HANDLER, or FS_ERR_INVALID_PARAM when (max_n_cell == 0 && mode == TOPK). return FS_ERR_INVALID_KEY when the cube_dimensions is not unique.
+*/
+int fieldstat_create_cube(struct fieldstat *instance, const struct field *cube_dimensions, size_t n_dimension, enum sampling_mode mode, size_t max_n_cell);
+
+/*
+ @brief Change the topk cube primary metric id. When fieldstat_counter_add or fieldstat_counter_set are called on the primary metric, the topk record of such cell will be updated.
+ the default primary metric id is 0.
+ @return FS_OK, FS_ERR_NULL_HANDLER or FS_ERR_INVALID_CUBE_ID.
+ FS_ERR_INVALID_METRIC_ID when the metric is not registered to instance.
+ FS_ERR_INVALID_PARAM when the cube is not a topk sampling cube, or the metric is not a counter.
+
+*/
+int fieldstat_cube_set_primary_metric(struct fieldstat *instance, int cube_id, int metric_id);
+
+/*
+ * @brief Delete the cube of cube_id. All the cells and metrics are deleted. The cube_id may be reused by other new cubes. Increase the corresponding cube_version by 1.
+ * @return FS_OK, FS_ERR_NULL_HANDLER or FS_ERR_INVALID_CUBE_ID
+*/
+int fieldstat_destroy_cube(struct fieldstat *instance, int cube_id);
+
+/*
+ * @brief add a metric to the cube of cube_id. One metric may be associated with different cells.
+ * @param metric_name: name of the metric. Cannot be NULL. Must be unique.
+ * @return metric id>=0 if success. If failed, return FS_ERR_NULL_HANDLER, FS_ERR_INVALID_KEY(when metric_name is not unique in this cube)
+*/
+int fieldstat_register_counter(struct fieldstat *instance, int cube_id, const char *metric_name);
+
+/*
+ * @brief refer to fieldstat_register_counter.
+ * @param precision: the bigger, the larger memory consumption, while accuracy improved. Must be in [4, 18].
+ * @return metric id if success. If failed, return FS_ERR_NULL_HANDLER, FS_ERR_INVALID_KEY(when metric_name is not unique in this cube), or FS_ERR_INVALID_PARAM(if precision not in range)
+*/
+int fieldstat_register_hll(struct fieldstat *instance, int cube_id, const char *metric_name, unsigned char precision);
+
+/*
+ * @brief refer to fieldstat_register_counter.
+ * @param lowest_trackable_value: the lowest value that can be tracked (distinguishable from 0) by the histogram. Must be >= 1.
+ * @param highest_trackable_value: the highest value to be tracked by the histogram. Must be >= 2 * lowest_trackable_value.
+ * @param significant_figures: the precision of the histogram. Must be in [1, 5].
+ * @return metric id if success. If failed, return FS_ERR_NULL_HANDLER, FS_ERR_INVALID_KEY(when metric_name is not unique in this cube), or FS_ERR_INVALID_PARAM(if any of the 3 params are out of range)
+*/
+int fieldstat_register_hist(struct fieldstat *instance, int cube_id, const char *metric_name, long long lowest_trackable_value, long long highest_trackable_value, int significant_figures);
+
+/*
+ * @brief let the value of counter metric of cell_id increase by `increment`.
+ * @param cube_id: cube id, previously returned by fieldstat_create_cube.
+ * @param metric_id: metric id, previously returned by fieldstat_register_counter.
+ * @param increment: increment of the counter metric. Can be negative.
+ * @return FS_OK if success. FS_ERR_NULL_HANDLER, FS_ERR_INVALID_CUBE_ID, FS_ERR_INVALID_METRIC_ID if fail.
+ * FS_ERR_INVALID_PARAM when cube is topk, metric is primary metric, and increment is negative.
+ * FS_ERR_TOO_MANY_CELLS when the cube is full.
+ * In comprehensive mode, a full cube cannot be added any more cells.
+ * In topk mode, every increment matters, so even the cube is full, the cell can also replace the least frequent cell.
+*/
+int fieldstat_counter_incrby(struct fieldstat *instance, int cube_id, int metric_id, const struct field *cell_dimensions, size_t n_dimensions, long long increment);
+
+/*
+ * @brief let the value of counter metric equal to value. Other annotations refer to fieldstat_counter_incrby.
+ * @return Refer to fieldstat_counter_incrby. What's more, be cautious to call this function if the metric is a primary metric of a topk cube,
+ * in such case, FS_ERR_INVALID_PARAM will be the output when the value is set to a smaller one(increment is negative).
+*/
+int fieldstat_counter_set(struct fieldstat *instance, int cube_id, int metric_id, const struct field *cell_dimensions, size_t n_dimensions, long long value);
+
+/*
+ * @brief add a key to the hll metric of cell_id. HLL approximates the number of distinct elements in a set of `key`s.
+ * @param key: key of the hll metric. Cannot be NULL.
+ * @param key_len: strlen(key).
+ * @return FS_OK if success. FS_ERR_NULL_HANDLER, FS_ERR_INVALID_CUBE_ID, FS_ERR_INVALID_METRIC_ID if fail.
+ * Since topk only support counter, FS_ERR_INVALID_PARAM is returned when the cube is topk.
+*/
+int fieldstat_hll_add(struct fieldstat *instance, int cube_id, int metric_id, const struct field *cell_dimensions, size_t n_dimensions, const char *key, size_t key_len);
+int fieldstat_hll_add_field(struct fieldstat *instance, int cube_id, int metric_id, const struct field *cell_dimensions, size_t n_dimensions, const struct field *item, size_t item_len);
+
+
+/*
+ * @brief Add a value to the histogram metric of cell_id. Histogram will record the distribution of the values.
+ The value bigger than highest_trackable_value will be set to highest_trackable_value. The value less than lowest_trackable_value will be tried to record, and, if succeed, remains in the record as -inf(most of the time) or 0(if value == 0)
+ * @param value: value of the histogram metric.
+ * @return FS_OK if success. FS_ERR_NULL_HANDLER, FS_ERR_INVALID_CUBE_ID, FS_ERR_INVALID_METRIC_ID if fail.
+ * FS_ERR_INVALID_PARAM when value is less than 0, or the cube is topk.
+*/
+int fieldstat_hist_record(struct fieldstat *instance, int cube_id, int metric_id, const struct field *cell_dimensions, size_t n_dimensions, long long value);
+
+/*
+ * @brief Delete all the cells, also the content of every metrics. The cube and metrics are not deleted. Increase cell_version by 1.
+ Note that the cell record won't be deleted at once, they just seem to be deleted. The cell record will be deleted when they are not used since the last reset and until the next reset.
+*/
+void fieldstat_reset(struct fieldstat *instance);
+
+/*
+ @brief Merge the instance. The registered cubes and metrics are merged even if there are no cells added.
+ @return 0 if success. return FS_ERR_INVALID_PARAM when the registered cubes or metrics between dest and src of the same keys has different types or configurations(like different primary metric).
+*/
+int fieldstat_merge(struct fieldstat *instance, const struct fieldstat *src);
+
+/* -------------------------------------------------------------------------- */
+/* query */
+/* -------------------------------------------------------------------------- */
+
+struct field_list
+{
+ struct field *field;
+ size_t n_field;
+};
+
+/*
+ * @brief Get all the registered cubes.
+ * @param cube_ids: the cube ids. The caller should free it. Use it like: int *cube_ids; fieldstat_get_cubes(instance, &cube_ids, &n_cube); for (int i = 0; i < n_cube; i++) { printf("%d\n", cube_ids[i]); } free(cube_ids);
+ * @param n_cube: Length of cube_ids.
+*/
+void fieldstat_get_cubes(const struct fieldstat *instance, int **cube_ids, int *n_cube);
+
+/*
+ * @brief Get all the registered metrics by fieldstat_register_counter, fieldstat_register_hll, fieldstat_register_hist.
+*/
+void fieldstat_cube_get_metrics(const struct fieldstat *instance, int cube_id, int **metric_id_out, size_t *n_metric);
+
+void fieldstat_get_metric_in_cell(const struct fieldstat *instance, int cube_id, const struct field_list *cell_dimensions, int **metric_id_out, size_t *n_metric_out);
+
+// query the name of the metric, return NULL if metric_id is invalid.
+const char *fieldstat_get_metric_name(const struct fieldstat *instance, int cube_id, int metric_id);
+
+// query the type of the metric. return (enum metric_type)-1 if metric_id is invalid.
+enum metric_type fieldstat_get_metric_type(const struct fieldstat *instance, int cube_id, int metric_id);
+
+/*
+ get the cell_dimensions added to cube when calling fieldstat_counter_incrby, fieldstat_counter_set, fieldstat_hll_add, fieldstat_hist_record.
+*/
+void fieldstat_cube_get_cells(const struct fieldstat *instance, int cube_id, struct field_list **cell_dimensions, size_t *n_cell);
+
+/*
+ get the field of fieldstat_create_cube. User free them by calling fieldstat_tag_list_arr_free(struct field_list *, 1)
+ return NULL when ID is invalid.
+*/
+struct field_list *fieldstat_cube_get_tags(const struct fieldstat *instance, int cube_id);
+
+/*
+ return a cube id corresponding to `cube_dimensions`. FS_ERR_INVALID_KEY is returned if the cube is not found.
+*/
+int fieldstat_find_cube(const struct fieldstat *instance, const struct field *cube_dimensions, size_t n_dimensions);
+
+/*
+ get the cell numbers in a cube. Return FS_ERR_INVALID_CUBE_ID if cube_id is invalid.
+*/
+int fieldstat_get_used_sampling(const struct fieldstat *instance, int cube_id);
+
+/*
+ * @brief Get the value of a metric of a cell.
+ * @param cube_id: cube id, previously returned by fieldstat_get_cubes.
+ * @param metric_id: metric id, previously returned by fieldstat_get_max_metric_id.
+ * @param cell_dimensions: previously returned by fieldstat_get_cells_used_by_metric.
+ * @param value_out: the value of the metric. If the cell is not found, *value_out is set to 0.
+ * @return FS_OK if success. FS_ERR_NULL_HANDLER, FS_ERR_INVALID_CUBE_ID, FS_ERR_INVALID_METRIC_ID if fail.
+*/
+int fieldstat_counter_get(const struct fieldstat *instance, int cube_id, const struct field_list *cell_dimensions, int metric_id, long long *value);
+
+/*
+ @brief Get an approximate count of the number of distinct elements in the cell. Other information refer to fieldstat_counter_get.
+ @return >= 0 if success. FS_ERR_INVALID_PARAM if precision is invalid.
+*/
+int fieldstat_hll_get(const struct fieldstat *instance, int cube_id, const struct field_list *cell_dimensions, int metric_id, double *value);
+long long fieldstat_hist_value_at_percentile(const struct fieldstat *instance, int cube_id, const struct field_list *cell_dimensions, int metric_id, double percentile);
+long long fieldstat_hist_count_le_value(const struct fieldstat *instance, int cube_id, const struct field_list *cell_dimensions, int metric_id, long long value);
+
+// get the base 64 encoded string of the serialized blob of a cell
+void fieldstat_get_serialized_blob(const struct fieldstat *instance, int cube_id, int metric_id, const struct field_list *cell_dimensions, char **blob, size_t *blob_size);
+
+void fieldstat_tag_list_arr_free(struct field_list *tag_list, size_t n_cell);
+
+
+#ifdef __cplusplus
+}
+#endif \ No newline at end of file
diff --git a/src/cube.c b/src/cube.c
index 47c9065..a01b775 100644
--- a/src/cube.c
+++ b/src/cube.c
@@ -302,7 +302,7 @@ int cube_manager_find(const struct cube_manager *pthis, const struct field *cube
{
char key_stack[4096];
char *key = key_stack;
- int key_len = field_array_to_key_safe(cube_dimensions, n_dimension, key, sizeof(key));
+ int key_len = field_array_to_key_safe(cube_dimensions, n_dimension, key, sizeof(key_stack));
bool free_key = false;
if (key_len < 0) { // very unlikely to happen
char *key_heap;
@@ -575,6 +575,7 @@ struct cube *cube_new(const struct field *dimensions, size_t n_dimensions, enum
case SAMPLING_MODE_SPREADSKETCH:
cube->spread_sketch = spread_sketch_new(max_n_cell);
spread_sketch_set_exdata_schema(cube->spread_sketch, exdata_new_i, exdata_free_i, exdata_merge_i, exdata_reset_i, exdata_copy_i);
+ break;
default:
assert(0);
break;
@@ -633,10 +634,10 @@ void cube_set_primary_metric(struct cube *cube, int metric_id) {
cube->primary_metric_id = metric_id;
}
-struct cell *get_cell(struct cube *cube, const struct field *dimensions, size_t n_dimension, long long increment, int metric_id) {
- char key_stack[4096];
+struct cell *get_cell_in_comprehensive_cube(struct cube *cube, const struct field *dimensions, size_t n_dimension) {
+ char key_stack[4096] = {0};
char *key = key_stack;
- int key_len = field_array_to_key_safe(dimensions, n_dimension, key, sizeof(key));
+ int key_len = field_array_to_key_safe(dimensions, n_dimension, key, sizeof(key_stack));
bool free_key = false;
if (key_len < 0) { // very unlikely to happen
char *key_heap;
@@ -652,51 +653,100 @@ struct cell *get_cell(struct cube *cube, const struct field *dimensions, size_t
args.n_dimensions = n_dimension;
struct cell *cell_data = NULL;
- switch (cube->sampling_mode) {
- case SAMPLING_MODE_TOPK: {
- if (cube->primary_metric_id != metric_id) {
- cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len);
- if (cell_data == NULL) {
- int tmp_ret = heavy_keeper_add(cube->topk, key, key_len, 0, (void *)&args);
- if (tmp_ret == 1) {
- cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len);
- }
- }
- } else {
- // heavy_keeper_add should be called anyway, to let the topk record update.
- int tmp_ret = heavy_keeper_add(cube->topk, key, key_len, increment, (void *)&args);
- if (tmp_ret == 1) {
- cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len);
- }
+ assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE);
+
+ cell_data = hash_table_get0_exdata(cube->comprehensive, key, key_len);
+ if (cell_data == NULL) {
+ int tmp_ret = hash_table_add(cube->comprehensive, key, key_len, (void *)&args);
+ if (tmp_ret == 1) {
+ cell_data = hash_table_get0_exdata(cube->comprehensive, key, key_len);
}
- break;}
- case SAMPLING_MODE_COMPREHENSIVE: {
- cell_data = hash_table_get0_exdata(cube->comprehensive, key, key_len);
+ }
+
+ if (free_key) {
+ free(key);
+ }
+ return cell_data;
+}
+
+struct cell *get_cell_in_topk_cube(struct cube *cube, const struct field *dimensions, size_t n_dimension, long long increment, int metric_id) {
+ char key_stack[4096] = {0};
+ char *key = key_stack;
+ int key_len = field_array_to_key_safe(dimensions, n_dimension, key, sizeof(key_stack));
+ bool free_key = false;
+ if (key_len < 0) { // very unlikely to happen
+ char *key_heap;
+ size_t key_len_tmp;
+ field_array_to_key_endeavor(dimensions, n_dimension, &key_heap, &key_len_tmp);
+ key = key_heap;
+ key_len = key_len_tmp;
+ free_key = true;
+ }
+
+ struct exdata_new_args args;
+ args.cell_dimensions = dimensions;
+ args.n_dimensions = n_dimension;
+
+ struct cell *cell_data = NULL;
+ assert(cube->sampling_mode == SAMPLING_MODE_TOPK);
+ if (cube->primary_metric_id != metric_id) { // FIXME: TODO: 我想把这个先get 再add 的逻辑直接改成add然后看返回值,结果在fuzz test 中的特殊码返回值里发现了问题。
+ cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len);
if (cell_data == NULL) {
- int tmp_ret = hash_table_add(cube->comprehensive, key, key_len, (void *)&args);
+ int tmp_ret = heavy_keeper_add(cube->topk, key, key_len, 0, (void *)&args);
if (tmp_ret == 1) {
- cell_data = hash_table_get0_exdata(cube->comprehensive, key, key_len);
+ cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len);
}
}
- break;}
- case SAMPLING_MODE_SPREADSKETCH: {
- if (cube->primary_metric_id != metric_id) {
- cell_data = spread_sketch_get0_exdata(cube->spread_sketch, tag_in_string, tag_len);
- // todo: spread sketch 没办法支持dummy 场景。首先,我不能让所有metric 都走spread sketch流程,
- // 因为正常来说,用level=0 的hashy 做数值,没有任何意义,肯定都更新不了,只是在刚开始的时候,起一个记录key 的作用。
- // 而,如果像是topk 那样,给count=0 的一席之地,那么存在问题:spread sketch本身不对记录的个数有限制,所以什么时候停止记录呢?这样的设计也太麻烦了。
- // 之前跟老板讨论的时候,给了两个方案,方案1:做一个buffer,如果get exdata0 get 不到,则往buffer 中的cell 里写,等到来了primary以后,把cell 送进去。
- // 方案2:简单略去第一轮添加时的情况。这会造成很少量的误差。不过,实际上这个操作不是逐包而是会话开始结束时来一次,所以误差也不会太小。必须是让贺岚风能第一个操作primary metric。
+ } else {
+ // heavy_keeper_add should be called anyway, to let the topk record update.
+ int tmp_ret = heavy_keeper_add(cube->topk, key, key_len, increment, (void *)&args);
+ if (tmp_ret == 1) {
+ cell_data = heavy_keeper_get0_exdata(cube->topk, key, key_len);
}
+ }
+
+ if (free_key) {
+ free(key);
+ }
+ return cell_data;
+}
+
+struct cell *get_cell_in_spread_sketch_cube(struct cube *cube, const struct field *dimensions, size_t n_dimension, uint64_t item_hash, int metric_id) {
+ char key_stack[4096] = {0};
+ char *key = key_stack;
+ int key_len = field_array_to_key_safe(dimensions, n_dimension, key, sizeof(key_stack));
+ bool free_key = false;
+ if (key_len < 0) { // very unlikely to happen
+ char *key_heap;
+ size_t key_len_tmp;
+ field_array_to_key_endeavor(dimensions, n_dimension, &key_heap, &key_len_tmp);
+ key = key_heap;
+ key_len = key_len_tmp;
+ free_key = true;
+ }
+
+ struct exdata_new_args args;
+ args.cell_dimensions = dimensions;
+ args.n_dimensions = n_dimension;
+
+ struct cell *cell_data = NULL;
+ assert(cube->sampling_mode == SAMPLING_MODE_SPREADSKETCH);
+
+ // todo: spread sketch 现在支持dummy 的方式是让他们也走sketch,可以用“满行”来减少这种计算,但确实加入level 低的内容,会走相同的流程,不像heavy keeper 一样就简单的查哈希表。
+ if (cube->primary_metric_id != metric_id) {
+ cell_data = spread_sketch_get0_exdata(cube->spread_sketch, key, key_len);
if (cell_data == NULL) {
- int tmp_ret = spread_sketch_add(cube->spread_sketch, tag_in_string, tag_len, 0, (void *)&args);
+ int tmp_ret = spread_sketch_add(cube->spread_sketch, key, key_len, DUMMY_ITEM_HASH, (void *)&args);
if (tmp_ret == 1) {
- cell_data = spread_sketch_get0_exdata(cube->spread_sketch, tag_in_string, tag_len);
+ cell_data = spread_sketch_get0_exdata(cube->spread_sketch, key, key_len);
}
}
- break;}
- }
-
+ } else {
+ int tmp_ret = spread_sketch_add(cube->spread_sketch, key, key_len, item_hash, (void *)&args);
+ if (tmp_ret == 1) {
+ cell_data = spread_sketch_get0_exdata(cube->spread_sketch, key, key_len);
+ }
+ }
if (free_key) {
free(key);
@@ -704,14 +754,30 @@ struct cell *get_cell(struct cube *cube, const struct field *dimensions, size_t
return cell_data;
}
+
int cube_histogram_record(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, long long value) {
assert(manifest->type == METRIC_TYPE_HISTOGRAM);
assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE || (cube->primary_metric_id != manifest->id));
- struct cell *cell_data = get_cell(cube, dimensions, n_dimensions, 0, manifest->id);
+ struct cell *cell_data = NULL;
+ switch (cube->sampling_mode) {
+ case SAMPLING_MODE_COMPREHENSIVE: {
+ cell_data = get_cell_in_comprehensive_cube(cube, dimensions, n_dimensions);
+ break;}
+ case SAMPLING_MODE_TOPK: {
+ cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, 0, manifest->id);
+ break;}
+ case SAMPLING_MODE_SPREADSKETCH: {
+ cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, 0, manifest->id);
+ break;}
+ default:
+ assert(0);
+ break;
+ }
if (cell_data == NULL) {
return FS_ERR_TOO_MANY_CELLS;
}
+
struct metric *metric = add_or_find_metric_in_cell(manifest, cell_data);
int ret = metric_histogram_record(metric, value);
@@ -723,19 +789,37 @@ int cube_histogram_record(struct cube *cube, const struct metric_manifest *manif
int cube_hll_add(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, const char *key, size_t key_len) {
assert(manifest->type == METRIC_TYPE_HLL);
- assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE || (cube->primary_metric_id != manifest->id));
+ assert(cube->sampling_mode != SAMPLING_MODE_TOPK || cube->primary_metric_id != manifest->id);
- struct cell *cell_data = get_cell(cube, dimensions, n_dimensions, 0, manifest->id);
+ uint64_t hash = 0; // just any value, if we do not need to update the primary metric of spread sketch cube, hash value is not used
+ if (cube->sampling_mode == SAMPLING_MODE_SPREADSKETCH && cube->primary_metric_id == manifest->id) {
+ hash = XXH3_64bits(key, key_len);
+ }
+ struct cell *cell_data = NULL;
+ switch (cube->sampling_mode) {
+ case SAMPLING_MODE_COMPREHENSIVE: {
+ cell_data = get_cell_in_comprehensive_cube(cube, dimensions, n_dimensions);
+ break;}
+ case SAMPLING_MODE_TOPK: {
+ cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, 0, manifest->id);
+ break;}
+ case SAMPLING_MODE_SPREADSKETCH: {
+ cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, hash, manifest->id);
+ break;}
+ default:
+ assert(0);
+ break;
+ }
if (cell_data == NULL) {
return FS_ERR_TOO_MANY_CELLS;
}
- struct metric *metric = add_or_find_metric_in_cell(manifest, cell_data);
+ struct metric *metric = add_or_find_metric_in_cell(manifest, cell_data);
metric_hll_add(metric, key, key_len);
return FS_OK;
}
-uint64_t tags2hash(const struct field *field, size_t n_dimensions) {
+uint64_t field_array_to_hash(const struct field *field, size_t n_dimensions) {
XXH3_state_t state = {0};
XXH3_64bits_reset(&state);
@@ -751,27 +835,65 @@ uint64_t tags2hash(const struct field *field, size_t n_dimensions) {
return XXH3_64bits_digest(&state);
}
-int cube_hll_add_tag(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, const struct field *tags_key, size_t n_tag_key)
+int cube_hll_add_field(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, const struct field *tags_key, size_t n_tag_key)
{
assert(manifest->type == METRIC_TYPE_HLL);
assert(cube->sampling_mode != SAMPLING_MODE_TOPK || (cube->primary_metric_id != manifest->id));
- struct cell *cell_data = get_cell(cube, dimensions, n_dimensions, 0, manifest->id);
+ uint64_t hash = 0; // just any value, if we do not need to update the primary metric of spread sketch cube, hash value is not used
+ if (cube->sampling_mode == SAMPLING_MODE_SPREADSKETCH && cube->primary_metric_id == manifest->id) {
+ hash = field_array_to_hash(tags_key, n_tag_key);
+ }
+ struct cell *cell_data = NULL;
+ switch (cube->sampling_mode) {
+ case SAMPLING_MODE_COMPREHENSIVE: {
+ cell_data = get_cell_in_comprehensive_cube(cube, dimensions, n_dimensions);
+ break;}
+ case SAMPLING_MODE_TOPK: {
+ cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, 0, manifest->id);
+ break;}
+ case SAMPLING_MODE_SPREADSKETCH: {
+ cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, hash, manifest->id);
+ break;}
+ default:
+ assert(0);
+ break;
+ }
if (cell_data == NULL) {
return FS_ERR_TOO_MANY_CELLS;
}
struct metric *metric = add_or_find_metric_in_cell(manifest, cell_data);
-
- uint64_t hash = tags2hash(tags_key, n_tag_key);
+
+ if (hash == 0) { // hash is not calculated yet.
+ hash = field_array_to_hash(tags_key, n_tag_key);
+ }
metric_hll_add_hash(metric, hash);
return FS_OK;
}
int cube_counter_incrby(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, long long increment) {
assert(manifest->type == METRIC_TYPE_COUNTER);
- assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE || (cube->primary_metric_id != manifest->id || increment >= 0));
+ assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE ||
+ (cube->sampling_mode == SAMPLING_MODE_TOPK && (cube->primary_metric_id != manifest->id || increment >= 0)) ||
+ (cube->sampling_mode == SAMPLING_MODE_SPREADSKETCH && cube->primary_metric_id != manifest->id)
+ );
- struct cell *cell_data = get_cell(cube, dimensions, n_dimensions, increment, manifest->id);
+ struct cell *cell_data = NULL;
+ switch (cube->sampling_mode) {
+ case SAMPLING_MODE_COMPREHENSIVE: {
+ cell_data = get_cell_in_comprehensive_cube(cube, dimensions, n_dimensions);
+ break;}
+ case SAMPLING_MODE_TOPK: {
+ cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, increment, manifest->id);
+ break;}
+ case SAMPLING_MODE_SPREADSKETCH: {
+ cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, 0, manifest->id);
+ break;}
+ default:
+ assert(0);
+ break;
+ }
+
if (cell_data == NULL) {
return FS_ERR_TOO_MANY_CELLS;
}
@@ -786,7 +908,21 @@ int cube_counter_set(struct cube *cube, const struct metric_manifest *manifest,
assert(manifest->type == METRIC_TYPE_COUNTER);
assert(cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE || (cube->primary_metric_id != manifest->id));
- struct cell *cell_data = get_cell(cube, dimensions, n_dimensions, 0, manifest->id);
+ struct cell *cell_data = NULL;
+ switch (cube->sampling_mode) {
+ case SAMPLING_MODE_COMPREHENSIVE: {
+ cell_data = get_cell_in_comprehensive_cube(cube, dimensions, n_dimensions);
+ break;}
+ case SAMPLING_MODE_TOPK: {
+ cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, 0, manifest->id);
+ break;}
+ case SAMPLING_MODE_SPREADSKETCH: {
+ cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, 0, manifest->id);
+ break;}
+ default:
+ assert(0);
+ break;
+ }
if (cell_data == NULL) {
return FS_ERR_TOO_MANY_CELLS;
}
@@ -809,6 +945,9 @@ struct cube *cube_copy(const struct cube *cube)
case SAMPLING_MODE_COMPREHENSIVE:
cube_dup->comprehensive = hash_table_copy(cube->comprehensive);
break;
+ case SAMPLING_MODE_SPREADSKETCH:
+ cube_dup->spread_sketch = spread_sketch_copy(cube->spread_sketch);
+ break;
default:
assert(0);
break;
@@ -829,6 +968,9 @@ void cube_merge(struct cube *dest, const struct cube *src)
case SAMPLING_MODE_COMPREHENSIVE:
hash_table_merge(dest->comprehensive, src->comprehensive);
break;
+ case SAMPLING_MODE_SPREADSKETCH:
+ spread_sketch_merge(dest->spread_sketch, src->spread_sketch);
+ break;
default:
assert(0);
break;
@@ -842,6 +984,21 @@ struct cube *cube_fork(const struct cube *cube) {
return ret;
}
+struct tmp_sorted_data_spread_sketch_cell {
+ double hll_value;
+ struct cell *data;
+};
+static int compare_tmp_sorted_data_spread_sketch_cell(const void *a, const void *b) { // sort in descending order
+ const struct tmp_sorted_data_spread_sketch_cell *aa = (const struct tmp_sorted_data_spread_sketch_cell *)a;
+ const struct tmp_sorted_data_spread_sketch_cell *bb = (const struct tmp_sorted_data_spread_sketch_cell *)b;
+ if (aa->hll_value < bb->hll_value) {
+ return 1;
+ } else if (aa->hll_value > bb->hll_value) {
+ return -1;
+ } else {
+ return 0;
+ }
+}
void cube_get_cells(const struct cube *cube, struct field_list **cell_dimensions, size_t *n_cell)
{
size_t n_cell_tmp = 0;
@@ -852,6 +1009,9 @@ void cube_get_cells(const struct cube *cube, struct field_list **cell_dimensions
case SAMPLING_MODE_TOPK:
n_cell_tmp = heavy_keeper_get_count(cube->topk);
break;
+ case SAMPLING_MODE_SPREADSKETCH:
+ n_cell_tmp = spread_sketch_get_count(cube->spread_sketch);
+ break;
default:
assert(0);
}
@@ -870,14 +1030,34 @@ void cube_get_cells(const struct cube *cube, struct field_list **cell_dimensions
case SAMPLING_MODE_TOPK:
heavy_keeper_list(cube->topk, (void **)cell_datas, n_cell_tmp);
break;
+ case SAMPLING_MODE_SPREADSKETCH:
+ spread_sketch_list(cube->spread_sketch, (void **)cell_datas, n_cell_tmp);
+ break;
default:
assert(0);
}
+ // spread sketch often stores more than max_n_cell. So sort out the top max_n_cell cells.
+ if (cube->sampling_mode == SAMPLING_MODE_SPREADSKETCH && n_cell_tmp > cube->max_n_cell) {
+ struct tmp_sorted_data_spread_sketch_cell *tmp_sorted_data = (struct tmp_sorted_data_spread_sketch_cell *)malloc(sizeof(struct tmp_sorted_data_spread_sketch_cell) * n_cell_tmp);
+ for (int i = 0; i < n_cell_tmp; i++) {
+ tmp_sorted_data[i].data = cell_datas[i];
+ tmp_sorted_data[i].hll_value = metric_hll_get(cell_datas[i]->metrics[cube->primary_metric_id]);
+ }
+ qsort(tmp_sorted_data, n_cell_tmp, sizeof(struct tmp_sorted_data_spread_sketch_cell), compare_tmp_sorted_data_spread_sketch_cell);
+
+ free(cell_datas);
+ cell_datas = (struct cell **)malloc(sizeof(struct cell *) * cube->max_n_cell);
+ for (int i = 0; i < cube->max_n_cell; i++) {
+ cell_datas[i] = tmp_sorted_data[i].data;
+ }
+ n_cell_tmp = cube->max_n_cell;
+ free(tmp_sorted_data);
+ }
+
struct field_list *tag_list_ret = (struct field_list *)malloc(sizeof(struct field_list) * n_cell_tmp);
*cell_dimensions = tag_list_ret;
*n_cell = n_cell_tmp;
-
for (int i = 0; i < n_cell_tmp; i++) {
struct cell *cell_data = cell_datas[i];
struct field_list *tag_list_tmp = &tag_list_ret[i];
@@ -907,6 +1087,9 @@ const struct cell *get_cell_by_tag_list(const struct cube *cube, const struct fi
case SAMPLING_MODE_COMPREHENSIVE:
ret = hash_table_get0_exdata(cube->comprehensive, tag_in_string, tag_len);
break;
+ case SAMPLING_MODE_SPREADSKETCH:
+ ret = spread_sketch_get0_exdata(cube->spread_sketch, tag_in_string, tag_len);
+ break;
default:
assert(0);
return NULL;
@@ -1018,7 +1201,7 @@ int cube_get_cell_count(const struct cube *cube) {
}
}
-void cube_get_cells_used_by_metric(const struct cube *cube, const struct field_list *fields, int **metric_id_out, size_t *n_metric_out) {
+void cube_get_metrics_in_cell(const struct cube *cube, const struct field_list *fields, int **metric_id_out, size_t *n_metric_out) {
const struct cell *cell_data = get_cell_by_tag_list(cube, fields);
if (cell_data == NULL) {
*metric_id_out = NULL;
@@ -1051,4 +1234,8 @@ struct field_list *cube_get_identifier(const struct cube *cube) {
return tag_list;
-} \ No newline at end of file
+}
+
+enum sampling_mode cube_get_sampling_mode(const struct cube *cube) {
+ return cube->sampling_mode;
+}
diff --git a/src/cube.h b/src/cube.h
index 2b08530..6802b2d 100644
--- a/src/cube.h
+++ b/src/cube.h
@@ -22,7 +22,7 @@ struct cube *cube_fork(const struct cube *cube); // only copy the cube configura
int cube_histogram_record(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, long long value);
int cube_hll_add(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, const char *key, size_t key_len);
-int cube_hll_add_tag(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, const struct field *tags_key, size_t n_tag_key);
+int cube_hll_add_field(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, const struct field *tags_key, size_t n_tag_key);
int cube_counter_incrby(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, long long increment);
int cube_counter_set(struct cube *cube, const struct metric_manifest *manifest, const struct field *dimensions, size_t n_dimensions, long long value);
@@ -33,8 +33,9 @@ int cube_histogram_count_le_value(const struct cube *cube, int metric_id, const
int cube_get_serialization(const struct cube *cube, int metric_id, const struct field_list *dimensions, char **blob, size_t *blob_size);
int cube_get_cell_count(const struct cube *cube);
+enum sampling_mode cube_get_sampling_mode(const struct cube *cube);
void cube_get_cells(const struct cube *cube, struct field_list **tag_list, size_t *n_cell);
-void cube_get_cells_used_by_metric(const struct cube *cube, const struct field_list *dimensions, int **metric_id_out, size_t *n_metric_out);
+void cube_get_metrics_in_cell(const struct cube *cube, const struct field_list *dimensions, int **metric_id_out, size_t *n_metric_out);
void cube_set_primary_metric(struct cube *cube, int metric_id);
struct field_list *cube_get_identifier(const struct cube *cube);
diff --git a/src/fieldstat.c b/src/fieldstat.c
index abf3e91..b847bd5 100644
--- a/src/fieldstat.c
+++ b/src/fieldstat.c
@@ -204,7 +204,9 @@ int fieldstat_cube_set_primary_metric(struct fieldstat *instance, int cube_id, i
if (manifest == NULL) {
return FS_ERR_INVALID_METRIC_ID;
}
- if (manifest->type != METRIC_TYPE_COUNTER) {
+ if (cube_get_sampling_mode(cube) == SAMPLING_MODE_COMPREHENSIVE ||
+ (cube_get_sampling_mode(cube) == SAMPLING_MODE_TOPK && manifest->type != METRIC_TYPE_COUNTER) ||
+ (cube_get_sampling_mode(cube) == SAMPLING_MODE_SPREADSKETCH && manifest->type != METRIC_TYPE_HLL)) {
return FS_ERR_INVALID_PARAM;
}
@@ -351,7 +353,7 @@ int fieldstat_hll_add_field(struct fieldstat *instance, int cube_id, int metric_
return FS_ERR_INVALID_METRIC_ID;
}
- return cube_hll_add_tag(cube, manifest, cell_dimensions, n_dimensions, item, item_len);
+ return cube_hll_add_field(cube, manifest, cell_dimensions, n_dimensions, item, item_len);
}
// cppcheck-suppress [constParameterPointer, unmatchedSuppression]
@@ -589,5 +591,5 @@ int fieldstat_find_cube(const struct fieldstat *instance, const struct field *cu
void fieldstat_get_metric_in_cell(const struct fieldstat *instance, int cube_id, const struct field_list *cell_dimensions, int **metric_id_out, size_t *n_metric_out)
{
const struct cube *cube = cube_manager_get_cube_by_id(instance->cube_manager, cube_id);
- return cube_get_cells_used_by_metric(cube, cell_dimensions, metric_id_out, n_metric_out);
+ return cube_get_metrics_in_cell(cube, cell_dimensions, metric_id_out, n_metric_out);
} \ No newline at end of file
diff --git a/src/tags/spread_sketch.c b/src/tags/spread_sketch.c
index c654e89..598ef9b 100644
--- a/src/tags/spread_sketch.c
+++ b/src/tags/spread_sketch.c
@@ -28,10 +28,11 @@
会让cell 的操作变得麻烦,无法借用老的cell manager 流程,get exdata 会得到一个exdata 的数组(可能多个),而非单独的一个,要对多个cell综合起来当一个cell 看。。修改量非常小,但是确实会影响代码本身的整洁度。
*/
-struct smart_ptr { // todo:entry
+
+struct entry {
int ref_count;
void *exdata;
-
+ bool dying;
char *key;
size_t key_len;
UT_hash_handle hh;
@@ -45,14 +46,14 @@ struct spread_sketch_scheme {
exdata_copy_cb copy_fn;
};
-struct smart_ptr_table { // TODO: entry table
- struct smart_ptr *entry;
+struct entry_table {
+ struct entry *entry;
struct spread_sketch_scheme scheme;
};
struct bucket {
- struct smart_ptr *content;
+ struct entry *content;
uint32_t level;
};
@@ -63,7 +64,7 @@ struct spread_sketch {
struct spread_sketch_scheme scheme;
struct bucket *buckets;
- struct smart_ptr_table *table;
+ struct entry_table *table;
uint32_t *min_level_per_row; // TODO: 先看看性能吧, 之后再写。用来记录每行最小的level,从而跳过行数。对于64位的level,维持一个计数,额外使用64 r的空间,当一个最小位数的level 计数到0时,更新最小level。
// TODO: 对比heavy keeper,不仅仅是跳过的问题,heavykeeper 无论什么情况,在输入0的时候都不会走sketch 更新。
@@ -91,19 +92,22 @@ static inline bool key_equal(const char *key1, size_t key1_len, const char *key2
return memcmp(key1, key2, key1_len) == 0;
}
static inline char *key_dup(const char *key, size_t key_len) {
- char *ret = malloc(key_len);
+ char *ret = malloc(key_len+1);
memcpy(ret, key, key_len);
+ ret[key_len] = '\0';
return ret;
}
-struct smart_ptr *smart_ptr_table_get(struct smart_ptr_table *table, const char *key, size_t key_len, void *arg) {
- struct smart_ptr *ret;
+struct entry *smart_ptr_table_get(struct entry_table *table, const char *key, size_t key_len, void *arg) {
+ struct entry *ret;
HASH_FIND(hh, table->entry, key, key_len, ret);
if (ret != NULL) {
+ ret->dying = false;
ret->ref_count++;
} else {
- ret = malloc(sizeof(struct smart_ptr));
+ ret = malloc(sizeof(struct entry));
+ ret->dying = false;
ret->ref_count = 1;
ret->key = key_dup(key, key_len);
ret->key_len = key_len;
@@ -118,8 +122,8 @@ struct smart_ptr *smart_ptr_table_get(struct smart_ptr_table *table, const char
return ret;
}
-int smart_ptr_table_release(struct smart_ptr_table *table, const char *key, size_t key_len) {
- struct smart_ptr *ret;
+int smart_ptr_table_release(struct entry_table *table, const char *key, size_t key_len) {
+ struct entry *ret;
HASH_FIND(hh, table->entry, key, key_len, ret);
if (ret == NULL) {
return -1;
@@ -127,6 +131,7 @@ int smart_ptr_table_release(struct smart_ptr_table *table, const char *key, size
ret->ref_count--;
if (ret->ref_count == 0) {
+ // printf("release %s\n", key);
HASH_DEL(table->entry, ret);
table->scheme.free_fn(ret->exdata);
free(ret->key);
@@ -136,8 +141,8 @@ int smart_ptr_table_release(struct smart_ptr_table *table, const char *key, size
return 0;
}
-void smart_ptr_table_free(struct smart_ptr_table *table) {
- struct smart_ptr *current, *tmp;
+void smart_ptr_table_free(struct entry_table *table) {
+ struct entry *current, *tmp;
HASH_ITER(hh, table->entry, current, tmp) {
HASH_DEL(table->entry, current);
table->scheme.free_fn(current->exdata);
@@ -147,8 +152,8 @@ void smart_ptr_table_free(struct smart_ptr_table *table) {
free(table);
}
-struct smart_ptr_table *smart_ptr_table_new() {
- struct smart_ptr_table *table = malloc(sizeof(struct smart_ptr_table));
+struct entry_table *smart_ptr_table_new() {
+ struct entry_table *table = malloc(sizeof(struct entry_table));
table->entry = NULL;
table->scheme.new_fn = default_new_fn;
@@ -194,9 +199,10 @@ struct spread_sketch *spread_sketch_new(int expected_query_num) {
}
// return 0 if not added, return 1 if added
-int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t hash_identifier, void *arg) {// todo: entry, item
+int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg) {
// uint64_t hash_identifier = XXH3_64bits_withSeed(identifier, identifier_length, 171);
- uint32_t level = (uint32_t)__builtin_clzll(hash_identifier) + 1;
+ uint32_t level = (uint32_t)__builtin_clzll(item_hash) + 1;
+ // printf("spread_sketch_add key %s, level %u\n", key, level);
// https://www.eecs.harvard.edu/~michaelm/postscripts/tr-02-05.pdf
// A technique from the hashing literature is to use two hash functions h1(x) and h2(x) to simulate additional hash functions of the form gi(x) = h1(x) + ih2(x)
@@ -212,17 +218,22 @@ int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_leng
struct bucket *bucket = &ss->buckets[bucket_idx];
if (bucket->content != NULL && key_equal(bucket->content->key, bucket->content->key_len, key, key_length)) {
+ bucket->content->dying = false;
+
if (bucket->level < level) {
bucket->level = level;
}
in_sketch = true;
} else {
- uint32_t true_level = bucket->content == NULL ? 0: bucket->level;
-
- if (true_level < level) {
- const struct smart_ptr *content_old = bucket->content;
- smart_ptr_table_release(ss->table, content_old->key, content_old->key_len);
- struct smart_ptr *content_new = smart_ptr_table_get(ss->table, key, key_length, arg);
+ uint32_t old_level = bucket->content == NULL ? 0: bucket->level;
+
+ if (level > old_level) {
+ // printf("update key %s to %s, and level %u to %u, in bucket (r,w)=(%d,%u)\n", bucket->content == NULL ? "NULL": bucket->content->key, key, old_level, level, i, hash_x % ss->width);
+ const struct entry *content_old = bucket->content;
+ if (content_old != NULL) {
+ smart_ptr_table_release(ss->table, content_old->key, content_old->key_len);
+ }
+ struct entry *content_new = smart_ptr_table_get(ss->table, key, key_length, arg);
bucket->content = content_new;
bucket->level = level;
@@ -248,7 +259,7 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch *
const struct bucket *bucket_src = &src->buckets[i];
struct bucket *bucket_dst = &dst->buckets[i];
- if (bucket_src->content == NULL) {
+ if (bucket_src->content == NULL || bucket_src->content->dying) {
continue;
}
if (bucket_dst->content == NULL) {
@@ -256,6 +267,7 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch *
bucket_dst->level = bucket_src->level;
continue;
}
+ bucket_dst->content->dying = false;
if (key_equal(bucket_src->content->key, bucket_src->content->key_len, bucket_dst->content->key, bucket_dst->content->key_len)) {
if (bucket_src->level > bucket_dst->level) {
@@ -272,10 +284,10 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch *
const struct spread_sketch_scheme *scheme = &dst->table->scheme;
- struct smart_ptr *content_dest, *content_src, *tmp;
+ struct entry *content_dest, *content_src, *tmp;
HASH_ITER(hh, dst->table->entry, content_dest, tmp) {
HASH_FIND(hh, src->table->entry, content_dest->key, content_dest->key_len, content_src);
- if (content_src == NULL) {
+ if (content_src == NULL || content_src->dying) {
continue;
}
@@ -288,9 +300,9 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch *
}
void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *key, size_t key_len) {
- struct smart_ptr *content;
+ struct entry *content;
HASH_FIND(hh, ss->table->entry, key, key_len, content);
- if (content == NULL) {
+ if (content == NULL || content->dying) {
return NULL;
}
return content->exdata;
@@ -302,9 +314,10 @@ void spread_sketch_reset(struct spread_sketch *ss) {
bucket->level = 0;
}
- struct smart_ptr *content, *tmp;
+ struct entry *content, *tmp;
HASH_ITER(hh, ss->table->entry, content, tmp) {
ss->scheme.reset_fn(content->exdata);
+ content->dying = true;
}
}
@@ -319,15 +332,24 @@ void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new
}
int spread_sketch_get_count(const struct spread_sketch *ss) {
- return HASH_CNT(hh, ss->table->entry);
+ int cnt = 0;
+ struct entry *content, *tmp;
+ HASH_ITER(hh, ss->table->entry, content, tmp) {
+ if (!content->dying) {
+ cnt++;
+ }
+ }
+
+ return cnt;
}
-// 这个函数还是会忠实的把内部所有内容都拿出来,但是预期是最多expected_query_num 个,所以在外层要做一个排序。
-// 这个排序在这里面没法做,因为没有具体的hll计数。
size_t spread_sketch_list(const struct spread_sketch *ss, void **exdatas, size_t n_exdatas) {
size_t count = 0;
- struct smart_ptr *content, *tmp;
+ struct entry *content, *tmp;
HASH_ITER(hh, ss->table->entry, content, tmp) {
+ if (content->dying) {
+ continue;
+ }
if (count >= n_exdatas) {
break;
}
@@ -343,9 +365,10 @@ struct spread_sketch *spread_sketch_copy(const struct spread_sketch *src) {
dst->buckets = calloc(dst->depth * dst->width, sizeof(struct bucket));
dst->table = smart_ptr_table_new();
+ spread_sketch_set_exdata_schema(dst, src->scheme.new_fn, src->scheme.free_fn, src->scheme.merge_fn, src->scheme.reset_fn, src->scheme.copy_fn);
for (int i = 0; i < dst->depth * dst->width; i++) {
- if (src->buckets[i].content == NULL) {
+ if (src->buckets[i].content == NULL || src->buckets[i].content->dying) {
continue;
}
dst->buckets[i].content = smart_ptr_table_get(dst->table, src->buckets[i].content->key, src->buckets[i].content->key_len, NULL);
diff --git a/src/tags/spread_sketch.h b/src/tags/spread_sketch.h
index f50cae4..9717238 100644
--- a/src/tags/spread_sketch.h
+++ b/src/tags/spread_sketch.h
@@ -8,6 +8,7 @@ extern "C"{
#include "exdata.h"
+#define DUMMY_ITEM_HASH (1ULL<<63) // level(left most zeros) = 0
struct spread_sketch;
@@ -18,7 +19,7 @@ void spread_sketch_free(struct spread_sketch *ss);
void spread_sketch_reset(struct spread_sketch *ss);
-int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t hash_identifier, void *arg);
+int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg);
void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn);
void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *key, size_t key_len);
diff --git a/test/test_empty_tags.cpp b/test/test_empty_tags.cpp
index 709ef8d..54e243a 100644
--- a/test/test_empty_tags.cpp
+++ b/test/test_empty_tags.cpp
@@ -94,6 +94,42 @@ TEST(test_empty_tag, merge_topk)
fieldstat_free(instance_src);
}
+TEST(test_empty_tag, merge_spreadsketch)
+{
+ struct fieldstat *instance_src = fieldstat_new();
+ int cube_id = fieldstat_create_cube(instance_src, NULL, 0, SAMPLING_MODE_SPREADSKETCH, 1);
+ int metric_id = fieldstat_register_hll(instance_src, "metric", 4);
+ fieldstat_hll_add(instance_src, cube_id, metric_id, NULL, 0, "1", 1);
+ struct fieldstat *instance_dst = fieldstat_new();
+
+ fieldstat_merge(instance_dst, instance_src);
+ fieldstat_merge(instance_dst, instance_src);
+
+ int *ret_cube_id_arr = NULL;
+ int n_cube = 0;
+ fieldstat_get_cubes(instance_dst, &ret_cube_id_arr, &n_cube);
+ int ret_cell_id = ret_cube_id_arr[0];
+ struct field_list *shared_tag = fieldstat_cube_get_tags(instance_dst, ret_cell_id);
+ EXPECT_EQ(shared_tag->n_field, 0);
+ EXPECT_TRUE(shared_tag->field == NULL);
+ fieldstat_tag_list_arr_free(shared_tag, 1);
+ free(ret_cube_id_arr);
+
+ struct field_list *tag_list = NULL;
+ size_t n_cell = 0;
+ fieldstat_cube_get_cells(instance_dst, cube_id, &tag_list, &n_cell);
+ EXPECT_EQ(n_cell, 1);
+ EXPECT_EQ(tag_list[0].n_field, 0);
+ EXPECT_TRUE(tag_list[0].field == NULL);
+ double value;
+ fieldstat_hll_get(instance_dst, cube_id, &tag_list[0], 0, &value);
+ EXPECT_NEAR(value, 1, 0.4);
+ fieldstat_tag_list_arr_free(tag_list, n_cell);
+
+ fieldstat_free(instance_dst);
+ fieldstat_free(instance_src);
+}
+
TEST(test_empty_tag, export)
{
struct fieldstat *instance = test_empty_my_init();
diff --git a/test/test_fuzz_test.cpp b/test/test_fuzz_test.cpp
index 38f68bc..8d896b5 100644
--- a/test/test_fuzz_test.cpp
+++ b/test/test_fuzz_test.cpp
@@ -431,8 +431,8 @@ TEST(Fuzz_test, simple_one_for_perf)
int main(int argc, char *argv[])
{
testing::InitGoogleTest(&argc, argv);
- // testing::GTEST_FLAG(filter) = "Fuzz_test.many_instance_random_flow_unregister_calibrate_reset_fork_merge_topk";
- testing::GTEST_FLAG(filter) = "Fuzz_test.simple_one_for_perf";
+ // testing::GTEST_FLAG(filter) = "Fuzz_test.add_and_reset_with_randomly_generated_flows_and_randomly_chosen_metric";
+ testing::GTEST_FLAG(filter) = "-Fuzz_test.simple_one_for_perf";
return RUN_ALL_TESTS();
} \ No newline at end of file
diff --git a/test/test_merge.cpp b/test/test_merge.cpp
index e51b90d..7a07baf 100644
--- a/test/test_merge.cpp
+++ b/test/test_merge.cpp
@@ -2,6 +2,7 @@
#include <gtest/gtest.h>
#include <set>
#include <unordered_map>
+#include <unordered_set>
#include "fieldstat.h"
#include "utils.hpp"
@@ -38,6 +39,13 @@ long long merge_test_fieldstat_counter_get(const struct fieldstat *instance, int
return ret;
}
+double merge_test_fieldstat_hll_get(const struct fieldstat *instance, int cube_id, int metric_id, const struct field_list *tag_list = &TEST_TAG_LIST_STRING)
+{
+ double ret = 0;
+ fieldstat_hll_get(instance, cube_id, tag_list, metric_id, &ret);
+ return ret;
+}
+
TEST(unit_test_merge, test_metric_name_mapping_by_adding_metric_to_dest)
{
struct fieldstat *instance = fieldstat_new();
@@ -362,7 +370,7 @@ TEST(unit_test_merge, new_too_many_cells_on_one_metric_given_source_cube_reset_a
fieldstat_tag_list_arr_free(tag_list, n_cell);
}
-struct fieldstat *test_push_flows(vector<Fieldstat_tag_list_wrapper *> &flows_in_test, int K, long long count = 1)
+struct fieldstat *topk_test_push_flows(vector<Fieldstat_tag_list_wrapper *> &flows_in_test, int K, long long count = 1)
{
struct fieldstat *instance = fieldstat_new();
int cube_id = fieldstat_create_cube(instance, &TEST_SHARED_TAG, 1, SAMPLING_MODE_TOPK, K);
@@ -377,9 +385,9 @@ TEST(unit_test_merge, merge_accuracy_test_with_K_large_enough_topk)
{
int K = 100;
vector<Fieldstat_tag_list_wrapper *> flows_in_src = test_gen_topk_flows(K, K);
- struct fieldstat *instance_src = test_push_flows(flows_in_src, K);
+ struct fieldstat *instance_src = topk_test_push_flows(flows_in_src, K);
vector<Fieldstat_tag_list_wrapper *> flows_in_dest = test_gen_topk_flows(K, K);
- struct fieldstat *instance_dest = test_push_flows(flows_in_dest, K);
+ struct fieldstat *instance_dest = topk_test_push_flows(flows_in_dest, K);
fieldstat_merge(instance_dest, instance_src);
struct field_list *tag_list = NULL;
@@ -406,17 +414,17 @@ TEST(unit_test_merge, merge_accuracy_test_with_K_large_enough_topk)
}
}
-TEST(unit_test_merge, merge_accuracy_test_gen_dest_full_all_inserted_given_src_flows_larger)
+TEST(unit_test_merge, merge_accuracy_test_gen_dest_full_all_inserted_given_src_flows_larger_topk)
{
int K = 1000;
vector<Fieldstat_tag_list_wrapper *> flows_in_src = test_gen_topk_flows(10000, K);
- struct fieldstat *instance_src = test_push_flows(flows_in_src, K, 1000); // 1000 times larger than dest 1
+ struct fieldstat *instance_src = topk_test_push_flows(flows_in_src, K, 1000); // 1000 times larger than dest 1
vector<Fieldstat_tag_list_wrapper *> flows_in_dest;
for (int i = 0; i < K; i++) {
Fieldstat_tag_list_wrapper *tmp = new Fieldstat_tag_list_wrapper("flows in dest", to_string(i).c_str());
flows_in_dest.push_back(tmp);
}
- struct fieldstat *instance_dest = test_push_flows(flows_in_dest, K, 1);
+ struct fieldstat *instance_dest = topk_test_push_flows(flows_in_dest, K, 1);
fieldstat_merge(instance_dest, instance_src);
@@ -444,13 +452,13 @@ TEST(unit_test_merge, merge_accuracy_test_gen_dest_full_all_inserted_given_src_f
}
}
-TEST(unit_test_merge, merge_accuracy_test_gen_dest_full_some_inserted_and_some_merged_and_some_fail_to_add)
+TEST(unit_test_merge, merge_accuracy_test_gen_dest_full_some_inserted_and_some_merged_and_some_fail_to_add_topk)
{
int K = 100;
vector<Fieldstat_tag_list_wrapper *> flows_in_src = test_gen_topk_flows(30000, K + 50); // let elephant flows in src and dest different
- struct fieldstat *instance_src = test_push_flows(flows_in_src, K);
+ struct fieldstat *instance_src = topk_test_push_flows(flows_in_src, K);
vector<Fieldstat_tag_list_wrapper *> flows_in_dest = test_gen_topk_flows(30000, K + 50);
- struct fieldstat *instance_dest = test_push_flows(flows_in_dest, K);
+ struct fieldstat *instance_dest = topk_test_push_flows(flows_in_dest, K);
fieldstat_merge(instance_dest, instance_src);
struct field_list *tag_list = NULL;
@@ -523,11 +531,227 @@ TEST(unit_test_merge, primary_metric_id_different)
fieldstat_free(instance_dst);
}
+TEST(unit_test_merge, new_cube_and_metric_to_empty_spreadsketch) {
+ struct fieldstat *instance = fieldstat_new();
+ fieldstat_create_cube(instance, &TEST_TAG_INT, 1, SAMPLING_MODE_SPREADSKETCH, 10);
+ fieldstat_register_hll(instance, "metric", 6);
+
+ struct fieldstat *instance_dest = fieldstat_new();
+ fieldstat_merge(instance_dest, instance);
+
+ int *cube_id_dest;
+ int n_cube;
+ fieldstat_get_cubes(instance_dest, &cube_id_dest, &n_cube);
+ EXPECT_TRUE(n_cube == 1);
+ EXPECT_STREQ(fieldstat_get_metric_name(instance_dest, 0), "metric");
+
+ free(cube_id_dest);
+ fieldstat_free(instance);
+ fieldstat_free(instance_dest);
+}
+
+TEST(unit_test_merge, new_cell_on_existing_cube_and_metric_spreadsketch) {
+ struct fieldstat *instance = fieldstat_new();
+ int cube_id = fieldstat_create_cube(instance, &TEST_SHARED_TAG, 1, SAMPLING_MODE_SPREADSKETCH, 10);
+ int metric_id = fieldstat_register_hll(instance, "metric", 6);
+ struct fieldstat *instance_dest = fieldstat_new();
+ fieldstat_merge(instance_dest, instance);
+
+ fieldstat_hll_add(instance, cube_id, metric_id, &TEST_TAG_STRING, 1, "1", 1);
+ fieldstat_hll_add(instance, cube_id, metric_id, &TEST_TAG_STRING, 1, "2", 1);
+ fieldstat_merge(instance_dest, instance);
+
+ int *cube_id_dest;
+ int n_cube;
+ fieldstat_get_cubes(instance_dest, &cube_id_dest, &n_cube);
+ EXPECT_TRUE(n_cube == 1);
+ free(cube_id_dest);
+ EXPECT_STREQ(fieldstat_get_metric_name(instance_dest, 0), "metric");
+ long long measure = merge_test_fieldstat_hll_get(instance, cube_id, metric_id);
+ EXPECT_NEAR(measure, 2, 0.3);
+
+ struct field_list *tag_list = NULL;
+ size_t n_cell = 0;
+ fieldstat_cube_get_cells(instance, cube_id, &tag_list, &n_cell);
+ EXPECT_EQ(n_cell, 1);
+ EXPECT_EQ(tag_list->n_field, 1);
+ EXPECT_STREQ(tag_list->field[0].key, TEST_TAG_STRING.key);
+
+ fieldstat_free(instance);
+ fieldstat_free(instance_dest);
+ fieldstat_tag_list_arr_free(tag_list, n_cell);
+}
+
+TEST(unit_test_merge, merge_existing_cell_on_existing_cube_and_metric_spreadsketch) {
+ struct fieldstat *instance = fieldstat_new();
+ int cube_id = fieldstat_create_cube(instance, &TEST_SHARED_TAG, 1, SAMPLING_MODE_SPREADSKETCH, 10);
+ int metric_id = fieldstat_register_hll(instance, "metric", 6);
+ fieldstat_hll_add(instance, cube_id, metric_id, &TEST_TAG_STRING, 1, "1", 1);
+ struct fieldstat *instance_dest = fieldstat_new();
+
+ fieldstat_merge(instance_dest, instance);
+ fieldstat_merge(instance_dest, instance);
+ fieldstat_hll_add(instance, cube_id, metric_id, &TEST_TAG_STRING, 1, "2", 1);
+ fieldstat_merge(instance_dest, instance);
+
+ struct field_list *tag_list = NULL;
+ size_t n_cell = 0;
+ fieldstat_cube_get_cells(instance_dest, cube_id, &tag_list, &n_cell);
+ EXPECT_EQ(n_cell, 1);
+ double value = merge_test_fieldstat_hll_get(instance_dest, cube_id, metric_id, &tag_list[0]);
+ EXPECT_NEAR(value, 2, 0.3);
+
+ fieldstat_free(instance);
+ fieldstat_free(instance_dest);
+ fieldstat_tag_list_arr_free(tag_list, n_cell);
+}
+
+TEST(unit_test_merge, new_too_many_cells_on_one_metric_given_source_cube_reset_and_get_different_cube_spreadsketch) {
+ struct fieldstat *instance = fieldstat_new();
+ int cube_id = fieldstat_create_cube(instance, &TEST_SHARED_TAG, 1, SAMPLING_MODE_SPREADSKETCH, 2);
+ int metric_id = fieldstat_register_hll(instance, "metric", 6);
+ fieldstat_hll_add(instance, cube_id, metric_id, &TEST_TAG_STRING, 1, "1", 1);
+ struct fieldstat *instance_dest = fieldstat_new();
+ fieldstat_merge(instance_dest, instance);
+
+ fieldstat_reset(instance);
+ fieldstat_hll_add(instance, cube_id, metric_id, &TEST_TAG_INT, 1, "21", 2);
+ fieldstat_hll_add(instance, cube_id, metric_id, &TEST_TAG_INT, 1, "22", 2);
+ fieldstat_hll_add(instance, cube_id, metric_id, &TEST_TAG_DOUBLE, 1, "31", 2);
+ fieldstat_hll_add(instance, cube_id, metric_id, &TEST_TAG_DOUBLE, 1, "32", 2);
+ fieldstat_hll_add(instance, cube_id, metric_id, &TEST_TAG_DOUBLE, 1, "33", 2);
+ fieldstat_merge(instance_dest, instance);
+
+ struct field_list *tag_list = NULL;
+ size_t n_cell = 0;
+ fieldstat_cube_get_cells(instance_dest, 0, &tag_list, &n_cell);
+ EXPECT_EQ(n_cell, 2);
+ EXPECT_NEAR(merge_test_fieldstat_hll_get(instance_dest, 0, 0, &tag_list[0]), 3, 0.3);
+ EXPECT_NEAR(merge_test_fieldstat_hll_get(instance_dest, 0, 0, &tag_list[1]), 2, 0.3);
+ EXPECT_STREQ(tag_list[0].field[0].key, TEST_TAG_DOUBLE.key);
+ EXPECT_STREQ(tag_list[1].field[0].key, TEST_TAG_INT.key);
+
+ fieldstat_free(instance);
+ fieldstat_free(instance_dest);
+ fieldstat_tag_list_arr_free(tag_list, n_cell);
+}
+
+TEST(unit_test_merge, gen_dest_full_all_src_inserted_given_src_flows_larger_spreadsketch) {
+ int K = 100;
+ SpreadSketchZipfGenerator flow_generator(1.0, K); // exactly the number of cells, so there will be almost all(in case of hash collision happen) cells added successfully
+ struct fieldstat *instance_src = fieldstat_new();
+ int cube_id = fieldstat_create_cube(instance_src, &TEST_SHARED_TAG, 1, SAMPLING_MODE_SPREADSKETCH, K);
+ int metric_id = fieldstat_register_hll(instance_src, "metric", 6);
+ struct fieldstat *instance_dest = fieldstat_fork(instance_src);
+ const char dest_key[] = "key of dest";
+ const char src_key[] = "key of src";
+
+ std::unordered_map<std::string, std::unordered_set<std::string>> flow_cnt;
+ for (int i = 0; i < 500000; i++) { // add more, so the fanout of any flow to src instance is more than dest
+ Flow flow = flow_generator.next();
+ Fieldstat_tag_list_wrapper dimension = Fieldstat_tag_list_wrapper(src_key, flow.src_ip.c_str());
+ Fieldstat_tag_list_wrapper item = Fieldstat_tag_list_wrapper("dummy", flow.dst_ip.c_str());
+ fieldstat_hll_add_field(instance_src, cube_id, metric_id, dimension.get_tag(), dimension.get_tag_count(), item.get_tag(), item.get_tag_count());
+
+ flow_cnt[dimension.to_string()].insert(item.to_string());
+ }
+
+ for (int i = 0; i < 1000; i++) {
+ Flow flow = flow_generator.next();
+ Fieldstat_tag_list_wrapper dimension = Fieldstat_tag_list_wrapper(dest_key, flow.src_ip.c_str());
+ Fieldstat_tag_list_wrapper item = Fieldstat_tag_list_wrapper("dummy", flow.dst_ip.c_str());
+ fieldstat_hll_add_field(instance_dest, cube_id, metric_id, dimension.get_tag(), dimension.get_tag_count(), item.get_tag(), item.get_tag_count());
+
+ flow_cnt[dimension.to_string()].insert(item.to_string());
+ }
+
+ fieldstat_merge(instance_dest, instance_src);
+
+ struct field_list *tag_list = NULL;
+ struct field_list *tag_list_src = NULL;
+ size_t n_cell = 0;
+ size_t n_cell_src = 0;
+ std::vector<struct Fieldstat_tag_list_wrapper *> test_result;
+ fieldstat_cube_get_cells(instance_dest, 0, &tag_list, &n_cell);
+ fieldstat_cube_get_cells(instance_src, 0, &tag_list_src, &n_cell_src);
+ for (size_t i = 0; i < n_cell; i++) {
+ test_result.push_back(new Fieldstat_tag_list_wrapper(&tag_list[i]));
+ }
+ std::unordered_map<std::string, int> expected_unique_cnt;
+ for (auto &kv : flow_cnt) {
+ expected_unique_cnt[kv.first] = kv.second.size();
+ }
+
+ double recall = test_cal_topk_accuracy(test_result, expected_unique_cnt);
+ EXPECT_NEAR(recall, n_cell_src * 1.0 / n_cell, 0.0001); // the false positive is only generated because some cells in src are left because of hash collision
+
+ fieldstat_free(instance_src);
+ fieldstat_free(instance_dest);
+ fieldstat_tag_list_arr_free(tag_list, n_cell);
+ fieldstat_tag_list_arr_free(tag_list_src, n_cell_src);
+ for (size_t i = 0; i < test_result.size(); i++) {
+ delete test_result[i];
+ }
+}
+
+TEST(unit_test_merge, merge_accuracy_test_gen_dest_full_some_inserted_and_some_merged_and_some_fail_to_add_spreadsketch) {
+ int K = 100;
+ SpreadSketchZipfGenerator flow_generator(1.0, K); // exactly the number of cells, so there will be almost all(in case of hash collision happen) cells added successfully
+ struct fieldstat *instance_src = fieldstat_new();
+ int cube_id = fieldstat_create_cube(instance_src, &TEST_SHARED_TAG, 1, SAMPLING_MODE_SPREADSKETCH, K);
+ int metric_id = fieldstat_register_hll(instance_src, "metric", 6);
+ struct fieldstat *instance_dest = fieldstat_fork(instance_src);
+
+ std::unordered_map<std::string, std::unordered_set<std::string>> flow_cnt;
+ for (int i = 0; i < 100000; i++) {
+ Flow flow = flow_generator.next();
+ const char *use_key = rand()%2? "src":"common";
+ Fieldstat_tag_list_wrapper dimension = Fieldstat_tag_list_wrapper(use_key, flow.src_ip.c_str());
+ Fieldstat_tag_list_wrapper item = Fieldstat_tag_list_wrapper("dummy", flow.dst_ip.c_str());
+ fieldstat_hll_add_field(instance_src, cube_id, metric_id, dimension.get_tag(), dimension.get_tag_count(), item.get_tag(), item.get_tag_count());
+
+ flow_cnt[dimension.to_string()].insert(item.to_string());
+ }
+ for (int i = 0; i < 100000; i++) {
+ Flow flow = flow_generator.next();
+ const char *use_key = rand()%2? "dest":"common";
+ Fieldstat_tag_list_wrapper dimension = Fieldstat_tag_list_wrapper(use_key, flow.src_ip.c_str());
+ Fieldstat_tag_list_wrapper item = Fieldstat_tag_list_wrapper("dummy", flow.dst_ip.c_str());
+ fieldstat_hll_add_field(instance_src, cube_id, metric_id, dimension.get_tag(), dimension.get_tag_count(), item.get_tag(), item.get_tag_count());
+
+ flow_cnt[dimension.to_string()].insert(item.to_string());
+ }
+
+ fieldstat_merge(instance_dest, instance_src);
+
+ struct field_list *tag_list = NULL;
+ size_t n_cell = 0;
+ std::vector<struct Fieldstat_tag_list_wrapper *> test_result;
+ fieldstat_cube_get_cells(instance_dest, 0, &tag_list, &n_cell);
+ for (size_t i = 0; i < n_cell; i++) {
+ test_result.push_back(new Fieldstat_tag_list_wrapper(&tag_list[i]));
+ }
+
+ std::unordered_map<std::string, int> expected_unique_cnt;
+ for (auto &kv : flow_cnt) {
+ expected_unique_cnt[kv.first] = kv.second.size();
+ }
+ double recall = test_cal_topk_accuracy(test_result, expected_unique_cnt);
+ EXPECT_GT(recall, 0.7);
+ printf("gen_dest_full_all_src_inserted_given_src_flows_larger_spreadsketch recall is %lf\n", recall);
+
+ fieldstat_free(instance_src);
+ fieldstat_free(instance_dest);
+ fieldstat_tag_list_arr_free(tag_list, n_cell);
+ for (size_t i = 0; i < test_result.size(); i++) {
+ delete test_result[i];
+ }
+}
int main(int argc, char *argv[])
{
testing::InitGoogleTest(&argc, argv);
- // testing::GTEST_FLAG(filter) = "unit_test_merge.merge_existing_cell_on_existing_cube_and_metric_topk";
+ testing::GTEST_FLAG(filter) = "*spreadsketch";
return RUN_ALL_TESTS();
} \ No newline at end of file
diff --git a/test/test_metric_hll.cpp b/test/test_metric_hll.cpp
index 8e96266..7c38de2 100644
--- a/test/test_metric_hll.cpp
+++ b/test/test_metric_hll.cpp
@@ -1,4 +1,10 @@
#include <gtest/gtest.h>
+#include <string>
+#include <vector>
+#include <unordered_map>
+#include <unordered_set>
+#include <math.h>
+
#include "fieldstat.h"
#include "utils.hpp"
@@ -216,9 +222,71 @@ TEST(metric_test_hll, add_with_wrong_metric_id_expecting_fail)
fieldstat_free(instance);
}
+TEST(metric_test_hll, spread_sketch_add_and_test_accuracy)
+{
+ struct fieldstat *instance = fieldstat_new();
+ int K = 10;
+ fieldstat_create_cube(instance, &TEST_TAG_INT_collided, 1, SAMPLING_MODE_SPREADSKETCH, K);
+ fieldstat_register_hll(instance, "testss", 6);
+
+ int n_flows = 100000;
+ std::unordered_map<std::string, std::unordered_set<std::string>> flow_cnt;
+ SpreadSketchZipfGenerator generator(1.0, K * 10); // give much bigger distribution, so that we can test the accuracy
+ for (int i = 0; i < n_flows; i++)
+ {
+ Flow f = generator.next();
+ Fieldstat_tag_list_wrapper dimension("src ip", f.src_ip.c_str());
+ Fieldstat_tag_list_wrapper counted("dst ip", f.dst_ip.c_str());
+
+ fieldstat_hll_add_field(instance, 0, 0, dimension.get_tag(), dimension.get_tag_count(), counted.get_tag(), counted.get_tag_count());
+
+ flow_cnt[dimension.to_string()].insert(counted.to_string());
+ }
+
+ // recall
+ std::unordered_map<std::string, int> expected_unique_cnt;
+ std::vector<struct Fieldstat_tag_list_wrapper *> test_result;
+ for (auto &kv : flow_cnt) {
+ expected_unique_cnt[kv.first] = kv.second.size();
+ }
+
+ struct field_list *tag_list = NULL;
+ size_t n_cell = 0;
+ fieldstat_cube_get_cells(instance, 0, &tag_list, &n_cell);
+ EXPECT_EQ(n_cell, K);
+ for (size_t i = 0; i < n_cell; i++) {
+ Fieldstat_tag_list_wrapper tmp = Fieldstat_tag_list_wrapper(&tag_list[i]);
+ test_result.push_back(new Fieldstat_tag_list_wrapper(tmp));
+ }
+ double recall = test_cal_topk_accuracy(test_result, expected_unique_cnt);
+ printf("spread_sketch_add_and_test_accuracy recall: %f\n", recall);
+ EXPECT_GE(recall, 0.8);
+
+ // MRE
+ double mre = 0;
+ for (size_t i = 0; i < n_cell; i++) {
+ Fieldstat_tag_list_wrapper tmp = Fieldstat_tag_list_wrapper(&tag_list[i]);
+ double value_true = expected_unique_cnt[tmp.to_string()];
+ double value_est;
+ fieldstat_hll_get(instance, 0, &tag_list[i], 0, &value_est);
+ // printf("the estimated value for %s is %f, the true value is %f\n", tmp.to_string().c_str(), value_est, value_true);
+
+ mre += fabs(value_true - value_est) / value_true;
+ }
+ mre = mre / n_cell;
+ printf("topk_add_and_test_accuracy Mean ratio e: %f\n", mre);
+ EXPECT_LE(mre, 0.2);
+
+ fieldstat_tag_list_arr_free(tag_list, n_cell);
+ fieldstat_free(instance);
+ for (auto &ptr : test_result) {
+ delete ptr;
+ }
+}
int main(int argc, char *argv[])
{
testing::InitGoogleTest(&argc, argv);
+ // testing::GTEST_FLAG(filter) = "metric_test_hll.spread_sketch_add_and_test_accuracy";
return RUN_ALL_TESTS();
}
diff --git a/test/test_register_and_reset.cpp b/test/test_register_and_reset.cpp
index 2e295e8..d94e935 100644
--- a/test/test_register_and_reset.cpp
+++ b/test/test_register_and_reset.cpp
@@ -23,6 +23,12 @@ TEST(test_register, delete_comprehensive_cube_with_cells_and_metrics)
fieldstat_counter_incrby(instance, cube_id, metric_id, &TEST_TAG_INT, 1, 1);
fieldstat_destroy_cube(instance, cube_id);
+
+ struct field_list *tag_list = fieldstat_cube_get_tags(instance, cube_id);
+ EXPECT_EQ(tag_list, nullptr);
+ int cube_id_ret = fieldstat_find_cube(instance, &TEST_SHARED_TAG, 1);
+ EXPECT_EQ(cube_id_ret, FS_ERR_INVALID_KEY);
+
fieldstat_free(instance);
}
@@ -34,9 +40,34 @@ TEST(test_register, delete_topk_cube_with_cells_and_metrics)
fieldstat_counter_incrby(instance, cube_id, metric_id, &TEST_TAG_INT, 1, 1);
fieldstat_destroy_cube(instance, cube_id);
+ struct field_list *tag_list = fieldstat_cube_get_tags(instance, cube_id);
+ EXPECT_EQ(tag_list, nullptr);
+ int cube_id_ret = fieldstat_find_cube(instance, &TEST_SHARED_TAG, 1);
+ EXPECT_EQ(cube_id_ret, FS_ERR_INVALID_KEY);
+
fieldstat_free(instance);
}
+TEST(test_register, delete_spreadsketch_cube_with_cells_and_metrics)
+{
+ struct fieldstat *instance = fieldstat_new();
+ int cube_id = fieldstat_create_cube(instance, &TEST_SHARED_TAG, 1, SAMPLING_MODE_SPREADSKETCH, 10);
+ int metric_id1 = fieldstat_register_counter(instance, "counter");
+ int metric_primary = fieldstat_register_hll(instance, "hll_primary", 5);
+ fieldstat_cube_set_primary_metric(instance, cube_id, metric_primary);
+ fieldstat_counter_incrby(instance, cube_id, metric_id1, &TEST_TAG_INT, 1, 1);
+ fieldstat_hll_add_field(instance, cube_id, metric_primary, &TEST_TAG_INT, 1, &TEST_TAG_DOUBLE, 1);
+
+ fieldstat_destroy_cube(instance, cube_id);
+ struct field_list *tag_list = fieldstat_cube_get_tags(instance, cube_id);
+ EXPECT_EQ(tag_list, nullptr);
+ int cube_id_ret = fieldstat_find_cube(instance, &TEST_SHARED_TAG, 1);
+ EXPECT_EQ(cube_id_ret, FS_ERR_INVALID_KEY);
+
+ fieldstat_free(instance);
+}
+
+
int test_get_max_metric_id(const struct fieldstat *instance)
{
int *metric_id_out;
@@ -46,7 +77,7 @@ int test_get_max_metric_id(const struct fieldstat *instance)
return n_metric - 1;
}
-TEST(test_register, reset_and_try_to_query_cell)
+TEST(test_register, reset_and_try_to_query_cell_comprehensive)
{
struct fieldstat *instance = fieldstat_new();
int cube_id = fieldstat_create_cube(instance, &TEST_SHARED_TAG, 1, SAMPLING_MODE_COMPREHENSIVE, 10);
@@ -54,14 +85,56 @@ TEST(test_register, reset_and_try_to_query_cell)
fieldstat_counter_incrby(instance, cube_id, metric_id, &TEST_TAG_INT, 1, 1);
fieldstat_reset(instance);
- EXPECT_EQ(test_get_max_metric_id(instance), 0);
long long value;
EXPECT_EQ(fieldstat_counter_get(instance, cube_id, &TEST_TAG_LIST_INT, metric_id, &value), FS_ERR_INVALID_TAG);
+ size_t n_cell;
+ struct field_list *tag_list;
+ fieldstat_cube_get_cells(instance, cube_id, &tag_list, &n_cell);
+ EXPECT_EQ(n_cell, 0);
+
fieldstat_free(instance);
}
-TEST(test_register, reset_and_new_cell)
+TEST(test_register, reset_and_try_to_query_cell_topk)
+{
+ struct fieldstat *instance = fieldstat_new();
+ int cube_id = fieldstat_create_cube(instance, &TEST_SHARED_TAG, 1, SAMPLING_MODE_TOPK, 10);
+ int metric_id = fieldstat_register_counter(instance, "counter");
+ fieldstat_counter_incrby(instance, cube_id, metric_id, &TEST_TAG_INT, 1, 1);
+
+ fieldstat_reset(instance);
+ long long value;
+ EXPECT_EQ(fieldstat_counter_get(instance, cube_id, &TEST_TAG_LIST_INT, metric_id, &value), FS_ERR_INVALID_TAG);
+
+ size_t n_cell;
+ struct field_list *tag_list;
+ fieldstat_cube_get_cells(instance, cube_id, &tag_list, &n_cell);
+ EXPECT_EQ(n_cell, 0);
+
+ fieldstat_free(instance);
+}
+
+TEST(test_register, reset_and_try_to_query_cell_spreadsketch)
+{
+ struct fieldstat *instance = fieldstat_new();
+ int cube_id = fieldstat_create_cube(instance, &TEST_SHARED_TAG, 1, SAMPLING_MODE_SPREADSKETCH, 10);
+ int metric_id = fieldstat_register_hll(instance, "hll", 5);
+ fieldstat_hll_add(instance, cube_id, metric_id, &TEST_TAG_INT, 1, "12abc", 5);
+
+ fieldstat_reset(instance);
+ double value;
+ EXPECT_EQ(fieldstat_hll_get(instance, cube_id, &TEST_TAG_LIST_INT, metric_id, &value), FS_ERR_INVALID_TAG);
+
+ size_t n_cell;
+ struct field_list *tag_list;
+ fieldstat_cube_get_cells(instance, cube_id, &tag_list, &n_cell);
+ EXPECT_EQ(n_cell, 0);
+
+ fieldstat_free(instance);
+}
+
+TEST(test_register, reset_and_new_cell_comprehensive)
{
struct fieldstat *instance = fieldstat_new();
int cube_id = fieldstat_create_cube(instance, &TEST_SHARED_TAG, 1, SAMPLING_MODE_COMPREHENSIVE, 2);
@@ -78,6 +151,134 @@ TEST(test_register, reset_and_new_cell)
fieldstat_free(instance);
}
+TEST(test_register, reset_and_new_cell_topk)
+{
+ struct fieldstat *instance = fieldstat_new();
+ int cube_id = fieldstat_create_cube(instance, &TEST_SHARED_TAG, 1, SAMPLING_MODE_TOPK, 1);
+ int metric_id = fieldstat_register_counter(instance, "counter");
+ fieldstat_counter_incrby(instance, cube_id, metric_id, &TEST_TAG_INT, 1, 100);//100: bigger value
+ int ret = fieldstat_counter_incrby(instance, cube_id, metric_id, &TEST_TAG_DOUBLE, 1, 1);
+ EXPECT_EQ(ret, FS_ERR_TOO_MANY_CELLS);
+
+ fieldstat_reset(instance);
+ ret = fieldstat_counter_incrby(instance, cube_id, metric_id, &TEST_TAG_DOUBLE, 1, 1);
+ EXPECT_EQ(ret, FS_OK);
+
+ fieldstat_free(instance);
+}
+
+TEST(test_register, reset_and_new_cell_spreadsketch)
+{
+ struct fieldstat *instance = fieldstat_new();
+ int cube_id = fieldstat_create_cube(instance, &TEST_SHARED_TAG, 1, SAMPLING_MODE_SPREADSKETCH, 1);
+ int metric_id = fieldstat_register_hll(instance, "hll", 5);
+ // spread sketch will store more data than expected cell number 1. So loop for many cells first to trigger the error
+ struct field test_tag_long = TEST_TAG_INT;
+ for (int i = 0; i < 10000; i++) {
+ test_tag_long.value_longlong = i;
+ fieldstat_hll_add(instance, cube_id, metric_id, &test_tag_long, 1, "12abc", 5);
+ }
+ int ret = fieldstat_hll_add(instance, cube_id, metric_id, &TEST_TAG_DOUBLE, 1, "12abc", 5);
+ EXPECT_EQ(ret, FS_ERR_TOO_MANY_CELLS);
+
+ fieldstat_reset(instance);
+ ret = fieldstat_hll_add(instance, cube_id, metric_id, &TEST_TAG_DOUBLE, 1, "12abc", 5);
+ EXPECT_EQ(ret, FS_OK);
+
+ fieldstat_free(instance);
+}
+
+TEST(test_register, ensure_recovery_more_faster_comprehensive) {
+ struct fieldstat *instance = fieldstat_new();
+ int cell_num = 1000;
+ int cube_id = fieldstat_create_cube(instance, &TEST_SHARED_TAG, 1, SAMPLING_MODE_COMPREHENSIVE, cell_num);
+ int metric_id = fieldstat_register_counter(instance, "counter");
+ struct field test_tag_long = TEST_TAG_INT;
+
+ clock_t start = clock();
+ for (int i = 0; i < cell_num; i++) {
+ test_tag_long.value_longlong = i;
+ fieldstat_counter_incrby(instance, cube_id, metric_id, &test_tag_long, 1, 1);
+ }
+ clock_t end = clock();
+ clock_t duration_initialize = end - start;
+
+ fieldstat_reset(instance);
+
+ start = clock();
+ for (int i = 0; i < cell_num; i++) {
+ test_tag_long.value_longlong = i;
+ fieldstat_counter_incrby(instance, cube_id, metric_id, &test_tag_long, 1, 1);
+ }
+ end = clock();
+ clock_t duration_reset = end - start;
+
+ EXPECT_LT(duration_reset, duration_initialize);
+
+ fieldstat_free(instance);
+}
+
+TEST(test_register, ensure_recovery_more_faster_topk) {
+ struct fieldstat *instance = fieldstat_new();
+ int cell_num = 1000;
+ int cube_id = fieldstat_create_cube(instance, &TEST_SHARED_TAG, 1, SAMPLING_MODE_TOPK, cell_num);
+ int metric_id = fieldstat_register_counter(instance, "counter");
+ struct field test_tag_long = TEST_TAG_INT;
+
+ clock_t start = clock();
+ for (int i = 0; i < cell_num; i++) {
+ test_tag_long.value_longlong = i;
+ fieldstat_counter_incrby(instance, cube_id, metric_id, &test_tag_long, 1, 1);
+ }
+ clock_t end = clock();
+ clock_t duration_initialize = end - start;
+
+ fieldstat_reset(instance);
+
+ start = clock();
+ for (int i = 0; i < cell_num; i++) {
+ test_tag_long.value_longlong = i;
+ fieldstat_counter_incrby(instance, cube_id, metric_id, &test_tag_long, 1, 1);
+ }
+ end = clock();
+ clock_t duration_reset = end - start;
+
+ EXPECT_LT(duration_reset, duration_initialize);
+
+ fieldstat_free(instance);
+}
+
+TEST(test_register, ensure_recovery_more_faster_spreadsketch) {
+ struct fieldstat *instance = fieldstat_new();
+ int cell_num = 1000;
+ int cube_id = fieldstat_create_cube(instance, &TEST_SHARED_TAG, 1, SAMPLING_MODE_SPREADSKETCH, cell_num);
+ int metric_id = fieldstat_register_counter(instance, "counter");
+ struct field test_tag_long = TEST_TAG_INT;
+
+ clock_t start = clock();
+ for (int i = 0; i < cell_num; i++) {
+ test_tag_long.value_longlong = i;
+ fieldstat_hll_add(instance, cube_id, metric_id, &test_tag_long, 1, "1", 1);
+ }
+ clock_t end = clock();
+ clock_t duration_initialize = end - start;
+
+ fieldstat_reset(instance);
+
+ start = clock();
+ for (int i = 0; i < cell_num; i++) {
+ test_tag_long.value_longlong = i;
+ fieldstat_hll_add(instance, cube_id, metric_id, &test_tag_long, 1, "1", 1);
+ }
+ end = clock();
+ clock_t duration_reset = end - start;
+
+ printf("initialize: %ld, reset: %ld\n", duration_initialize, duration_reset);
+ EXPECT_LT(duration_reset, duration_initialize);
+
+ fieldstat_free(instance);
+}
+
TEST(test_register, register_many_cubes)
{
struct fieldstat *instance = fieldstat_new();
@@ -109,7 +310,7 @@ TEST(test_register, add_many_tagged_cells)
int cube_id = fieldstat_create_cube(instance, &TEST_SHARED_TAG, 1, SAMPLING_MODE_COMPREHENSIVE, 10);
size_t n_field = 1000;
struct field test_tag_long[n_field];
- for (int i = 0; i < n_field; i++) {
+ for (size_t i = 0; i < n_field; i++) {
test_tag_long[i] = TEST_TAG_INT; // will trigger realloc
}
diff --git a/test/utils.cpp b/test/utils.cpp
index 20475c3..eb14f6f 100644
--- a/test/utils.cpp
+++ b/test/utils.cpp
@@ -343,40 +343,10 @@ int zipf(double alpha, int n)
return(zipf_value);
}
-
-// class SpreadSketchZipfGenerator {
-// private:
-// const int MAX_DATA = 1000000;
-// std::pair<std::string, std::string> *loadeds;
-// unsigned cursor;
-
-// public:
-// SpreadSketchZipfGenerator(double alpha, int n) {
-
-// }
-
-// struct Flow next() {
-// int r_cursor = cursor % MAX_DATA;
-// struct Flow flow;
-// flow.src_ip = loadeds[r_cursor].first;
-// flow.dst_ip = loadeds[r_cursor].second;
-
-// cursor++;
-
-// return flow;
-// }
-
-// ~SpreadSketchZipfGenerator() {
-// delete[] loadeds;
-// }
-
-// double _alpha;
-// int _n;
-// };
-
SpreadSketchZipfGenerator::SpreadSketchZipfGenerator(double alpha, int n) {
_alpha = alpha;
_n = n;
+ cursor = 0;
// generate data and write them to file
std::string filename = "zipf_" + std::to_string(alpha) + "_" + std::to_string(n) + ".txt";
@@ -411,28 +381,28 @@ SpreadSketchZipfGenerator::SpreadSketchZipfGenerator(double alpha, int n) {
return;
}
- loadeds = new std::pair<std::string, std::string>[MAX_DATA];
+ loadeds = new std::vector<std::pair<std::string, std::string>>;
std::string line;
- int i = 0;
- while (std::getline(file, line) && i < MAX_DATA) {
+ while (std::getline(file, line)) {
std::istringstream iss(line);
std::string src_ip, dst_ip;
iss >> src_ip >> dst_ip;
- loadeds[i] = std::make_pair(src_ip, dst_ip);
- i++;
+ loadeds->push_back(std::make_pair(src_ip, dst_ip));
}
file.close();
+
}
SpreadSketchZipfGenerator::~SpreadSketchZipfGenerator() {
- delete[] loadeds;
+ delete loadeds;
}
struct Flow SpreadSketchZipfGenerator::next() {
- int r_cursor = cursor % MAX_DATA;
+ int r_cursor = cursor % loadeds->size();
struct Flow flow;
- flow.src_ip = loadeds[r_cursor].first;
- flow.dst_ip = loadeds[r_cursor].second;
+
+ flow.src_ip = loadeds->at(r_cursor).first;
+ flow.dst_ip = loadeds->at(r_cursor).second;
cursor++;
diff --git a/test/utils.hpp b/test/utils.hpp
index 84f5e09..ce73db0 100644
--- a/test/utils.hpp
+++ b/test/utils.hpp
@@ -55,7 +55,7 @@ struct Flow {
class SpreadSketchZipfGenerator {
private:
const int MAX_DATA = 1000000;
- std::pair<std::string, std::string> *loadeds;
+ std::vector<std::pair<std::string, std::string>> *loadeds;
unsigned cursor;
public: