diff options
| author | chenzizhan <[email protected]> | 2023-11-03 15:18:20 +0800 |
|---|---|---|
| committer | chenzizhan <[email protected]> | 2023-11-03 15:18:20 +0800 |
| commit | 6c645be362e10eaa6e15e0b46569f43710e11494 (patch) | |
| tree | 999a9d418e32718b272eb38cf06eaca31a8d4c01 | |
| parent | 4ff17defbc7df64c905210a18a9a63dd26860c3f (diff) | |
export with delta and readme for fs easy
| -rw-r--r-- | fieldstat_easy.md | 119 | ||||
| -rw-r--r-- | fs_easy_example.json | 33 | ||||
| -rw-r--r-- | include/fieldstat/fieldstat_easy.h | 52 | ||||
| -rw-r--r-- | include/fieldstat/fieldstat_exporter.h | 2 | ||||
| -rw-r--r-- | src/exporter/cjson_exporter.c | 296 | ||||
| -rw-r--r-- | src/fieldstat_easy.c | 58 | ||||
| -rw-r--r-- | src/utils/very_fast_json_writer.c | 5 | ||||
| -rw-r--r-- | test/test_easy_fs.cpp | 293 | ||||
| -rw-r--r-- | test/test_exporter_json.cpp | 124 |
9 files changed, 729 insertions, 253 deletions
diff --git a/fieldstat_easy.md b/fieldstat_easy.md new file mode 100644 index 0000000..66b86e3 --- /dev/null +++ b/fieldstat_easy.md @@ -0,0 +1,119 @@ + +# fieldstat easy + +## Description +**Fieldstat easy** wraps the fieldstat to provide a simpler interface and multi-threaded support. For details, refer to [fieldstat](readme.md). + +Fieldstat easy support multi-threaded writing and output. The output of fieldstat easy is in json format, which can be easily parsed by other python. + +All the metrics in fieldstat easy are dynamic, which means that the metrics has no volume limit, and users can add any tags to the metrics as needed and at any time. + +## Example +### coding +``` +#include "fieldstat_easy.h" + +const int N_THREADS = 3; +struct fieldstat_tag global_tags[1]; +struct fieldstat_tag tmptag; +tmptag.key = "app id"; +tmptag.type = TAG_INTEGER; +tmptag.value_longlong = 1; +global_tags[0] = tmptag; + +struct fieldstat_tag tag1 = {"direction", TAG_STRING, "incoming"}; +struct fieldstat_tag tag2 = {"direction", TAG_STRING, "outgoing"}; + +struct fieldstat_easy *fse = fieldstat_easy_new(N_THREADS, global_tags, 1); +int counter_id = fieldstat_easy_register_counter(fse, "incoming bytes"); +int hdr_id = fieldstat_easy_register_histogram(fse, "delay", 1, 10000, 1); +int output_interval_seconds = 1; +fieldstat_easy_enable_auto_output(fse, FILENAME, output_interval_seconds); // FILENAME is the same as the one you set in python. + +std::thread threads[N_THREADS]; +for (int thread_id = 0; thread_id < N_THREADS; thread_id++) { + threads[thread_id] = std::thread([fse, counter_id, hdr_id, thread_id]() { + while(1) { + fieldstat_easy_counter_incrby(fse, thread_id, counter_id, &tag1, 1, 100); + fieldstat_easy_counter_incrby(fse, thread_id, counter_id, &tag2, 1, 2); + fieldstat_easy_histogram_record(fse, thread_id, hdr_id, &tag1, 1, rand() % 10000); + } + }); +} +sleep(1000); +fieldstat_easy_free(fse); + +``` + +### raw output +``` +[ + { + "name": "-", + "tags": { + "direction": "incoming", + "app id": 1 + }, + "fields": { + "bytes": 585167, + "delay": "HISTEwAAAUYAAAAAAAAAAQAAAAAAAAABAAAAAAAAJxA/8AAAAAAAAJABbHh8ZnZccHp2gAFUaIABdmaCAXx0YoQBdGyMAXhsggF6eowBggGCAfAB6AGWAvQB9AHyAdYB4AHoAeoB3gHEAcwBrgKKAtgB5gPQA7QD5APIA8wD4gO+A+YDxAO2A8ADwgPAA8QD5gOEB5wHhAfQB9gHvAfwB6QHuAe6B/QGxAecB7QHqAfeB/INsA+ODtIO2g2QDo4OrA7WDtYN/A7qDcQN9A6WDpYP3h7OHqwdnhyeHYge7B2EHaQesB7QHaId+B2MHaYejh3gOvI6jDuyOqI7qjqMOpo7mDqeOvI6iDuiOrQ7lDqKOpZztHDuc+x2onbkdoR0hnSAdZR03naWdK5z6nX8dZh1vugBvOcBqusB2OsBkOgB+OkB3OoBxOoB9ukB9uwBqOwBnuoB8OkBxusBsugB7OoBptMD5tEDpNUDivcBAAAAAAAAAA==", + "bytes_delta": 31689, + "delay_delta": "HISTEwAAAOgAAAAAAAAAAQAAAAAAAAABAAAAAAAAJxA/8AAAAAAAAAQEAQYKCgIEAggECgYECgQGBAYIDgoMAgQBCgQECAICEAIUDBIUEg4ECAwKDBQGFCAWFBYYFBIcIhogGhYiEhg6JDQ4NDw+GjouOC4oMC4ufGZidGhaYGBMXGJYYk5eXugBrAHYAdQB7AHeAcoB3AHAAfIBwAHYAdgBwgG+Ae4BzgOqA6QDsAOUA64DrAOMA4gD5gL4ArAD4AKYA44DtAOCBvgFkgaiBrIG1AasBswG0ga2BqAGpAaKBrYGpAamBvAM3AzWDI4NgA2mDKIMkg2uDIANiA3gDMQMlA2ODfIM2hikGf4XjA0AAAAAAAAA" + }, + "timestamp_ms": 5507527913, + "timestamp_ms_delta": 1002 + }, + { + "name": "-", + "tags": { + "direction": "outgoing", + "app id": 1 + }, + "fields": { + "bytes": 5857, + "delay": "HISTEwAAAUYAAAAAAAAAAQAAAAAAAAABAAAAAAAAJxA/8AAAAAAAAJABbHh8ZnZccHp2gAFUaIABdmaCAXx0YoQBdGyMAXhsggF6eowBggGCAfAB6AGWAvQB9AHyAdYB4AHoAeoB3gHEAcwBrgKKAtgB5gPQA7QD5APIA8wD4gO+A+YDxAO2A8ADwgPAA8QD5gOEB5wHhAfQB9gHvAfwB6QHuAe6B/QGxAecB7QHqAfeB/INsA+ODtIO2g2QDo4OrA7WDtYN/A7qDcQN9A6WDpYP3h7OHqwdnhyeHYge7B2EHaQesB7QHaId+B2MHaYejh3gOvI6jDuyOqI7qjqMOpo7mDqeOvI6iDuiOrQ7lDqKOpZztHDuc+x2onbkdoR0hnSAdZR03naWdK5z6nX8dZh1vugBvOcBqusB2OsBkOgB+OkB3OoBxOoB9ukB9uwBqOwBnuoB8OkBxusBsugB7OoBptMD5tEDpNUDivcBAAAAAAAAAA==", + "bytes_delta": 31, + "delay_delta": "HISTEwAAAOgAAAAAAAAAAQAAAAAAAAABAAAAAAAAJxA/8AAAAAAAAAQEAQYKCgIEAggECgYECgQGBAYIDgoMAgQBCgQECAICEAIUDBIUEg4ECAwKDBQGFCAWFBYYFBIcIhogGhYiEhg6JDQ4NDw+GjouOC4oMC4ufGZidGhaYGBMXGJYYk5eXugBrAHYAdQB7AHeAcoB3AHAAfIBwAHYAdgBwgG+Ae4BzgOqA6QDsAOUA64DrAOMA4gD5gL4ArAD4AKYA44DtAOCBvgFkgaiBrIG1AasBswG0ga2BqAGpAaKBrYGpAamBvAM3AzWDI4NgA2mDKIMkg2uDIANiA3gDMQMlA2ODfIM2hikGf4XjA0AAAAAAAAA" + }, + "timestamp_ms": 5507527913, + "timestamp_ms_delta": 1002 + }**** +] +``` + +Another way to output is to use the `fieldstat_easy_output` function, the output of which is a c-string in json format. The output format is *different* from the above. `fieldstat_easy_output` is stateless, and will only output accumulated data. + +Explaination of the output format: + - `name` Always `-`. Just omit it. + - `tags` The tags of the fields. It contains the global tags and the tags of the metrics. The tags of the metrics are the same as the arguments of `fieldstat_easy_counter_incrby` and `fieldstat_easy_histogram_record`. + - `fields` The fields of the metrics. The fields of the counter (`bytes` in example) are of integer type, and the fields of the histogram (`delay` in example) are of string type(in base64 format). Thoses with key ending with `_delta` are metrics recording the values of the current output interval. While the value of the fields is the value of all time. + +The output histogram is a base64 encoded string. You can decode it with the following command. + +``` +// C language +#include "hdr_histogram.h" +#include "metrics/histogram_encoder.h" + +char *buf = "HISTEwAAAUYAAAAAAAAAAQAAAAAAAAABAAAAAAAAJxA/8AAAAAAAAJABbHh8ZnZccHp2gAFUaIABdmaCAXx0YoQBdGyMAXhsggF6eowBggGCAfAB6AGWAvQB9AHyAdYB4AHoAeoB3gHEAcwBrgKKAtgB5gPQA7QD5APIA8wD4gO+A+YDxAO2A8ADwgPAA8QD5gOEB5wHhAfQB9gHvAfwB6QHuAe6B/QGxAecB7QHqAfeB/INsA+ODtIO2g2QDo4OrA7WDtYN/A7qDcQN9A6WDpYP3h7OHqwdnhyeHYge7B2EHaQesB7QHaId+B2MHaYejh3gOvI6jDuyOqI7qjqMOpo7mDqeOvI6iDuiOrQ7lDqKOpZztHDuc+x2onbkdoR0hnSAdZR03naWdK5z6nX8dZh1vugBvOcBqusB2OsBkOgB+OkB3OoBxOoB9ukB9uwBqOwBnuoB8OkBxusBsugB7OoBptMD5tEDpNUDivcBAAAAAAAAAA==" + +struct hdr_histogram *hdr = histogram_decode_from_b64(buf, strlen(buf)); +(void)hdr_percentiles_print(hdr, stdout, 5, 1.0, CLASSIC); +hdr_close(hdr); +``` + +``` +# python language +from fieldstat_exporter import FieldstatAPI + +c_hist = FieldstatAPI.libfieldstat.fieldstat_histogram_base64_decode(value.encode('utf-8')) +sum_value = FieldstatAPI.libfieldstat.fieldstat_histogram_value_sum(c_hist) +FieldstatAPI.libfieldstat.fieldstat_histogram_free(c_hist) # need to free memory, because it's a C binding +``` + +### prometheus + + +### output to file + + diff --git a/fs_easy_example.json b/fs_easy_example.json new file mode 100644 index 0000000..05cbad2 --- /dev/null +++ b/fs_easy_example.json @@ -0,0 +1,33 @@ +[ + { + "name": "-", + "tags": { + "direction": "incoming", + "app id": 1 + }, + "fields": { + "bytes": 585167, + "delay": "HISTEwAAAUYAAAAAAAAAAQAAAAAAAAABAAAAAAAAJxA/8AAAAAAAAJABbHh8ZnZccHp2gAFUaIABdmaCAXx0YoQBdGyMAXhsggF6eowBggGCAfAB6AGWAvQB9AHyAdYB4AHoAeoB3gHEAcwBrgKKAtgB5gPQA7QD5APIA8wD4gO+A+YDxAO2A8ADwgPAA8QD5gOEB5wHhAfQB9gHvAfwB6QHuAe6B/QGxAecB7QHqAfeB/INsA+ODtIO2g2QDo4OrA7WDtYN/A7qDcQN9A6WDpYP3h7OHqwdnhyeHYge7B2EHaQesB7QHaId+B2MHaYejh3gOvI6jDuyOqI7qjqMOpo7mDqeOvI6iDuiOrQ7lDqKOpZztHDuc+x2onbkdoR0hnSAdZR03naWdK5z6nX8dZh1vugBvOcBqusB2OsBkOgB+OkB3OoBxOoB9ukB9uwBqOwBnuoB8OkBxusBsugB7OoBptMD5tEDpNUDivcBAAAAAAAAAA==", + "bytes_delta": 31689, + "delay_delta": "HISTEwAAAOgAAAAAAAAAAQAAAAAAAAABAAAAAAAAJxA/8AAAAAAAAAQEAQYKCgIEAggECgYECgQGBAYIDgoMAgQBCgQECAICEAIUDBIUEg4ECAwKDBQGFCAWFBYYFBIcIhogGhYiEhg6JDQ4NDw+GjouOC4oMC4ufGZidGhaYGBMXGJYYk5eXugBrAHYAdQB7AHeAcoB3AHAAfIBwAHYAdgBwgG+Ae4BzgOqA6QDsAOUA64DrAOMA4gD5gL4ArAD4AKYA44DtAOCBvgFkgaiBrIG1AasBswG0ga2BqAGpAaKBrYGpAamBvAM3AzWDI4NgA2mDKIMkg2uDIANiA3gDMQMlA2ODfIM2hikGf4XjA0AAAAAAAAA" + }, + "timestamp_ms": 5507527913, + "timestamp_ms_delta": 1002 + }, + { + "name": "-", + "tags": { + "direction": "outgoing", + "app id": 1 + }, + "fields": { + "bytes": 5857, + "delay": "HISTEwAAAUYAAAAAAAAAAQAAAAAAAAABAAAAAAAAJxA/8AAAAAAAAJABbHh8ZnZccHp2gAFUaIABdmaCAXx0YoQBdGyMAXhsggF6eowBggGCAfAB6AGWAvQB9AHyAdYB4AHoAeoB3gHEAcwBrgKKAtgB5gPQA7QD5APIA8wD4gO+A+YDxAO2A8ADwgPAA8QD5gOEB5wHhAfQB9gHvAfwB6QHuAe6B/QGxAecB7QHqAfeB/INsA+ODtIO2g2QDo4OrA7WDtYN/A7qDcQN9A6WDpYP3h7OHqwdnhyeHYge7B2EHaQesB7QHaId+B2MHaYejh3gOvI6jDuyOqI7qjqMOpo7mDqeOvI6iDuiOrQ7lDqKOpZztHDuc+x2onbkdoR0hnSAdZR03naWdK5z6nX8dZh1vugBvOcBqusB2OsBkOgB+OkB3OoBxOoB9ukB9uwBqOwBnuoB8OkBxusBsugB7OoBptMD5tEDpNUDivcBAAAAAAAAAA==", + "bytes_delta": 31, + "delay_delta": "HISTEwAAAOgAAAAAAAAAAQAAAAAAAAABAAAAAAAAJxA/8AAAAAAAAAQEAQYKCgIEAggECgYECgQGBAYIDgoMAgQBCgQECAICEAIUDBIUEg4ECAwKDBQGFCAWFBYYFBIcIhogGhYiEhg6JDQ4NDw+GjouOC4oMC4ufGZidGhaYGBMXGJYYk5eXugBrAHYAdQB7AHeAcoB3AHAAfIBwAHYAdgBwgG+Ae4BzgOqA6QDsAOUA64DrAOMA4gD5gL4ArAD4AKYA44DtAOCBvgFkgaiBrIG1AasBswG0ga2BqAGpAaKBrYGpAamBvAM3AzWDI4NgA2mDKIMkg2uDIANiA3gDMQMlA2ODfIM2hikGf4XjA0AAAAAAAAA" + }, + "timestamp_ms": 5507527913, + "timestamp_ms_delta": 1002 + } +] + diff --git a/include/fieldstat/fieldstat_easy.h b/include/fieldstat/fieldstat_easy.h index 9a356a7..451f226 100644 --- a/include/fieldstat/fieldstat_easy.h +++ b/include/fieldstat/fieldstat_easy.h @@ -9,16 +9,60 @@ extern "C" struct fieldstat_easy; +/* + * new a fieldstat_easy instance. + * @param max_thread_num: max thread number of this instance. + * @param tags: tags of this instance. Will appear in output json. +*/ struct fieldstat_easy *fieldstat_easy_new(int max_thread_num, const struct fieldstat_tag *tags, size_t n_tag); +/* + * free a fieldstat_easy instance. +*/ void fieldstat_easy_free(struct fieldstat_easy *fse); -// both data of accumulated and delta will be output. +/* + * enable auto output. both data of accumulated and delta will be output. + * @param output_path: output file path. Should be identical to the one in python config. + * @param interval_second: output interval in second. + * @return: 0 if success, -1 if failed to open file. -2 if the output is already enabled. +*/ int fieldstat_easy_enable_auto_output(struct fieldstat_easy *pthis, const char *output_path, int interval_second); +/* + * @brief add a metric to the cube of cube_id. One metric may be associated with different cells. + * @param metric_name: name of the metric. Cannot be NULL. Must be unique. + * @return metric id>=0 if success. If failed, return FS_ERR_NULL_HANDLER, FS_ERR_INVALID_KEY(when metric_name is not unique in this cube). + * For the error code, see fieldstat.h +*/ int fieldstat_easy_register_counter(struct fieldstat_easy *fse, const char *name); +/* + * @brief refer to fieldstat_easy_register_counter. + * @param lowest_trackable_value: the lowest value that can be tracked (distinguishable from 0) by the histogram. Must be >= 1. + * @param highest_trackable_value: the highest value to be tracked by the histogram. Must be >= 2 * lowest_trackable_value. + * @param significant_figures: the precision of the histogram. Must be in [1, 5]. + * @return metric id if success. If failed, return FS_ERR_NULL_HANDLER, FS_ERR_INVALID_KEY(when metric_name is not unique in this cube), or FS_ERR_INVALID_PARAM(if any of the 3 params are out of range) + * For the error code, see fieldstat.h +*/ int fieldstat_easy_register_histogram(struct fieldstat_easy *fse, const char *name, long long lowest_trackable_value, long long highest_trackable_value, int significant_figures); -// buff is a json string of accumulated data. -void fieldstat_easy_output(const struct fieldstat_easy *fse, char **buff, size_t *buff_len); - +/* + * Output the accumulated data to a json string. + * @param buff: output buffer. User should free it after use. +*/ +void fieldstat_easy_output(struct fieldstat_easy *fse, char **buff, size_t *buff_len); +/* + * @brief let the value of counter metric of cell_id increase by `increment`. + * @param thread_id: thread id. Must be in [0, max_thread_num). + * @param metric_id: metric id, previously returned by fieldstat_register_counter. + * @param increment: increment of the counter metric. Can be negative. + * @return FS_OK if success. FS_ERR_NULL_HANDLER, FS_ERR_INVALID_CUBE_ID, FS_ERR_INVALID_METRIC_ID if fail. + * return -1 also when the thread_id is out of range. +*/ int fieldstat_easy_counter_incrby(struct fieldstat_easy *fse, int thread_id, int metric_id, const struct fieldstat_tag *tags, size_t n_tag, long long increment); +/* + * @brief Add a value to the histogram metric of cell_id. Histogram will record the distribution of the values. + The value bigger than highest_trackable_value will be set to highest_trackable_value. The value less than lowest_trackable_value will be tried to record, and, if succeed, remains in the record as -inf(most of the time) or 0(if value == 0) + * @param value: value of the histogram metric. + * @return FS_OK if success. FS_ERR_NULL_HANDLER, FS_ERR_INVALID_CUBE_ID, FS_ERR_INVALID_METRIC_ID if fail. + * return -1 also when the thread_id is out of range. +*/ int fieldstat_easy_histogram_record(struct fieldstat_easy *fse, int thread_id, int metric_id, const struct fieldstat_tag *tags, size_t n_tag, long long value); #ifdef __cplusplus diff --git a/include/fieldstat/fieldstat_exporter.h b/include/fieldstat/fieldstat_exporter.h index c7fa80c..f7db70a 100644 --- a/include/fieldstat/fieldstat_exporter.h +++ b/include/fieldstat/fieldstat_exporter.h @@ -43,7 +43,7 @@ void fieldstat_json_exporter_enable_delta(struct fieldstat_json_exporter *export since this function is only used by fieldstat_easy, users are not expected to use this function directly. As a result, the configuration check is not implemented. return NULL when instance has no cell records. */ -char *fieldstat_json_exporter_export_with_delta(struct fieldstat_json_exporter *exporter, const struct fieldstat *instance, const struct fieldstat *instance_delta, const struct timeval *timestamp); +char *fieldstat_json_exporter_export_with_delta(const struct fieldstat_json_exporter *exporter, const struct fieldstat *instance, const struct fieldstat *instance_delta, const struct timeval *timestamp, const struct timeval *timestamp_delta); #ifdef __cplusplus } diff --git a/src/exporter/cjson_exporter.c b/src/exporter/cjson_exporter.c index 47709f8..2e031b8 100644 --- a/src/exporter/cjson_exporter.c +++ b/src/exporter/cjson_exporter.c @@ -1,3 +1,5 @@ +#if 1 + #include <string.h> #include <stdlib.h> #include <stdio.h> @@ -47,6 +49,7 @@ */ #define DEFAULT_EXPORTER_NAME "-" + struct counter_history; struct export_kv_pair { char *key; @@ -62,6 +65,7 @@ struct fieldstat_json_exporter { char *name; struct fieldstat_tag_list *global_tag_list; struct counter_history *history; + long long last_ts; }; struct cell_iter { @@ -80,12 +84,11 @@ struct cell_iter { const struct fieldstat *instance; }; +// todo: 这个结构体太突兀了,就简简单单带上全量metric id,不带cjson_fields。 +// 之后是要拼fields呢,还是只用counter数据写入到history里呢,是之后的事 struct cellwise_rec_for_export { struct json_writer *cjson_tags; - struct json_writer *cjson_fields; - - char **metric_name; // used for delta export - long long *metric_value; // used for delta export + struct export_kv_pair **metric_pairs; size_t n_metric; }; @@ -113,6 +116,17 @@ struct counter_history { struct fieldstat_tag_list *global_tag_list; }; +struct couple_export_table_item { + char *key; + int kv_pair_id; + UT_hash_handle hh; +}; + +struct couple_export_table { + struct couple_export_table_item *items; +}; + + void kv_pair_free(struct export_kv_pair *pair) { if (pair->type == TAG_CSTRING) { free((char *)pair->value_str); @@ -366,33 +380,107 @@ void fieldstat_json_exporter_update_history(struct fieldstat_json_exporter *expo counter_history_fill_version_info(exporter->history, exporter, instance); } -void fieldstat_json_exporter_write_delta(struct fieldstat_json_exporter *exporter, struct cellwise_rec_for_export *tag_field_pair_arr, size_t arr_len) +void write_delta_to_json(struct fieldstat_json_exporter *exporter, struct cellwise_rec_for_export *tag_field_pair, struct json_writer *field_json) { // for every tag_field_pair, get the tag json string - for (int i = 0; i < arr_len; i++) { - const char *tag_json = json_writer_unwrap(tag_field_pair_arr[i].cjson_tags); - for (int j = 0; j < tag_field_pair_arr[i].n_metric; j++) { - const char *metric_name = tag_field_pair_arr[i].metric_name[j]; - long long cur_value = tag_field_pair_arr[i].metric_value[j]; - struct name_value_map *node = counter_history_find(exporter->history, tag_json, metric_name); - long long last_value = node == NULL ? 0 : node->value; - long long delta = cur_value - last_value; - - if (node == NULL) { // new added cell/cube/metric - counter_history_add(exporter->history, tag_json, metric_name, cur_value); - } else { // previous one - node->value = cur_value; - } - // cell will never delete. If delete cube / cell happen, the exporter history will be reset. - - size_t delta_metric_name_len = strlen(metric_name) + 6; // 6 for "_delta" - char *delta_metric_name = malloc(delta_metric_name_len + 1); - snprintf(delta_metric_name, delta_metric_name_len + 1, "%s_delta", metric_name); - delta_metric_name[delta_metric_name_len] = '\0'; - json_writer_longlong_field(tag_field_pair_arr[i].cjson_fields, delta_metric_name, delta); - free(delta_metric_name); + const char *tag_json = json_writer_unwrap(tag_field_pair->cjson_tags); + for (int j = 0; j < tag_field_pair->n_metric; j++) { + if (tag_field_pair->metric_pairs[j]->type != TAG_INTEGER) { // only counter type need to write delta + continue; + } + const char *metric_name = tag_field_pair->metric_pairs[j]->key; + assert(metric_name != NULL); // mute cppcheck + long long cur_value = tag_field_pair->metric_pairs[j]->value_longlong; + struct name_value_map *node = counter_history_find(exporter->history, tag_json, metric_name); + long long last_value = node == NULL ? 0 : node->value; + long long delta = cur_value - last_value; + + if (node == NULL) { // new added cell/cube/metric + counter_history_add(exporter->history, tag_json, metric_name, cur_value); + } else { // previous one + node->value = cur_value; + } + // cell will never delete. If delete cube / cell happen, the exporter history will be reset. + + size_t delta_metric_name_len = strlen(metric_name) + 6; // 6 for "_delta" + char *delta_metric_name = malloc(delta_metric_name_len + 1); + snprintf(delta_metric_name, delta_metric_name_len + 1, "%s_delta", metric_name); + delta_metric_name[delta_metric_name_len] = '\0'; + json_writer_longlong_field(field_json, delta_metric_name, delta); + free(delta_metric_name); + } +} + +/* -------------------------------------------------------------------------- */ +/* export couple */ +/* -------------------------------------------------------------------------- */ +struct couple_export_table *couple_export_table_new() +{ + struct couple_export_table *ret = calloc(1, sizeof(struct couple_export_table)); + return ret; +} + +void couple_export_table_add(struct couple_export_table *tbl, const char *json, int id) +{ + struct couple_export_table_item *node = malloc(sizeof(struct couple_export_table_item)); + node->key = strdup(json); + node->kv_pair_id = id; + + HASH_ADD_KEYPTR(hh, tbl->items, node->key, strlen(node->key), node); +} + +int couple_export_table_find(const struct couple_export_table *tbl, const char *json) +{ + struct couple_export_table_item *node; + HASH_FIND_STR(tbl->items, json, node); + if (node == NULL) { + return false; + } + return node->kv_pair_id; +} + +void couple_export_table_free(struct couple_export_table *tbl) +{ + if (tbl == NULL) { + return; + } + + struct couple_export_table_item *node, *tmp; + HASH_ITER(hh, tbl->items, node, tmp) { + HASH_DEL(tbl->items, node); + free(node->key); + free(node); + } + + free(tbl); +} + +void extend_metric_delta(struct cellwise_rec_for_export *acc, size_t n_acc, struct cellwise_rec_for_export *delta, size_t n_delta) +{ + struct couple_export_table *map = couple_export_table_new(); // todo: 这个map更新放到exporter里,不要每次都new + for (int i = 0; i < n_acc; i++) { + const char *tag_json = json_writer_unwrap(acc[i].cjson_tags); + couple_export_table_add(map, tag_json, i); + } + + for (int i = 0; i < n_delta; i++) { + const char *tag_json = json_writer_unwrap(delta[i].cjson_tags); + int id_acc = couple_export_table_find(map, tag_json); + assert(id_acc < n_acc); + + acc[id_acc].metric_pairs = realloc(acc[id_acc].metric_pairs, sizeof(struct export_kv_pair *) * (acc[id_acc].n_metric + delta[i].n_metric)); + struct export_kv_pair **dst_expair_arr = acc[id_acc].metric_pairs; + struct export_kv_pair **src_expair_arr = delta[i].metric_pairs; + for (int j = 0; j < delta[i].n_metric; j++) { + src_expair_arr[j]->key = realloc(src_expair_arr[j]->key, strlen(src_expair_arr[j]->key) + 6 + 1); // 6 for "_delta", 1 for '\0' + strcat(src_expair_arr[j]->key, "_delta"); + dst_expair_arr[acc[id_acc].n_metric + j] = src_expair_arr[j]; } + acc[id_acc].n_metric += delta[i].n_metric; + delta[i].n_metric = 0; // moved to acc, avoid free } + + couple_export_table_free(map); } /* -------------------------------------------------------------------------- */ @@ -602,24 +690,25 @@ void kv_pair_free_list(struct export_kv_pair *pairs, size_t len) // return 1 if added, 0 if not added int cjson_map_add(struct cellwise_rec_for_export *tag_field_pair, const struct cell_iter *iter) { - struct json_writer *writer = NULL; - + struct export_kv_pair **fields = NULL; + int n_nonempty_metrics = 0; + for (int i = 0; i < iter->n_metric; i++) { struct export_kv_pair *field = cell_query_with_iter(iter, iter->metric_ids[i]); if (field == NULL) { continue; } - if (writer == NULL) { - writer = json_writer_init(); + if (fields == NULL) { + fields = malloc(sizeof(struct export_kv_pair *) * iter->n_metric); } - kv_pair_write_to_json(field, writer); - kv_pair_free(field); + fields[n_nonempty_metrics++] = field; } - if (writer == NULL) { // all fields are null + if (fields == NULL) { // all fields are null return 0; } - tag_field_pair->cjson_fields = writer; + tag_field_pair->metric_pairs = fields; + tag_field_pair->n_metric = n_nonempty_metrics; struct export_kv_pair *tag_list; size_t tag_list_len; @@ -634,37 +723,12 @@ int cjson_map_add(struct cellwise_rec_for_export *tag_field_pair, const struct c return 1; } -void cjson_map_record_metrics(struct cellwise_rec_for_export *tag_field_pair, const struct cell_iter *iter) -{ - tag_field_pair->metric_name = malloc(sizeof(char *) * iter->n_metric); - tag_field_pair->metric_value = malloc(sizeof(long long) * iter->n_metric); - - int n_counter_metric = 0; - for (int i = 0; i < iter->n_metric; i++) { - int metric_id = iter->metric_ids[i]; - int cube_id = iter->cube_ids[iter->curr_cube_idx]; - if (fieldstat_get_metric_type(iter->instance, metric_id) != METRIC_TYPE_COUNTER) { - continue; - } - long long value; - int ret = fieldstat_counter_get(iter->instance, cube_id, metric_id, &iter->tag_list[iter->curr_cell_idx], &value); - if (ret < 0) { // no value, happens when the two cells in one cube has different metrics - continue; - } - - tag_field_pair->metric_name[n_counter_metric] = strdup(fieldstat_get_metric_name(iter->instance, metric_id)); - tag_field_pair->metric_value[n_counter_metric] = value; - - n_counter_metric++; - } - tag_field_pair->n_metric = n_counter_metric; -} - -struct cellwise_rec_for_export *read_tag_and_field(const struct fieldstat *instance, size_t *n_pair_out, bool output_delta) +struct cellwise_rec_for_export *read_tag_and_field(const struct fieldstat *instance, size_t *n_pair_out) { // clock_t prepare_start = clock(); int *cube_id = NULL; int cube_id_len = 0; + *n_pair_out = 0; fieldstat_get_cubes(instance, &cube_id, &cube_id_len); if (cube_id_len == 0) { return NULL; @@ -687,10 +751,6 @@ struct cellwise_rec_for_export *read_tag_and_field(const struct fieldstat *insta if (cjson_map_add(&tag_field_pair[i], iter) == 0) { continue; } - - if (output_delta) { - cjson_map_record_metrics(&tag_field_pair[i], iter); - } i++; } @@ -787,7 +847,7 @@ void fieldstat_json_exporter_export_array(const struct fieldstat_json_exporter * } size_t n_pair; - struct cellwise_rec_for_export *tag_field_pair = read_tag_and_field(instance, &n_pair, exporter->history != NULL); + struct cellwise_rec_for_export *tag_field_pair = read_tag_and_field(instance, &n_pair); if (tag_field_pair == NULL || n_pair == 0) { free(tag_field_pair); // tag_field_pair is not NULL when there are registered metrics but no valid cells *output = NULL; @@ -796,9 +856,6 @@ void fieldstat_json_exporter_export_array(const struct fieldstat_json_exporter * } fieldstat_json_exporter_write_global_tags(exporter, tag_field_pair, n_pair); - if (exporter->history != NULL) { // not null when fieldstat_json_exporter_enable_delta is called - fieldstat_json_exporter_write_delta((struct fieldstat_json_exporter *)exporter, tag_field_pair, n_pair); - } char **cjson_str_arr = (char **)malloc(sizeof(char *) * n_pair); @@ -806,11 +863,20 @@ void fieldstat_json_exporter_export_array(const struct fieldstat_json_exporter * struct cellwise_rec_for_export *current = &tag_field_pair[i]; struct json_writer *root = json_writer_init(); - json_writer_start_map(root); + struct json_writer *field_json = json_writer_init(); + for (int j = 0; j < tag_field_pair[i].n_metric; j++) { + kv_pair_write_to_json(tag_field_pair[i].metric_pairs[j], field_json); + } + if (exporter->history != NULL) { // not null when fieldstat_json_exporter_enable_delta is called + write_delta_to_json((struct fieldstat_json_exporter *)exporter, &tag_field_pair[i], field_json); + } + const char *tmp_name = exporter->name ? exporter->name : DEFAULT_EXPORTER_NAME; + + json_writer_start_map(root); json_writer_str_field(root, "name", tmp_name, strlen(tmp_name)); json_writer_object_item(root, "tags", current->cjson_tags); - json_writer_object_item(root, "fields", current->cjson_fields); + json_writer_object_item(root, "fields", field_json); json_writer_longlong_field(root, "timestamp_ms", timestamp_ms); if (exporter->history != NULL) { json_writer_longlong_field(root, "timestamp_ms_delta", timestamp_ms - exporter->history->ts); @@ -828,16 +894,14 @@ void fieldstat_json_exporter_export_array(const struct fieldstat_json_exporter * *output = cjson_str_arr; *output_size = n_pair; - if (exporter->history != NULL) { // read_tag_and_field does not allocate memory for metric_name and metric_value when exporter->history == NULL - for (int i = 0; i < n_pair; i++) { - for (int j = 0; j < tag_field_pair[i].n_metric; j++) { - free(tag_field_pair[i].metric_name[j]); - } - free(tag_field_pair[i].metric_name); - free(tag_field_pair[i].metric_value); + + for (int i = 0; i < n_pair; i++) { + for (int j = 0; j < tag_field_pair[i].n_metric; j++) { + kv_pair_free(tag_field_pair[i].metric_pairs[j]); } + free(tag_field_pair[i].metric_pairs); } - // json object is added in json_writer_object_item + free(tag_field_pair); } @@ -867,12 +931,80 @@ char *fieldstat_json_exporter_export(const struct fieldstat_json_exporter *expor return cjson_str_arr; } +char *fieldstat_json_exporter_export_with_delta(const struct fieldstat_json_exporter *exporter, const struct fieldstat *instance, const struct fieldstat *delta, + const struct timeval *timestamp, const struct timeval *timestamp_delta) +{ + size_t n_pair_acc; + struct cellwise_rec_for_export *expair_acc = read_tag_and_field(instance, &n_pair_acc); + if (expair_acc == NULL || n_pair_acc == 0) { + free(expair_acc); // tag_field_pair is not NULL when there are registered metrics but no valid cells + return NULL; + } + size_t n_pair_delta; + struct cellwise_rec_for_export *expair_delta = read_tag_and_field(delta, &n_pair_delta); + if (expair_delta != NULL && n_pair_delta != 0) { + extend_metric_delta(expair_acc, n_pair_acc, expair_delta, n_pair_delta); + } + + fieldstat_json_exporter_write_global_tags(exporter, expair_acc, n_pair_acc); + + struct json_writer *root = json_writer_init(); + json_writer_array_start(root); + for (int i = 0; i < n_pair_acc; i++) { + struct cellwise_rec_for_export *current = &expair_acc[i]; + + struct json_writer *field_json = json_writer_init(); + for (int j = 0; j < expair_acc[i].n_metric; j++) { + kv_pair_write_to_json(expair_acc[i].metric_pairs[j], field_json); + } + + const char *tmp_name = exporter->name ? exporter->name : DEFAULT_EXPORTER_NAME; + + json_writer_start_map(root); + json_writer_str_field(root, "name", tmp_name, strlen(tmp_name)); + json_writer_object_item(root, "tags", current->cjson_tags); + json_writer_object_item(root, "fields", field_json); + json_writer_longlong_field(root, "timestamp_ms", cal_ms_time(timestamp)); + json_writer_longlong_field(root, "timestamp_ms_delta", cal_ms_time(timestamp_delta)); + + json_writer_end_map(root); + } + json_writer_array_end(root); + + char *cjson_str; + size_t cjson_str_len; + json_writer_finish(root, &cjson_str, &cjson_str_len); + + for (int i = 0; i < n_pair_acc; i++) { + for (int j = 0; j < expair_acc[i].n_metric; j++) { + kv_pair_free(expair_acc[i].metric_pairs[j]); + } + free(expair_acc[i].metric_pairs); + } + free(expair_acc); + + for (int i = 0; i < n_pair_delta; i++) { + for (int j = 0; j < expair_delta[i].n_metric; j++) { + kv_pair_free(expair_delta[i].metric_pairs[j]); + } + free(expair_delta[i].metric_pairs); + char *dummy_s; + size_t dummy_len; + json_writer_finish(expair_delta[i].cjson_tags, &dummy_s, &dummy_len); // the tag json is not added to the root json, so we need to free it manually + free(dummy_s); + } + free(expair_delta); + + return cjson_str; +} + struct fieldstat_json_exporter *fieldstat_json_exporter_new() { struct fieldstat_json_exporter *exporter = (struct fieldstat_json_exporter *)malloc(sizeof(struct fieldstat_json_exporter)); exporter->name = NULL; exporter->global_tag_list = NULL; exporter->history = NULL; + exporter->last_ts = 0; return exporter; } @@ -938,4 +1070,6 @@ void fieldstat_json_exporter_enable_delta(struct fieldstat_json_exporter *export return; } exporter->history = counter_history_new(); -}
\ No newline at end of file +} + +#endif
\ No newline at end of file diff --git a/src/fieldstat_easy.c b/src/fieldstat_easy.c index 9ec3e7b..c76372b 100644 --- a/src/fieldstat_easy.c +++ b/src/fieldstat_easy.c @@ -10,10 +10,10 @@ #include "fieldstat.h" #include "fieldstat_exporter.h" - +#include "fieldstat_easy.h" struct fs_unit { - struct fieldstat *write_only; + struct fieldstat *active; struct fieldstat *read_only; pthread_spinlock_t lock; }; @@ -23,7 +23,6 @@ static volatile int g_output_thread_running = 0; void close_output_thread() { - // atomic (void)__sync_lock_test_and_set(&g_output_thread_running, 0); } @@ -38,20 +37,20 @@ struct fieldstat_easy struct fieldstat_json_exporter *exporter; FILE *output_fp; int output_interval_second; - pthread_spinlock_t outputting_lock; + pthread_spinlock_t outputting_lock; // lock the resource: fieldstat_easy::accumulate }; void fs_unit_switch_role(struct fs_unit *fsu) { fieldstat_reset(fsu->read_only); pthread_spin_lock(&fsu->lock); - struct fieldstat *tmp = fsu->write_only; - fsu->write_only = fsu->read_only; + struct fieldstat *tmp = fsu->active; + fsu->active = fsu->read_only; fsu->read_only = tmp; pthread_spin_unlock(&fsu->lock); } -char *output_work(struct fieldstat_easy *fs, const struct timeval *timestamp) +char *output_work(struct fieldstat_easy *fs, const struct timeval *timestamp, const struct timeval *timestamp_delta) { fieldstat_reset(fs->delta); @@ -59,17 +58,22 @@ char *output_work(struct fieldstat_easy *fs, const struct timeval *timestamp) fs_unit_switch_role(fs->fsu + i); fieldstat_merge(fs->delta, fs->fsu[i].read_only); } + pthread_spin_lock(&fs->outputting_lock); fieldstat_merge(fs->accumulate, fs->delta); - // char *ret = fieldstat_json_exporter_export_with_delta(fs->exporter, fs->accumulate, fs->delta, timestamp); - char *ret = fieldstat_json_exporter_export(fs->exporter, fs->accumulate, timestamp); + char *ret = fieldstat_json_exporter_export_with_delta(fs->exporter, fs->accumulate, fs->delta, timestamp, timestamp_delta); + pthread_spin_unlock(&fs->outputting_lock); return ret; } void *output_main(void *arg) // return void * for pthread_create check only { - long long last_run_time = 0; + struct timespec entry_time; + clock_gettime(CLOCK_MONOTONIC, &entry_time); + long long last_run_time = entry_time.tv_sec * 1000 + entry_time.tv_nsec / 1000000; + // long long last_run_time = 0; struct timeval timestamp; + struct timeval timestamp_delta; struct timespec this_output_time; struct fieldstat_easy *fs = (struct fieldstat_easy *)arg; long long output_interval = fs->output_interval_second * 1000; @@ -81,19 +85,17 @@ void *output_main(void *arg) // return void * for pthread_create check only usleep(50000); // 50ms continue; } - last_run_time = now; - printf("output_main: outputting\n"); timestamp.tv_sec = this_output_time.tv_sec; timestamp.tv_usec = this_output_time.tv_nsec / 1000; - pthread_spin_lock(&fs->outputting_lock); - char *ret = output_work(fs, ×tamp); - pthread_spin_unlock(&fs->outputting_lock); + timestamp_delta.tv_sec = timestamp.tv_sec - last_run_time / 1000; // divide 1000 to convert ms to sec + timestamp_delta.tv_usec = timestamp.tv_usec - (last_run_time % 1000) * 1000; // %1000 to get the ms part, then *1000 to convert ms to us + last_run_time = now; + + char *ret = output_work(fs, ×tamp, ×tamp_delta); if (ret == NULL) { ret = strdup("[]"); } - printf("output_main: outputting done, ret: %s\n", ret); - struct flock lock; lock.l_type = F_WRLCK; lock.l_start = 0; @@ -117,7 +119,7 @@ void *output_main(void *arg) // return void * for pthread_create check only free(ret); } - return NULL; + return NULL; // return void * for pthread_create check only } struct fieldstat_easy *fieldstat_easy_new(int max_thread_num, const struct fieldstat_tag *tags, size_t n_tag) { @@ -133,7 +135,7 @@ struct fieldstat_easy *fieldstat_easy_new(int max_thread_num, const struct field pthread_spin_init(&fse->outputting_lock, PTHREAD_PROCESS_PRIVATE); for (int i = 0; i < max_thread_num; i++) { - fse->fsu[i].write_only = fieldstat_fork(fse->delta); + fse->fsu[i].active = fieldstat_fork(fse->delta); fse->fsu[i].read_only = fieldstat_fork(fse->delta); pthread_spin_init(&fse->fsu[i].lock, PTHREAD_PROCESS_PRIVATE); } @@ -155,7 +157,7 @@ void fieldstat_easy_free(struct fieldstat_easy *fse) { for (int i = 0; i < fse->max_thread_num; i++) { pthread_spin_lock(&fse->fsu[i].lock); - fieldstat_free(fse->fsu[i].write_only); + fieldstat_free(fse->fsu[i].active); fieldstat_free(fse->fsu[i].read_only); pthread_spin_unlock(&fse->fsu[i].lock); pthread_spin_destroy(&fse->fsu[i].lock); @@ -189,7 +191,7 @@ int fieldstat_easy_register_counter(struct fieldstat_easy *fse, const char *name pthread_spin_lock(&fse->fsu[i].lock); } - int ret = fieldstat_register_counter(fse->fsu[0].write_only, name); // try to register + int ret = fieldstat_register_counter(fse->fsu[0].active, name); // try to register if (ret < 0) { for (int i = 0; i < fse->max_thread_num; i++) { pthread_spin_unlock(&fse->fsu[i].lock); @@ -198,7 +200,7 @@ int fieldstat_easy_register_counter(struct fieldstat_easy *fse, const char *name } fieldstat_register_counter(fse->fsu[0].read_only, name); for (int i = 1; i < fse->max_thread_num; i++) { - fieldstat_register_counter(fse->fsu[i].write_only, name); + fieldstat_register_counter(fse->fsu[i].active, name); fieldstat_register_counter(fse->fsu[i].read_only, name); } @@ -215,7 +217,7 @@ int fieldstat_easy_register_histogram(struct fieldstat_easy *fse, const char *na pthread_spin_lock(&fse->fsu[i].lock); } - int ret = fieldstat_register_hist(fse->fsu[0].write_only, name, lowest_trackable_value, highest_trackable_value, significant_figures); // try to register + int ret = fieldstat_register_hist(fse->fsu[0].active, name, lowest_trackable_value, highest_trackable_value, significant_figures); // try to register if (ret < 0) { for (int i = 0; i < fse->max_thread_num; i++) { pthread_spin_unlock(&fse->fsu[i].lock); @@ -224,7 +226,7 @@ int fieldstat_easy_register_histogram(struct fieldstat_easy *fse, const char *na } fieldstat_register_hist(fse->fsu[0].read_only, name, lowest_trackable_value, highest_trackable_value, significant_figures); for (int i = 1; i < fse->max_thread_num; i++) { - fieldstat_register_hist(fse->fsu[i].write_only, name, lowest_trackable_value, highest_trackable_value, significant_figures); + fieldstat_register_hist(fse->fsu[i].active, name, lowest_trackable_value, highest_trackable_value, significant_figures); fieldstat_register_hist(fse->fsu[i].read_only, name, lowest_trackable_value, highest_trackable_value, significant_figures); } @@ -244,10 +246,10 @@ void fieldstat_easy_output(struct fieldstat_easy *fse, char **buff, size_t *buff timestamp.tv_sec = this_output_time.tv_sec; timestamp.tv_usec = this_output_time.tv_nsec / 1000; struct fieldstat *dst = fieldstat_new(); - // collect all the data delta recorded since last passive output, if it happened. If fieldstat_easy_enable_auto_output is not called, its the data since the program started. + // collect all the data recorded since last passive output, if passive output happened. Otherwise, its the data since the program started. for (int i = 0; i < fse->max_thread_num; i++) { pthread_spin_lock(&fse->fsu[i].lock); - fieldstat_merge(dst, fse->fsu[i].write_only); + fieldstat_merge(dst, fse->fsu[i].active); pthread_spin_unlock(&fse->fsu[i].lock); } @@ -274,7 +276,7 @@ int fieldstat_easy_counter_incrby(struct fieldstat_easy *fse, int thread_id, int } pthread_spin_lock(&fse->fsu[thread_id].lock); - int ret = fieldstat_counter_incrby(fse->fsu[thread_id].write_only, 0, metric_id, tags, n_tag, increment); + int ret = fieldstat_counter_incrby(fse->fsu[thread_id].active, 0, metric_id, tags, n_tag, increment); pthread_spin_unlock(&fse->fsu[thread_id].lock); return ret; @@ -290,7 +292,7 @@ int fieldstat_easy_histogram_record(struct fieldstat_easy *fse, int thread_id, i } pthread_spin_lock(&fse->fsu[thread_id].lock); - int ret = fieldstat_hist_record(fse->fsu[thread_id].write_only, 0, metric_id, tags, n_tag, value); + int ret = fieldstat_hist_record(fse->fsu[thread_id].active, 0, metric_id, tags, n_tag, value); pthread_spin_unlock(&fse->fsu[thread_id].lock); return ret; diff --git a/src/utils/very_fast_json_writer.c b/src/utils/very_fast_json_writer.c index e512b78..6f05464 100644 --- a/src/utils/very_fast_json_writer.c +++ b/src/utils/very_fast_json_writer.c @@ -61,11 +61,14 @@ void json_writer_array_start(struct json_writer *writer) { void json_writer_array_end(struct json_writer *writer) { if (writer->buffer[writer->cursor - 1] == '[') { // empty array - json_writer_check_and_realloc(writer, 1); + json_writer_check_and_realloc(writer, 2); writer->buffer[writer->cursor++] = ']'; + writer->buffer[writer->cursor++] = ','; return; } writer->buffer[writer->cursor - 1] = ']'; // replace last comma with ] + json_writer_check_and_realloc(writer, 1); + writer->buffer[writer->cursor++] = ','; } void json_writer_str_item(struct json_writer *writer, const char *value, size_t str_len) { diff --git a/test/test_easy_fs.cpp b/test/test_easy_fs.cpp index 117963d..78e1374 100644 --- a/test/test_easy_fs.cpp +++ b/test/test_easy_fs.cpp @@ -1,6 +1,7 @@ #include <gtest/gtest.h> #include <stub/stub.h> #include <fstream> +#include <thread> #include "utils.hpp" #include "cjson/cJSON.h" @@ -9,152 +10,168 @@ #define FILENAME "./test_easy_fieldstat.json" -// TEST(test_easy_fieldstat, new_and_free) -// { -// struct fieldstat_easy *fse = fieldstat_easy_new(10, &TEST_TAG_STRING, 1); -// fieldstat_easy_free(fse); -// } - -// TEST(test_easy_fieldstat, output_to_buff) -// { -// struct fieldstat_easy *fse = fieldstat_easy_new(10, &TEST_TAG_STRING, 1); -// fieldstat_easy_register_counter(fse, "metric counter"); -// fieldstat_easy_counter_incrby(fse, 0, 0, &TEST_TAG_INT, 1, 1); -// // read file, should be empty -// char *buff = NULL; -// size_t buff_len = 0; -// fieldstat_easy_output(fse, &buff, &buff_len); -// cJSON *root = cJSON_Parse(buff); -// cJSON *cell = cJSON_GetArrayItem(root, 0); -// cJSON *metric = cJSON_GetObjectItem(cell, "fields"); -// long long value = cJSON_GetObjectItem(metric, "metric counter")->valueint; -// EXPECT_EQ(value, 1); -// cJSON *tags = cJSON_GetObjectItem(cell, "tags"); -// EXPECT_EQ(cJSON_GetObjectItem(tags, TEST_TAG_INT.key)->valueint, TEST_TAG_INT.value_longlong); -// EXPECT_STREQ(cJSON_GetObjectItem(tags, TEST_TAG_STRING.key)->valuestring, TEST_TAG_STRING.value_str); - -// cJSON_Delete(root); -// free(buff); -// fieldstat_easy_free(fse); -// } - -// cJSON *read_file() -// { -// std::ifstream ifs(FILENAME); -// if (!ifs.is_open()) { -// return NULL; -// } -// std::string content((std::istreambuf_iterator<char>(ifs)), (std::istreambuf_iterator<char>())); -// printf("content: %s\n", content.c_str()); -// cJSON *root = cJSON_Parse(content.c_str()); -// ifs.close(); -// return root; -// } - -// TEST(test_easy_fieldstat, output_to_file) -// { -// struct fieldstat_easy *fse = fieldstat_easy_new(10, NULL, 0); -// int counter_id = fieldstat_easy_register_counter(fse, "metric counter"); -// // fieldstat_easy_register_histogram(fse, "metric histogram", 1, 10000, 1); -// fieldstat_easy_enable_auto_output(fse, FILENAME, 1); -// sleep(2); // long enough to output -// // 1st interval: read file, should be empty -// printf("1st interval\n"); -// cJSON *root = read_file(); -// EXPECT_EQ(cJSON_GetArraySize(root), 0); // is empty and valid -// cJSON_Delete(root); - -// fieldstat_easy_counter_incrby(fse, 0, counter_id, &TEST_TAG_INT, 1, 1); -// fieldstat_easy_counter_incrby(fse, 1, counter_id, &TEST_TAG_INT, 1, 10); -// fieldstat_easy_counter_incrby(fse, 2, counter_id, &TEST_TAG_INT, 1, 100); -// sleep(2); - -// // 2nd interval: merge 3 thread's data, and output - -// printf("2nd interval\n"); -// root = read_file(); -// EXPECT_EQ(cJSON_GetArraySize(root), 1); - -// cJSON *tagged_by_int = cJSON_GetArrayItem(root, 0); -// cJSON *metric = cJSON_GetObjectItem(tagged_by_int, "fields"); -// EXPECT_TRUE(metric != NULL); - -// long long value = cJSON_GetObjectItem(metric, "metric counter")->valueint; -// EXPECT_EQ(value, 111); -// cJSON_Delete(root); - -// // 3nd interval: no new data, just output again -// sleep(1); -// printf("3rd interval\n"); -// root = read_file(); -// EXPECT_EQ(cJSON_GetArraySize(root), 1); -// tagged_by_int = cJSON_GetArrayItem(root, 0); -// metric = cJSON_GetObjectItem(tagged_by_int, "fields"); -// value = cJSON_GetObjectItem(metric, "metric counter")->valueint; -// EXPECT_EQ(value, 111); // should not change -// cJSON_Delete(root); - -// // 4th interval: new data, output again -// fieldstat_easy_counter_incrby(fse, 0, counter_id, &TEST_TAG_DOUBLE, 1, 10086); -// sleep(2); -// printf("4th interval\n"); -// root = read_file(); -// cJSON *tagged_by_double = cJSON_GetArrayItem(root, 1); -// metric = cJSON_GetObjectItem(tagged_by_double, "fields"); -// value = cJSON_GetObjectItem(metric, "metric counter")->valueint; -// EXPECT_EQ(value, 10086); -// cJSON_Delete(root); - -// fieldstat_easy_free(fse); -// remove(FILENAME); -// } - - -// extern "C" { -// extern char *output_work(struct fieldstat_easy *fs, struct timeval *timestamp); -// } - -// int g_output_count = 0; -// char *output_work_stub(struct fieldstat_easy *fs, struct timeval *timestamp) -// { -// g_output_count++; -// return NULL; -// } - -// TEST(test_easy_fieldstat, output_interval_ok) -// { -// Stub stub; -// stub.set(output_work, output_work_stub); - -// struct fieldstat_easy *fse = fieldstat_easy_new(10, NULL, 0); -// fieldstat_easy_enable_auto_output(fse, FILENAME, 1); - -// sleep(20); // output 20s/1s = 20 times -// fieldstat_easy_free(fse); - -// EXPECT_EQ(g_output_count, 20); -// } +TEST(test_easy_fieldstat, new_and_free) +{ + struct fieldstat_easy *fse = fieldstat_easy_new(10, &TEST_TAG_STRING, 1); + fieldstat_easy_free(fse); +} -TEST(test_easy_fieldstat, ensure_data_racing_of_two_output_and_of_incyby) +TEST(test_easy_fieldstat, output_to_buff) +{ + struct fieldstat_easy *fse = fieldstat_easy_new(10, &TEST_TAG_STRING, 1); + fieldstat_easy_register_counter(fse, "metric counter"); + fieldstat_easy_counter_incrby(fse, 0, 0, &TEST_TAG_INT, 1, 1); + // read file, should be empty + char *buff = NULL; + size_t buff_len = 0; + fieldstat_easy_output(fse, &buff, &buff_len); + cJSON *root = cJSON_Parse(buff); + cJSON *cell = cJSON_GetArrayItem(root, 0); + cJSON *metric = cJSON_GetObjectItem(cell, "fields"); + long long value = cJSON_GetObjectItem(metric, "metric counter")->valueint; + EXPECT_EQ(value, 1); + cJSON *tags = cJSON_GetObjectItem(cell, "tags"); + EXPECT_EQ(cJSON_GetObjectItem(tags, TEST_TAG_INT.key)->valueint, TEST_TAG_INT.value_longlong); + EXPECT_STREQ(cJSON_GetObjectItem(tags, TEST_TAG_STRING.key)->valuestring, TEST_TAG_STRING.value_str); + + cJSON_Delete(root); + free(buff); + fieldstat_easy_free(fse); +} + +cJSON *read_file() +{ + std::ifstream ifs(FILENAME); + if (!ifs.is_open()) { + return NULL; + } + std::string content((std::istreambuf_iterator<char>(ifs)), (std::istreambuf_iterator<char>())); + printf("content: %s\n", content.c_str()); + cJSON *root = cJSON_Parse(content.c_str()); + ifs.close(); + return root; +} + +TEST(test_easy_fieldstat, output_to_file) { struct fieldstat_easy *fse = fieldstat_easy_new(10, NULL, 0); int counter_id = fieldstat_easy_register_counter(fse, "metric counter"); - int hdr_id = fieldstat_easy_register_histogram(fse, "metric histogram", 1, 10000, 1); fieldstat_easy_enable_auto_output(fse, FILENAME, 1); + sleep(3); // long enough to output + // 1st interval: read file, should be empty + printf("1st interval\n"); + cJSON *root = read_file(); + EXPECT_EQ(cJSON_GetArraySize(root), 0); // is empty and valid + cJSON_Delete(root); + + fieldstat_easy_counter_incrby(fse, 0, counter_id, &TEST_TAG_INT, 1, 1); + fieldstat_easy_counter_incrby(fse, 1, counter_id, &TEST_TAG_INT, 1, 10); + fieldstat_easy_counter_incrby(fse, 2, counter_id, &TEST_TAG_INT, 1, 100); + sleep(2); + + // 2nd interval: merge 3 thread's data, and output + + printf("2nd interval\n"); + root = read_file(); + EXPECT_EQ(cJSON_GetArraySize(root), 1); + + cJSON *tagged_by_int = cJSON_GetArrayItem(root, 0); + cJSON *metric = cJSON_GetObjectItem(tagged_by_int, "fields"); + EXPECT_TRUE(metric != NULL); + + long long value = cJSON_GetObjectItem(metric, "metric counter")->valueint; + EXPECT_EQ(value, 111); + cJSON_Delete(root); + + // 3nd interval: no new data, just output again + sleep(1); + printf("3rd interval\n"); + root = read_file(); + EXPECT_EQ(cJSON_GetArraySize(root), 1); + tagged_by_int = cJSON_GetArrayItem(root, 0); + metric = cJSON_GetObjectItem(tagged_by_int, "fields"); + value = cJSON_GetObjectItem(metric, "metric counter")->valueint; + EXPECT_EQ(value, 111); // should not change + cJSON_Delete(root); + + // 4th interval: new data, output again + fieldstat_easy_counter_incrby(fse, 0, counter_id, &TEST_TAG_DOUBLE, 1, 10086); + sleep(2); + printf("4th interval\n"); + root = read_file(); + cJSON *tagged_by_double = cJSON_GetArrayItem(root, 1); + metric = cJSON_GetObjectItem(tagged_by_double, "fields"); + value = cJSON_GetObjectItem(metric, "metric counter")->valueint; + EXPECT_EQ(value, 10086); + cJSON_Delete(root); - char *out; - size_t out_len; - for (size_t i = 0; i < 1000000ULL; i++) { // loop million times to ensure no core dump - fieldstat_easy_output(fse, &out, &out_len); - fieldstat_easy_counter_incrby(fse, 0, counter_id, &TEST_TAG_INT, 1, 1); - fieldstat_easy_histogram_record(fse, 0, hdr_id, &TEST_TAG_INT, 1, rand() % 10000); - cJSON *root = cJSON_Parse(out); - EXPECT_TRUE(root != NULL); - cJSON_Delete(root); - free(out); - } + fieldstat_easy_free(fse); + remove(FILENAME); +} + +extern "C" { + extern char *output_work(struct fieldstat_easy *fs, struct timeval *timestamp); +} + +int g_output_count = 0; +char *output_work_stub(struct fieldstat_easy *fs, struct timeval *timestamp) +{ + g_output_count++; + return NULL; +} + +TEST(test_easy_fieldstat, output_interval_ok) +{ + Stub stub; + stub.set(output_work, output_work_stub); + + struct fieldstat_easy *fse = fieldstat_easy_new(10, NULL, 0); + fieldstat_easy_enable_auto_output(fse, FILENAME, 1); + + sleep(30); // output 30s/1s = 20 times + fieldstat_easy_free(fse); + + EXPECT_TRUE(g_output_count == 30 || g_output_count == 29); // 29 when the last output is not triggered +} + +TEST(test_easy_fieldstat, ensure_data_racing_of_two_output_and_of_incyby) +{ + const int N_THREADS = 3; + struct fieldstat_tag global_tags[1]; + struct fieldstat_tag tmptag; + tmptag.key = "app id"; + tmptag.type = TAG_INTEGER; + tmptag.value_longlong = 1; + global_tags[0] = tmptag; + + struct fieldstat_easy *fse = fieldstat_easy_new(N_THREADS, global_tags, 1); + int counter_id = fieldstat_easy_register_counter(fse, "incoming bytes"); + int hdr_id = fieldstat_easy_register_histogram(fse, "delay", 1, 10000, 1); + fieldstat_easy_enable_auto_output(fse, FILENAME, 1); + + std::thread threads[N_THREADS]; + for (int thread_id = 0; thread_id < N_THREADS; thread_id++) { + threads[thread_id] = std::thread([fse, counter_id, hdr_id, thread_id]() { + for (int i = 0; i < 1000000; i++) { // loop million times to ensure no core dump + fieldstat_easy_counter_incrby(fse, thread_id, counter_id, &TEST_TAG_INT, 1, 1); + fieldstat_easy_histogram_record(fse, thread_id, hdr_id, &TEST_TAG_INT, 1, rand() % 10000); + char *out; + size_t out_len; + fieldstat_easy_output(fse, &out, &out_len); + cJSON *root = cJSON_Parse(out); + EXPECT_TRUE(root != NULL); + cJSON_Delete(root); + free(out); + } + }); + } + for (int i = 0; i < N_THREADS; i++) { + threads[i].join(); + } fieldstat_easy_free(fse); + + remove(FILENAME); } int main(int argc, char **argv) diff --git a/test/test_exporter_json.cpp b/test/test_exporter_json.cpp index 44bbcdb..dc19177 100644 --- a/test/test_exporter_json.cpp +++ b/test/test_exporter_json.cpp @@ -809,6 +809,130 @@ TEST(export_test, enable_delta_and_reset_on_delete_cube) { test_reset_one_round(trigger); } +TEST(export_test, delta_with_two_instance_same_config) +{ + struct fieldstat *instance = fieldstat_new(); + fieldstat_create_cube(instance, NULL, 0, SAMPLING_MODE_COMPREHENSIVE, 0); + int id_counter = fieldstat_register_counter(instance, "counter"); + fieldstat_counter_incrby(instance, 0, id_counter, &TEST_TAG_INT, 1, 123); + int id_hist = fieldstat_register_hist(instance, "histogram", 1, 1000, 3); + fieldstat_hist_record(instance, 0, id_hist, &TEST_TAG_INT, 1, 5); + + struct fieldstat *acc = fieldstat_new(); + fieldstat_merge(acc, instance); + fieldstat_counter_incrby(acc, 0, id_counter, &TEST_TAG_INT, 1, 1000); + + // export test + struct fieldstat_json_exporter *fieldstat_json_exporter = fieldstat_json_exporter_new(); + char *json = fieldstat_json_exporter_export_with_delta(fieldstat_json_exporter, acc, instance, &TEST_TIMEVAL, &TEST_TIMEVAL); + cJSON *root_arr = cJSON_Parse(json); + free(json); + + cJSON *root = cJSON_GetArrayItem(root_arr, 0); + cJSON *metrics = cJSON_GetObjectItem(root, "fields"); + cJSON *counter = cJSON_GetObjectItem(metrics, "counter"); + EXPECT_EQ(counter->valueint, 1123); + cJSON *counter_delta = cJSON_GetObjectItem(metrics, "counter_delta"); + EXPECT_EQ(counter_delta->valueint, 123); + const cJSON *histogram = cJSON_GetObjectItem(metrics, "histogram"); + const cJSON *histogram_delta = cJSON_GetObjectItem(metrics, "histogram_delta"); + EXPECT_STREQ(histogram->valuestring, histogram_delta->valuestring); + cJSON *timestamp = cJSON_GetObjectItem(root, "timestamp_ms"); + EXPECT_EQ(timestamp->valueint, TEST_TIMEVAL_LONG); + cJSON *timestamp_delta = cJSON_GetObjectItem(root, "timestamp_ms_delta"); + EXPECT_EQ(timestamp_delta->valueint, TEST_TIMEVAL_LONG); + + cJSON_Delete(root_arr); + fieldstat_json_exporter_free(fieldstat_json_exporter); + fieldstat_free(instance); + fieldstat_free(acc); +} + +TEST(export_test, delta_with_two_instance_one_empty) +{ + struct fieldstat *instance = fieldstat_new(); + fieldstat_create_cube(instance, NULL, 0, SAMPLING_MODE_COMPREHENSIVE, 0); + int id_counter = fieldstat_register_counter(instance, "counter"); + fieldstat_counter_incrby(instance, 0, id_counter, &TEST_TAG_INT, 1, 123); + int id_hist = fieldstat_register_hist(instance, "histogram", 1, 1000, 3); + fieldstat_hist_record(instance, 0, id_hist, &TEST_TAG_INT, 1, 5); + + struct fieldstat *delta = fieldstat_fork(instance); + + // export test + struct fieldstat_json_exporter *fieldstat_json_exporter = fieldstat_json_exporter_new(); + char *json = fieldstat_json_exporter_export_with_delta(fieldstat_json_exporter, instance, delta, &TEST_TIMEVAL, &TEST_TIMEVAL); + printf("delta_with_two_instance_one_empty :\n%s\n", json); + cJSON *root_arr = cJSON_Parse(json); + free(json); + + cJSON *root = cJSON_GetArrayItem(root_arr, 0); + cJSON *metrics = cJSON_GetObjectItem(root, "fields"); + cJSON *counter = cJSON_GetObjectItem(metrics, "counter"); + EXPECT_EQ(counter->valueint, 123); + const cJSON *counter_delta = cJSON_GetObjectItem(metrics, "counter_delta"); + EXPECT_TRUE(counter_delta == NULL); + const cJSON *histogram = cJSON_GetObjectItem(metrics, "histogram"); + EXPECT_TRUE(histogram != NULL); + const cJSON *histogram_delta = cJSON_GetObjectItem(metrics, "histogram_delta"); + EXPECT_TRUE(histogram_delta == NULL); + cJSON *timestamp = cJSON_GetObjectItem(root, "timestamp_ms"); + EXPECT_EQ(timestamp->valueint, TEST_TIMEVAL_LONG); + cJSON *timestamp_delta = cJSON_GetObjectItem(root, "timestamp_ms_delta"); + EXPECT_EQ(timestamp_delta->valueint, TEST_TIMEVAL_LONG); + + cJSON_Delete(root_arr); + fieldstat_json_exporter_free(fieldstat_json_exporter); + fieldstat_free(instance); + fieldstat_free(delta); +} + +TEST(export_test, delta_with_two_instance_different_cell) +{ + struct fieldstat *instance = fieldstat_new(); + fieldstat_create_cube(instance, NULL, 0, SAMPLING_MODE_COMPREHENSIVE, 0); + int id_counter = fieldstat_register_counter(instance, "counter"); + fieldstat_counter_incrby(instance, 0, id_counter, &TEST_TAG_INT, 1, 123); + + struct fieldstat *delta = fieldstat_fork(instance); + fieldstat_counter_incrby(delta, 0, id_counter, &TEST_TAG_DOUBLE, 1, 1); + fieldstat_merge(instance, delta); + + // export test + struct fieldstat_json_exporter *fieldstat_json_exporter = fieldstat_json_exporter_new(); + char *json = fieldstat_json_exporter_export_with_delta(fieldstat_json_exporter, instance, delta, &TEST_TIMEVAL, &TEST_TIMEVAL); + printf("delta_with_two_instance_different_cell :\n%s\n", json); + cJSON *root_arr = cJSON_Parse(json); + free(json); + + cJSON *root = cJSON_GetArrayItem(root_arr, 0); + cJSON *metrics = cJSON_GetObjectItem(root, "fields"); + cJSON *counter = cJSON_GetObjectItem(metrics, "counter"); + EXPECT_EQ(counter->valueint, 123); + cJSON *counter_delta = cJSON_GetObjectItem(metrics, "counter_delta"); + EXPECT_TRUE(counter_delta == NULL); + cJSON *timestamp = cJSON_GetObjectItem(root, "timestamp_ms"); + EXPECT_EQ(timestamp->valueint, TEST_TIMEVAL_LONG); + cJSON *timestamp_delta = cJSON_GetObjectItem(root, "timestamp_ms_delta"); + EXPECT_EQ(timestamp_delta->valueint, TEST_TIMEVAL_LONG); + + root = cJSON_GetArrayItem(root_arr, 1); + metrics = cJSON_GetObjectItem(root, "fields"); + counter = cJSON_GetObjectItem(metrics, "counter"); + EXPECT_EQ(counter->valueint, 1); + counter_delta = cJSON_GetObjectItem(metrics, "counter_delta"); + EXPECT_EQ(counter_delta->valueint, 1); + timestamp = cJSON_GetObjectItem(root, "timestamp_ms"); + EXPECT_EQ(timestamp->valueint, TEST_TIMEVAL_LONG); + timestamp_delta = cJSON_GetObjectItem(root, "timestamp_ms_delta"); + EXPECT_EQ(timestamp_delta->valueint, TEST_TIMEVAL_LONG); + + cJSON_Delete(root_arr); + fieldstat_json_exporter_free(fieldstat_json_exporter); + fieldstat_free(instance); + fieldstat_free(delta); +} + extern "C" { extern int add_object_to_json_array_start(char *buf, int buf_len); |
