diff options
| author | chenzizhan <[email protected]> | 2024-07-02 10:42:04 +0800 |
|---|---|---|
| committer | chenzizhan <[email protected]> | 2024-07-02 10:42:04 +0800 |
| commit | 46935eec3e986fd0f392b9c6356a52e2a865feef (patch) | |
| tree | cabdc068f829b969ebda6218cd49b6731b8e3559 | |
| parent | 088c83fb9febe0aa7e7dbfa03d5d797f859db21a (diff) | |
dynamic composed string as cell key
| -rw-r--r-- | ctest/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | include/fieldstat/fieldstat.h | 5 | ||||
| -rw-r--r-- | src/fieldstat.c | 116 | ||||
| -rw-r--r-- | src/metrics/metric.c | 10 | ||||
| -rw-r--r-- | src/metrics/metric.h | 2 | ||||
| -rw-r--r-- | src/tags/heavy_keeper.c | 135 | ||||
| -rw-r--r-- | src/tags/heavy_keeper.h | 20 | ||||
| -rw-r--r-- | src/tags/my_ut_hash.h | 2 | ||||
| -rw-r--r-- | src/tags/tag_map.c | 41 | ||||
| -rw-r--r-- | src/tags/tag_map.h | 4 | ||||
| -rw-r--r-- | test/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | test/test_register_and_reset.cpp | 86 | ||||
| -rw-r--r-- | test/unit_test_cells.cpp | 442 |
13 files changed, 279 insertions, 587 deletions
diff --git a/ctest/CMakeLists.txt b/ctest/CMakeLists.txt index 99ba5d7..fd1077d 100644 --- a/ctest/CMakeLists.txt +++ b/ctest/CMakeLists.txt @@ -14,7 +14,6 @@ add_test(NAME COPY_GTEST_METRIC_COUNTER_BINARY COMMAND sh -c "cp ${CMAKE_BINARY_ add_test(NAME COPY_GTEST_METRIC_HISTOGRAM_BINARY COMMAND sh -c "cp ${CMAKE_BINARY_DIR}/test/test_metric_histogram ${CMAKE_BINARY_DIR}/testing/") add_test(NAME COPY_GTEST_METRIC_HLL_BINARY COMMAND sh -c "cp ${CMAKE_BINARY_DIR}/test/test_metric_hll ${CMAKE_BINARY_DIR}/testing/") add_test(NAME COPY_GTEST_REGISTER_AND_RESET_BINARY COMMAND sh -c "cp ${CMAKE_BINARY_DIR}/test/test_register_and_reset ${CMAKE_BINARY_DIR}/testing/") -add_test(NAME COPY_GTEST_UNIT_TEST_CELL_MANAGER_BINARY COMMAND sh -c "cp ${CMAKE_BINARY_DIR}/test/unit_test_cells ${CMAKE_BINARY_DIR}/testing/") add_test(NAME COPY_GTEST_WRITE_JSON_FILE_BINARY COMMAND sh -c "cp ${CMAKE_BINARY_DIR}/test/test_write_json_file ${CMAKE_BINARY_DIR}/testing/") add_test(NAME CHMOD_UNITTEST COMMAND sh -c "chmod 0755 ${CMAKE_SOURCE_DIR}/test/test_fieldstat_exporter.py; cp ${CMAKE_SOURCE_DIR}/test/test_fieldstat_exporter.py ${CMAKE_BINARY_DIR}/testing/test_fieldstat_exporter") @@ -28,6 +27,5 @@ add_test(NAME GTEST_METRIC_COUNTER COMMAND test_metric_counter WORKING_DIRECTORY add_test(NAME GTEST_METRIC_HISTOGRAM COMMAND test_metric_histogram WORKING_DIRECTORY ${GTEST_RUN_DIR}) add_test(NAME GTEST_METRIC_HLL COMMAND test_metric_hll WORKING_DIRECTORY ${GTEST_RUN_DIR}) add_test(NAME GTEST_REGISTER_AND_RESET COMMAND test_register_and_reset WORKING_DIRECTORY ${GTEST_RUN_DIR}) -add_test(NAME GTEST_UNIT_TEST_CELL_MANAGER COMMAND unit_test_cells WORKING_DIRECTORY ${GTEST_RUN_DIR}) add_test(NAME GTEST_WRITE_JSON_FILE COMMAND test_write_json_file WORKING_DIRECTORY ${GTEST_RUN_DIR}) add_test(NAME GTEST_PYTHON_EXPORTER COMMAND test_fieldstat_exporter WORKING_DIRECTORY ${GTEST_RUN_DIR}) diff --git a/include/fieldstat/fieldstat.h b/include/fieldstat/fieldstat.h index a4b58e2..0fd42b1 100644 --- a/include/fieldstat/fieldstat.h +++ b/include/fieldstat/fieldstat.h @@ -141,6 +141,9 @@ int fieldstat_counter_set(struct fieldstat *instance, int cube_id, int metric_id */ int fieldstat_hll_add(struct fieldstat *instance, int cube_id, int metric_id, const struct fieldstat_tag *tags, size_t n_tag, const char *key, size_t key_len); +// TODO: 增加: +// int fieldstat_hll_add(struct fieldstat *instance, int cube_id, int metric_id, const struct fieldstat_tag *tags, size_t n_tag, const struct fieldstat_tag *tags, size_t n_tag); + /* * @brief Add a value to the histogram metric of cell_id. Histogram will record the distribution of the values. The value bigger than highest_trackable_value will be set to highest_trackable_value. The value less than lowest_trackable_value will be tried to record, and, if succeed, remains in the record as -inf(most of the time) or 0(if value == 0) @@ -148,7 +151,7 @@ int fieldstat_hll_add(struct fieldstat *instance, int cube_id, int metric_id, co * @return FS_OK if success. FS_ERR_NULL_HANDLER, FS_ERR_INVALID_CUBE_ID, FS_ERR_INVALID_METRIC_ID if fail. * FS_ERR_INVALID_PARAM when value is less than 0, or the cube is topk. */ -int fieldstat_hist_record(struct fieldstat *instance, int cube_id, int metric_id, const struct fieldstat_tag *tags, size_t n_tag, long long value); +int fieldstat_hist_record(struct fieldstat *instance, int cube_id, int metric_id, const struct fieldstat_tag *tags, size_t n_tag, long long value); // TODO: 看看能不能改成histogram(重命名)。 /* * @brief Delete all the cells, also the content of every metrics. The cube and metrics are not deleted. Increase cell_version by 1. diff --git a/src/fieldstat.c b/src/fieldstat.c index d01ea6d..eac8e4c 100644 --- a/src/fieldstat.c +++ b/src/fieldstat.c @@ -12,34 +12,35 @@ #include "cell_manager.h" #include "heavy_keeper.h" #include "tag_map.h" +#include "my_ut_hash.h" #define DEFAULT_N_METRIC 64 #define DEFAULT_N_CUBE 128 -struct exdata { +struct exdata { // TODO: 重命名成cell struct metric **metrics; - size_t valid_metric_arr_len; + size_t valid_metric_arr_len; // TODO: 单纯就是每次超过就realloc,使用UTARRAY size_t max_n_metric; - struct fieldstat_tag_list tags; - - const struct metric_name_id_map *metric_reference; // used in merge, point to fieldstat->metric_name_id_map + struct fieldstat_tag_list tags; // cell identifier }; union cell_manager { struct heavy_keeper *topk; struct tag_map *comprehensive; -}; -struct fs_cube { +}; // todo: 移到cube里了, 名字叫cell tabel + +struct fs_cube { // TODO rename 到 cube enum sampling_mode sampling_mode; union cell_manager cells; size_t max_n_cell; // the key of cube is the combination of shared tags - struct fieldstat_tag *shared_tags; + struct fieldstat_tag *shared_tags; // TODO: CUBE IDENTIFIER size_t n_shared_tags; - struct tag_hash_key *key_tag; + struct tag_hash_key *key_tag; // TODO: 删了它 + // const char *key_tag; int primary_metric_id; }; @@ -69,7 +70,7 @@ struct fieldstat { struct metric **metric_masters; size_t n_metric_master; - size_t max_n_metric_master; + size_t max_n_metric_master; // TODO: 学习用ut array struct metric_name_id_map *metric_name_id_map; struct cube_manager *shared_tag_cube_manager; @@ -184,8 +185,6 @@ struct exdata *exdata_new(const struct exdata_new_args *args) { pthis->tags.n_tag = args->n_tags; pthis->tags.tag = malloc(sizeof(struct fieldstat_tag) * args->n_tags); tag_array_copy(pthis->tags.tag, args->tags, args->n_tags); - - pthis->metric_reference = NULL; return pthis; } @@ -221,8 +220,6 @@ struct exdata *exdata_copy(const struct exdata *src) { pthis->tags.tag = malloc(sizeof(struct fieldstat_tag) * src->tags.n_tag); tag_array_copy(pthis->tags.tag, src->tags.tag, src->tags.n_tag); - pthis->metric_reference = NULL; - return pthis; } @@ -691,15 +688,15 @@ int check_before_add(const struct fieldstat *instance, int cube_id, int metric_i return FS_OK; } -struct exdata *find_or_add_exdata_comprehensive(struct tag_map *comprehensive, const struct tag_hash_key *tag_key, struct exdata_new_args *args) +struct exdata *find_or_add_exdata_comprehensive(struct tag_map *comprehensive, const char *key, size_t key_len, struct exdata_new_args *args) { - struct exdata *cell_data = tag_map_get0_exdata(comprehensive, tag_key); + struct exdata *cell_data = tag_map_get0_exdata(comprehensive, key, key_len); if (cell_data == NULL) { - int tmp_ret = tag_map_add(comprehensive, tag_key, args); + int tmp_ret = tag_map_add(comprehensive, key, key_len, args); if (tmp_ret != 1) { return NULL; } - cell_data = tag_map_get0_exdata(comprehensive, tag_key); + cell_data = tag_map_get0_exdata(comprehensive, key, key_len); } return cell_data; } @@ -712,45 +709,52 @@ int fieldstat_counter_incrby(struct fieldstat *instance, int cube_id, int metric } const struct fs_cube *cube = instance->cube[cube_id]; - struct tag_hash_key tag_key; - tag_hash_key_init_with_fieldstat_tag(&tag_key, tags, n_tag, false); + + char *tag_in_string; + size_t tag_len; + build_dynamic_cell_key(tags, n_tag, &tag_in_string, &tag_len); struct exdata_new_args args; args.tags = tags; args.n_tags = n_tag; - struct exdata *cell_data; + struct exdata *cell_data = NULL; switch (cube->sampling_mode) { case SAMPLING_MODE_TOPK: if (cube->primary_metric_id != metric_id) { - cell_data = heavy_keeper_get0_exdata(cube->cells.topk, &tag_key); + cell_data = heavy_keeper_get0_exdata(cube->cells.topk, tag_in_string, tag_len); if (cell_data == NULL) { - int tmp_ret = heavy_keeper_add(cube->cells.topk, &tag_key, 0, &args); + int tmp_ret = heavy_keeper_add(cube->cells.topk, tag_in_string, tag_len, 0, &args); if (tmp_ret != 1) { + free(tag_in_string); return FS_ERR_TOO_MANY_CELLS; } - cell_data = heavy_keeper_get0_exdata(cube->cells.topk, &tag_key); + cell_data = heavy_keeper_get0_exdata(cube->cells.topk, tag_in_string, tag_len); } } else { if (increment < 0) { + free(tag_in_string); return FS_ERR_INVALID_PARAM; } else if (increment == 0) { + free(tag_in_string); return FS_OK; } // heavy_keeper_add should be called anyway, to let the topk record update. - int tmp_ret = heavy_keeper_add(cube->cells.topk, &tag_key, increment, &args); + int tmp_ret = heavy_keeper_add(cube->cells.topk, tag_in_string, tag_len, increment, &args); if (tmp_ret != 1) { + free(tag_in_string); return FS_ERR_TOO_MANY_CELLS; } - cell_data = heavy_keeper_get0_exdata(cube->cells.topk, &tag_key); + cell_data = heavy_keeper_get0_exdata(cube->cells.topk, tag_in_string, tag_len); } break; case SAMPLING_MODE_COMPREHENSIVE: - cell_data = find_or_add_exdata_comprehensive(cube->cells.comprehensive, &tag_key, &args); + cell_data = find_or_add_exdata_comprehensive(cube->cells.comprehensive, tag_in_string, tag_len, &args); if (cell_data == NULL) { + free(tag_in_string); return FS_ERR_TOO_MANY_CELLS; } break; @@ -762,10 +766,10 @@ int fieldstat_counter_incrby(struct fieldstat *instance, int cube_id, int metric struct metric *metric = construct_or_find_metric_to_exdata(instance, cell_data, metric_id); metric_counter_incrby(metric, increment); + free(tag_in_string); return FS_OK; } - int fieldstat_counter_set(struct fieldstat *instance, int cube_id, int metric_id, const struct fieldstat_tag *tags, size_t n_tag, long long value) { int ret = check_before_add(instance, cube_id, metric_id, METRIC_TYPE_COUNTER); @@ -773,8 +777,9 @@ int fieldstat_counter_set(struct fieldstat *instance, int cube_id, int metric_id return ret; } const struct fs_cube *cube = instance->cube[cube_id]; - struct tag_hash_key tag_key; - tag_hash_key_init_with_fieldstat_tag(&tag_key, tags, n_tag, false); + char *tag_in_string; + size_t tag_len; + build_dynamic_cell_key(tags, n_tag, &tag_in_string, &tag_len); struct exdata_new_args args; args.tags = tags; @@ -785,17 +790,18 @@ int fieldstat_counter_set(struct fieldstat *instance, int cube_id, int metric_id { case SAMPLING_MODE_TOPK: { if (cube->primary_metric_id != metric_id) { - cell_data = heavy_keeper_get0_exdata(cube->cells.topk, &tag_key); + cell_data = heavy_keeper_get0_exdata(cube->cells.topk, tag_in_string, tag_len); if (cell_data == NULL) { - int tmp_ret = heavy_keeper_add(cube->cells.topk, &tag_key, 0, &args); + int tmp_ret = heavy_keeper_add(cube->cells.topk, tag_in_string, tag_len, 0, &args); if (tmp_ret != 1) { + free(tag_in_string); return FS_ERR_TOO_MANY_CELLS; } - cell_data = heavy_keeper_get0_exdata(cube->cells.topk, &tag_key); + cell_data = heavy_keeper_get0_exdata(cube->cells.topk, tag_in_string, tag_len); } } else { long long current_count = 0; - cell_data = heavy_keeper_get0_exdata(cube->cells.topk, &tag_key); + cell_data = heavy_keeper_get0_exdata(cube->cells.topk, tag_in_string, tag_len); if (cell_data != NULL) { const struct metric *tmp_metric = find_metric_in_exdata(cell_data, metric_id); if (tmp_metric != NULL) { @@ -804,20 +810,22 @@ int fieldstat_counter_set(struct fieldstat *instance, int cube_id, int metric_id } long long increment = value - current_count; if (increment < 0) { + free(tag_in_string); return FS_ERR_INVALID_PARAM; } else if (increment == 0) { + free(tag_in_string); return FS_OK; } - int tmp_ret = heavy_keeper_add(cube->cells.topk, &tag_key, increment, &args); + int tmp_ret = heavy_keeper_add(cube->cells.topk, tag_in_string, tag_len, increment, &args); if (tmp_ret != 1) { return FS_ERR_TOO_MANY_CELLS; } - cell_data = heavy_keeper_get0_exdata(cube->cells.topk, &tag_key); + cell_data = heavy_keeper_get0_exdata(cube->cells.topk, tag_in_string, tag_len); } break;} case SAMPLING_MODE_COMPREHENSIVE: { - cell_data = find_or_add_exdata_comprehensive(cube->cells.comprehensive, &tag_key, &args); + cell_data = find_or_add_exdata_comprehensive(cube->cells.comprehensive, tag_in_string, tag_len, &args); break;} default: assert(0); @@ -825,6 +833,7 @@ int fieldstat_counter_set(struct fieldstat *instance, int cube_id, int metric_id } assert(cell_data != NULL); // to mute the warning struct metric *metric = construct_or_find_metric_to_exdata(instance, cell_data, metric_id); + free(tag_in_string); metric_counter_set(metric, value); return FS_OK; } @@ -839,17 +848,20 @@ int fieldstat_hll_add(struct fieldstat *instance, int cube_id, int metric_id, co return FS_ERR_INVALID_PARAM; } - struct tag_hash_key tag_key; - tag_hash_key_init_with_fieldstat_tag(&tag_key, tags, n_tag, false); + char *tag_in_string; + size_t tag_len; + build_dynamic_cell_key(tags, n_tag, &tag_in_string, &tag_len); struct exdata_new_args args; args.tags = tags; args.n_tags = n_tag; - struct exdata *cell_data = find_or_add_exdata_comprehensive(instance->cube[cube_id]->cells.comprehensive, &tag_key, &args); + struct exdata *cell_data = find_or_add_exdata_comprehensive(instance->cube[cube_id]->cells.comprehensive, tag_in_string, tag_len, &args); struct metric *metric = construct_or_find_metric_to_exdata(instance, cell_data, metric_id); metric_hll_add(metric, key, key_len); + free(tag_in_string); + return 0; } @@ -863,14 +875,17 @@ int fieldstat_hist_record(struct fieldstat *instance, int cube_id, int metric_id return FS_ERR_INVALID_PARAM; } - struct tag_hash_key tag_key; - tag_hash_key_init_with_fieldstat_tag(&tag_key, tags, n_tag, false); + char *tag_in_string; + size_t tag_len; + build_dynamic_cell_key(tags, n_tag, &tag_in_string, &tag_len); struct exdata_new_args args; args.tags = tags; args.n_tags = n_tag; - struct exdata *cell_data = find_or_add_exdata_comprehensive(instance->cube[cube_id]->cells.comprehensive, &tag_key, &args); + struct exdata *cell_data = find_or_add_exdata_comprehensive(instance->cube[cube_id]->cells.comprehensive, tag_in_string, tag_len, &args); + + free(tag_in_string); // // metric_histogram_record may fail, unlike the other add functions. struct metric *metric = find_metric_in_exdata(cell_data, metric_id); @@ -1204,19 +1219,26 @@ struct fieldstat_tag_list *fieldstat_get_shared_tags(const struct fieldstat *ins const struct exdata *get_exdata_by_tag_list(const struct fs_cube *cube, const struct fieldstat_tag_list *tags) { - struct tag_hash_key tag_key; - tag_hash_key_init_with_fieldstat_tag(&tag_key, tags->tag, tags->n_tag, false); + const struct exdata *ret = NULL; + char *tag_in_string; + size_t tag_len; + build_dynamic_cell_key(tags->tag, tags->n_tag, &tag_in_string, &tag_len); switch (cube->sampling_mode) { case SAMPLING_MODE_TOPK: - return heavy_keeper_get0_exdata(cube->cells.topk, &tag_key); + ret = heavy_keeper_get0_exdata(cube->cells.topk, tag_in_string, tag_len); + break; case SAMPLING_MODE_COMPREHENSIVE: - return tag_map_get0_exdata(cube->cells.comprehensive, &tag_key); + ret = tag_map_get0_exdata(cube->cells.comprehensive, tag_in_string, tag_len); + break; default: assert(0); return NULL; } + free(tag_in_string); + + return ret; } const struct metric *get_metric_by_tag_list(const struct fieldstat *instance, int cube_id, const struct fieldstat_tag_list *tags, int metric_id,int *ret) diff --git a/src/metrics/metric.c b/src/metrics/metric.c index e85f548..b58e229 100644 --- a/src/metrics/metric.c +++ b/src/metrics/metric.c @@ -34,7 +34,7 @@ struct metric_parameter { }; }; -struct metric_info { +struct metric_info { // TODO: 别用info 这个词,用manifest。另外新建一个这样的结构体。 int id; char *name; struct metric_parameter *paras; @@ -50,7 +50,7 @@ typedef struct metric_measure_data * (*metric_func_deserialize)(const char *, si typedef void (*metric_func_reset)(struct metric_measure_data *); struct metric_scheme { metric_func_new new; - metric_func_del del; + metric_func_del del; // TODO: del -> free metric_func_merge merge; metric_func_copy copy; metric_func_serialize serialize; @@ -58,7 +58,7 @@ struct metric_scheme { metric_func_reset reset; }; -struct metric_measure_data { +struct metric_measure_data { // TODO: 改成data 就行 union { struct metric_counter_or_gauge *counter; struct ST_hyperloglog *hll; @@ -69,7 +69,7 @@ struct metric_measure_data { struct metric { const struct metric_scheme *scheme; struct metric_info *info; - struct metric_measure_data *data_array; + struct metric_measure_data *data_array; // todo: 重命名,data }; /* -------------------------------------------------------------------------- */ @@ -312,7 +312,7 @@ struct metric *metric_new(const char *name, enum metric_type type, struct metric { struct metric *pthis = (struct metric *)calloc(1, sizeof(struct metric)); pthis->info = metric_info_new(name, type, para); - pthis->scheme = &g_metric_scheme_table[type]; + pthis->scheme = &g_metric_scheme_table[type]; // todo: 改成Switch type pthis->data_array = NULL; return pthis; diff --git a/src/metrics/metric.h b/src/metrics/metric.h index 0d84f3d..3e04bd2 100644 --- a/src/metrics/metric.h +++ b/src/metrics/metric.h @@ -6,6 +6,8 @@ struct metric; +// TODO: 删掉metric,单纯提供一个metric_manifest 类,直接把定义写进来,由fieldstat 和 cube 共同依赖。 + const char *metric_get_name(const struct metric *metrics); // don't free the return value enum metric_type metric_get_type(const struct metric *metrics); void metric_free(struct metric *pthis); diff --git a/src/tags/heavy_keeper.c b/src/tags/heavy_keeper.c index b932ce1..1fd09b8 100644 --- a/src/tags/heavy_keeper.c +++ b/src/tags/heavy_keeper.c @@ -12,16 +12,18 @@ #include "minheap/heap.h" #include "mpack/mpack.h" +#include "uthash.h" #include "xxhash/xxhash.h" -#include "my_ut_hash_inner.h" #include "exdata.h" #define FP_HASH_KEY 0 #define INVALID_COUNT UINT64_MAX +#define MAX(a, b) (((a) > (b)) ? (a) : (b)) struct entry_data { // the value constitute of a sorted set entry - struct tag_hash_key *key; + char *key; + size_t key_len; void *exdata; }; @@ -40,7 +42,8 @@ struct sorted_set { }; struct hash_node { - struct tag_hash_key *key; // value is equal to entry_data::key(point to the same memory) + const char *key; // value is equal to entry_data::key(point to the same memory) + size_t key_len; heap_entry *val; UT_hash_handle hh; }; @@ -97,11 +100,18 @@ static void *default_copy_fn(void *exdata) { return exdata; } -struct entry_data *entry_data_construct(const struct tag_hash_key *tag, void *exdata) +static char *my_key_dup(const char *key, size_t key_len) { + char *ret = (char *)malloc(key_len); + memcpy(ret, key, key_len); + return ret; +} + +struct entry_data *entry_data_construct(const char *key, size_t key_len, void *exdata) { struct entry_data *entry_data = (struct entry_data *)malloc(sizeof(struct entry_data)); - entry_data->key = tag_hash_key_copy(tag); + entry_data->key = my_key_dup(key, key_len); + entry_data->key_len = key_len; entry_data->exdata = exdata; return entry_data; } @@ -111,7 +121,7 @@ void entry_data_destroy(struct entry_data *entry_data, exdata_free_cb free_fn) if (entry_data == NULL) { return; } - tag_hash_key_free(entry_data->key); + free(entry_data->key); free_fn(entry_data->exdata); free(entry_data); } @@ -222,11 +232,11 @@ void sorted_set_free(struct sorted_set *ss) free(ss); } -heap_entry *sorted_set_find_entry(const struct sorted_set *ss, const struct tag_hash_key *tag) +heap_entry *sorted_set_find_entry(const struct sorted_set *ss, const char *key, size_t key_len) { struct hash_node *hash_tbl = ss->hash_tbl; struct hash_node *s = NULL; - HASH_FIND_TAG(hash_tbl, tag, s); + HASH_FIND(hh, hash_tbl, key, key_len, s); if (s == NULL) { return NULL; @@ -251,7 +261,7 @@ int sorted_set_pop(struct sorted_set *ss) struct entry_data *tmp_data = sorted_set_entry_get_data(entry); struct hash_node *s = NULL; - HASH_FIND_TAG(hash_tbl, tmp_data->key, s); + HASH_FIND(hh, hash_tbl, tmp_data->key, tmp_data->key_len, s); assert(s!=NULL); HASH_DEL(hash_tbl, s); @@ -293,12 +303,12 @@ unsigned long long sorted_set_get_min_count(const struct sorted_set *ss) return count - 1; // sorted set will let the count start from 1, 0 for dying entry. } -void sorted_set_insert_to_available_heap(struct sorted_set *ss, const struct tag_hash_key *tag, unsigned long long cnt, void *exdata) +void sorted_set_insert_to_available_heap(struct sorted_set *ss, const char *key, size_t key_len, unsigned long long cnt, void *exdata) { cnt = safe_add(cnt, 1); // sorted set will let the count start from 1, 0 for dying entry. unsigned long long *tmp_cnt = (unsigned long long*)malloc(sizeof(unsigned long long)); *tmp_cnt = cnt; - struct entry_data *tmp_data = entry_data_construct(tag, exdata); + struct entry_data *tmp_data = entry_data_construct(key, key_len, exdata); heap_entry *node = (heap_entry *)malloc(sizeof(heap_entry)); node->key = tmp_cnt; node->value = tmp_data; @@ -308,17 +318,18 @@ void sorted_set_insert_to_available_heap(struct sorted_set *ss, const struct tag struct hash_node *hash_tbl = ss->hash_tbl; struct hash_node *s = (struct hash_node *)malloc(sizeof(struct hash_node)); s->key = sorted_set_entry_get_data(node)->key; + s->key_len = key_len; s->val = node; - HASH_ADD_TAG(hash_tbl, key, s); + HASH_ADD_KEYPTR(hh, hash_tbl, s->key, key_len, s); ss->hash_tbl = hash_tbl; ss->n_living_entry++; } -int sorted_set_insert(struct sorted_set *ss, const struct tag_hash_key *tag, unsigned long long cnt, void *args) +int sorted_set_insert(struct sorted_set *ss, const char *key, size_t key_len, unsigned long long cnt, void *args) { // if there is a dying record, reborn it to use. - heap_entry *entry = sorted_set_find_entry(ss, tag); + heap_entry *entry = sorted_set_find_entry(ss, key, key_len); if (entry != NULL) { if (!sorted_set_entry_dying(entry)) { assert(0); @@ -338,7 +349,7 @@ int sorted_set_insert(struct sorted_set *ss, const struct tag_hash_key *tag, uns int ret = sorted_set_pop(ss); assert(ret != -1); } - sorted_set_insert_to_available_heap(ss, tag, cnt, ss->new_fn(args)); + sorted_set_insert_to_available_heap(ss, key, key_len, cnt, ss->new_fn(args)); return 1; } @@ -347,12 +358,12 @@ int sorted_set_cardinality(const struct sorted_set *ss) return ss->n_living_entry; } -unsigned long long sorted_set_get_count(const struct sorted_set *ss, const struct tag_hash_key *tag) +unsigned long long sorted_set_get_count(const struct sorted_set *ss, const char *key, size_t key_len) { if (sorted_set_cardinality(ss) == 0) { return INVALID_COUNT; } - const heap_entry *entry = sorted_set_find_entry(ss, tag); + const heap_entry *entry = sorted_set_find_entry(ss, key, key_len); if (entry == NULL) { return INVALID_COUNT; } @@ -363,9 +374,9 @@ unsigned long long sorted_set_get_count(const struct sorted_set *ss, const struc return sorted_set_entry_get_count(entry); } -int sorted_set_incrby(struct sorted_set *ss, const struct tag_hash_key *tag, unsigned long long count) +int sorted_set_incrby(struct sorted_set *ss, const char *key, size_t key_len, unsigned long long count) { - heap_entry *entry = sorted_set_find_entry(ss, tag); + heap_entry *entry = sorted_set_find_entry(ss, key, key_len); if (entry == NULL) { return -1; } @@ -399,7 +410,7 @@ int cmp_tmp_heap_node(const void *a, const void *b) } } -void sorted_set_dump(const struct sorted_set *ss, struct tag_hash_key **tags_out, void **exdata_out) +void sorted_set_dump(const struct sorted_set *ss, char **key_out, size_t *key_len_out, void **exdata_out) { struct minheap *h = ss->heap; tmp_heap_node *tmp_nodes = (tmp_heap_node *)malloc(sizeof(tmp_heap_node) * h->cur_size); @@ -417,7 +428,8 @@ void sorted_set_dump(const struct sorted_set *ss, struct tag_hash_key **tags_out assert(n_living_entry == ss->n_living_entry); qsort(tmp_nodes, n_living_entry, sizeof(tmp_heap_node), cmp_tmp_heap_node); for (int i = 0; i < n_living_entry; i++) { - tags_out[i] = tmp_nodes[i].val->key; + key_out[i] = tmp_nodes[i].val->key; + key_len_out[i] = tmp_nodes[i].val->key_len; exdata_out[i] = tmp_nodes[i].val->exdata; } free(tmp_nodes); @@ -434,7 +446,7 @@ struct sorted_set *sorted_set_copy(const struct sorted_set *ss) continue; } const struct entry_data *data = sorted_set_entry_get_data(entry); - sorted_set_insert_to_available_heap(ret, data->key, sorted_set_entry_get_count(entry), ss->copy_fn(data->exdata)); + sorted_set_insert_to_available_heap(ret, data->key, data->key_len, sorted_set_entry_get_count(entry), ss->copy_fn(data->exdata)); } return ret; @@ -556,15 +568,20 @@ bool if_need_to_decay(struct heavy_keeper *hk, const struct Bucket *bucket, unsi return false; } -struct Bucket *map_flow_id_hash_to_bucket(struct heavy_keeper *hk, int array_index, const struct tag_hash_key *tag) { +unsigned int cal_hash_val_with_seed(const char *key, size_t key_len, unsigned int seed) { + return XXH3_64bits_withSeed(key, key_len, seed); +} + +struct Bucket *find_bucket_by_key(struct heavy_keeper *hk, int array_index, const char *key, size_t key_len) { int w = hk->params.max_bucket_num; - int Hsh = tag_hash_key_cal_hash_val(tag, array_index + 1) % w; // +1: the let any row do not use FP as hashing key directly. + unsigned int Hsh = cal_hash_val_with_seed(key, key_len, array_index + 1) % w; // +1: the let any row do not use FP as hashing key directly. // Otherwise, when different key has the same FP(in 2^-64 probability), they will also in the same bucket in the row #0 and hash collision will happen more severely. return &(hk->sketch[array_index * w + Hsh]); } + unsigned my_max(unsigned a, unsigned b) { if (a > b) { return a; @@ -575,10 +592,10 @@ unsigned my_max(unsigned a, unsigned b) { /* 1 for newly add something. 0 for not add. -1 for unexpected cases. */ -int heavy_keeper_add(struct heavy_keeper *heavy_keeper, const struct tag_hash_key *tag, int count, void *arg) { +int heavy_keeper_add(struct heavy_keeper *heavy_keeper, const char *key, size_t key_len, int count, void *arg) { if (count == 0) { if (sorted_set_cardinality(heavy_keeper->top_K_heap) < heavy_keeper->K) { - sorted_set_insert(heavy_keeper->top_K_heap, tag, count, arg); + sorted_set_insert(heavy_keeper->top_K_heap, key, key_len, count, arg); return 1; } return 0; @@ -589,13 +606,13 @@ int heavy_keeper_add(struct heavy_keeper *heavy_keeper, const struct tag_hash_ke if (nMin == INVALID_COUNT) { nMin = 0; } - unsigned long long int old_cnt = sorted_set_get_count(summary, tag); + unsigned long long int old_cnt = sorted_set_get_count(summary, key, key_len); bool not_in_sorted_set = (old_cnt == INVALID_COUNT); unsigned long long maxv = 0; - unsigned int FP = tag_hash_key_cal_hash_val(tag, FP_HASH_KEY); + unsigned int FP = cal_hash_val_with_seed(key, key_len, FP_HASH_KEY); for (int j = 0; j < heavy_keeper->params.array_num; j++) { - struct Bucket *bucket = map_flow_id_hash_to_bucket(heavy_keeper, j, tag); + struct Bucket *bucket = find_bucket_by_key(heavy_keeper, j, key, key_len); if (bucket->finger_print == FP) { // If a flow is not in the min-heap, then the estimated flow size should be no larger than nmin. @@ -608,7 +625,7 @@ int heavy_keeper_add(struct heavy_keeper *heavy_keeper, const struct tag_hash_ke continue; } bucket->count = safe_add(bucket->count, count); - maxv = my_max(maxv, bucket->count); + maxv = MAX(maxv, bucket->count); } else { if (!if_need_to_decay(heavy_keeper, bucket, count)) { continue; @@ -618,7 +635,7 @@ int heavy_keeper_add(struct heavy_keeper *heavy_keeper, const struct tag_hash_ke bucket->finger_print = FP; bucket->count = count; - maxv = my_max(maxv, count); + maxv = MAX(maxv, count); } else { bucket->count -= count; } @@ -627,14 +644,13 @@ int heavy_keeper_add(struct heavy_keeper *heavy_keeper, const struct tag_hash_ke if (not_in_sorted_set) { if ((maxv - nMin <= count && maxv != nMin) || sorted_set_cardinality(summary) != heavy_keeper->K) { - int insert_ret = sorted_set_insert(summary, tag, maxv, arg); - assert(insert_ret != -1 && insert_ret != 0); + sorted_set_insert(summary, key, key_len, maxv, arg); return 1; } return 0; } else { if (maxv > old_cnt) { - sorted_set_incrby(summary, tag, maxv - old_cnt); + sorted_set_incrby(summary, key, key_len, maxv - old_cnt); } return 1; // no popped, but the tag definitely exists in the sorted set } @@ -651,8 +667,8 @@ int heavy_keeper_set_exdata_schema(struct heavy_keeper *hk, exdata_new_cb new_fn return 0; } -void *heavy_keeper_get0_exdata(const struct heavy_keeper *hk, const struct tag_hash_key *key) { - const heap_entry *entry = sorted_set_find_entry(hk->top_K_heap, key); +void *heavy_keeper_get0_exdata(const struct heavy_keeper *hk, const char *key, size_t key_len) { + const heap_entry *entry = sorted_set_find_entry(hk->top_K_heap, key, key_len); if (entry == NULL || sorted_set_entry_dying(entry)) { return NULL; } @@ -673,10 +689,12 @@ void heavy_keeper_list(const struct heavy_keeper *hk, void ***exdatas, size_t *n void **exdatas_ret = (void **)malloc(sizeof(void *) * (*n_exdatas)); *exdatas = exdatas_ret; - struct tag_hash_key **tags = (struct tag_hash_key **)malloc(sizeof(struct tag_hash_key *) * hk->K); - sorted_set_dump(hk->top_K_heap, tags, exdatas_ret); + char **keys_dummy = (char **)malloc(sizeof(char *) * (*n_exdatas)); + size_t *key_lens_dummy = (size_t *)malloc(sizeof(size_t) * (*n_exdatas)); + sorted_set_dump(hk->top_K_heap, keys_dummy, key_lens_dummy, exdatas_ret); - free(tags); + free(keys_dummy); + free(key_lens_dummy); } static void heavy_keeper_merge_sketch(struct heavy_keeper *dest, const struct heavy_keeper *src) { @@ -700,17 +718,16 @@ static void heavy_keeper_merge_sketch(struct heavy_keeper *dest, const struct he } } - -unsigned long long find_maxv_in_sketch(struct heavy_keeper *hk, const struct tag_hash_key *tag) { +unsigned long long find_maxv_in_sketch(struct heavy_keeper *hk, const char *key, size_t key_len) { struct Bucket *bucket; - unsigned fp = tag_hash_key_cal_hash_val(tag, FP_HASH_KEY); + unsigned fp = cal_hash_val_with_seed(key, key_len, FP_HASH_KEY); unsigned long long maxv = 0; for (int array_id = 0; array_id < hk->params.array_num; array_id++) { - bucket = map_flow_id_hash_to_bucket(hk, array_id, tag); // hk->sketch is the merge of two. So just check one + bucket = find_bucket_by_key(hk, array_id, key, key_len); // hk->sketch is the merge of two. So just check one if (bucket->finger_print == fp) { - maxv = my_max(maxv, bucket->count); + maxv = MAX(maxv, bucket->count); } } @@ -721,7 +738,7 @@ int cmp_int(const void *a, const void *b) { return *(int *)a - *(int *)b; } -void heavy_keeper_merge(struct heavy_keeper *dest, struct heavy_keeper *src) { +void heavy_keeper_merge(struct heavy_keeper *dest, const struct heavy_keeper *src) { assert(dest->K == src->K); heavy_keeper_merge_sketch(dest, src); @@ -734,25 +751,24 @@ void heavy_keeper_merge(struct heavy_keeper *dest, struct heavy_keeper *src) { int size_src = sorted_set_cardinality(ss_src); int max_size = size_dest > size_src ? size_dest : size_src; - struct tag_hash_key **tag_arr = (struct tag_hash_key **)calloc(max_size, sizeof(struct tag_hash_key *)); void **exdatas_dst = (void **)calloc(size_dest, sizeof(void *)); - sorted_set_dump(ss_dest, tag_arr, exdatas_dst); + char **key_arr = (char **)malloc(sizeof(char *) * max_size); + size_t *key_lens = (size_t *)malloc(sizeof(size_t) * max_size); + sorted_set_dump(ss_dest, key_arr, key_lens, exdatas_dst); /* ------------------------------ merge dest ------------------------------ */ for (int i = 0; i < size_dest; i++) { - unsigned long long maxv = find_maxv_in_sketch(dest, tag_arr[i]); - sorted_set_insert_to_available_heap(new_rec, tag_arr[i], maxv, dest->copy_fn(exdatas_dst[i])); + unsigned long long maxv = find_maxv_in_sketch(dest, key_arr[i], key_lens[i]); + sorted_set_insert_to_available_heap(new_rec, key_arr[i], key_lens[i], maxv, dest->copy_fn(exdatas_dst[i])); } /* ------------------------------ merge src ------------------------------ */ void **exdatas_src = (void **)calloc(size_src, sizeof(void *)); - sorted_set_dump(ss_src, tag_arr, exdatas_src); + sorted_set_dump(ss_src, key_arr, key_lens, exdatas_src); for (int i = 0; i < size_src; i++) { - const struct tag_hash_key *tag_src = tag_arr[i]; - - const heap_entry *entry = sorted_set_find_entry(new_rec, tag_src); + const heap_entry *entry = sorted_set_find_entry(new_rec, key_arr[i], key_lens[i]); if (entry != NULL) { // the tag is in both dest and src, so has been processed in the previous loop. The reason why no need to sum up result is that merged sketch already keeps its summed up count // merge exdata void *exdata_new = sorted_set_entry_get_data(entry)->exdata; @@ -762,20 +778,21 @@ void heavy_keeper_merge(struct heavy_keeper *dest, struct heavy_keeper *src) { continue; } - unsigned long long maxv = find_maxv_in_sketch(dest, tag_src);// the dest heavy keeper has been merged, so the maxv is the maxv in the merged sketch, instead of the one in the dest + unsigned long long maxv = find_maxv_in_sketch(dest, key_arr[i], key_lens[i]);// the dest heavy keeper has been merged, so the maxv is the maxv in the merged sketch, instead of the one in the dest if (sorted_set_check_is_full(new_rec)) { unsigned long long tmp_mincnt = sorted_set_get_min_count(new_rec); if (maxv > tmp_mincnt) { sorted_set_pop(new_rec); // TODO: 如果dest 和 new 共用指针,这里pop 出来以后,dest 的内存好像会变得有点问题。先别纠结这么复杂的事,new 和 dest 使用copy后的data 先试试。 - sorted_set_insert_to_available_heap(new_rec, tag_src, maxv, dest->copy_fn(exdatas_src[i])); + sorted_set_insert_to_available_heap(new_rec, key_arr[i], key_lens[i], maxv, dest->copy_fn(exdatas_src[i])); } } else { - sorted_set_insert_to_available_heap(new_rec, tag_src, maxv, dest->copy_fn(exdatas_src[i])); + sorted_set_insert_to_available_heap(new_rec, key_arr[i], key_lens[i], maxv, dest->copy_fn(exdatas_src[i])); } } - free(tag_arr); + free(key_arr); + free(key_lens); free(exdatas_dst); free(exdatas_src); // dest->top_K_heap->free_fn = default_free_fn; // do not free exdata in the original top_K_heap, because they are either moved to new_rec or freed when popped out when add source to dest. @@ -803,11 +820,11 @@ struct heavy_keeper *heavy_keeper_copy(const struct heavy_keeper *src) { return ret; } -void heavy_keeper_one_point_query(const struct heavy_keeper *hk, const struct tag_hash_key *tag, int *count_out, void **exdata_out) { +void heavy_keeper_one_point_query(const struct heavy_keeper *hk, const char *key, size_t key_len, int *count_out, void **exdata_out) { *count_out = 0; *exdata_out = NULL; - const heap_entry *entry = sorted_set_find_entry(hk->top_K_heap, tag); + const heap_entry *entry = sorted_set_find_entry(hk->top_K_heap, key, key_len); if (entry == NULL) { return; } diff --git a/src/tags/heavy_keeper.h b/src/tags/heavy_keeper.h index 42acc75..4644e48 100644 --- a/src/tags/heavy_keeper.h +++ b/src/tags/heavy_keeper.h @@ -7,7 +7,6 @@ extern "C"{ #endif #include "exdata.h" -#include "my_ut_hash.h" struct heavy_keeper; @@ -25,29 +24,24 @@ void heavy_keeper_free(struct heavy_keeper *hk); */ void heavy_keeper_reset(struct heavy_keeper *hk); -// int heavy_keeper_add(struct heavy_keeper *hk, const char *key, size_t key_len, int count, void *arg); - -int heavy_keeper_add(struct heavy_keeper *hk, const struct tag_hash_key *tag, int count, void *arg); - +int heavy_keeper_add(struct heavy_keeper *hk, const char *key, size_t key_len, int count, void *arg); int heavy_keeper_set_exdata_schema(struct heavy_keeper *hk, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn); -// void *heavy_keeper_get0_exdata(struct heavy_keeper *hk, const char *key, size_t key_len); -void *heavy_keeper_get0_exdata(const struct heavy_keeper *hk, const struct tag_hash_key *tag); +void *heavy_keeper_get0_exdata(const struct heavy_keeper *hk, const char *key, size_t key_len); // get the number of cells in the heavy keeper int heavy_keeper_get_count(const struct heavy_keeper *hk); -void heavy_keeper_list(const struct heavy_keeper *hk, void ***exdatas, size_t *n_exdatas); //use list: void **exdatas, heavy_keeper_list(&exdatas); void *exdata = exdatas[i]; free(exdatas); +// size_t heavy_keeper_list(const struct heavy_keeper *hk, void **exdatas, size_t n_exdatas); //use list: void **exdatas, heavy_keeper_list(&exdatas); void *exdata = exdatas[i]; // TODO: 用户申请exdata 数组 +void heavy_keeper_list(const struct heavy_keeper *hk, void ***exdatas, size_t *n_exdatas); + +void heavy_keeper_merge(struct heavy_keeper *dest, const struct heavy_keeper *src); // TODO: dst 和 dest 统一一下 -void heavy_keeper_merge(struct heavy_keeper *dest, struct heavy_keeper *src); -/* - Equal to heavy_keeper_merge, with empty dest, but much faster. -*/ struct heavy_keeper *heavy_keeper_copy(const struct heavy_keeper *src); // for test -void heavy_keeper_one_point_query(const struct heavy_keeper *hk, const struct tag_hash_key *tag, int *count_out, void **exdata_out); +void heavy_keeper_one_point_query(const struct heavy_keeper *hk, const char *key, size_t key_len, int *count_out, void **exdata_out); #ifdef __cplusplus diff --git a/src/tags/my_ut_hash.h b/src/tags/my_ut_hash.h index 1066569..4830804 100644 --- a/src/tags/my_ut_hash.h +++ b/src/tags/my_ut_hash.h @@ -27,6 +27,8 @@ void fieldtag_list_free(struct fieldstat_tag *tags, size_t n_tags); char *tag_hash_key_get_compound_key(const struct tag_hash_key *tag); int tag_hash_key_cmp(const struct tag_hash_key *a, const struct tag_hash_key *b); +void build_dynamic_cell_key(const struct fieldstat_tag tags[], size_t n_tags, char **out_key, size_t *out_key_size); + #ifdef __cplusplus } #endif
\ No newline at end of file diff --git a/src/tags/tag_map.c b/src/tags/tag_map.c index 5c312fb..55a70a4 100644 --- a/src/tags/tag_map.c +++ b/src/tags/tag_map.c @@ -5,13 +5,15 @@ #include <stdlib.h> #include <string.h> +#include "uthash.h" + #include "fieldstat.h" -#include "my_ut_hash_inner.h" #include "my_ut_hash.h" #include "exdata.h" struct tag_exdata_item { - struct tag_hash_key *tag; + char *key; + size_t key_len; void *exdata; bool dying; UT_hash_handle hh; @@ -66,7 +68,7 @@ void tag_map_free(struct tag_map *pthis) { struct tag_exdata_item *item, *tmp; HASH_ITER(hh, pthis->tag_id_map, item, tmp) { HASH_DEL(pthis->tag_id_map, item); - tag_hash_key_free(item->tag); + free(item->key); pthis->free_fn(item->exdata); free(item); } @@ -82,7 +84,7 @@ void tag_map_reset(struct tag_map *pthis) { continue; } HASH_DEL(pthis->tag_id_map, node); - tag_hash_key_free(node->tag); + free(node->key); pthis->free_fn(node->exdata); free(node); } @@ -90,9 +92,15 @@ void tag_map_reset(struct tag_map *pthis) { pthis->current_cell_num = 0; } -int tag_map_add(struct tag_map *pthis, const struct tag_hash_key *tag, void *arg) { +static char *my_keydup(const char *key, size_t key_len) { + char *ret = calloc(1, key_len + 1); + memcpy(ret, key, key_len); + return ret; +} + +int tag_map_add(struct tag_map *pthis, const char *key, size_t key_len, void *arg) { struct tag_exdata_item *item; - HASH_FIND_TAG(pthis->tag_id_map, tag, item); + HASH_FIND(hh, pthis->tag_id_map, key, key_len, item); if (item != NULL && !item->dying) { return 1; } @@ -109,10 +117,11 @@ int tag_map_add(struct tag_map *pthis, const struct tag_hash_key *tag, void *arg } item = calloc(1, sizeof(struct tag_exdata_item)); - item->tag = tag_hash_key_copy(tag); + item->key = my_keydup(key, key_len); + item->key_len = key_len; item->exdata = pthis->new_fn(arg); item->dying = false; - HASH_ADD_TAG(pthis->tag_id_map, tag, item); + HASH_ADD_KEYPTR(hh, pthis->tag_id_map, item->key, key_len, item); pthis->current_cell_num++; return 1; @@ -126,9 +135,9 @@ void tag_map_set_exdata_schema(struct tag_map *pthis, exdata_new_cb new_fn, exda pthis->copy_fn = copy_fn; } -void *tag_map_get0_exdata(struct tag_map *pthis, const struct tag_hash_key *tag) { +void *tag_map_get0_exdata(struct tag_map *pthis, const char *key, size_t key_len) { struct tag_exdata_item *item; - HASH_FIND_TAG(pthis->tag_id_map, tag, item); + HASH_FIND(hh, pthis->tag_id_map, key, key_len, item); if (item == NULL || item->dying) { return NULL; } @@ -168,7 +177,7 @@ int tag_map_merge(struct tag_map *dest, struct tag_map *src) { continue; } - HASH_FIND_TAG(dest->tag_id_map, item_src->tag, item_dst); + HASH_FIND(hh, dest->tag_id_map, item_src->key, item_src->key_len, item_dst); if (item_dst != NULL && !item_dst->dying) { dest->merge_fn(item_dst->exdata, item_src->exdata); continue; @@ -180,10 +189,11 @@ int tag_map_merge(struct tag_map *dest, struct tag_map *src) { if (item_dst == NULL) { item_dst = calloc(1, sizeof(struct tag_exdata_item)); - item_dst->tag = tag_hash_key_copy(item_src->tag); + item_dst->key = my_keydup(item_src->key, item_src->key_len); + item_dst->key_len = item_src->key_len; item_dst->dying = false; item_dst->exdata = dest->copy_fn(item_src->exdata); - HASH_ADD_TAG(dest->tag_id_map, tag, item_dst); + HASH_ADD_KEYPTR(hh, dest->tag_id_map, item_dst->key, item_dst->key_len, item_dst); dest->current_cell_num++; } else { assert(item_dst->dying); @@ -211,10 +221,11 @@ struct tag_map *tag_map_copy(const struct tag_map *src) { continue; } struct tag_exdata_item *new_item = calloc(1, sizeof(struct tag_exdata_item)); - new_item->tag = tag_hash_key_copy(item->tag); + new_item->key = my_keydup(item->key, item->key_len); + new_item->key_len = item->key_len; new_item->exdata = pthis->copy_fn(item->exdata); new_item->dying = false; - HASH_ADD_TAG(pthis->tag_id_map, tag, new_item); + HASH_ADD_KEYPTR(hh, pthis->tag_id_map, new_item->key, new_item->key_len, new_item); } return pthis; } diff --git a/src/tags/tag_map.h b/src/tags/tag_map.h index 4b60f1e..2d8f44c 100644 --- a/src/tags/tag_map.h +++ b/src/tags/tag_map.h @@ -19,10 +19,10 @@ int tag_map_merge(struct tag_map *dest, struct tag_map *src); struct tag_map *tag_map_copy(const struct tag_map *src); // int tag_map_add(struct tag_map *pthis, const char *key, size_t key_len, int count, void *arg); -int tag_map_add(struct tag_map *pthis, const struct tag_hash_key *tag, void *arg); +int tag_map_add(struct tag_map *pthis, const char *key, size_t key_len, void *arg); // void *tag_map_get0_exdata(struct tag_map *pthis, const char *key, size_t key_len); -void *tag_map_get0_exdata(struct tag_map *pthis, const struct tag_hash_key *tag); +void *tag_map_get0_exdata(struct tag_map *pthis, const char *key, size_t key_len); int tag_map_get_count(const struct tag_map *pthis); void tag_map_list(const struct tag_map *pthis, void ***exdatas, size_t *n_exdatas); diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 726221b..ced6420 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -57,5 +57,4 @@ add_unit_test(test_metric_histogram) add_unit_test(test_metric_hll)
add_unit_test(test_performance)
add_unit_test(test_register_and_reset)
-add_unit_test(unit_test_cells)
add_unit_test(test_write_json_file)
\ No newline at end of file diff --git a/test/test_register_and_reset.cpp b/test/test_register_and_reset.cpp index d9cd7f3..3e3f1dd 100644 --- a/test/test_register_and_reset.cpp +++ b/test/test_register_and_reset.cpp @@ -622,6 +622,92 @@ TEST(calibrate, delete_first_cube) fieldstat_free(target); } +/* + +// // reset once will not delete the cells, just let them be discared, so in such case, cube_add will just add to the same cell. +TEST(unit_test_tag_map, add_after_reset_and_ensure_performance_improvement) { + clock_t start, end; + const int TEST_ROUND = 100000; + // struct cell_manager *hk = cell_manager_new(SAMPLING_MODE_COMPREHENSIVE, TEST_ROUND); + struct tag_map *hk = tag_map_new(TEST_ROUND); + vector<struct tag_hash_key *> keys; + for (int i = 0; i < TEST_ROUND; i++) + { + struct tag_hash_key *key = test_gen_tag_key("key", i); + keys.push_back(key); + } + + for (int i = 0; i < TEST_ROUND; i++) { + tag_map_add(hk, keys[i], NULL); + } + + tag_map_reset(hk); + start = clock(); + for (int i = 0; i < TEST_ROUND; i++) + { + tag_map_add(hk, keys[i], NULL); + } + end = clock(); + clock_t time_reset_once = end - start; + + + tag_map_reset(hk); + tag_map_reset(hk); + start = clock(); + for (int i = 0; i < TEST_ROUND; i++) + { + tag_map_add(hk, keys[i], NULL); + } + end = clock(); + clock_t time_reset_twice = end - start; + + EXPECT_GE(time_reset_twice, time_reset_once); + + tag_map_free(hk); + for (int i = 0; i < TEST_ROUND; i++) { + tag_hash_key_free(keys[i]); + } +} + +TEST(unit_test_heavy_keeper, add_after_reset_and_ensure_performance_improvement) { + clock_t start, end; + const int K = 1000; + const int TEST_ROUND = K; + + struct heavy_keeper *hk = heavy_keeper_new(K); + vector<struct tag_hash_key *> keys; + for (int i = 0; i < TEST_ROUND; i++) + { + struct tag_hash_key *key = test_gen_tag_key("key", i % K); + keys.push_back(key); + } + + start = clock(); + for (int i = 0; i < TEST_ROUND; i++) { + heavy_keeper_add(hk, keys[i], 1, NULL); + } + end = clock(); + clock_t time_empty = end - start; + + heavy_keeper_reset(hk); + start = clock(); + for (int i = 0; i < TEST_ROUND; i++) + { + heavy_keeper_add(hk, keys[i], 1, NULL); + } + end = clock(); + clock_t time_reset_once = end - start; + + EXPECT_GE(time_empty, time_reset_once); + + heavy_keeper_free(hk); + for (int i = 0; i < TEST_ROUND; i++) { + tag_hash_key_free(keys[i]); + } +} + +*/ + int main(int argc, char *argv[]) { testing::InitGoogleTest(&argc, argv); diff --git a/test/unit_test_cells.cpp b/test/unit_test_cells.cpp deleted file mode 100644 index 7bf4581..0000000 --- a/test/unit_test_cells.cpp +++ /dev/null @@ -1,442 +0,0 @@ - -#include <gtest/gtest.h> -#include <map> -#include <set> -#include <string> -#include <vector> -#include <algorithm> - - -#include "fieldstat.h" -#include "heavy_keeper.h" -#include "tag_map.h" -#include "utils.hpp" -#include "my_ut_hash.h" - -using namespace std; - -struct tag_hash_key *test_gen_tag_key(const char *key, int value) -{ - struct fieldstat_tag tag = { - .key = key, - .type = TAG_CSTRING, - {.value_str = strdup(to_string(value).c_str())}, - }; - - struct tag_hash_key *tag_key = (struct tag_hash_key *)malloc(sizeof(struct tag_hash_key)); - tag_hash_key_init_with_fieldstat_tag(tag_key, &tag, 1, true); - - free((void *)tag.value_str); - - return tag_key; -} - -struct Fieldstat_tag_list_wrapper *test_key_tag_to_wrapper(const struct tag_hash_key *key) -{ - assert(key != NULL); - struct fieldstat_tag *tag; - size_t n_out; - tag_hash_key_convert_to_fieldstat_tag(key, &tag, &n_out); - struct fieldstat_tag_list tag_list; - tag_list.tag = tag; - tag_list.n_tag = n_out; - struct Fieldstat_tag_list_wrapper *wrapper = new Fieldstat_tag_list_wrapper(&tag_list); - - for (size_t i = 0; i < n_out; i++) - { - if (tag[i].type == TAG_CSTRING) - free((void *)tag[i].value_str); - free((void *)tag[i].key); - } - free(tag); - - return wrapper; -} - -double cal_accuracy_with_tags(const vector<struct tag_hash_key *> &expected_keys, const vector<struct tag_hash_key *> &test_result) { - unordered_map<string, int> countMap; - for (size_t i = 0; i < expected_keys.size(); i++) { - struct Fieldstat_tag_list_wrapper *wrapper = test_key_tag_to_wrapper(expected_keys[i]); - string key = wrapper->to_string(); - countMap[key]++; - delete wrapper; - } - vector<struct Fieldstat_tag_list_wrapper *> test_result_wrapper; - - for (size_t i = 0; i < test_result.size(); i++) { - struct Fieldstat_tag_list_wrapper *wrapper = test_key_tag_to_wrapper(test_result[i]); - test_result_wrapper.push_back(wrapper); - } - - double ret = test_cal_topk_accuracy(test_result_wrapper, countMap); - - for (size_t i = 0; i < test_result.size(); i++) { - delete test_result_wrapper[i]; - } - return ret; -} - -struct test_heavy_keeper_args { - struct tag_hash_key *key; -}; - -void *test_heavy_keeper_new_cb(void *arg) -{ - struct test_heavy_keeper_args *args = (struct test_heavy_keeper_args *)arg; - return args->key; -} -void test_heavy_keeper_free_cb(void *exdata) -{ -} -void test_heavy_keeper_reset_cb(void *exdata) -{ -} -void test_heavy_keeper_merge_cb(void *dest, void *src) -{ -} -void *test_heavy_keeper_copy_cb(void *src) -{ - return src; -} - - -vector<tag_hash_key *> test_query_heavy_keeper_content(const struct heavy_keeper *hk) -{ - size_t ret_len; - struct tag_hash_key **dump_ret_exdata = NULL; - heavy_keeper_list(hk, (void ***)&dump_ret_exdata, &ret_len); - vector<tag_hash_key *> test_result; - for (size_t i = 0; i < ret_len; i++) { - // char *tmp = tag_hash_key_get_compound_key(dump_ret_exdata[i]); - // printf("test_query_heavy_keeper_content key: %s\n", tmp); - // free(tmp); - test_result.push_back((struct tag_hash_key *)dump_ret_exdata[i]); - } - - free(dump_ret_exdata); - - return test_result; -} - -TEST(unit_test_heavy_keeper, topk_simple_add) -{ - struct heavy_keeper *hk = heavy_keeper_new(10); - heavy_keeper_set_exdata_schema(hk, test_heavy_keeper_new_cb, test_heavy_keeper_free_cb, test_heavy_keeper_merge_cb, test_heavy_keeper_reset_cb, test_heavy_keeper_copy_cb); - const int TEST_ROUND = 10; - - vector<struct tag_hash_key *> keys; - for (int i = 0; i < TEST_ROUND; i++) - { - struct tag_hash_key *key = test_gen_tag_key("key", i); - keys.push_back(key); - } - - for (int i = 0; i < TEST_ROUND; i++) { - struct test_heavy_keeper_args args = {keys[i]}; - heavy_keeper_add(hk, keys[i], 1, &args); - } - - vector<tag_hash_key *> test_result = test_query_heavy_keeper_content(hk); - EXPECT_EQ(test_result.size(), 10); - double accuracy = cal_accuracy_with_tags(keys, test_result); - EXPECT_NEAR(accuracy, 1.0, 0.01); - - heavy_keeper_free(hk); - for (int i = 0; i < TEST_ROUND; i++) { - tag_hash_key_free(keys[i]); - } -} - -TEST(unit_test_heavy_keeper, topk_add_pop) -{ - struct heavy_keeper *hk = heavy_keeper_new(10); - heavy_keeper_set_exdata_schema(hk, test_heavy_keeper_new_cb, test_heavy_keeper_free_cb, test_heavy_keeper_merge_cb, test_heavy_keeper_reset_cb, test_heavy_keeper_copy_cb); - const int TEST_ROUND = 11; - - vector<struct tag_hash_key *> keys; - for (int i = 0; i < TEST_ROUND; i++) - { - struct tag_hash_key *key = test_gen_tag_key("key", i); - keys.push_back(key); - } - - for (int i = 0; i < TEST_ROUND - 1; i++) { - struct test_heavy_keeper_args args = {keys[i]}; - heavy_keeper_add(hk, keys[i], 1, &args); - } - struct test_heavy_keeper_args args = {keys[TEST_ROUND - 1]}; - heavy_keeper_add(hk, keys[TEST_ROUND - 1], 100, &args); - - vector<tag_hash_key *> test_result = test_query_heavy_keeper_content(hk); - EXPECT_EQ(test_result.size(), 10); - double accuracy = cal_accuracy_with_tags(keys, test_result); - EXPECT_NEAR(accuracy, 1.0, 0.01); - - int count; - void *exdata; - heavy_keeper_one_point_query(hk, keys[TEST_ROUND - 1], &count, &exdata); - EXPECT_EQ(count, 100); - - heavy_keeper_free(hk); - for (int i = 0; i < TEST_ROUND; i++) { - tag_hash_key_free(keys[i]); - } -} - -TEST(unit_test_heavy_keeper, topk_add_last_one_twice) -{ - struct heavy_keeper *hk = heavy_keeper_new(10); - heavy_keeper_set_exdata_schema(hk, test_heavy_keeper_new_cb, test_heavy_keeper_free_cb, test_heavy_keeper_merge_cb, test_heavy_keeper_reset_cb, test_heavy_keeper_copy_cb); - const int TEST_ROUND = 10; - - vector<struct tag_hash_key *> keys; - for (int i = 0; i < TEST_ROUND; i++) - { - struct tag_hash_key *key = test_gen_tag_key("key", i); - keys.push_back(key); - } - - for (int i = 0; i < TEST_ROUND; i++) { - struct test_heavy_keeper_args args = {keys[i]}; - heavy_keeper_add(hk, keys[i], 1, &args); - } - struct test_heavy_keeper_args args = {keys[TEST_ROUND - 1]}; - heavy_keeper_add(hk, keys[TEST_ROUND - 1], 100, &args); - - vector<tag_hash_key *> test_result = test_query_heavy_keeper_content(hk); - EXPECT_EQ(test_result.size(), 10); - double accuracy = cal_accuracy_with_tags(keys, test_result); - EXPECT_NEAR(accuracy, 1.0, 0.01); - - int count; - void *exdata; - heavy_keeper_one_point_query(hk, keys[TEST_ROUND - 1], &count, &exdata); - EXPECT_EQ(count, 100 + 1); - - heavy_keeper_free(hk); - for (int i = 0; i < TEST_ROUND; i++) { - tag_hash_key_free(keys[i]); - } -} - -TEST(unit_test_heavy_keeper, topk_add_and_query_accuracy) -{ - struct heavy_keeper *hk = heavy_keeper_new(10); - heavy_keeper_set_exdata_schema(hk, test_heavy_keeper_new_cb, test_heavy_keeper_free_cb, test_heavy_keeper_merge_cb, test_heavy_keeper_reset_cb, test_heavy_keeper_copy_cb); - const int TEST_ROUND = 10000; - - vector<struct tag_hash_key *> keys; - for (int i = 0; i < TEST_ROUND; i++) - { - if (rand()) { - struct tag_hash_key *key = test_gen_tag_key("key", rand() % 10); - keys.push_back(key); - } else { - struct tag_hash_key *key = test_gen_tag_key("key", rand() % 1000); - keys.push_back(key); - } - } - - for (int i = 0; i < TEST_ROUND; i++) { - struct test_heavy_keeper_args args = {keys[i]}; - heavy_keeper_add(hk, keys[i], 1, &args); - } - - vector<tag_hash_key *> test_result = test_query_heavy_keeper_content(hk); - EXPECT_EQ(test_result.size(), 10); - double accuracy = cal_accuracy_with_tags(keys, test_result); - EXPECT_NEAR(accuracy, 1.0, 0.01); - - heavy_keeper_free(hk); - for (int i = 0; i < TEST_ROUND; i++) { - tag_hash_key_free(keys[i]); - } -} - -TEST(unit_test_heavy_keeper, merge_topk_given_K_large_enough) -{ - struct heavy_keeper *hk1 = heavy_keeper_new(10); - struct heavy_keeper *hk2 = heavy_keeper_new(10); - heavy_keeper_set_exdata_schema(hk1, test_heavy_keeper_new_cb, test_heavy_keeper_free_cb, test_heavy_keeper_merge_cb, test_heavy_keeper_reset_cb, test_heavy_keeper_copy_cb); - heavy_keeper_set_exdata_schema(hk2, test_heavy_keeper_new_cb, test_heavy_keeper_free_cb, test_heavy_keeper_merge_cb, test_heavy_keeper_reset_cb, test_heavy_keeper_copy_cb); - - vector<struct tag_hash_key *> keys; - keys.push_back(test_gen_tag_key("key_share", 1)); - keys.push_back(test_gen_tag_key("key_1", 1)); - keys.push_back(test_gen_tag_key("key_1", 2)); - keys.push_back(test_gen_tag_key("key_share", 1)); - keys.push_back(test_gen_tag_key("key_2", 1)); - - for (size_t i = 0; i < 3; i++) { - struct test_heavy_keeper_args args = {keys[i]}; - heavy_keeper_add(hk1, keys[i], 1, &args); - } - for (size_t i = 3; i < 5; i++) { - struct test_heavy_keeper_args args = {keys[i]}; - heavy_keeper_add(hk2, keys[i], 1, &args); - } - - heavy_keeper_merge(hk1, hk2); - - auto test_result = test_query_heavy_keeper_content(hk1); - double accuracy = cal_accuracy_with_tags(keys, test_result); - EXPECT_NEAR(accuracy, 1.0, 0.01); - - int count; - void *exdata; - heavy_keeper_one_point_query(hk1, keys[0], &count, &exdata); - EXPECT_EQ(count, 2); // key_share merged once - - heavy_keeper_free(hk1); - heavy_keeper_free(hk2); - - for (size_t i = 0; i < keys.size(); i++) { - tag_hash_key_free(keys[i]); - } -} - -TEST(unit_test_heavy_keeper, merge_topk_to_full_one) -{ - struct heavy_keeper *hk1 = heavy_keeper_new(10); - struct heavy_keeper *hk2 = heavy_keeper_new(10); - heavy_keeper_set_exdata_schema(hk1, test_heavy_keeper_new_cb, test_heavy_keeper_free_cb, test_heavy_keeper_merge_cb, test_heavy_keeper_reset_cb, test_heavy_keeper_copy_cb); - heavy_keeper_set_exdata_schema(hk2, test_heavy_keeper_new_cb, test_heavy_keeper_free_cb, test_heavy_keeper_merge_cb, test_heavy_keeper_reset_cb, test_heavy_keeper_copy_cb); - - vector<struct tag_hash_key *> keys1; - keys1.push_back(test_gen_tag_key("key_1", 1)); - keys1.push_back(test_gen_tag_key("key_1", 2)); - keys1.push_back(test_gen_tag_key("key_shared", 1)); - - vector<struct tag_hash_key *> keys2; - for (int i = 0; i < 9; i++) { - keys2.push_back(test_gen_tag_key("key_2", i)); - } - keys2.push_back(test_gen_tag_key("key_shared", 1)); - - for (size_t i = 0; i < 3; i++) { - struct test_heavy_keeper_args args = {keys1[i]}; - heavy_keeper_add(hk1, keys1[i], 10, &args); - } - for (size_t i = 0; i < 10; i++) { - unsigned int count = i < 2 ? i : 5; // the first 2 keys have count 1 and 2(less), the rest have count 5 - struct test_heavy_keeper_args args = {keys2[i]}; - heavy_keeper_add(hk2, keys2[i], count, &args); - } - - heavy_keeper_merge(hk2, hk1); - - auto test_result = test_query_heavy_keeper_content(hk2); - // join keys2 to keys1 - keys1.insert(keys1.end(), std::make_move_iterator(keys2.begin()), std::make_move_iterator(keys2.end())); - double accuracy = cal_accuracy_with_tags(keys1, test_result); - EXPECT_NEAR(accuracy, 1.0, 0.01); - // print all count - for (size_t i = 0; i < test_result.size(); i++) { - int count; - void *exdata; - heavy_keeper_one_point_query(hk2, test_result[i], &count, &exdata); - if (strcmp(test_result[i]->tags->key, "key_shared") == 0) { - EXPECT_EQ(count, 15); - } else if (strcmp(test_result[i]->tags->key, "key_1") == 0) { - EXPECT_EQ(count, 10); - } else if (strcmp(test_result[i]->tags->key, "key_2") == 0) { - EXPECT_EQ(count, 5); - } - } - - heavy_keeper_free(hk2); - heavy_keeper_free(hk1); - for (size_t i = 0; i < keys1.size(); i++) { - tag_hash_key_free(keys1[i]); - } - // all keys are moved to hk1, so no need to free keys2 -} - -// // reset once will not delete the cells, just let them be discared, so in such case, cube_add will just add to the same cell. -TEST(unit_test_tag_map, add_after_reset_and_ensure_performance_improvement) { - clock_t start, end; - const int TEST_ROUND = 100000; - // struct cell_manager *hk = cell_manager_new(SAMPLING_MODE_COMPREHENSIVE, TEST_ROUND); - struct tag_map *hk = tag_map_new(TEST_ROUND); - vector<struct tag_hash_key *> keys; - for (int i = 0; i < TEST_ROUND; i++) - { - struct tag_hash_key *key = test_gen_tag_key("key", i); - keys.push_back(key); - } - - for (int i = 0; i < TEST_ROUND; i++) { - tag_map_add(hk, keys[i], NULL); - } - - tag_map_reset(hk); - start = clock(); - for (int i = 0; i < TEST_ROUND; i++) - { - tag_map_add(hk, keys[i], NULL); - } - end = clock(); - clock_t time_reset_once = end - start; - - - tag_map_reset(hk); - tag_map_reset(hk); - start = clock(); - for (int i = 0; i < TEST_ROUND; i++) - { - tag_map_add(hk, keys[i], NULL); - } - end = clock(); - clock_t time_reset_twice = end - start; - - EXPECT_GE(time_reset_twice, time_reset_once); - - tag_map_free(hk); - for (int i = 0; i < TEST_ROUND; i++) { - tag_hash_key_free(keys[i]); - } -} - -TEST(unit_test_heavy_keeper, add_after_reset_and_ensure_performance_improvement) { - clock_t start, end; - const int K = 1000; - const int TEST_ROUND = K; - - struct heavy_keeper *hk = heavy_keeper_new(K); - vector<struct tag_hash_key *> keys; - for (int i = 0; i < TEST_ROUND; i++) - { - struct tag_hash_key *key = test_gen_tag_key("key", i % K); - keys.push_back(key); - } - - start = clock(); - for (int i = 0; i < TEST_ROUND; i++) { - heavy_keeper_add(hk, keys[i], 1, NULL); - } - end = clock(); - clock_t time_empty = end - start; - - heavy_keeper_reset(hk); - start = clock(); - for (int i = 0; i < TEST_ROUND; i++) - { - heavy_keeper_add(hk, keys[i], 1, NULL); - } - end = clock(); - clock_t time_reset_once = end - start; - - EXPECT_GE(time_empty, time_reset_once); - - heavy_keeper_free(hk); - for (int i = 0; i < TEST_ROUND; i++) { - tag_hash_key_free(keys[i]); - } -} - - -int main(int argc, char *argv[]) -{ - testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -}
\ No newline at end of file |
