summaryrefslogtreecommitdiff
path: root/src/exporter/cjson_exporter.c
diff options
context:
space:
mode:
authorchenzizhan <[email protected]>2023-09-15 18:14:26 +0800
committerchenzizhan <[email protected]>2023-10-08 17:03:28 +0800
commit9d452b417bbe330496347ad2c066d2541921ebd4 (patch)
treed523a9fc043c6810747f49296a8dbb961aaf2581 /src/exporter/cjson_exporter.c
parent230821b167fda5ed30333dc9f5ca8c752e468a16 (diff)
stash4.3.0-4.3.4(python api)
Signed-off-by: chenzizhan <[email protected]>
Diffstat (limited to 'src/exporter/cjson_exporter.c')
-rw-r--r--src/exporter/cjson_exporter.c366
1 files changed, 353 insertions, 13 deletions
diff --git a/src/exporter/cjson_exporter.c b/src/exporter/cjson_exporter.c
index 45b113c..5ab1836 100644
--- a/src/exporter/cjson_exporter.c
+++ b/src/exporter/cjson_exporter.c
@@ -8,6 +8,7 @@
#include <stddef.h>
#include "utils/very_fast_json_writer.h"
+#include "uthash.h"
#include "fieldstat.h"
#include "fieldstat_exporter.h"
@@ -28,7 +29,7 @@
{"in_pkts":123},
{"sessions":<hll blob base64-encoded string>}
]
- "timestamp":123456789
+ "timestamp_ms":123456789
}.
{
"name":"exporter_name",
@@ -40,11 +41,13 @@
{"in_bytes":1},
{"in_pkts":2},
]
- "timestamp":123456789
+ "timestamp_ms":123456789
}
]
*/
+#define DEFAULT_EXPORTER_NAME "-"
+struct counter_history;
struct export_kv_pair {
char *key;
enum fs_tag_type type;
@@ -59,6 +62,7 @@ struct fieldstat_json_exporter {
const struct fieldstat *instance;
char *name;
struct fieldstat_tag_list *global_tag_list;
+ struct counter_history *history;
};
struct cell_iter {
@@ -77,9 +81,37 @@ struct cell_iter {
const struct fieldstat *instance;
};
-struct json_tag_field_pair {
+struct cellwise_rec_for_export {
struct json_writer *cjson_tags;
struct json_writer *cjson_fields;
+
+ char **metric_name; // used for delta export
+ long long *metric_value; // used for delta export
+ size_t n_metric;
+};
+
+struct name_value_map {
+ char *name;
+ long long value;
+ UT_hash_handle hh;
+};
+
+struct tag_metric_map {
+ char *key;
+ struct name_value_map *value;
+ UT_hash_handle hh;
+};
+
+struct counter_history {
+ struct tag_metric_map *rec;
+ long long ts;
+
+ unsigned long cell_version;
+ unsigned long *cube_version;
+ size_t n_cube;
+
+ char *exporter_name;
+ struct fieldstat_tag_list *global_tag_list;
};
void kv_pair_free(struct export_kv_pair *pair) {
@@ -116,6 +148,249 @@ long long cal_ms_time(const struct timeval *ts)
return time_stamp_in_ms;
}
+/* -------------------------------------------------------------------------- */
+/* history rec */
+/* -------------------------------------------------------------------------- */
+struct counter_history *counter_history_new()
+{
+ struct counter_history *history = calloc(1, sizeof(struct counter_history));
+ history->exporter_name = strdup(DEFAULT_EXPORTER_NAME);
+ return history;
+}
+
+void counter_history_add(struct counter_history *history, const char *json, const char *name, long long value)
+{
+ struct name_value_map *name_value_node = malloc(sizeof(struct name_value_map));
+ name_value_node->name = strdup(name);
+ name_value_node->value = value;
+
+ struct tag_metric_map *tag_node = malloc(sizeof(struct tag_metric_map));
+ tag_node->key = strdup(json);
+ tag_node->value = NULL;
+
+ HASH_ADD_KEYPTR(hh, history->rec, tag_node->key, strlen(tag_node->key), tag_node);
+ HASH_ADD_KEYPTR(hh, tag_node->value, name_value_node->name, strlen(name_value_node->name), name_value_node);
+}
+
+struct name_value_map *counter_history_find(struct counter_history *history, const char *json, const char *name)
+{
+ struct tag_metric_map *tag_node;
+ HASH_FIND_STR(history->rec, json, tag_node);
+ if (tag_node == NULL) {
+ return NULL;
+ }
+
+ struct name_value_map *name_value_node;
+ HASH_FIND_STR(tag_node->value, name, name_value_node);
+ return name_value_node;
+}
+
+void counter_history_free(struct counter_history *history)
+{
+ if (history == NULL) {
+ return;
+ }
+
+ struct tag_metric_map *tag_node, *tmp;
+ HASH_ITER(hh, history->rec, tag_node, tmp) {
+ struct name_value_map *name_value_node, *tmp2;
+ HASH_ITER(hh, tag_node->value, name_value_node, tmp2) {
+ HASH_DEL(tag_node->value, name_value_node);
+ free(name_value_node->name);
+ free(name_value_node);
+ }
+ HASH_DEL(history->rec, tag_node);
+ free(tag_node->key);
+ free(tag_node);
+ }
+
+ free(history->cube_version);
+
+ free(history->exporter_name);
+
+ if (history->global_tag_list != NULL) {
+ fieldstat_tag_list_arr_free(history->global_tag_list, 1);
+ }
+
+ free(history);
+}
+
+bool fieldstat_tag_list_cmp(const struct fieldstat_tag_list *a, const struct fieldstat_tag_list *b)
+{
+ if (a->n_tag != b->n_tag) {
+ return false;
+ }
+
+ for (int i = 0; i < a->n_tag; i++) {
+ if (strcmp(a->tag[i].key, b->tag[i].key) != 0) {
+ return false;
+ }
+ if (a->tag[i].type != b->tag[i].type) {
+ return false;
+ }
+ switch (a->tag[i].type)
+ {
+ case TAG_INTEGER:
+ if (a->tag[i].value_longlong != b->tag[i].value_longlong) {
+ return false;
+ }
+ break;
+ case TAG_DOUBLE:
+ if (a->tag[i].value_double != b->tag[i].value_double) {
+ return false;
+ }
+ break;
+ case TAG_CSTRING:
+ if (strcmp(a->tag[i].value_str, b->tag[i].value_str) != 0) {
+ return false;
+ }
+ break;
+ default:
+ assert(0);
+ }
+ }
+
+ return true;
+}
+
+struct fieldstat_tag_list *my_copy_fs_tag_list(const struct fieldstat_tag_list *src)
+{
+ struct fieldstat_tag_list *dest = malloc(sizeof(struct fieldstat_tag_list));
+ dest->n_tag = src->n_tag;
+ dest->tag = malloc(sizeof(struct fieldstat_tag) * dest->n_tag);
+ for (int i = 0; i < dest->n_tag; i++) {
+ dest->tag[i].key = strdup(src->tag[i].key);
+ dest->tag[i].type = src->tag[i].type;
+ switch (src->tag[i].type)
+ {
+ case TAG_INTEGER:
+ dest->tag[i].value_longlong = src->tag[i].value_longlong;
+ break;
+ case TAG_DOUBLE:
+ dest->tag[i].value_double = src->tag[i].value_double;
+ break;
+ case TAG_CSTRING:
+ dest->tag[i].value_str = strdup(src->tag[i].value_str);
+ break;
+ default:
+ assert(0);
+ }
+ }
+
+ return dest;
+}
+
+bool counter_history_check_if_need_to_update(const struct fieldstat_json_exporter *exporter)
+{
+ if (exporter->history == NULL) {
+ return false; // delta export disabled
+ }
+
+ unsigned long cur_cell_version = fieldstat_get_cell_version(exporter->instance);
+ const char *cur_exporter_name = exporter->name ? exporter->name : DEFAULT_EXPORTER_NAME;
+ const struct fieldstat_tag_list *cur_global_tag_list = exporter->global_tag_list;
+
+ if (exporter->history->cell_version != cur_cell_version) {
+ return true;
+ }
+
+ if (strcmp(exporter->history->exporter_name, cur_exporter_name) != 0) {
+ return true;
+ }
+
+ if (exporter->history->global_tag_list == NULL && cur_global_tag_list != NULL) {
+ return true;
+ }
+ if (exporter->history->global_tag_list == NULL && cur_global_tag_list == NULL) {
+ return false;
+ }
+ // global tag cant be deleted, so no need to check if cur_global_tag_list == NULL && exporter->history->global_tag_list != NULL
+ if (fieldstat_tag_list_cmp(cur_global_tag_list, exporter->history->global_tag_list) == false) {
+ return true;
+ }
+
+ int *cube_ids = NULL;
+ int n_cube = 0;
+ fieldstat_get_cubes(exporter->instance, &cube_ids, &n_cube);
+ if (n_cube != exporter->history->n_cube) {
+ return true;
+ }
+ for (int i = 0; i < n_cube; i++) {
+ unsigned long cube_version = fieldstat_get_cube_version(exporter->instance, cube_ids[i]);
+ if (cube_version != exporter->history->cube_version[i]) {
+ return true;
+ }
+ }
+ free(cube_ids);
+
+ return false;
+}
+
+void counter_history_fill_version_info(struct counter_history *history, struct fieldstat_json_exporter *exporter)
+{
+ unsigned long cur_cell_version = fieldstat_get_cell_version(exporter->instance);
+ const char *cur_exporter_name = exporter->name ? exporter->name : DEFAULT_EXPORTER_NAME;
+ const struct fieldstat_tag_list *cur_global_tag_list = exporter->global_tag_list;
+
+ free(history->exporter_name);
+ history->exporter_name = strdup(cur_exporter_name);
+ if (cur_global_tag_list != NULL) {
+ history->global_tag_list = my_copy_fs_tag_list(cur_global_tag_list);
+ }
+
+ history->cell_version = cur_cell_version;
+
+ int *cube_ids = NULL;
+ int n_cube = 0;
+ fieldstat_get_cubes(exporter->instance, &cube_ids, &n_cube);
+ history->n_cube = n_cube;
+ history->cube_version = malloc(sizeof(unsigned long) * n_cube);
+ for (int i = 0; i < n_cube; i++) {
+ history->cube_version[i] = fieldstat_get_cube_version(exporter->instance, cube_ids[i]);
+ }
+ free(cube_ids);
+}
+
+void fieldstat_json_exporter_update_history(struct fieldstat_json_exporter *exporter)
+{
+ if (counter_history_check_if_need_to_update(exporter) == false) {
+ return;
+ }
+
+ counter_history_free(exporter->history);
+ exporter->history = counter_history_new();
+
+ counter_history_fill_version_info(exporter->history, exporter);
+}
+
+void fieldstat_json_exporter_write_delta(struct fieldstat_json_exporter *exporter, struct cellwise_rec_for_export *tag_field_pair_arr, size_t arr_len)
+{
+ // for every tag_field_pair, get the tag json string
+ for (int i = 0; i < arr_len; i++) {
+ const char *tag_json = json_writer_unwrap(tag_field_pair_arr[i].cjson_tags);
+ for (int j = 0; j < tag_field_pair_arr[i].n_metric; j++) {
+ const char *metric_name = tag_field_pair_arr[i].metric_name[j];
+ long long cur_value = tag_field_pair_arr[i].metric_value[j];
+ struct name_value_map *node = counter_history_find(exporter->history, tag_json, metric_name);
+ long long last_value = node == NULL ? 0 : node->value;
+ long long delta = cur_value - last_value;
+
+ if (node == NULL) { // new added cell/cube/metric
+ counter_history_add(exporter->history, tag_json, metric_name, cur_value);
+ } else { // previous one
+ node->value = cur_value;
+ }
+ // cell will never delete. If delete cube / cell happen, the exporter history will be reset.
+
+ size_t delta_metric_name_len = strlen(metric_name) + 6; // 6 for "_delta"
+ char *delta_metric_name = malloc(delta_metric_name_len + 1);
+ snprintf(delta_metric_name, delta_metric_name_len + 1, "%s_delta", metric_name);
+ delta_metric_name[delta_metric_name_len] = '\0';
+ json_writer_longlong_field(tag_field_pair_arr[i].cjson_fields, delta_metric_name, delta);
+ free(delta_metric_name);
+ }
+ }
+}
/* -------------------------------------------------------------------------- */
/* iter */
@@ -318,7 +593,7 @@ void kv_pair_free_list(struct export_kv_pair *pairs, size_t len)
}
// return 1 if added, 0 if not added
-int cjson_map_add(struct json_tag_field_pair *tag_field_pair, const struct cell_iter *iter)
+int cjson_map_add(struct cellwise_rec_for_export *tag_field_pair, const struct cell_iter *iter)
{
struct json_writer *writer = NULL;
@@ -353,7 +628,30 @@ int cjson_map_add(struct json_tag_field_pair *tag_field_pair, const struct cell_
return 1;
}
-struct json_tag_field_pair *read_tag_and_field(const struct fieldstat *instance, size_t *n_pair_out)
+void cjson_map_record_metrics(struct cellwise_rec_for_export *tag_field_pair, const struct cell_iter *iter)
+{
+ tag_field_pair->metric_name = malloc(sizeof(char *) * (iter->max_metric_id + 1));
+ tag_field_pair->metric_value = malloc(sizeof(long long) * (iter->max_metric_id + 1));
+
+ int n_counter_metric = 0;
+ for (int metric_id = 0; metric_id <= iter->max_metric_id; metric_id++) {
+ int cube_id = iter->cube_ids[iter->curr_cube_idx];
+ if (fieldstat_get_metric_type(iter->instance, cube_id, metric_id) != METRIC_TYPE_COUNTER) {
+ continue;
+ }
+ if (fieldstat_counter_get(iter->instance, cube_id, metric_id, iter->cell_ids[iter->curr_cell_idx]) == -1) { // no value, happens when the two cells in one cube has different metrics
+ continue;
+ }
+
+ tag_field_pair->metric_name[n_counter_metric] = strdup(fieldstat_get_metric_name(iter->instance, cube_id, metric_id));
+ tag_field_pair->metric_value[n_counter_metric] = fieldstat_counter_get(iter->instance, cube_id, metric_id, iter->cell_ids[iter->curr_cell_idx]);
+
+ n_counter_metric++;
+ }
+ tag_field_pair->n_metric = n_counter_metric;
+}
+
+struct cellwise_rec_for_export *read_tag_and_field(const struct fieldstat *instance, size_t *n_pair_out, bool output_delta)
{
// clock_t prepare_start = clock();
int *cube_id = NULL;
@@ -373,13 +671,17 @@ struct json_tag_field_pair *read_tag_and_field(const struct fieldstat *instance,
}
struct cell_iter *iter = cell_iter_new(instance);
- struct json_tag_field_pair *tag_field_pair = calloc(n_cell_total, sizeof(struct json_tag_field_pair));
+ struct cellwise_rec_for_export *tag_field_pair = calloc(n_cell_total, sizeof(struct cellwise_rec_for_export));
int i = 0;
while (cell_iter_next(iter)) {
// next_num++;
if (cjson_map_add(&tag_field_pair[i], iter) == 0) {
continue;
}
+
+ if (output_delta) {
+ cjson_map_record_metrics(&tag_field_pair[i], iter);
+ }
i++;
}
@@ -389,7 +691,7 @@ struct json_tag_field_pair *read_tag_and_field(const struct fieldstat *instance,
return tag_field_pair;
}
-void fieldstat_json_exporter_write_global_tags(const struct fieldstat_json_exporter *exporter, struct json_tag_field_pair *tag_field_pair_arr, size_t arr_len)
+void fieldstat_json_exporter_write_global_tags(const struct fieldstat_json_exporter *exporter, struct cellwise_rec_for_export *tag_field_pair_arr, size_t arr_len)
{
if (exporter->global_tag_list == NULL) {
return;
@@ -457,7 +759,7 @@ int add_object_to_json_array(char **buf, int *buf_len, int start, const char *st
"in_latency":<blob of histogram>
"client_ip_sketch":<blob of hyperloglog>
}
- "timestamp":<timestamp long>
+ "timestamp_ms":<timestamp long>
}
*/
/*
@@ -465,10 +767,15 @@ int add_object_to_json_array(char **buf, int *buf_len, int start, const char *st
*/
void fieldstat_json_exporter_export_array(const struct fieldstat_json_exporter *exporter, const struct timeval *timestamp, char ***output, size_t *output_size)
{
+ long long timestamp_ms = cal_ms_time(timestamp);
+ if (exporter->history != NULL) {
+ fieldstat_json_exporter_update_history((struct fieldstat_json_exporter *)exporter);
+ }
+
const struct fieldstat *instance = exporter->instance;
size_t n_pair;
- struct json_tag_field_pair *tag_field_pair = read_tag_and_field(instance, &n_pair);
+ struct cellwise_rec_for_export *tag_field_pair = read_tag_and_field(instance, &n_pair, exporter->history != NULL);
if (tag_field_pair == NULL || n_pair == 0) {
free(tag_field_pair); // tag_field_pair is not NULL when there are registered metrics but no valid cells
*output = NULL;
@@ -477,19 +784,27 @@ void fieldstat_json_exporter_export_array(const struct fieldstat_json_exporter *
}
fieldstat_json_exporter_write_global_tags(exporter, tag_field_pair, n_pair);
+ if (exporter->history != NULL) { // not null when fieldstat_json_exporter_enable_delta is called
+ fieldstat_json_exporter_write_delta((struct fieldstat_json_exporter *)exporter, tag_field_pair, n_pair);
+ }
char **cjson_str_arr = (char **)malloc(sizeof(char *) * n_pair);
for (int i = 0; i < n_pair; i++) {
- struct json_tag_field_pair *current = &tag_field_pair[i];
+ struct cellwise_rec_for_export *current = &tag_field_pair[i];
struct json_writer *root = json_writer_init();
json_writer_start_map(root);
- const char *tmp_name = exporter->name ? exporter->name : "-";
+ const char *tmp_name = exporter->name ? exporter->name : DEFAULT_EXPORTER_NAME;
json_writer_str_field(root, "name", tmp_name, strlen(tmp_name));
json_writer_object_item(root, "tags", current->cjson_tags);
json_writer_object_item(root, "fields", current->cjson_fields);
- json_writer_longlong_field(root, "timestamp", cal_ms_time(timestamp));
+ json_writer_longlong_field(root, "timestamp_ms", timestamp_ms);
+ if (exporter->history != NULL) {
+ json_writer_longlong_field(root, "timestamp_ms_delta", timestamp_ms - exporter->history->ts);
+ exporter->history->ts = timestamp_ms;
+ }
+
json_writer_end_map(root);
char *cjson_str;
@@ -501,7 +816,17 @@ void fieldstat_json_exporter_export_array(const struct fieldstat_json_exporter *
*output = cjson_str_arr;
*output_size = n_pair;
- free(tag_field_pair); // cjson object will be freed with cjson root
+ if (exporter->history != NULL) { // read_tag_and_field does not allocate memory for metric_name and metric_value when exporter->history == NULL
+ for (int i = 0; i < n_pair; i++) {
+ for (int j = 0; j < tag_field_pair[i].n_metric; j++) {
+ free(tag_field_pair[i].metric_name[j]);
+ }
+ free(tag_field_pair[i].metric_name);
+ free(tag_field_pair[i].metric_value);
+ }
+ }
+ // json object is added in json_writer_object_item
+ free(tag_field_pair);
}
char *fieldstat_json_exporter_export(const struct fieldstat_json_exporter *exporter, const struct timeval *timestamp)
@@ -536,6 +861,7 @@ struct fieldstat_json_exporter *fieldstat_json_exporter_new(const struct fieldst
exporter->instance = instance;
exporter->name = NULL;
exporter->global_tag_list = NULL;
+ exporter->history = NULL;
return exporter;
}
@@ -587,5 +913,19 @@ void fieldstat_json_exporter_free(struct fieldstat_json_exporter *exporter)
if (exporter->global_tag_list != NULL) {
fieldstat_tag_list_arr_free(exporter->global_tag_list, 1);
}
+
+ if (exporter->history != NULL) {
+ counter_history_free(exporter->history);
+ }
+
free(exporter);
}
+
+void fieldstat_json_exporter_enable_delta(struct fieldstat_json_exporter *exporter)
+{
+ if (exporter->history != NULL) {
+ return;
+ }
+ exporter->history = counter_history_new();
+ counter_history_fill_version_info(exporter->history, exporter);
+} \ No newline at end of file