diff options
| author | chenzizhan <[email protected]> | 2023-09-15 18:14:26 +0800 |
|---|---|---|
| committer | chenzizhan <[email protected]> | 2023-10-08 17:03:28 +0800 |
| commit | 9d452b417bbe330496347ad2c066d2541921ebd4 (patch) | |
| tree | d523a9fc043c6810747f49296a8dbb961aaf2581 /src/exporter/cjson_exporter.c | |
| parent | 230821b167fda5ed30333dc9f5ca8c752e468a16 (diff) | |
stash4.3.0-4.3.4(python api)
Signed-off-by: chenzizhan <[email protected]>
Diffstat (limited to 'src/exporter/cjson_exporter.c')
| -rw-r--r-- | src/exporter/cjson_exporter.c | 366 |
1 files changed, 353 insertions, 13 deletions
diff --git a/src/exporter/cjson_exporter.c b/src/exporter/cjson_exporter.c index 45b113c..5ab1836 100644 --- a/src/exporter/cjson_exporter.c +++ b/src/exporter/cjson_exporter.c @@ -8,6 +8,7 @@ #include <stddef.h> #include "utils/very_fast_json_writer.h" +#include "uthash.h" #include "fieldstat.h" #include "fieldstat_exporter.h" @@ -28,7 +29,7 @@ {"in_pkts":123}, {"sessions":<hll blob base64-encoded string>} ] - "timestamp":123456789 + "timestamp_ms":123456789 }. { "name":"exporter_name", @@ -40,11 +41,13 @@ {"in_bytes":1}, {"in_pkts":2}, ] - "timestamp":123456789 + "timestamp_ms":123456789 } ] */ +#define DEFAULT_EXPORTER_NAME "-" +struct counter_history; struct export_kv_pair { char *key; enum fs_tag_type type; @@ -59,6 +62,7 @@ struct fieldstat_json_exporter { const struct fieldstat *instance; char *name; struct fieldstat_tag_list *global_tag_list; + struct counter_history *history; }; struct cell_iter { @@ -77,9 +81,37 @@ struct cell_iter { const struct fieldstat *instance; }; -struct json_tag_field_pair { +struct cellwise_rec_for_export { struct json_writer *cjson_tags; struct json_writer *cjson_fields; + + char **metric_name; // used for delta export + long long *metric_value; // used for delta export + size_t n_metric; +}; + +struct name_value_map { + char *name; + long long value; + UT_hash_handle hh; +}; + +struct tag_metric_map { + char *key; + struct name_value_map *value; + UT_hash_handle hh; +}; + +struct counter_history { + struct tag_metric_map *rec; + long long ts; + + unsigned long cell_version; + unsigned long *cube_version; + size_t n_cube; + + char *exporter_name; + struct fieldstat_tag_list *global_tag_list; }; void kv_pair_free(struct export_kv_pair *pair) { @@ -116,6 +148,249 @@ long long cal_ms_time(const struct timeval *ts) return time_stamp_in_ms; } +/* -------------------------------------------------------------------------- */ +/* history rec */ +/* -------------------------------------------------------------------------- */ +struct counter_history *counter_history_new() +{ + struct counter_history *history = calloc(1, sizeof(struct counter_history)); + history->exporter_name = strdup(DEFAULT_EXPORTER_NAME); + return history; +} + +void counter_history_add(struct counter_history *history, const char *json, const char *name, long long value) +{ + struct name_value_map *name_value_node = malloc(sizeof(struct name_value_map)); + name_value_node->name = strdup(name); + name_value_node->value = value; + + struct tag_metric_map *tag_node = malloc(sizeof(struct tag_metric_map)); + tag_node->key = strdup(json); + tag_node->value = NULL; + + HASH_ADD_KEYPTR(hh, history->rec, tag_node->key, strlen(tag_node->key), tag_node); + HASH_ADD_KEYPTR(hh, tag_node->value, name_value_node->name, strlen(name_value_node->name), name_value_node); +} + +struct name_value_map *counter_history_find(struct counter_history *history, const char *json, const char *name) +{ + struct tag_metric_map *tag_node; + HASH_FIND_STR(history->rec, json, tag_node); + if (tag_node == NULL) { + return NULL; + } + + struct name_value_map *name_value_node; + HASH_FIND_STR(tag_node->value, name, name_value_node); + return name_value_node; +} + +void counter_history_free(struct counter_history *history) +{ + if (history == NULL) { + return; + } + + struct tag_metric_map *tag_node, *tmp; + HASH_ITER(hh, history->rec, tag_node, tmp) { + struct name_value_map *name_value_node, *tmp2; + HASH_ITER(hh, tag_node->value, name_value_node, tmp2) { + HASH_DEL(tag_node->value, name_value_node); + free(name_value_node->name); + free(name_value_node); + } + HASH_DEL(history->rec, tag_node); + free(tag_node->key); + free(tag_node); + } + + free(history->cube_version); + + free(history->exporter_name); + + if (history->global_tag_list != NULL) { + fieldstat_tag_list_arr_free(history->global_tag_list, 1); + } + + free(history); +} + +bool fieldstat_tag_list_cmp(const struct fieldstat_tag_list *a, const struct fieldstat_tag_list *b) +{ + if (a->n_tag != b->n_tag) { + return false; + } + + for (int i = 0; i < a->n_tag; i++) { + if (strcmp(a->tag[i].key, b->tag[i].key) != 0) { + return false; + } + if (a->tag[i].type != b->tag[i].type) { + return false; + } + switch (a->tag[i].type) + { + case TAG_INTEGER: + if (a->tag[i].value_longlong != b->tag[i].value_longlong) { + return false; + } + break; + case TAG_DOUBLE: + if (a->tag[i].value_double != b->tag[i].value_double) { + return false; + } + break; + case TAG_CSTRING: + if (strcmp(a->tag[i].value_str, b->tag[i].value_str) != 0) { + return false; + } + break; + default: + assert(0); + } + } + + return true; +} + +struct fieldstat_tag_list *my_copy_fs_tag_list(const struct fieldstat_tag_list *src) +{ + struct fieldstat_tag_list *dest = malloc(sizeof(struct fieldstat_tag_list)); + dest->n_tag = src->n_tag; + dest->tag = malloc(sizeof(struct fieldstat_tag) * dest->n_tag); + for (int i = 0; i < dest->n_tag; i++) { + dest->tag[i].key = strdup(src->tag[i].key); + dest->tag[i].type = src->tag[i].type; + switch (src->tag[i].type) + { + case TAG_INTEGER: + dest->tag[i].value_longlong = src->tag[i].value_longlong; + break; + case TAG_DOUBLE: + dest->tag[i].value_double = src->tag[i].value_double; + break; + case TAG_CSTRING: + dest->tag[i].value_str = strdup(src->tag[i].value_str); + break; + default: + assert(0); + } + } + + return dest; +} + +bool counter_history_check_if_need_to_update(const struct fieldstat_json_exporter *exporter) +{ + if (exporter->history == NULL) { + return false; // delta export disabled + } + + unsigned long cur_cell_version = fieldstat_get_cell_version(exporter->instance); + const char *cur_exporter_name = exporter->name ? exporter->name : DEFAULT_EXPORTER_NAME; + const struct fieldstat_tag_list *cur_global_tag_list = exporter->global_tag_list; + + if (exporter->history->cell_version != cur_cell_version) { + return true; + } + + if (strcmp(exporter->history->exporter_name, cur_exporter_name) != 0) { + return true; + } + + if (exporter->history->global_tag_list == NULL && cur_global_tag_list != NULL) { + return true; + } + if (exporter->history->global_tag_list == NULL && cur_global_tag_list == NULL) { + return false; + } + // global tag cant be deleted, so no need to check if cur_global_tag_list == NULL && exporter->history->global_tag_list != NULL + if (fieldstat_tag_list_cmp(cur_global_tag_list, exporter->history->global_tag_list) == false) { + return true; + } + + int *cube_ids = NULL; + int n_cube = 0; + fieldstat_get_cubes(exporter->instance, &cube_ids, &n_cube); + if (n_cube != exporter->history->n_cube) { + return true; + } + for (int i = 0; i < n_cube; i++) { + unsigned long cube_version = fieldstat_get_cube_version(exporter->instance, cube_ids[i]); + if (cube_version != exporter->history->cube_version[i]) { + return true; + } + } + free(cube_ids); + + return false; +} + +void counter_history_fill_version_info(struct counter_history *history, struct fieldstat_json_exporter *exporter) +{ + unsigned long cur_cell_version = fieldstat_get_cell_version(exporter->instance); + const char *cur_exporter_name = exporter->name ? exporter->name : DEFAULT_EXPORTER_NAME; + const struct fieldstat_tag_list *cur_global_tag_list = exporter->global_tag_list; + + free(history->exporter_name); + history->exporter_name = strdup(cur_exporter_name); + if (cur_global_tag_list != NULL) { + history->global_tag_list = my_copy_fs_tag_list(cur_global_tag_list); + } + + history->cell_version = cur_cell_version; + + int *cube_ids = NULL; + int n_cube = 0; + fieldstat_get_cubes(exporter->instance, &cube_ids, &n_cube); + history->n_cube = n_cube; + history->cube_version = malloc(sizeof(unsigned long) * n_cube); + for (int i = 0; i < n_cube; i++) { + history->cube_version[i] = fieldstat_get_cube_version(exporter->instance, cube_ids[i]); + } + free(cube_ids); +} + +void fieldstat_json_exporter_update_history(struct fieldstat_json_exporter *exporter) +{ + if (counter_history_check_if_need_to_update(exporter) == false) { + return; + } + + counter_history_free(exporter->history); + exporter->history = counter_history_new(); + + counter_history_fill_version_info(exporter->history, exporter); +} + +void fieldstat_json_exporter_write_delta(struct fieldstat_json_exporter *exporter, struct cellwise_rec_for_export *tag_field_pair_arr, size_t arr_len) +{ + // for every tag_field_pair, get the tag json string + for (int i = 0; i < arr_len; i++) { + const char *tag_json = json_writer_unwrap(tag_field_pair_arr[i].cjson_tags); + for (int j = 0; j < tag_field_pair_arr[i].n_metric; j++) { + const char *metric_name = tag_field_pair_arr[i].metric_name[j]; + long long cur_value = tag_field_pair_arr[i].metric_value[j]; + struct name_value_map *node = counter_history_find(exporter->history, tag_json, metric_name); + long long last_value = node == NULL ? 0 : node->value; + long long delta = cur_value - last_value; + + if (node == NULL) { // new added cell/cube/metric + counter_history_add(exporter->history, tag_json, metric_name, cur_value); + } else { // previous one + node->value = cur_value; + } + // cell will never delete. If delete cube / cell happen, the exporter history will be reset. + + size_t delta_metric_name_len = strlen(metric_name) + 6; // 6 for "_delta" + char *delta_metric_name = malloc(delta_metric_name_len + 1); + snprintf(delta_metric_name, delta_metric_name_len + 1, "%s_delta", metric_name); + delta_metric_name[delta_metric_name_len] = '\0'; + json_writer_longlong_field(tag_field_pair_arr[i].cjson_fields, delta_metric_name, delta); + free(delta_metric_name); + } + } +} /* -------------------------------------------------------------------------- */ /* iter */ @@ -318,7 +593,7 @@ void kv_pair_free_list(struct export_kv_pair *pairs, size_t len) } // return 1 if added, 0 if not added -int cjson_map_add(struct json_tag_field_pair *tag_field_pair, const struct cell_iter *iter) +int cjson_map_add(struct cellwise_rec_for_export *tag_field_pair, const struct cell_iter *iter) { struct json_writer *writer = NULL; @@ -353,7 +628,30 @@ int cjson_map_add(struct json_tag_field_pair *tag_field_pair, const struct cell_ return 1; } -struct json_tag_field_pair *read_tag_and_field(const struct fieldstat *instance, size_t *n_pair_out) +void cjson_map_record_metrics(struct cellwise_rec_for_export *tag_field_pair, const struct cell_iter *iter) +{ + tag_field_pair->metric_name = malloc(sizeof(char *) * (iter->max_metric_id + 1)); + tag_field_pair->metric_value = malloc(sizeof(long long) * (iter->max_metric_id + 1)); + + int n_counter_metric = 0; + for (int metric_id = 0; metric_id <= iter->max_metric_id; metric_id++) { + int cube_id = iter->cube_ids[iter->curr_cube_idx]; + if (fieldstat_get_metric_type(iter->instance, cube_id, metric_id) != METRIC_TYPE_COUNTER) { + continue; + } + if (fieldstat_counter_get(iter->instance, cube_id, metric_id, iter->cell_ids[iter->curr_cell_idx]) == -1) { // no value, happens when the two cells in one cube has different metrics + continue; + } + + tag_field_pair->metric_name[n_counter_metric] = strdup(fieldstat_get_metric_name(iter->instance, cube_id, metric_id)); + tag_field_pair->metric_value[n_counter_metric] = fieldstat_counter_get(iter->instance, cube_id, metric_id, iter->cell_ids[iter->curr_cell_idx]); + + n_counter_metric++; + } + tag_field_pair->n_metric = n_counter_metric; +} + +struct cellwise_rec_for_export *read_tag_and_field(const struct fieldstat *instance, size_t *n_pair_out, bool output_delta) { // clock_t prepare_start = clock(); int *cube_id = NULL; @@ -373,13 +671,17 @@ struct json_tag_field_pair *read_tag_and_field(const struct fieldstat *instance, } struct cell_iter *iter = cell_iter_new(instance); - struct json_tag_field_pair *tag_field_pair = calloc(n_cell_total, sizeof(struct json_tag_field_pair)); + struct cellwise_rec_for_export *tag_field_pair = calloc(n_cell_total, sizeof(struct cellwise_rec_for_export)); int i = 0; while (cell_iter_next(iter)) { // next_num++; if (cjson_map_add(&tag_field_pair[i], iter) == 0) { continue; } + + if (output_delta) { + cjson_map_record_metrics(&tag_field_pair[i], iter); + } i++; } @@ -389,7 +691,7 @@ struct json_tag_field_pair *read_tag_and_field(const struct fieldstat *instance, return tag_field_pair; } -void fieldstat_json_exporter_write_global_tags(const struct fieldstat_json_exporter *exporter, struct json_tag_field_pair *tag_field_pair_arr, size_t arr_len) +void fieldstat_json_exporter_write_global_tags(const struct fieldstat_json_exporter *exporter, struct cellwise_rec_for_export *tag_field_pair_arr, size_t arr_len) { if (exporter->global_tag_list == NULL) { return; @@ -457,7 +759,7 @@ int add_object_to_json_array(char **buf, int *buf_len, int start, const char *st "in_latency":<blob of histogram> "client_ip_sketch":<blob of hyperloglog> } - "timestamp":<timestamp long> + "timestamp_ms":<timestamp long> } */ /* @@ -465,10 +767,15 @@ int add_object_to_json_array(char **buf, int *buf_len, int start, const char *st */ void fieldstat_json_exporter_export_array(const struct fieldstat_json_exporter *exporter, const struct timeval *timestamp, char ***output, size_t *output_size) { + long long timestamp_ms = cal_ms_time(timestamp); + if (exporter->history != NULL) { + fieldstat_json_exporter_update_history((struct fieldstat_json_exporter *)exporter); + } + const struct fieldstat *instance = exporter->instance; size_t n_pair; - struct json_tag_field_pair *tag_field_pair = read_tag_and_field(instance, &n_pair); + struct cellwise_rec_for_export *tag_field_pair = read_tag_and_field(instance, &n_pair, exporter->history != NULL); if (tag_field_pair == NULL || n_pair == 0) { free(tag_field_pair); // tag_field_pair is not NULL when there are registered metrics but no valid cells *output = NULL; @@ -477,19 +784,27 @@ void fieldstat_json_exporter_export_array(const struct fieldstat_json_exporter * } fieldstat_json_exporter_write_global_tags(exporter, tag_field_pair, n_pair); + if (exporter->history != NULL) { // not null when fieldstat_json_exporter_enable_delta is called + fieldstat_json_exporter_write_delta((struct fieldstat_json_exporter *)exporter, tag_field_pair, n_pair); + } char **cjson_str_arr = (char **)malloc(sizeof(char *) * n_pair); for (int i = 0; i < n_pair; i++) { - struct json_tag_field_pair *current = &tag_field_pair[i]; + struct cellwise_rec_for_export *current = &tag_field_pair[i]; struct json_writer *root = json_writer_init(); json_writer_start_map(root); - const char *tmp_name = exporter->name ? exporter->name : "-"; + const char *tmp_name = exporter->name ? exporter->name : DEFAULT_EXPORTER_NAME; json_writer_str_field(root, "name", tmp_name, strlen(tmp_name)); json_writer_object_item(root, "tags", current->cjson_tags); json_writer_object_item(root, "fields", current->cjson_fields); - json_writer_longlong_field(root, "timestamp", cal_ms_time(timestamp)); + json_writer_longlong_field(root, "timestamp_ms", timestamp_ms); + if (exporter->history != NULL) { + json_writer_longlong_field(root, "timestamp_ms_delta", timestamp_ms - exporter->history->ts); + exporter->history->ts = timestamp_ms; + } + json_writer_end_map(root); char *cjson_str; @@ -501,7 +816,17 @@ void fieldstat_json_exporter_export_array(const struct fieldstat_json_exporter * *output = cjson_str_arr; *output_size = n_pair; - free(tag_field_pair); // cjson object will be freed with cjson root + if (exporter->history != NULL) { // read_tag_and_field does not allocate memory for metric_name and metric_value when exporter->history == NULL + for (int i = 0; i < n_pair; i++) { + for (int j = 0; j < tag_field_pair[i].n_metric; j++) { + free(tag_field_pair[i].metric_name[j]); + } + free(tag_field_pair[i].metric_name); + free(tag_field_pair[i].metric_value); + } + } + // json object is added in json_writer_object_item + free(tag_field_pair); } char *fieldstat_json_exporter_export(const struct fieldstat_json_exporter *exporter, const struct timeval *timestamp) @@ -536,6 +861,7 @@ struct fieldstat_json_exporter *fieldstat_json_exporter_new(const struct fieldst exporter->instance = instance; exporter->name = NULL; exporter->global_tag_list = NULL; + exporter->history = NULL; return exporter; } @@ -587,5 +913,19 @@ void fieldstat_json_exporter_free(struct fieldstat_json_exporter *exporter) if (exporter->global_tag_list != NULL) { fieldstat_tag_list_arr_free(exporter->global_tag_list, 1); } + + if (exporter->history != NULL) { + counter_history_free(exporter->history); + } + free(exporter); } + +void fieldstat_json_exporter_enable_delta(struct fieldstat_json_exporter *exporter) +{ + if (exporter->history != NULL) { + return; + } + exporter->history = counter_history_new(); + counter_history_fill_version_info(exporter->history, exporter); +}
\ No newline at end of file |
