diff options
| author | chenzizhan <[email protected]> | 2024-07-04 15:05:36 +0800 |
|---|---|---|
| committer | chenzizhan <[email protected]> | 2024-07-04 15:05:36 +0800 |
| commit | da2b236902f842903bd7643e824454eff286a15d (patch) | |
| tree | f04dea546d4f2ef48c97e8dc2c5f22fa9446079d /src/cube.c | |
| parent | f2b84f92d479dc37c835932286c00c1cbbb5c523 (diff) | |
move cube manager to cube.c; cube manager is now a bidict
Diffstat (limited to 'src/cube.c')
| -rw-r--r-- | src/cube.c | 259 |
1 files changed, 245 insertions, 14 deletions
@@ -9,14 +9,31 @@ #include "metric.h" #include "heavy_keeper.h" #include "tag_map.h" +#include "uthash.h" #define DEFAULT_N_METRIC 32 +#define DEFAULT_N_CUBE 64 struct exdata_new_args { const struct fieldstat_tag *tags; size_t n_tags; }; +struct cube_manager_item { + char *key; + size_t key_len; + int id; + UT_hash_handle hh; +}; + +struct cube_manager { + struct cube_manager_item *head; + + struct cube **cube; + size_t cube_cnt; + size_t cube_size; +}; + struct cell { struct metric **metrics; size_t max_n_metric; @@ -72,6 +89,220 @@ static void fieldstat_free_tag_array(struct fieldstat_tag *tags, size_t n_tags) free(tags); } +void add_cube_to_position(struct cube_manager *instance, struct cube *cube, int id) +{ + if (id >= instance->cube_size) { + struct cube **old_cube_arr = instance->cube; + instance->cube = calloc(instance->cube_size * 2, sizeof(struct cube *)); + memcpy(instance->cube, old_cube_arr, sizeof(struct cube *) * instance->cube_size); + free(old_cube_arr); + + instance->cube_size *= 2; + } + instance->cube[id] = cube; + + if (id >= instance->cube_cnt) { + instance->cube_cnt = id + 1; + } +} + +void cube_manager_free(struct cube_manager *pthis) { + struct cube_manager_item *node = NULL; + struct cube_manager_item *tmp = NULL; + struct cube_manager_item *head = pthis->head; + HASH_ITER(hh, head, node, tmp) { + HASH_DEL(head, node); + free(node->key); + free(node); + } + + for (int i = 0; i < pthis->cube_cnt; i++) { + if (pthis->cube[i] != NULL) { + cube_free(pthis->cube[i]); + } + } + + free(pthis->cube); + free(pthis); +} + +struct cube_manager *cube_manager_new() { + struct cube_manager *pthis = (struct cube_manager *)malloc(sizeof(struct cube_manager)); + pthis->head = NULL; + + pthis->cube = (struct cube **)calloc(DEFAULT_N_CUBE, sizeof(struct cube *)); + pthis->cube_cnt = 0; + pthis->cube_size = DEFAULT_N_CUBE; + return pthis; +} + +int cube_manager_add(struct cube_manager *pthis, struct cube *cube) +{ + char *key; + size_t key_len; + build_dynamic_cell_key(cube->cube_identifier, cube->n_shared_tags, &key, &key_len); + + struct cube_manager_item *node = NULL; + HASH_FIND(hh, pthis->head, key, key_len, node); + if (node != NULL) { + free(key); + return -1; + } + + int id = 0; + for ( ;id < pthis->cube_cnt; id++) { + if (pthis->cube[id] == NULL) { + break; + } + } + + node = (struct cube_manager_item *)malloc(sizeof(struct cube_manager_item)); + node->key = key; + node->key_len = key_len; + node->id = id; + HASH_ADD_KEYPTR(hh, pthis->head, node->key, key_len, node); + + add_cube_to_position(pthis, cube, id); + + return id; +} + +void cube_manager_delete_by_key(struct cube_manager *pthis, const char *key, size_t key_len) +{ + struct cube_manager_item *node = NULL; + HASH_FIND(hh, pthis->head, key, key_len, node); + if (node == NULL) { + return; + } + + int id = node->id; + cube_free(pthis->cube[id]); + pthis->cube[id] = NULL; + if (id == pthis->cube_cnt - 1) { + pthis->cube_cnt--; + } + + HASH_DEL(pthis->head, node); + free(node->key); + free(node); +} + +void cube_manager_delete(struct cube_manager *pthis, struct cube *cube) +{ + char *key; + size_t key_len; + build_dynamic_cell_key(cube->cube_identifier, cube->n_shared_tags, &key, &key_len); + + cube_manager_delete_by_key(pthis, key, key_len); + free(key); +} + +int cube_manager_find(const struct cube_manager *pthis, const struct fieldstat_tag *identifier, size_t n_tag) +{ + char *key; + size_t key_len; + build_dynamic_cell_key(identifier, n_tag, &key, &key_len); + + struct cube_manager_item *node = NULL; + HASH_FIND(hh, pthis->head, key, key_len, node); + free(key); + if (node == NULL) { + return -1; + } else { + return node->id; + } +} + +struct cube *cube_manager_get_cube_by_id(const struct cube_manager *manager, int cube_id) { + if (cube_id < 0 || cube_id >= manager->cube_size) { + return NULL; + } + return manager->cube[cube_id]; +} + +void cube_manager_list(const struct cube_manager *pthis, int **cube_ids, int *n_cube) { + int all_available_cube_count = 0; + + int *tmp_ids = (int *)malloc(sizeof(int) * pthis->cube_cnt); + for (int i = 0; i < pthis->cube_cnt; i++) { + if (pthis->cube[i] != NULL) { + tmp_ids[all_available_cube_count++] = i; + } + } + if (all_available_cube_count == 0) { + free(tmp_ids); + *cube_ids = NULL; + *n_cube = 0; + return; + } + + *cube_ids = tmp_ids; + *n_cube = all_available_cube_count; +} + +void cube_manager_calibrate(struct cube_manager *pthis, const struct cube_manager *master) +{ + struct cube_manager_item *node = NULL; + struct cube_manager_item *tmp = NULL; + + // exist in self but not in master + HASH_ITER(hh, pthis->head, node, tmp) { + struct cube_manager_item *node_in_master = NULL; + HASH_FIND(hh, master->head, node->key, node->key_len, node_in_master); + + if (node_in_master == NULL) { + cube_manager_delete_by_key(pthis, node->key, node->key_len); + } + } + + // exist in master but not in self + HASH_ITER(hh, master->head, node, tmp) { + struct cube_manager_item *node_in_self = NULL; + HASH_FIND(hh, pthis->head, node->key, node->key_len, node_in_self); + + if (node_in_self == NULL) { + int cube_id = node->id; + cube_manager_add(pthis, cube_fork(master->cube[cube_id])); + } + } +} + +struct cube_manager *cube_manager_fork(const struct cube_manager *src) +{ + struct cube_manager *pthis = cube_manager_new(); + struct cube_manager_item *node = NULL; + struct cube_manager_item *tmp = NULL; + HASH_ITER(hh, src->head, node, tmp) { + cube_manager_add(pthis, cube_fork(src->cube[node->id])); + } + return pthis; +} + +void cube_manager_merge(struct cube_manager *dest, const struct cube_manager *src) +{ + struct cube_manager_item *node = NULL; + struct cube_manager_item *tmp = NULL; + HASH_ITER(hh, src->head, node, tmp) { + struct cube_manager_item *node_in_dest = NULL; + HASH_FIND(hh, dest->head, node->key, node->key_len, node_in_dest); + + if (node_in_dest == NULL) { + cube_manager_add(dest, cube_copy(src->cube[node->id])); + } else { + cube_merge(dest->cube[node_in_dest->id], src->cube[node->id]); + } + } +} + +void cube_manager_reset(struct cube_manager *pthis) +{ + for (int i = 0; i < pthis->cube_cnt; i++) { + if (pthis->cube[i] == NULL) { + continue; + } + cube_reset(pthis->cube[i]); + } +} struct metric *find_metric_in_cell(const struct cell *cell, int metric_id) { @@ -195,7 +426,7 @@ void *exdata_copy_i(void *exdata) { return cell_copy((struct cell *)exdata); } -struct cube *fieldstat_cube_info_init(const struct fieldstat_tag *shared_tags, size_t n_tag, enum sampling_mode mode, size_t max_n_cell) +struct cube *cube_info_new(const struct fieldstat_tag *shared_tags, size_t n_tag, enum sampling_mode mode, size_t max_n_cell) { struct cube *cube = calloc(1, sizeof(struct cube)); cube->sampling_mode = mode; @@ -216,7 +447,7 @@ struct cube *fieldstat_cube_info_init(const struct fieldstat_tag *shared_tags, s struct cube *cube_new(const struct fieldstat_tag *shared_tags, size_t n_tag, enum sampling_mode mode, size_t max_n_cell) { - struct cube *cube = fieldstat_cube_info_init(shared_tags, n_tag, mode, max_n_cell); + struct cube *cube = cube_info_new(shared_tags, n_tag, mode, max_n_cell); switch (mode) { @@ -267,7 +498,7 @@ void cube_set_primary_metric(struct cube *cube, int metric_id) { cube->primary_metric_id = metric_id; } -struct cell *find_or_add_exdata_comprehensive(struct tag_map *comprehensive, const char *key, size_t key_len, struct exdata_new_args *args) +struct cell *find_or_add_cell_comprehensive(struct tag_map *comprehensive, const char *key, size_t key_len, struct exdata_new_args *args) { struct cell *cell_data = tag_map_get0_exdata(comprehensive, key, key_len); if (cell_data == NULL) { @@ -280,7 +511,7 @@ struct cell *find_or_add_exdata_comprehensive(struct tag_map *comprehensive, con return cell_data; } -struct cell *find_or_add_exdata_none_primary_topk(struct heavy_keeper *topk, const char *key, size_t key_len, struct exdata_new_args *args) +struct cell *find_or_add_cell_none_primary_topk(struct heavy_keeper *topk, const char *key, size_t key_len, struct exdata_new_args *args) { struct cell *cell_data = heavy_keeper_get0_exdata(topk, key, key_len); if (cell_data == NULL) { @@ -292,7 +523,7 @@ struct cell *find_or_add_exdata_none_primary_topk(struct heavy_keeper *topk, con } return cell_data; } - +// TODO: 整个Switch case 改成 cube_get_cell int cube_histogram_record(struct cube *cube, const struct metric_manifest *manifest, const struct fieldstat_tag *tags, size_t n_tag, long long value) { char *tag_in_string; @@ -306,10 +537,10 @@ int cube_histogram_record(struct cube *cube, const struct metric_manifest *manif struct cell *cell_data = NULL; switch (cube->sampling_mode) { case SAMPLING_MODE_TOPK: - cell_data = find_or_add_exdata_none_primary_topk(cube->topk, tag_in_string, tag_len, &args); + cell_data = find_or_add_cell_none_primary_topk(cube->topk, tag_in_string, tag_len, &args); break; case SAMPLING_MODE_COMPREHENSIVE: - cell_data = find_or_add_exdata_comprehensive(cube->comprehensive, tag_in_string, tag_len, &args); + cell_data = find_or_add_cell_comprehensive(cube->comprehensive, tag_in_string, tag_len, &args); break; } free(tag_in_string); @@ -338,10 +569,10 @@ int cube_hll_add(struct cube *cube, const struct metric_manifest *manifest, cons struct cell *cell_data = NULL; switch (cube->sampling_mode) { case SAMPLING_MODE_TOPK: - cell_data = find_or_add_exdata_none_primary_topk(cube->topk, tag_in_string, tag_len, &args); + cell_data = find_or_add_cell_none_primary_topk(cube->topk, tag_in_string, tag_len, &args); break; case SAMPLING_MODE_COMPREHENSIVE: - cell_data = find_or_add_exdata_comprehensive(cube->comprehensive, tag_in_string, tag_len, &args); + cell_data = find_or_add_cell_comprehensive(cube->comprehensive, tag_in_string, tag_len, &args); break; } @@ -373,7 +604,7 @@ int cube_counter_incrby(struct cube *cube, const struct metric_manifest *manifes cell_data = heavy_keeper_get0_exdata(cube->topk, tag_in_string, tag_len); if (cell_data == NULL) { - int tmp_ret = heavy_keeper_add(cube->topk, tag_in_string, tag_len, 0, &args); + int tmp_ret = heavy_keeper_add(cube->topk, tag_in_string, tag_len, 0, &args); // TODO: 忘了提取函数 if (tmp_ret != 1) { free(tag_in_string); return FS_ERR_TOO_MANY_CELLS; @@ -399,7 +630,7 @@ int cube_counter_incrby(struct cube *cube, const struct metric_manifest *manifes } break; case SAMPLING_MODE_COMPREHENSIVE: - cell_data = find_or_add_exdata_comprehensive(cube->comprehensive, tag_in_string, tag_len, &args); + cell_data = find_or_add_cell_comprehensive(cube->comprehensive, tag_in_string, tag_len, &args); if (cell_data == NULL) { free(tag_in_string); return FS_ERR_TOO_MANY_CELLS; @@ -432,7 +663,7 @@ int cube_counter_set(struct cube *cube, const struct metric_manifest *manifest, { case SAMPLING_MODE_TOPK: { // TODO: 这个地方想办法拿到值以后进counter incrby 流程,重构 if (cube->primary_metric_id != metric_id) { - cell_data = find_or_add_exdata_none_primary_topk(cube->topk, tag_in_string, tag_len, &args); + cell_data = find_or_add_cell_none_primary_topk(cube->topk, tag_in_string, tag_len, &args); } else { long long current_count = 0; cell_data = heavy_keeper_get0_exdata(cube->topk, tag_in_string, tag_len); @@ -459,7 +690,7 @@ int cube_counter_set(struct cube *cube, const struct metric_manifest *manifest, } break;} case SAMPLING_MODE_COMPREHENSIVE: { - cell_data = find_or_add_exdata_comprehensive(cube->comprehensive, tag_in_string, tag_len, &args); + cell_data = find_or_add_cell_comprehensive(cube->comprehensive, tag_in_string, tag_len, &args); break;} default: assert(0); @@ -474,7 +705,7 @@ int cube_counter_set(struct cube *cube, const struct metric_manifest *manifest, struct cube *cube_copy(const struct cube *cube) { - struct cube *cube_dup = fieldstat_cube_info_init(cube->cube_identifier, cube->n_shared_tags, cube->sampling_mode, cube->max_n_cell); + struct cube *cube_dup = cube_info_new(cube->cube_identifier, cube->n_shared_tags, cube->sampling_mode, cube->max_n_cell); cube_dup->primary_metric_id = cube->primary_metric_id; switch (cube->sampling_mode) |
