#include #include #include #include #include "cjson/cJSON.h" #include "uthash.h" #include "serializer.h" #include "fieldstat.h" #include "metrics/metric.h" #include "tags/cell_manager.h" #define DEFAULT_N_METRIC 64 #define DEFAULT_N_CUBE 128 struct fs_cube { enum sampling_mode sampling_mode; struct cell_manager *cell_manager; size_t max_n_cell; int next_cell_id; int primary_metric_id; // the key of cube is the combination of shared tags struct fieldstat_tag *shared_tags; size_t n_shared_tags; struct tag_hash_key *key_tag; struct metric **metrics; size_t valid_metric_arr_len; size_t max_n_metric; }; struct metric_name_id_map { char *name; int id; UT_hash_handle hh; }; struct fieldstat { struct fs_cube **cube; unsigned long *cube_version; // increase from 0 every time the cube is deleted unsigned long cell_version; // increase from 0 every time fieldstat_reset is called size_t valid_cube_arr_length; size_t max_n_cube; struct metric **metric_masters; size_t n_metric_master; size_t max_n_metric_master; struct metric_name_id_map *metric_name_id_map; struct cube_manager *shared_tag_cube_manager; }; int name_id_map_get_id_by_name(struct metric_name_id_map *map, const char *metric_name) { struct metric_name_id_map *entry = NULL; HASH_FIND_STR(map, metric_name, entry); if (entry == NULL) { return -1; } return entry->id; } void name_id_map_add(struct metric_name_id_map **map, const char *name, int id) { struct metric_name_id_map *entry = malloc(sizeof(struct metric_name_id_map)); entry->id = id; entry->name = strdup(name); HASH_ADD_KEYPTR(hh, *map, entry->name, strlen(entry->name), entry); } void name_id_map_free(struct metric_name_id_map *map) { struct metric_name_id_map *entry, *tmp; HASH_ITER(hh, map, entry, tmp) { HASH_DEL(map, entry); free(entry->name); free(entry); } } struct metric_name_id_map *name_id_map_copy(struct metric_name_id_map *map) { struct metric_name_id_map *map_dup = NULL; struct metric_name_id_map *entry, *tmp; HASH_ITER(hh, map, entry, tmp) { name_id_map_add(&map_dup, entry->name, entry->id); } return map_dup; } struct fieldstat *fieldstat_new() { struct fieldstat *instance = calloc(1, sizeof(struct fieldstat)); instance->max_n_cube = DEFAULT_N_CUBE; instance->cube = calloc(instance->max_n_cube, sizeof(struct fs_cube *)); instance->cube_version = calloc(instance->max_n_cube, sizeof(unsigned long)); instance->max_n_metric_master = DEFAULT_N_METRIC; instance->metric_masters = calloc(instance->max_n_metric_master, sizeof(struct metric *)); instance->metric_name_id_map = NULL; instance->shared_tag_cube_manager = cube_manager_new(); return instance; } void fieldstat_cube_free(struct fieldstat *instance, int cube_id); void fieldstat_free(struct fieldstat *instance) { if (instance == NULL) { return; } for (size_t i = 0; i < instance->valid_cube_arr_length; i++) { fieldstat_cube_free(instance, i); } free(instance->cube); free(instance->cube_version); cube_manager_free(instance->shared_tag_cube_manager); for (size_t i = 0; i < instance->n_metric_master; i++) { metric_free(instance->metric_masters[i]); } free(instance->metric_masters); name_id_map_free(instance->metric_name_id_map); free(instance); } void fieldstat_reset(struct fieldstat *instance) { if (instance == NULL) { return; } for (size_t i = 0; i < instance->valid_cube_arr_length; i++) { struct fs_cube *cube = instance->cube[i]; if (cube == NULL) { continue; } for (size_t j = 0; j < cube->valid_metric_arr_len; j++) { if (cube->metrics[j] == NULL) { continue; } metric_reset(cube->metrics[j]); } cell_manager_reset(cube->cell_manager); } instance->cell_version++; } unsigned long fieldstat_get_version(const struct fieldstat *instance) { if (instance == NULL) { return 0; } return instance->cell_version; } int fieldstat_destroy_cube(struct fieldstat *instance, int cube_id) { if (instance == NULL) { return FS_ERR_NULL_HANDLER; } if (cube_id < 0 || cube_id >= instance->valid_cube_arr_length) { return FS_ERR_INVALID_CUBE_ID; } if (instance->cube[cube_id] == NULL) { return FS_ERR_INVALID_CUBE_ID; } const struct fs_cube *cube = instance->cube[cube_id]; cube_manager_delete(instance->shared_tag_cube_manager, cube->key_tag); fieldstat_cube_free(instance, cube_id); instance->cube[cube_id] = NULL; instance->cube_version[cube_id]++; return 0; } long long fieldstat_get_cube_version(const struct fieldstat *instance, int cube_id) { if (instance == NULL) { return FS_ERR_NULL_HANDLER; } if (cube_id < 0 || cube_id >= instance->valid_cube_arr_length) { return FS_ERR_INVALID_CUBE_ID; } if (instance->cube[cube_id] == NULL && instance->cube_version[cube_id] == 0) { return FS_ERR_INVALID_CUBE_ID; } return instance->cube_version[cube_id]; } /* -------------------------------------------------------------------------- */ /* cube */ /* -------------------------------------------------------------------------- */ void fieldstat_free_tag_array(struct fieldstat_tag *tags, size_t n_tags) { for (size_t i = 0; i < n_tags; i++) { struct fieldstat_tag *tag = &tags[i]; free((char *)tag->key); if (tag->type == TAG_CSTRING) { free((char *)tag->value_str); } } free(tags); } void add_cube_to_position(struct fieldstat *instance, struct fs_cube *cube, int cube_id) { if (cube_id >= instance->max_n_cube) { instance->max_n_cube *= 2; struct fs_cube **old_cube_arr = instance->cube; instance->cube = calloc(instance->max_n_cube, sizeof(struct fs_cube *)); memcpy(instance->cube, old_cube_arr, sizeof(struct fs_cube *) * instance->valid_cube_arr_length); free(old_cube_arr); unsigned long *old_ver_arr = instance->cube_version; instance->cube_version = calloc(instance->max_n_cube, sizeof(unsigned long)); memcpy(instance->cube_version, old_ver_arr, sizeof(unsigned long) * instance->valid_cube_arr_length); free(old_ver_arr); } instance->cube[cube_id] = cube; if (cube_id >= instance->valid_cube_arr_length) { instance->valid_cube_arr_length = cube_id + 1; } } int fieldstat_append_cube_to_instance(struct fieldstat *instance, struct fs_cube *cube) { for (int i = 0; i < instance->valid_cube_arr_length; i++) { if (instance->cube[i] == NULL) { instance->cube[i] = cube; cube_manager_add(instance->shared_tag_cube_manager, cube->key_tag, i); return i; } } int cube_id = instance->valid_cube_arr_length; add_cube_to_position(instance, cube, cube_id); cube_manager_add(instance->shared_tag_cube_manager, cube->key_tag, cube_id); return cube_id; } struct fs_cube *fieldstat_cube_info_init(const struct fieldstat_tag *shared_tags, size_t n_tag, enum sampling_mode mode, size_t max_n_cell) { struct fs_cube *cube = calloc(1, sizeof(struct fs_cube)); cube->sampling_mode = mode; cube->max_n_cell = max_n_cell; if (n_tag == 0) { cube->shared_tags = NULL; } else { cube->shared_tags = malloc(sizeof(struct fieldstat_tag) * n_tag); for (int i = 0; i < n_tag; i++) { struct fieldstat_tag *dest_tag = &cube->shared_tags[i]; dest_tag->key = strdup(shared_tags[i].key); dest_tag->type = shared_tags[i].type; switch (dest_tag->type) { case TAG_INTEGER: dest_tag->value_longlong = shared_tags[i].value_longlong; break; case TAG_CSTRING: dest_tag->value_str = strdup(shared_tags[i].value_str); break; case TAG_DOUBLE: dest_tag->value_double = shared_tags[i].value_double; break; default: break; } } } cube->n_shared_tags = n_tag; struct tag_hash_key *shared_tag_key = malloc(sizeof(struct tag_hash_key)); tag_hash_key_init_with_fieldstat_tag(shared_tag_key, shared_tags, n_tag, true); cube->key_tag = shared_tag_key; cube->max_n_metric = 64; cube->metrics = calloc(cube->max_n_metric, sizeof(struct metric *)); return cube; } struct fs_cube *fieldstat_cube_new(const struct fieldstat_tag *shared_tags, size_t n_tag, enum sampling_mode mode, size_t max_n_cell) { struct fs_cube *cube = fieldstat_cube_info_init(shared_tags, n_tag, mode, max_n_cell); cube->cell_manager = cell_manager_new(mode, max_n_cell); return cube; } int fieldstat_create_cube(struct fieldstat *instance, const struct fieldstat_tag *shared_tags, size_t n_tag, enum sampling_mode mode, size_t max_n_cell) { if (instance == NULL) { return FS_ERR_NULL_HANDLER; } if (n_tag == 0 || shared_tags == NULL) { shared_tags = NULL; n_tag = 0; } if (mode == SAMPLING_MODE_TOPK && max_n_cell == 0) { return FS_ERR_INVALID_PARAM; } if (max_n_cell == 0) { max_n_cell = INT32_MAX; } struct tag_hash_key shared_tag_key; tag_hash_key_init_with_fieldstat_tag(&shared_tag_key, shared_tags, n_tag, false); int ret = cube_manager_find(instance->shared_tag_cube_manager, &shared_tag_key); if (ret != -1) { return FS_ERR_INVALID_KEY; } struct fs_cube *cube = fieldstat_cube_new(shared_tags, n_tag, mode, max_n_cell); int cube_id = fieldstat_append_cube_to_instance(instance, cube); return cube_id; } int fieldstat_cube_add(struct fieldstat *instance, int cube_id, const struct fieldstat_tag *tags, size_t n_tag, long long occurrence) { struct fs_cube *cube = instance->cube[cube_id]; int ret = -1; struct tag_hash_key tag_key; tag_hash_key_init_with_fieldstat_tag(&tag_key, tags, n_tag, false); if (cube->sampling_mode == SAMPLING_MODE_COMPREHENSIVE) { ret = cell_manager_add_cell(cube->cell_manager, &tag_key); } else { int popped_cell_id = -1; ret = cell_manager_add_cell_topk(cube->cell_manager, &tag_key, occurrence, &popped_cell_id); if (popped_cell_id != -1) { for (size_t i = 0; i < cube->valid_metric_arr_len; i++) { if (cube->metrics[i] == NULL) { continue; } metric_delete_cell(cube->metrics[i], popped_cell_id); } } } if (ret < 0) { return -1; } return ret; } void fieldstat_cube_free_contents(struct fieldstat *instance, int cube_id) { struct fs_cube *cube = instance->cube[cube_id]; cell_manager_free(cube->cell_manager); fieldstat_free_tag_array(cube->shared_tags, cube->n_shared_tags); tag_hash_key_free(cube->key_tag); for (size_t i = 0; i < cube->valid_metric_arr_len; i++) { if (cube->metrics[i] != NULL) { metric_free(cube->metrics[i]); } } free(cube->metrics); free(cube); instance->cube[cube_id] = NULL; if (cube_id == instance->valid_cube_arr_length - 1) { instance->valid_cube_arr_length--; } } void fieldstat_cube_free(struct fieldstat *instance, int cube_id) { if (instance->cube[cube_id] == NULL) { return; } cube_manager_delete(instance->shared_tag_cube_manager, instance->cube[cube_id]->key_tag); fieldstat_cube_free_contents(instance, cube_id); } struct fs_cube *fieldstat_cube_fork(const struct fs_cube *cube) { struct fs_cube *ret = fieldstat_cube_new(cube->shared_tags, cube->n_shared_tags, cube->sampling_mode, cube->max_n_cell); ret->primary_metric_id = cube->primary_metric_id; return ret; } int fieldstat_cube_set_primary_metric(struct fieldstat *instance, int cube_id, int metric_id) { if (instance == NULL) { return FS_ERR_NULL_HANDLER; } if (cube_id < 0 || cube_id >= instance->valid_cube_arr_length) { return FS_ERR_INVALID_CUBE_ID; } struct fs_cube *cube = instance->cube[cube_id]; if (cube == NULL) { return FS_ERR_INVALID_CUBE_ID; } if (cube->sampling_mode != SAMPLING_MODE_TOPK) { return FS_ERR_INVALID_PARAM; } if (metric_id < 0 || metric_id >= instance->n_metric_master) { return FS_ERR_INVALID_METRIC_ID; } if (instance->metric_masters[metric_id] == NULL) { return FS_ERR_INVALID_METRIC_ID; } if (metric_get_type(instance->metric_masters[metric_id]) != METRIC_TYPE_COUNTER) { return FS_ERR_INVALID_PARAM; } cube->primary_metric_id = metric_id; return FS_OK; } /* -------------------------------------------------------------------------- */ /* metric register */ /* -------------------------------------------------------------------------- */ void add_metric_to_cube(struct fs_cube *cube, struct metric *metric, int metric_id) { if (metric_id >= cube->max_n_metric) { cube->max_n_metric *= 2; cube->metrics = realloc(cube->metrics, sizeof(struct metric *) * cube->max_n_metric); memset(cube->metrics + cube->valid_metric_arr_len, 0, sizeof(struct metric *) * (cube->max_n_metric - cube->valid_metric_arr_len)); } cube->metrics[metric_id] = metric; if (metric_id >= cube->valid_metric_arr_len) { cube->valid_metric_arr_len = metric_id + 1; } } void add_metric_to_instance(struct fieldstat *instance, const struct metric *metric, int metric_id) { if (metric_id >= instance->max_n_metric_master) { instance->max_n_metric_master *= 2; instance->metric_masters = realloc(instance->metric_masters, sizeof(struct metric *) * instance->max_n_metric_master); memset(instance->metric_masters + instance->n_metric_master, 0, sizeof(struct metric *) * (instance->max_n_metric_master - instance->n_metric_master)); } instance->metric_masters[metric_id] = (struct metric *)metric; if (metric_id >= instance->n_metric_master) { instance->n_metric_master = metric_id + 1; } name_id_map_add(&instance->metric_name_id_map, metric_get_name(metric), metric_id); } struct metric *find_or_add_metric(struct fieldstat *instance, int cube_id, int metric_id) { struct fs_cube *cube = instance->cube[cube_id]; if (metric_id < cube->valid_metric_arr_len && cube->metrics[metric_id] != NULL) { return cube->metrics[metric_id]; } struct metric *metric = metric_fork(instance->metric_masters[metric_id]); add_metric_to_cube(cube, metric, metric_id); return metric; } static int append_metric_to_instance(struct fieldstat *instance, const struct metric *metric) { int metric_id = instance->n_metric_master; add_metric_to_instance(instance, metric, metric_id); return metric_id; } int check_before_register_metric(const struct fieldstat *instance, const char *metric_name) { if (instance == NULL) { return FS_ERR_NULL_HANDLER; } if (name_id_map_get_id_by_name(instance->metric_name_id_map, metric_name) != -1) { return FS_ERR_INVALID_KEY; } return FS_OK; } int fieldstat_register_counter(struct fieldstat *instance, const char *metric_name) { int ret = check_before_register_metric(instance, metric_name); if (ret != FS_OK) { return ret; } const struct metric *metric = metric_counter_new(metric_name); return append_metric_to_instance(instance, metric); } int fieldstat_register_hll(struct fieldstat *instance, const char *metric_name, unsigned char precision) { int ret = check_before_register_metric(instance, metric_name); if (ret != FS_OK) { return ret; } if (precision < 4 || precision > 18) { return FS_ERR_INVALID_PARAM; } const struct metric *metric = metric_hll_new(metric_name, precision); return append_metric_to_instance(instance, metric); } int fieldstat_register_hist(struct fieldstat *instance, const char *metric_name, long long lowest_trackable_value, long long highest_trackable_value, int significant_figures) { int ret = check_before_register_metric(instance, metric_name); if (ret != FS_OK) { return ret; } // refer to hdr_histogram.h for the rules of parameters. Just copy them here if (lowest_trackable_value < 1) { return FS_ERR_INVALID_PARAM; } if (significant_figures < 1 || significant_figures > 5) { return FS_ERR_INVALID_PARAM; } if (lowest_trackable_value * 2 > highest_trackable_value) { return FS_ERR_INVALID_PARAM; } const struct metric *metric = metric_histogram_new(metric_name, lowest_trackable_value, highest_trackable_value, significant_figures); return append_metric_to_instance(instance, metric); } /* -------------------------------------------------------------------------- */ /* metric operation */ /* -------------------------------------------------------------------------- */ static struct metric *fieldstat_find_metric(const struct fieldstat *instance, int cube_id, int metric_id, int *err_code) { if (instance == NULL) { *err_code = FS_ERR_NULL_HANDLER; return NULL; } if (cube_id < 0 || cube_id >= instance->valid_cube_arr_length) { *err_code = FS_ERR_INVALID_CUBE_ID; return NULL; } struct fs_cube *cube = instance->cube[cube_id]; if (cube == NULL) { *err_code = FS_ERR_INVALID_CUBE_ID; return NULL; } *err_code = FS_OK; return cube->metrics[metric_id]; } int check_before_add(const struct fieldstat *instance, int cube_id, int metric_id, enum metric_type type) { if (instance == NULL) { return FS_ERR_NULL_HANDLER; } if (cube_id < 0 || cube_id >= instance->valid_cube_arr_length) { return FS_ERR_INVALID_CUBE_ID; } if (instance->cube[cube_id] == NULL) { return FS_ERR_INVALID_CUBE_ID; } if (metric_id < 0 || metric_id >= instance->n_metric_master) { return FS_ERR_INVALID_METRIC_ID; } const struct metric *metric = instance->metric_masters[metric_id]; if (metric == NULL || metric_get_type(metric) != type) { return FS_ERR_INVALID_METRIC_ID; } return FS_OK; } int fieldstat_counter_incrby(struct fieldstat *instance, int cube_id, int metric_id, const struct fieldstat_tag *tags, size_t n_tag, long long increment) { int ret = check_before_add(instance, cube_id, metric_id, METRIC_TYPE_COUNTER); if (ret != FS_OK) { return ret; } const struct fs_cube *cube = instance->cube[cube_id]; long long occurrence = 0; if (cube->sampling_mode == SAMPLING_MODE_TOPK && cube->primary_metric_id == metric_id) { if (increment < 0) { return FS_ERR_INVALID_PARAM; } occurrence = increment; } int cell_id = fieldstat_cube_add(instance, cube_id, tags, n_tag, occurrence); if (cell_id < 0) { return FS_ERR_TOO_MANY_CELLS; } struct metric *metric = find_or_add_metric(instance, cube_id, metric_id); metric_counter_incrby(metric, cell_id, increment); return FS_OK; } int fieldstat_counter_incrby_batch(struct fieldstat *instance, int cube_id, int metric_ids[], const struct fieldstat_tag *tags, size_t n_tag, long long increments[], int n_metric) { int ret = check_before_add(instance, cube_id, metric_ids[0], METRIC_TYPE_COUNTER); if (ret != FS_OK) { return ret; } const struct fs_cube *cube = instance->cube[cube_id]; long long occurrence = 0; if (cube->sampling_mode == SAMPLING_MODE_TOPK) { for (int i = 0; i < n_metric; i++) { if (metric_ids[i] == cube->primary_metric_id) { if (increments[i] < 0) { return FS_ERR_INVALID_PARAM; } occurrence += increments[i]; } } } int cell_id = fieldstat_cube_add(instance, cube_id, tags, n_tag, occurrence); if (cell_id < 0) { return FS_ERR_TOO_MANY_CELLS; } for (int i = 0; i < n_metric; i++) { struct metric *metric = find_or_add_metric(instance, cube_id, metric_ids[i]); metric_counter_incrby(metric, cell_id, increments[i]); } return FS_OK; } int fieldstat_counter_set(struct fieldstat *instance, int cube_id, int metric_id, const struct fieldstat_tag *tags, size_t n_tag, long long value) { int ret = check_before_add(instance, cube_id, metric_id, METRIC_TYPE_COUNTER); if (ret != FS_OK) { return ret; } const struct fs_cube *cube = instance->cube[cube_id]; struct metric *metric = find_or_add_metric(instance, cube_id, metric_id); int cell_id = -1; // get the occurrence long long occurrence = 0; if (cube->sampling_mode == SAMPLING_MODE_TOPK && cube->primary_metric_id == metric_id) { struct tag_hash_key tag_key; tag_hash_key_init_with_fieldstat_tag(&tag_key, tags, n_tag, false); cell_id = cell_manager_find(cube->cell_manager, &tag_key); if (cell_id >= 0) { long long old_value; int exist_flag = metric_counter_get(metric, cell_id, &old_value); if (exist_flag == 0) { // cell already exist in metric occurrence = value - old_value; } else { occurrence = value; } } else { occurrence = value; } if (occurrence < 0) { return FS_ERR_INVALID_PARAM; } } cell_id = fieldstat_cube_add(instance, cube_id, tags, n_tag, occurrence); if (cell_id < 0) { return FS_ERR_TOO_MANY_CELLS; } metric_counter_set(metric, cell_id, value); return 0; } int fieldstat_hll_add(struct fieldstat *instance, int cube_id, int metric_id, const struct fieldstat_tag *tags, size_t n_tag, const char *key, size_t key_len) { int ret = check_before_add(instance, cube_id, metric_id, METRIC_TYPE_HLL); if (ret != FS_OK) { return ret; } if (instance->cube[cube_id]->sampling_mode == SAMPLING_MODE_TOPK) { return FS_ERR_INVALID_PARAM; } int cell_id = fieldstat_cube_add(instance, cube_id, tags, n_tag, 0); if (cell_id < 0) { return FS_ERR_TOO_MANY_CELLS; } struct metric *metric = find_or_add_metric(instance, cube_id, metric_id); metric_hll_add(metric, cell_id, key, key_len); return 0; } int fieldstat_hist_record(struct fieldstat *instance, int cube_id, int metric_id, const struct fieldstat_tag *tags, size_t n_tag, long long value) { int ret = check_before_add(instance, cube_id, metric_id, METRIC_TYPE_HISTOGRAM); if (ret != FS_OK) { return ret; } if (instance->cube[cube_id]->sampling_mode == SAMPLING_MODE_TOPK) { return FS_ERR_INVALID_PARAM; } int cell_id = fieldstat_cube_add(instance, cube_id, tags, n_tag, 0); if (cell_id < 0) { return FS_ERR_TOO_MANY_CELLS; } struct metric *metric = find_or_add_metric(instance, cube_id, metric_id); ret = metric_histogram_record(metric, cell_id, value); if (ret < 0) { // it's ok not to recover the cell and metrics even if they are new ones, since unused ones will not be exported. return FS_ERR_INVALID_PARAM; } return 0; } /* -------------------------------------------------------------------------- */ /* merge */ /* -------------------------------------------------------------------------- */ struct fs_cube *fieldstat_cube_dup(const struct fs_cube *cube, const int *metric_id_map) { struct fs_cube *cube_dup = fieldstat_cube_info_init(cube->shared_tags, cube->n_shared_tags, cube->sampling_mode, cube->max_n_cell); const struct cell_manager *cm_src = cube->cell_manager; struct cell_manager *cm_dup = cell_manager_copy(cm_src); cube_dup->cell_manager = cm_dup; cube_dup->primary_metric_id = metric_id_map[cube->primary_metric_id]; for (int i = 0; i < cube->valid_metric_arr_len; i ++) { if (cube->metrics[i] == NULL) { continue; } struct metric *metric_dup = metric_copy(cube->metrics[i]); add_metric_to_cube(cube_dup, metric_dup, metric_id_map[i]); } return cube_dup; } void fieldstat_cube_merge_comprehensive(struct fs_cube *dest, const struct fs_cube *src, const int *metric_id_src_dest_map) { const struct cell_manager *cell_manager_src = src->cell_manager; struct cell_manager *cell_manager_dest = dest->cell_manager; int arr_len_src = 0; const struct tag_hash_key **tag_arr_src = cell_manager_dump(cell_manager_src, &arr_len_src); for (int cell_id_src = 0; cell_id_src < arr_len_src; cell_id_src++) { const struct tag_hash_key *tag_src = tag_arr_src[cell_id_src]; if (tag_src == NULL) { continue; } int cell_id_final = cell_manager_add_cell(cell_manager_dest, tag_src); if (cell_id_final == -1) { // dest is full break; } for (int metric_id_src = 0; metric_id_src < src->valid_metric_arr_len; metric_id_src++) { if (src->metrics[metric_id_src] == NULL) { continue; } int metric_id_dest = metric_id_src_dest_map[metric_id_src]; (void)metric_merge_or_copy_cell(dest->metrics[metric_id_dest], src->metrics[metric_id_src], cell_id_final, cell_id_src); } } } void fieldstat_cube_merge_topk(struct fs_cube *dest, const struct fs_cube *src, const int *metric_id_src_dest_map) { const struct cell_manager *cell_manager_src = src->cell_manager; struct cell_manager *cell_manager_dest = dest->cell_manager; int *cell_id_added = NULL; int *cell_id_old = NULL; int n_cell_id_added = 0; int *cell_id_popped = NULL; int n_cell_id_popped = 0; cell_manager_merge_topk(cell_manager_dest, cell_manager_src, &cell_id_popped, &n_cell_id_popped, &cell_id_old, &cell_id_added, &n_cell_id_added); for (int i = 0; i < n_cell_id_added; i++) { int tmp_id_dest = cell_id_added[i]; int tmp_id_src = cell_id_old[i]; for (int j = 0; j < src->valid_metric_arr_len; j++) { if (src->metrics[j] == NULL) { continue; } int metric_id_dest = metric_id_src_dest_map[j]; metric_merge_or_copy_cell(dest->metrics[metric_id_dest], src->metrics[j], tmp_id_dest, tmp_id_src); } } // chances are that: a cell exists in both dest and src, but it is not in final. In this case, these cells are both in cell_id_added and cell_id_popped. // Since all cells are counter, which is easy to merge, we just delete these cells after merging them. for (int i = 0; i < n_cell_id_popped;i++) { int id = cell_id_popped[i]; for (int j = 0; j < dest->valid_metric_arr_len; j++) { if (dest->metrics[j] == NULL) { continue; } metric_delete_cell(dest->metrics[j], id); } } free(cell_id_popped); free(cell_id_added); free(cell_id_old); } void fieldstat_cube_merge(struct fs_cube *dest, const struct fs_cube *src, const int *metric_id_src_dest_map) { for (int metric_id_src = 0; metric_id_src < src->valid_metric_arr_len; metric_id_src++) { if (src->metrics[metric_id_src] == NULL) { continue; } int metric_id_dest = metric_id_src_dest_map[metric_id_src]; if (dest->metrics[metric_id_dest] != NULL) { continue; } struct metric *metric_dest = metric_fork(src->metrics[metric_id_src]); add_metric_to_cube(dest, metric_dest, metric_id_dest); } if (dest->sampling_mode == SAMPLING_MODE_COMPREHENSIVE) { return fieldstat_cube_merge_comprehensive(dest, src, metric_id_src_dest_map); } fieldstat_cube_merge_topk(dest, src, metric_id_src_dest_map); } int fieldstat_merge(struct fieldstat *instance, struct fieldstat *src) { if (instance == NULL || src == NULL) { return FS_ERR_NULL_HANDLER; } int metric_id_src_dest_map[src->n_metric_master]; // every metric in src move to metric_id_src_dest_map[] in dst instance for (int metric_id_src = 0; metric_id_src < src->n_metric_master; metric_id_src++) { const char *name_src = metric_get_name(src->metric_masters[metric_id_src]); int metric_id_dst = name_id_map_get_id_by_name(instance->metric_name_id_map, name_src); if (metric_id_dst == -1) { metric_id_dst = append_metric_to_instance(instance, metric_fork(src->metric_masters[metric_id_src])); name_id_map_add(&instance->metric_name_id_map, name_src, metric_id_dst); } metric_id_src_dest_map[metric_id_src] = metric_id_dst; } size_t n_cube_src = src->valid_cube_arr_length; const struct cube_manager *tag_cube_id_map = instance->shared_tag_cube_manager; int ret = 0; for (int i = 0; i < n_cube_src; i++) { const struct fs_cube *cube_src = src->cube[i]; if (cube_src == NULL) { continue; } const struct tag_hash_key *shared_tag_key_src = cube_src->key_tag; int cube_id_tmp = cube_manager_find(tag_cube_id_map, shared_tag_key_src); if (cube_id_tmp == -1) { struct fs_cube *copied_cube = fieldstat_cube_dup(cube_src, metric_id_src_dest_map); fieldstat_append_cube_to_instance(instance, copied_cube); } else { struct fs_cube *cube_dst = instance->cube[cube_id_tmp]; if (cube_dst->sampling_mode != cube_src->sampling_mode) { ret = FS_ERR_INVALID_PARAM; continue; } if (cube_dst->sampling_mode == SAMPLING_MODE_TOPK && strcmp( metric_get_name(instance->metric_masters[cube_dst->primary_metric_id]), metric_get_name(src->metric_masters[cube_src->primary_metric_id]) ) != 0) { printf("primary metric name not match, name instance: %s, name src: %s\n", metric_get_name(instance->metric_masters[cube_dst->primary_metric_id]), metric_get_name(src->metric_masters[cube_src->primary_metric_id])); ret = FS_ERR_INVALID_PARAM; continue; } fieldstat_cube_merge(cube_dst, cube_src, metric_id_src_dest_map); } } return ret; } struct fieldstat *fieldstat_fork(const struct fieldstat *instance) { if (instance == NULL) { return NULL; } struct fieldstat *new_instance = calloc(1, sizeof(struct fieldstat)); new_instance->shared_tag_cube_manager = cube_manager_new(); new_instance->valid_cube_arr_length = instance->valid_cube_arr_length; new_instance->max_n_cube = instance->max_n_cube; new_instance->cube = calloc(new_instance->max_n_cube, sizeof(struct fs_cube *)); for (size_t i = 0; i < new_instance->valid_cube_arr_length; i++) { struct fs_cube *cube = instance->cube[i]; if (cube == NULL) { continue; } new_instance->cube[i] = fieldstat_cube_fork(cube); cube_manager_add(new_instance->shared_tag_cube_manager, cube->key_tag, i); // copy registered metrics for (size_t j = 0; j < cube->valid_metric_arr_len; j++) { const struct metric *metric = cube->metrics[j]; if (metric == NULL) { continue; } struct metric *new_metric = metric_fork(metric); add_metric_to_cube(new_instance->cube[i], new_metric, j); } } new_instance->cube_version = calloc(new_instance->max_n_cube, sizeof(unsigned long)); memcpy(new_instance->cube_version, instance->cube_version, sizeof(unsigned long) * new_instance->max_n_cube); new_instance->metric_masters = calloc(instance->max_n_metric_master, sizeof(struct metric *)); new_instance->max_n_metric_master = instance->max_n_metric_master; new_instance->n_metric_master = instance->n_metric_master; for (size_t i = 0; i < instance->n_metric_master; i++) { new_instance->metric_masters[i] = metric_fork(instance->metric_masters[i]); } new_instance->metric_name_id_map = name_id_map_copy(instance->metric_name_id_map); return new_instance; } void calibrate_metrics_in_instance(const struct fieldstat *master, struct fieldstat *replica) { if (replica->max_n_metric_master < master->max_n_metric_master) { replica->metric_masters = (struct metric **)realloc(replica->metric_masters, sizeof(struct metric *) * master->max_n_metric_master); memset(replica->metric_masters + replica->max_n_metric_master, 0, sizeof(struct metric *) * (master->max_n_metric_master - replica->max_n_metric_master)); replica->max_n_metric_master = master->max_n_metric_master; } size_t longer_arr_len = master->n_metric_master > replica->n_metric_master ? master->n_metric_master : replica->n_metric_master; for (size_t i = 0; i < longer_arr_len; i++) { const struct metric *metric_master = i >= master->n_metric_master ? NULL : master->metric_masters[i]; struct metric *metric_target = i >= replica->n_metric_master ? NULL : replica->metric_masters[i]; if (metric_master == NULL && metric_target == NULL) { continue; } if (metric_master == NULL && metric_target != NULL) { metric_free(metric_target); replica->metric_masters[i] = NULL; continue; } if (metric_master != NULL && metric_target == NULL) { const struct metric *metric_dup = metric_fork(metric_master); add_metric_to_instance(replica, metric_dup, i); continue; } if (metric_get_type(metric_master) != metric_get_type(metric_target) || strcmp(metric_get_name(metric_master), metric_get_name(metric_target)) != 0 ) { metric_free(metric_target); const struct metric *metric_dup = metric_fork(metric_master); add_metric_to_instance(replica, metric_dup, i); continue; } // metric same, no need to do anything } replica->n_metric_master = master->n_metric_master; } int fieldstat_calibrate(const struct fieldstat *master, struct fieldstat *replica) { if (master == NULL || replica == NULL) { return FS_ERR_NULL_HANDLER; } if (replica->max_n_cube < master->max_n_cube) { replica->cube = (struct fs_cube **)realloc(replica->cube, sizeof(struct fs_cube *) * master->max_n_cube); memset(replica->cube + replica->max_n_cube, 0, sizeof(struct fs_cube *) * (master->max_n_cube - replica->max_n_cube)); replica->cube_version = (unsigned long *)realloc(replica->cube_version, sizeof(unsigned long) * master->max_n_cube); memset(replica->cube_version + replica->max_n_cube, 0, sizeof(unsigned long) * (master->max_n_cube - replica->max_n_cube)); replica->max_n_cube = master->max_n_cube; } size_t len_master = master->valid_cube_arr_length; size_t len_replica = replica->valid_cube_arr_length; size_t longer_arr_len = len_master > len_replica ? len_master : len_replica; for (size_t i = 0; i < longer_arr_len; i++) { const struct fs_cube *cube_master = i >= len_master ? NULL : master->cube[i]; const struct fs_cube *cube_target = i >= len_replica ? NULL : replica->cube[i]; if (cube_master == NULL && cube_target == NULL) { continue; } if (cube_master == NULL && cube_target != NULL) { fieldstat_cube_free_contents(replica, i); continue; } if (cube_master != NULL && cube_target == NULL) { struct fs_cube *cube_dup = fieldstat_cube_fork(cube_master); add_cube_to_position(replica, cube_dup, i); continue; } if (master->cube_version[i] == replica->cube_version[i] && tag_hash_key_cmp(cube_master->key_tag, cube_target->key_tag) == 0) { continue; } fieldstat_cube_free_contents(replica, i); struct fs_cube *cube_dup = fieldstat_cube_fork(cube_master); add_cube_to_position(replica, cube_dup, i); } memcpy(replica->cube_version, master->cube_version, sizeof(unsigned long) * master->max_n_cube); replica->valid_cube_arr_length = master->valid_cube_arr_length; cube_manager_calibrate(replica->shared_tag_cube_manager, master->shared_tag_cube_manager); calibrate_metrics_in_instance(master, replica); return FS_OK; } /* -------------------------------------------------------------------------- */ /* query */ /* -------------------------------------------------------------------------- */ void fieldstat_get_cubes(const struct fieldstat *instance, int **cube_ids, int *n_cube) { if (instance == NULL || instance->valid_cube_arr_length == 0) { *cube_ids = NULL; *n_cube = 0; return; } int all_available_cube_count = 0; int *tmp_ids = (int *)malloc(sizeof(int) * instance->valid_cube_arr_length); for (int i = 0; i < instance->valid_cube_arr_length; i++) { const struct fs_cube *tmp_cube = instance->cube[i]; if (tmp_cube == NULL) { continue; } tmp_ids[all_available_cube_count] = i; all_available_cube_count ++; } if (all_available_cube_count == 0) { free(tmp_ids); *cube_ids = NULL; *n_cube = 0; return; } *cube_ids = (int *)malloc(sizeof(int) * all_available_cube_count); memcpy(*cube_ids, tmp_ids, sizeof(int) * all_available_cube_count); *n_cube = all_available_cube_count; free(tmp_ids); } int fieldstat_get_metrics_used_by_cube(const struct fieldstat *instance, int cube_id, int **metric_id_out, size_t *n_metric) { *metric_id_out = NULL; *n_metric = 0; if (instance == NULL) { return FS_ERR_NULL_HANDLER; } if (cube_id >= instance->valid_cube_arr_length || cube_id < 0) { return FS_ERR_INVALID_CUBE_ID; } const struct fs_cube *cube = instance->cube[cube_id]; if (cube == NULL) { return FS_ERR_INVALID_CUBE_ID; } int all_available_metric_count = 0; if (cube->valid_metric_arr_len == 0) { return FS_OK; } int *tmp_ids = (int *)malloc(sizeof(int) * cube->valid_metric_arr_len); for (int i = 0; i < cube->valid_metric_arr_len; i++) { const struct metric *tmp_metric = cube->metrics[i]; if (tmp_metric == NULL) { continue; } tmp_ids[all_available_metric_count] = i; all_available_metric_count ++; } if (all_available_metric_count == 0) { free(tmp_ids); return FS_OK; } *metric_id_out = tmp_ids; *n_metric = all_available_metric_count; return FS_OK; } void fieldstat_get_metrics(const struct fieldstat *instance, int **metric_id_out, size_t *n_metric) { if (instance == NULL || instance->n_metric_master == 0) { *metric_id_out = NULL; *n_metric = 0; return; } int *tmp_ids = (int *)malloc(sizeof(int) * instance->n_metric_master); *metric_id_out = tmp_ids; *n_metric = instance->n_metric_master; for (int i = 0; i < instance->n_metric_master; i++) { tmp_ids[i] = i; } return; } void fieldstat_get_cells_used_by_metric(const struct fieldstat *instance, int cube_id, int metric_id, struct fieldstat_tag_list **tag_list, size_t *n_cell) { *tag_list = NULL; *n_cell = 0; int ret = FS_OK; const struct metric *metric = fieldstat_find_metric(instance, cube_id, metric_id, &ret); if (metric == NULL) { return; } size_t n_cell_ret = 0; int *cell_ids = NULL; metric_get_cell_ids(metric, &cell_ids, &n_cell_ret); if (n_cell_ret == 0) { return; } *n_cell = n_cell_ret; struct fieldstat_tag_list *tag_list_ret = (struct fieldstat_tag_list *)malloc(sizeof(struct fieldstat_tag_list) * n_cell_ret); *tag_list = tag_list_ret; for (int i = 0; i < n_cell_ret; i++) { const struct tag_hash_key *tag_key = cell_manager_get_tag_by_cell_id(instance->cube[cube_id]->cell_manager, cell_ids[i]); if (tag_key == NULL) { continue; } tag_hash_key_convert_to_fieldstat_tag(tag_key, &(tag_list_ret[i].tag), &(tag_list_ret[i].n_tag)); } free(cell_ids); } struct fieldstat_tag_list *fieldstat_get_shared_tags(const struct fieldstat *instance, int cube_id) { if (instance == NULL || cube_id >= instance->valid_cube_arr_length || cube_id < 0) { return NULL; } struct fs_cube *cube = instance->cube[cube_id]; if (cube == NULL) { return NULL; } struct fieldstat_tag_list *tag_list = (struct fieldstat_tag_list *)malloc(sizeof(struct fieldstat_tag_list)); if (cube->n_shared_tags == 0) { tag_list->tag = NULL; tag_list->n_tag = 0; return tag_list; } tag_list->tag = (struct fieldstat_tag *)malloc(sizeof(struct fieldstat_tag) * cube->n_shared_tags); for (int i = 0; i < cube->n_shared_tags; i++) { struct fieldstat_tag *tag_dest = &(tag_list->tag[i]); struct fieldstat_tag *tag_src = &(cube->shared_tags[i]); tag_dest->key = strdup(tag_src->key); tag_dest->type = tag_src->type; switch (tag_src->type) { case TAG_INTEGER: tag_dest->value_longlong = tag_src->value_longlong; break; case TAG_CSTRING: tag_dest->value_str = strdup(tag_src->value_str); break; case TAG_DOUBLE: tag_dest->value_double = tag_src->value_double; break; default: break; } } tag_list->n_tag = cube->n_shared_tags; return tag_list; } int get_cell_id_by_tag_list(const struct fieldstat *instance, int cube_id, const struct fieldstat_tag_list *tags) { struct tag_hash_key tag_key; tag_hash_key_init_with_fieldstat_tag(&tag_key, tags->tag, tags->n_tag, false); int cell_id = cell_manager_find(instance->cube[cube_id]->cell_manager, &tag_key); return cell_id; } int fieldstat_counter_get(const struct fieldstat *instance, int cube_id, int metric_id, const struct fieldstat_tag_list *tags, long long *value) { int ret; *value = 0; const struct metric *metric = fieldstat_find_metric(instance, cube_id, metric_id, &ret); if (ret != FS_OK) { return ret; } if (metric == NULL || metric_get_type(metric) != METRIC_TYPE_COUNTER) { return FS_ERR_INVALID_METRIC_ID; } int cell_id = get_cell_id_by_tag_list(instance, cube_id, tags); if (cell_id == -1) { return FS_ERR_INVALID_TAG; } ret = metric_counter_get(metric, cell_id, value); if (ret < 0) { return FS_ERR_INVALID_TAG; } return FS_OK; } int fieldstat_hll_get(const struct fieldstat *instance, int cube_id, int metric_id, const struct fieldstat_tag_list *tags, double *value) { int ret; const struct metric *metric = fieldstat_find_metric(instance, cube_id, metric_id, &ret); if (ret != FS_OK) { return ret; } if (metric == NULL || metric_get_type(metric) != METRIC_TYPE_HLL) { return FS_ERR_INVALID_METRIC_ID; } int cell_id = get_cell_id_by_tag_list(instance, cube_id, tags); if (cell_id == -1) { return FS_ERR_INVALID_TAG; } double ret2 = metric_hll_get(metric, cell_id); if (ret2 < 0) { return FS_ERR_INVALID_TAG; } *value = ret2; return FS_OK; } long long fieldstat_hist_value_at_percentile(const struct fieldstat *instance, int cube_id, int metric_id, const struct fieldstat_tag_list *tags, double percentile) { int ret; const struct metric *metric = fieldstat_find_metric(instance, cube_id, metric_id, &ret); if (ret != FS_OK) { return ret; } if (metric == NULL || metric_get_type(metric) != METRIC_TYPE_HISTOGRAM) { return FS_ERR_INVALID_METRIC_ID; } int cell_id = get_cell_id_by_tag_list(instance, cube_id, tags); if (cell_id == -1) { return FS_ERR_INVALID_TAG; } long long ret2 = metric_histogram_value_at_percentile(metric, cell_id, percentile); if (ret2 < 0) { return FS_ERR_INVALID_TAG; } return ret2; } long long fieldstat_hist_count_le_value(const struct fieldstat *instance, int cube_id, int metric_id, const struct fieldstat_tag_list *tags, long long value) { int ret; const struct metric *metric = fieldstat_find_metric(instance, cube_id, metric_id, &ret); if (ret != FS_OK) { return ret; } if (metric == NULL || metric_get_type(metric) != METRIC_TYPE_HISTOGRAM) { return FS_ERR_INVALID_METRIC_ID; } int cell_id = get_cell_id_by_tag_list(instance, cube_id, tags); if (cell_id == -1) { return FS_ERR_INVALID_TAG; } long long ret2 = metric_histogram_count_le_value(metric, cell_id, value); if (ret2 < 0) { return FS_ERR_INVALID_TAG; } return ret2; } void fieldstat_get_serialized_blob(const struct fieldstat *instance, int cube_id, int metric_id, const struct fieldstat_tag_list *tags, char **blob, size_t *blob_size) { *blob = NULL; *blob_size = 0; int ret; const struct metric *metric = fieldstat_find_metric(instance, cube_id, metric_id, &ret); if (ret != FS_OK) { return; } if (metric == NULL) { return; } enum metric_type type = metric_get_type(metric); if (!(type == METRIC_TYPE_HLL || type == METRIC_TYPE_HISTOGRAM)) { return; } int cell_id = get_cell_id_by_tag_list(instance, cube_id, tags); if (cell_id == -1) { return; } metric_get_plain_blob(metric, cell_id, blob, blob_size); } void fieldstat_tag_list_arr_free(struct fieldstat_tag_list *tag_list, size_t n_cell) { if (tag_list == NULL) { return; } for (int i = 0; i < n_cell; i++) { fieldstat_free_tag_array(tag_list[i].tag, tag_list[i].n_tag); } free(tag_list); } const char *fieldstat_get_metric_name(const struct fieldstat *instance, int metric_id) { if (metric_id < 0 || metric_id >= instance->n_metric_master) { return NULL; } const struct metric *metric = instance->metric_masters[metric_id]; if (metric == NULL) { return NULL; } return metric_get_name(metric); } enum metric_type fieldstat_get_metric_type(const struct fieldstat *instance, int metric_id) { if (instance == NULL || metric_id < 0 || metric_id >= instance->n_metric_master) { return (enum metric_type)(-1); } const struct metric *metric = instance->metric_masters[metric_id]; if (metric == NULL) { return (enum metric_type)(-1); } return metric_get_type(metric); } void fieldstat_get_cells_used_by_cube(const struct fieldstat *instance, int cube_id, struct fieldstat_tag_list **tag_list, size_t *n_cell) { if (instance == NULL || cube_id < 0 || cube_id >= instance->valid_cube_arr_length) { return; } const struct fs_cube *cube = instance->cube[cube_id]; if (cube == NULL) { return; } int arr_len = 0; const struct tag_hash_key **tags_discontinuous = cell_manager_dump(cube->cell_manager, &arr_len); if (arr_len == 0) { *tag_list = NULL; *n_cell = 0; return; } struct fieldstat_tag_list *tag_list_ret = (struct fieldstat_tag_list *)malloc(sizeof(struct fieldstat_tag_list) * arr_len); int n_cell_ret = 0; for (int i = 0; i < arr_len; i++) { const struct tag_hash_key *tag_key = tags_discontinuous[i]; if (tag_key == NULL) { continue; } tag_hash_key_convert_to_fieldstat_tag(tag_key, &(tag_list_ret[n_cell_ret].tag), &(tag_list_ret[n_cell_ret].n_tag)); n_cell_ret++; } if (n_cell_ret == 0) { free(tag_list_ret); *tag_list = NULL; *n_cell = 0; return; } *tag_list = tag_list_ret; *n_cell = n_cell_ret; } int fieldstat_get_used_sampling(const struct fieldstat *instance, int cube_id) { if (instance == NULL) { return FS_ERR_NULL_HANDLER; } if (cube_id < 0 || cube_id >= instance->valid_cube_arr_length) { return FS_ERR_INVALID_CUBE_ID; } const struct fs_cube *cube = instance->cube[cube_id]; if (cube == NULL) { return FS_ERR_INVALID_CUBE_ID; } return cell_manager_get_cardinality(cube->cell_manager); } int fieldstat_find_cube(const struct fieldstat *instance, const struct fieldstat_tag *shared_tags, size_t n_shared_tags) { if (instance == NULL) { return FS_ERR_NULL_HANDLER; } const struct cube_manager *tag_cube_id_map = instance->shared_tag_cube_manager; struct tag_hash_key shared_tag_key; tag_hash_key_init_with_fieldstat_tag(&shared_tag_key, shared_tags, n_shared_tags, false); int cube_id = cube_manager_find(tag_cube_id_map, &shared_tag_key); if (cube_id == -1) { return FS_ERR_INVALID_KEY; } return cube_id; } int fieldstat_get_cube_mode(const struct fieldstat *instance, int cube_id, enum sampling_mode *mode, int *primary_metric_id) { if (instance == NULL) { return FS_ERR_NULL_HANDLER; } if (cube_id < 0 || cube_id >= instance->valid_cube_arr_length) { return FS_ERR_INVALID_CUBE_ID; } const struct fs_cube *cube = instance->cube[cube_id]; if (cube == NULL) { return FS_ERR_INVALID_CUBE_ID; } *mode = cube->sampling_mode; if (cube->sampling_mode == SAMPLING_MODE_TOPK) { *primary_metric_id = cube->primary_metric_id; } else { *primary_metric_id = -1; } return FS_OK; }