diff options
| author | chenzizhan <[email protected]> | 2023-09-15 18:14:26 +0800 |
|---|---|---|
| committer | chenzizhan <[email protected]> | 2023-10-08 17:03:28 +0800 |
| commit | 9d452b417bbe330496347ad2c066d2541921ebd4 (patch) | |
| tree | d523a9fc043c6810747f49296a8dbb961aaf2581 /src | |
| parent | 230821b167fda5ed30333dc9f5ca8c752e468a16 (diff) | |
stash4.3.0-4.3.4(python api)
Signed-off-by: chenzizhan <[email protected]>
Diffstat (limited to 'src')
| -rw-r--r-- | src/exporter/cjson_exporter.c | 366 | ||||
| -rw-r--r-- | src/fieldstat.c | 26 | ||||
| -rw-r--r-- | src/metrics/histogram_encoder.c | 88 | ||||
| -rw-r--r-- | src/metrics/histogram_encoder.h | 5 | ||||
| -rw-r--r-- | src/metrics/metric.c | 13 | ||||
| -rw-r--r-- | src/metrics/python_api.c | 140 | ||||
| -rw-r--r-- | src/utils/very_fast_json_writer.c | 9 | ||||
| -rw-r--r-- | src/utils/very_fast_json_writer.h | 1 |
8 files changed, 605 insertions, 43 deletions
diff --git a/src/exporter/cjson_exporter.c b/src/exporter/cjson_exporter.c index 45b113c..5ab1836 100644 --- a/src/exporter/cjson_exporter.c +++ b/src/exporter/cjson_exporter.c @@ -8,6 +8,7 @@ #include <stddef.h> #include "utils/very_fast_json_writer.h" +#include "uthash.h" #include "fieldstat.h" #include "fieldstat_exporter.h" @@ -28,7 +29,7 @@ {"in_pkts":123}, {"sessions":<hll blob base64-encoded string>} ] - "timestamp":123456789 + "timestamp_ms":123456789 }. { "name":"exporter_name", @@ -40,11 +41,13 @@ {"in_bytes":1}, {"in_pkts":2}, ] - "timestamp":123456789 + "timestamp_ms":123456789 } ] */ +#define DEFAULT_EXPORTER_NAME "-" +struct counter_history; struct export_kv_pair { char *key; enum fs_tag_type type; @@ -59,6 +62,7 @@ struct fieldstat_json_exporter { const struct fieldstat *instance; char *name; struct fieldstat_tag_list *global_tag_list; + struct counter_history *history; }; struct cell_iter { @@ -77,9 +81,37 @@ struct cell_iter { const struct fieldstat *instance; }; -struct json_tag_field_pair { +struct cellwise_rec_for_export { struct json_writer *cjson_tags; struct json_writer *cjson_fields; + + char **metric_name; // used for delta export + long long *metric_value; // used for delta export + size_t n_metric; +}; + +struct name_value_map { + char *name; + long long value; + UT_hash_handle hh; +}; + +struct tag_metric_map { + char *key; + struct name_value_map *value; + UT_hash_handle hh; +}; + +struct counter_history { + struct tag_metric_map *rec; + long long ts; + + unsigned long cell_version; + unsigned long *cube_version; + size_t n_cube; + + char *exporter_name; + struct fieldstat_tag_list *global_tag_list; }; void kv_pair_free(struct export_kv_pair *pair) { @@ -116,6 +148,249 @@ long long cal_ms_time(const struct timeval *ts) return time_stamp_in_ms; } +/* -------------------------------------------------------------------------- */ +/* history rec */ +/* -------------------------------------------------------------------------- */ +struct counter_history *counter_history_new() +{ + struct counter_history *history = calloc(1, sizeof(struct counter_history)); + history->exporter_name = strdup(DEFAULT_EXPORTER_NAME); + return history; +} + +void counter_history_add(struct counter_history *history, const char *json, const char *name, long long value) +{ + struct name_value_map *name_value_node = malloc(sizeof(struct name_value_map)); + name_value_node->name = strdup(name); + name_value_node->value = value; + + struct tag_metric_map *tag_node = malloc(sizeof(struct tag_metric_map)); + tag_node->key = strdup(json); + tag_node->value = NULL; + + HASH_ADD_KEYPTR(hh, history->rec, tag_node->key, strlen(tag_node->key), tag_node); + HASH_ADD_KEYPTR(hh, tag_node->value, name_value_node->name, strlen(name_value_node->name), name_value_node); +} + +struct name_value_map *counter_history_find(struct counter_history *history, const char *json, const char *name) +{ + struct tag_metric_map *tag_node; + HASH_FIND_STR(history->rec, json, tag_node); + if (tag_node == NULL) { + return NULL; + } + + struct name_value_map *name_value_node; + HASH_FIND_STR(tag_node->value, name, name_value_node); + return name_value_node; +} + +void counter_history_free(struct counter_history *history) +{ + if (history == NULL) { + return; + } + + struct tag_metric_map *tag_node, *tmp; + HASH_ITER(hh, history->rec, tag_node, tmp) { + struct name_value_map *name_value_node, *tmp2; + HASH_ITER(hh, tag_node->value, name_value_node, tmp2) { + HASH_DEL(tag_node->value, name_value_node); + free(name_value_node->name); + free(name_value_node); + } + HASH_DEL(history->rec, tag_node); + free(tag_node->key); + free(tag_node); + } + + free(history->cube_version); + + free(history->exporter_name); + + if (history->global_tag_list != NULL) { + fieldstat_tag_list_arr_free(history->global_tag_list, 1); + } + + free(history); +} + +bool fieldstat_tag_list_cmp(const struct fieldstat_tag_list *a, const struct fieldstat_tag_list *b) +{ + if (a->n_tag != b->n_tag) { + return false; + } + + for (int i = 0; i < a->n_tag; i++) { + if (strcmp(a->tag[i].key, b->tag[i].key) != 0) { + return false; + } + if (a->tag[i].type != b->tag[i].type) { + return false; + } + switch (a->tag[i].type) + { + case TAG_INTEGER: + if (a->tag[i].value_longlong != b->tag[i].value_longlong) { + return false; + } + break; + case TAG_DOUBLE: + if (a->tag[i].value_double != b->tag[i].value_double) { + return false; + } + break; + case TAG_CSTRING: + if (strcmp(a->tag[i].value_str, b->tag[i].value_str) != 0) { + return false; + } + break; + default: + assert(0); + } + } + + return true; +} + +struct fieldstat_tag_list *my_copy_fs_tag_list(const struct fieldstat_tag_list *src) +{ + struct fieldstat_tag_list *dest = malloc(sizeof(struct fieldstat_tag_list)); + dest->n_tag = src->n_tag; + dest->tag = malloc(sizeof(struct fieldstat_tag) * dest->n_tag); + for (int i = 0; i < dest->n_tag; i++) { + dest->tag[i].key = strdup(src->tag[i].key); + dest->tag[i].type = src->tag[i].type; + switch (src->tag[i].type) + { + case TAG_INTEGER: + dest->tag[i].value_longlong = src->tag[i].value_longlong; + break; + case TAG_DOUBLE: + dest->tag[i].value_double = src->tag[i].value_double; + break; + case TAG_CSTRING: + dest->tag[i].value_str = strdup(src->tag[i].value_str); + break; + default: + assert(0); + } + } + + return dest; +} + +bool counter_history_check_if_need_to_update(const struct fieldstat_json_exporter *exporter) +{ + if (exporter->history == NULL) { + return false; // delta export disabled + } + + unsigned long cur_cell_version = fieldstat_get_cell_version(exporter->instance); + const char *cur_exporter_name = exporter->name ? exporter->name : DEFAULT_EXPORTER_NAME; + const struct fieldstat_tag_list *cur_global_tag_list = exporter->global_tag_list; + + if (exporter->history->cell_version != cur_cell_version) { + return true; + } + + if (strcmp(exporter->history->exporter_name, cur_exporter_name) != 0) { + return true; + } + + if (exporter->history->global_tag_list == NULL && cur_global_tag_list != NULL) { + return true; + } + if (exporter->history->global_tag_list == NULL && cur_global_tag_list == NULL) { + return false; + } + // global tag cant be deleted, so no need to check if cur_global_tag_list == NULL && exporter->history->global_tag_list != NULL + if (fieldstat_tag_list_cmp(cur_global_tag_list, exporter->history->global_tag_list) == false) { + return true; + } + + int *cube_ids = NULL; + int n_cube = 0; + fieldstat_get_cubes(exporter->instance, &cube_ids, &n_cube); + if (n_cube != exporter->history->n_cube) { + return true; + } + for (int i = 0; i < n_cube; i++) { + unsigned long cube_version = fieldstat_get_cube_version(exporter->instance, cube_ids[i]); + if (cube_version != exporter->history->cube_version[i]) { + return true; + } + } + free(cube_ids); + + return false; +} + +void counter_history_fill_version_info(struct counter_history *history, struct fieldstat_json_exporter *exporter) +{ + unsigned long cur_cell_version = fieldstat_get_cell_version(exporter->instance); + const char *cur_exporter_name = exporter->name ? exporter->name : DEFAULT_EXPORTER_NAME; + const struct fieldstat_tag_list *cur_global_tag_list = exporter->global_tag_list; + + free(history->exporter_name); + history->exporter_name = strdup(cur_exporter_name); + if (cur_global_tag_list != NULL) { + history->global_tag_list = my_copy_fs_tag_list(cur_global_tag_list); + } + + history->cell_version = cur_cell_version; + + int *cube_ids = NULL; + int n_cube = 0; + fieldstat_get_cubes(exporter->instance, &cube_ids, &n_cube); + history->n_cube = n_cube; + history->cube_version = malloc(sizeof(unsigned long) * n_cube); + for (int i = 0; i < n_cube; i++) { + history->cube_version[i] = fieldstat_get_cube_version(exporter->instance, cube_ids[i]); + } + free(cube_ids); +} + +void fieldstat_json_exporter_update_history(struct fieldstat_json_exporter *exporter) +{ + if (counter_history_check_if_need_to_update(exporter) == false) { + return; + } + + counter_history_free(exporter->history); + exporter->history = counter_history_new(); + + counter_history_fill_version_info(exporter->history, exporter); +} + +void fieldstat_json_exporter_write_delta(struct fieldstat_json_exporter *exporter, struct cellwise_rec_for_export *tag_field_pair_arr, size_t arr_len) +{ + // for every tag_field_pair, get the tag json string + for (int i = 0; i < arr_len; i++) { + const char *tag_json = json_writer_unwrap(tag_field_pair_arr[i].cjson_tags); + for (int j = 0; j < tag_field_pair_arr[i].n_metric; j++) { + const char *metric_name = tag_field_pair_arr[i].metric_name[j]; + long long cur_value = tag_field_pair_arr[i].metric_value[j]; + struct name_value_map *node = counter_history_find(exporter->history, tag_json, metric_name); + long long last_value = node == NULL ? 0 : node->value; + long long delta = cur_value - last_value; + + if (node == NULL) { // new added cell/cube/metric + counter_history_add(exporter->history, tag_json, metric_name, cur_value); + } else { // previous one + node->value = cur_value; + } + // cell will never delete. If delete cube / cell happen, the exporter history will be reset. + + size_t delta_metric_name_len = strlen(metric_name) + 6; // 6 for "_delta" + char *delta_metric_name = malloc(delta_metric_name_len + 1); + snprintf(delta_metric_name, delta_metric_name_len + 1, "%s_delta", metric_name); + delta_metric_name[delta_metric_name_len] = '\0'; + json_writer_longlong_field(tag_field_pair_arr[i].cjson_fields, delta_metric_name, delta); + free(delta_metric_name); + } + } +} /* -------------------------------------------------------------------------- */ /* iter */ @@ -318,7 +593,7 @@ void kv_pair_free_list(struct export_kv_pair *pairs, size_t len) } // return 1 if added, 0 if not added -int cjson_map_add(struct json_tag_field_pair *tag_field_pair, const struct cell_iter *iter) +int cjson_map_add(struct cellwise_rec_for_export *tag_field_pair, const struct cell_iter *iter) { struct json_writer *writer = NULL; @@ -353,7 +628,30 @@ int cjson_map_add(struct json_tag_field_pair *tag_field_pair, const struct cell_ return 1; } -struct json_tag_field_pair *read_tag_and_field(const struct fieldstat *instance, size_t *n_pair_out) +void cjson_map_record_metrics(struct cellwise_rec_for_export *tag_field_pair, const struct cell_iter *iter) +{ + tag_field_pair->metric_name = malloc(sizeof(char *) * (iter->max_metric_id + 1)); + tag_field_pair->metric_value = malloc(sizeof(long long) * (iter->max_metric_id + 1)); + + int n_counter_metric = 0; + for (int metric_id = 0; metric_id <= iter->max_metric_id; metric_id++) { + int cube_id = iter->cube_ids[iter->curr_cube_idx]; + if (fieldstat_get_metric_type(iter->instance, cube_id, metric_id) != METRIC_TYPE_COUNTER) { + continue; + } + if (fieldstat_counter_get(iter->instance, cube_id, metric_id, iter->cell_ids[iter->curr_cell_idx]) == -1) { // no value, happens when the two cells in one cube has different metrics + continue; + } + + tag_field_pair->metric_name[n_counter_metric] = strdup(fieldstat_get_metric_name(iter->instance, cube_id, metric_id)); + tag_field_pair->metric_value[n_counter_metric] = fieldstat_counter_get(iter->instance, cube_id, metric_id, iter->cell_ids[iter->curr_cell_idx]); + + n_counter_metric++; + } + tag_field_pair->n_metric = n_counter_metric; +} + +struct cellwise_rec_for_export *read_tag_and_field(const struct fieldstat *instance, size_t *n_pair_out, bool output_delta) { // clock_t prepare_start = clock(); int *cube_id = NULL; @@ -373,13 +671,17 @@ struct json_tag_field_pair *read_tag_and_field(const struct fieldstat *instance, } struct cell_iter *iter = cell_iter_new(instance); - struct json_tag_field_pair *tag_field_pair = calloc(n_cell_total, sizeof(struct json_tag_field_pair)); + struct cellwise_rec_for_export *tag_field_pair = calloc(n_cell_total, sizeof(struct cellwise_rec_for_export)); int i = 0; while (cell_iter_next(iter)) { // next_num++; if (cjson_map_add(&tag_field_pair[i], iter) == 0) { continue; } + + if (output_delta) { + cjson_map_record_metrics(&tag_field_pair[i], iter); + } i++; } @@ -389,7 +691,7 @@ struct json_tag_field_pair *read_tag_and_field(const struct fieldstat *instance, return tag_field_pair; } -void fieldstat_json_exporter_write_global_tags(const struct fieldstat_json_exporter *exporter, struct json_tag_field_pair *tag_field_pair_arr, size_t arr_len) +void fieldstat_json_exporter_write_global_tags(const struct fieldstat_json_exporter *exporter, struct cellwise_rec_for_export *tag_field_pair_arr, size_t arr_len) { if (exporter->global_tag_list == NULL) { return; @@ -457,7 +759,7 @@ int add_object_to_json_array(char **buf, int *buf_len, int start, const char *st "in_latency":<blob of histogram> "client_ip_sketch":<blob of hyperloglog> } - "timestamp":<timestamp long> + "timestamp_ms":<timestamp long> } */ /* @@ -465,10 +767,15 @@ int add_object_to_json_array(char **buf, int *buf_len, int start, const char *st */ void fieldstat_json_exporter_export_array(const struct fieldstat_json_exporter *exporter, const struct timeval *timestamp, char ***output, size_t *output_size) { + long long timestamp_ms = cal_ms_time(timestamp); + if (exporter->history != NULL) { + fieldstat_json_exporter_update_history((struct fieldstat_json_exporter *)exporter); + } + const struct fieldstat *instance = exporter->instance; size_t n_pair; - struct json_tag_field_pair *tag_field_pair = read_tag_and_field(instance, &n_pair); + struct cellwise_rec_for_export *tag_field_pair = read_tag_and_field(instance, &n_pair, exporter->history != NULL); if (tag_field_pair == NULL || n_pair == 0) { free(tag_field_pair); // tag_field_pair is not NULL when there are registered metrics but no valid cells *output = NULL; @@ -477,19 +784,27 @@ void fieldstat_json_exporter_export_array(const struct fieldstat_json_exporter * } fieldstat_json_exporter_write_global_tags(exporter, tag_field_pair, n_pair); + if (exporter->history != NULL) { // not null when fieldstat_json_exporter_enable_delta is called + fieldstat_json_exporter_write_delta((struct fieldstat_json_exporter *)exporter, tag_field_pair, n_pair); + } char **cjson_str_arr = (char **)malloc(sizeof(char *) * n_pair); for (int i = 0; i < n_pair; i++) { - struct json_tag_field_pair *current = &tag_field_pair[i]; + struct cellwise_rec_for_export *current = &tag_field_pair[i]; struct json_writer *root = json_writer_init(); json_writer_start_map(root); - const char *tmp_name = exporter->name ? exporter->name : "-"; + const char *tmp_name = exporter->name ? exporter->name : DEFAULT_EXPORTER_NAME; json_writer_str_field(root, "name", tmp_name, strlen(tmp_name)); json_writer_object_item(root, "tags", current->cjson_tags); json_writer_object_item(root, "fields", current->cjson_fields); - json_writer_longlong_field(root, "timestamp", cal_ms_time(timestamp)); + json_writer_longlong_field(root, "timestamp_ms", timestamp_ms); + if (exporter->history != NULL) { + json_writer_longlong_field(root, "timestamp_ms_delta", timestamp_ms - exporter->history->ts); + exporter->history->ts = timestamp_ms; + } + json_writer_end_map(root); char *cjson_str; @@ -501,7 +816,17 @@ void fieldstat_json_exporter_export_array(const struct fieldstat_json_exporter * *output = cjson_str_arr; *output_size = n_pair; - free(tag_field_pair); // cjson object will be freed with cjson root + if (exporter->history != NULL) { // read_tag_and_field does not allocate memory for metric_name and metric_value when exporter->history == NULL + for (int i = 0; i < n_pair; i++) { + for (int j = 0; j < tag_field_pair[i].n_metric; j++) { + free(tag_field_pair[i].metric_name[j]); + } + free(tag_field_pair[i].metric_name); + free(tag_field_pair[i].metric_value); + } + } + // json object is added in json_writer_object_item + free(tag_field_pair); } char *fieldstat_json_exporter_export(const struct fieldstat_json_exporter *exporter, const struct timeval *timestamp) @@ -536,6 +861,7 @@ struct fieldstat_json_exporter *fieldstat_json_exporter_new(const struct fieldst exporter->instance = instance; exporter->name = NULL; exporter->global_tag_list = NULL; + exporter->history = NULL; return exporter; } @@ -587,5 +913,19 @@ void fieldstat_json_exporter_free(struct fieldstat_json_exporter *exporter) if (exporter->global_tag_list != NULL) { fieldstat_tag_list_arr_free(exporter->global_tag_list, 1); } + + if (exporter->history != NULL) { + counter_history_free(exporter->history); + } + free(exporter); } + +void fieldstat_json_exporter_enable_delta(struct fieldstat_json_exporter *exporter) +{ + if (exporter->history != NULL) { + return; + } + exporter->history = counter_history_new(); + counter_history_fill_version_info(exporter->history, exporter); +}
\ No newline at end of file diff --git a/src/fieldstat.c b/src/fieldstat.c index 2e01d10..9a26d84 100644 --- a/src/fieldstat.c +++ b/src/fieldstat.c @@ -53,6 +53,7 @@ struct fieldstat *fieldstat_new() instance->max_n_cube = 100; instance->cube = calloc(instance->max_n_cube, sizeof(struct fs_cube *)); instance->cube_version = calloc(instance->max_n_cube, sizeof(unsigned long)); + instance->shared_tag_cube_manager = cube_manager_new(); return instance; @@ -173,16 +174,6 @@ void name_id_map_free(struct metric_name_id_map *map) } } -void name_id_map_init(struct metric_name_id_map **map, const struct fs_cube *cube) -{ - *map = NULL; - const char *name; - for (int i = 0; i < cube->n_metric; i++) { - name = metric_get_name(cube->metrics[i]); - name_id_map_add(map, name, i); - } -} - void fieldstat_clear_one_tag(const struct fieldstat_tag *tag) { free((char *)tag->key); @@ -398,9 +389,7 @@ void fieldstat_cube_free(struct fieldstat *instance, int cube_id) } free(cube->metrics); - if (cube->metric_name_id_map != NULL) { - name_id_map_free(cube->metric_name_id_map); - } + name_id_map_free(cube->metric_name_id_map); free(cube); instance->cube[cube_id] = NULL; @@ -417,8 +406,10 @@ static int append_metric_to_cube(struct fs_cube *cube, struct metric *metric) cube->max_n_metric *= 2; cube->metrics = realloc(cube->metrics, sizeof(struct metric *) * cube->max_n_metric); } + name_id_map_add(&cube->metric_name_id_map, metric_get_name(metric), cube->n_metric); cube->metrics[cube->n_metric] = metric; cube->n_metric++; + return cube->n_metric - 1; } @@ -434,6 +425,11 @@ int fieldstat_register_counter(struct fieldstat *instance, int cube_id, const ch if (cube == NULL) { return -1; } + + if (name_id_map_get_id_by_name(cube->metric_name_id_map, field_name) != -1) { + return -2; + } + struct metric *metric = metric_counter_new(field_name, mode); return append_metric_to_cube(cube, metric); @@ -823,10 +819,6 @@ void fieldstat_cube_merge_topk(struct fs_cube *dest, const struct fs_cube *src, void fieldstat_cube_merge(struct fs_cube *dest, const struct fs_cube *src) { struct metric_name_id_map *name_id_map_dest = dest->metric_name_id_map; - if (name_id_map_dest == NULL) { - name_id_map_init(&name_id_map_dest, dest); - dest->metric_name_id_map = name_id_map_dest; - } int metric_id_src_dest_map[src->n_metric]; for (int metric_id_src = 0; metric_id_src < src->n_metric; metric_id_src++) { diff --git a/src/metrics/histogram_encoder.c b/src/metrics/histogram_encoder.c index a92589b..14e3865 100644 --- a/src/metrics/histogram_encoder.c +++ b/src/metrics/histogram_encoder.c @@ -119,6 +119,81 @@ void histogram_encode_into_b64(struct hdr_histogram *hdr, char **buffer, size_t *buffer = enc; } +static uint32_t get_cookie_base(uint32_t cookie) +{ + return (cookie & ~0xf0U); +} + +static double int64_bits_to_double(int64_t i) +{ + union uint64_dbl_cvt x; + + x.l = (uint64_t) i; + return x.d; +} + +static int hdr_decode_v2_buf(const unsigned char* buffer, size_t length, struct hdr_histogram** histogram) +{ + struct hdr_histogram* h = NULL; + int result = 0; + int rc = 0; + const uint8_t* counts_array = NULL; + uint32_t encoding_cookie; + int32_t counts_limit, significant_figures; + int64_t lowest_discernible_value, highest_trackable_value; + + encoding_flyweight_v1_t encoding_flyweight; + memcpy(&encoding_flyweight, buffer, SIZEOF_ENCODING_FLYWEIGHT_V1); + + encoding_cookie = get_cookie_base(be32toh(encoding_flyweight.cookie)); + if (V2_ENCODING_COOKIE != encoding_cookie) { + return -1; + } + + counts_limit = be32toh(encoding_flyweight.payload_len); + lowest_discernible_value = be64toh(encoding_flyweight.lowest_discernible_value); + highest_trackable_value = be64toh(encoding_flyweight.highest_trackable_value); + significant_figures = be32toh(encoding_flyweight.significant_figures); + + rc = hdr_init(lowest_discernible_value, highest_trackable_value, significant_figures, &h); + if (rc) { + return -1; + } + + encoding_flyweight_v1_t *buffer_aligner = (encoding_flyweight_v1_t *)buffer; + + counts_array = buffer_aligner->counts; + + rc = apply_to_counts_zz(h, counts_array, counts_limit); + if (rc) { + free(h); + return -1; + } + + h->normalizing_index_offset = be32toh(encoding_flyweight.normalizing_index_offset); + h->conversion_ratio = int64_bits_to_double(be64toh(encoding_flyweight.conversion_ratio_bits)); + hdr_reset_internal_counters(h); + + *histogram = h; + return result; +} + +struct hdr_histogram *histogram_decode_from_b64(const char *buffer, size_t buffer_size) +{ + size_t dec_size = 0; + unsigned char *dec = b64_decode_ex(buffer, buffer_size, &dec_size); + + struct hdr_histogram *hdr = NULL; + int ret = hdr_decode_v2_buf(dec, dec_size, &hdr); + free(dec); + + if (ret != 0) { + return NULL; + } + + return hdr; +} + void histogram_encode_into_blob(const struct hdr_histogram *hdr, char **blob, size_t *blob_len) { struct simple_encoded *encoded = NULL; @@ -151,4 +226,17 @@ struct hdr_histogram *histogram_decode_from_blob(const char *blob, size_t blob_l hdr_reset_internal_counters(hdr); return hdr; +} + +long long hdr_count_le_value(const struct hdr_histogram* h, long long value) +{ + struct hdr_iter iter; + long long count = 0; + hdr_iter_recorded_init(&iter, h); + while (hdr_iter_next(&iter)) + { + if((long long)iter.value <= value) + count ++; + } + return count; }
\ No newline at end of file diff --git a/src/metrics/histogram_encoder.h b/src/metrics/histogram_encoder.h index 32c44a3..9ed9a15 100644 --- a/src/metrics/histogram_encoder.h +++ b/src/metrics/histogram_encoder.h @@ -26,6 +26,9 @@ extern "C" exported buffer is after base64 encoding./ */ void histogram_encode_into_b64(struct hdr_histogram *hdr, char **buffer, size_t *buffer_size); + +struct hdr_histogram *histogram_decode_from_b64(const char *buffer, size_t buffer_size); + /* the most simple and fast version of encoding. No compression, no base64 encoding, no LEB128 encoding, no user-defined extra parameters, only carry the most basic information. */ @@ -35,6 +38,8 @@ void histogram_encode_into_blob(const struct hdr_histogram *hdr, char **buffer, */ struct hdr_histogram *histogram_decode_from_blob(const char *buffer, size_t blob_len); +long long hdr_count_le_value(const struct hdr_histogram* h, long long value); + #ifdef __cplusplus } #endif
\ No newline at end of file diff --git a/src/metrics/metric.c b/src/metrics/metric.c index 320ac20..ed525db 100644 --- a/src/metrics/metric.c +++ b/src/metrics/metric.c @@ -818,19 +818,6 @@ long long metric_histogram_value_at_percentile(const struct metric *pthis, int c return hdr_value_at_percentile(data->hdr, percentile); } -static long long hdr_count_le_value(const struct hdr_histogram* h, long long value) -{ - struct hdr_iter iter; - long long count = 0; - hdr_iter_recorded_init(&iter, h); - while (hdr_iter_next(&iter)) - { - if((long long)iter.value <= value) - count ++; - } - return count; -} - long long metric_histogram_count_le_value(const struct metric *pthis, int cell_id, long long value) { const struct metric_measure_data *data = metric_find_one_cell(pthis, cell_id); diff --git a/src/metrics/python_api.c b/src/metrics/python_api.c new file mode 100644 index 0000000..83ad75c --- /dev/null +++ b/src/metrics/python_api.c @@ -0,0 +1,140 @@ +#include <string.h> +#include <stdio.h> +#include <stddef.h> +#include <stdlib.h> + +#include "histogram_encoder.h" +#include "base64/b64.h" +#include "st_hyperloglog.h" + +// user must free the buf after use +void *histogram_base64_decode(char *buf) +{ + size_t buffer_size = strlen(buf); + struct hdr_histogram *hdr = histogram_decode_from_b64(buf, buffer_size); + + return hdr; +} + +long long histogram_value_at_percentile(void* h, double percentile) +{ + return hdr_value_at_percentile((const struct hdr_histogram *)h, percentile); +} + +long long histogram_count_le_value(void* h, long long value) +{ + return hdr_count_le_value((const struct hdr_histogram *)h, value); +} + +long long histogram_value_min(void* h) +{ + return hdr_min((const struct hdr_histogram *)h); +} + +long long histogram_value_max(void* h) +{ + return hdr_max((const struct hdr_histogram *)h); +} + +double histogram_value_mean(void* h) +{ + return hdr_mean((const struct hdr_histogram *)h); +} + +double histogram_value_stddev(void* h) +{ + return hdr_stddev((const struct hdr_histogram *)h); +} + +long long histogram_value_total_count(void *h) +{ + return ((struct hdr_histogram *)h)->total_count; +} + +long long histogram_value_sum(void *h) +{ + struct hdr_iter iter; + long long sum = 0; + hdr_iter_recorded_init(&iter, (const struct hdr_histogram *)h); + while (hdr_iter_next(&iter)) + { + sum+=(long long)iter.value; + } + return sum; +} + +void histogram_free(void *h) +{ + hdr_close((struct hdr_histogram *)h); +} + + +/* -------------------------------------------------------------------------- */ +/* hyperloglog */ +/* -------------------------------------------------------------------------- */ +void *hll_base64_decode(char *buf); +double hll_base64_to_count(char *buf) +{ + struct ST_hyperloglog *hll = hll_base64_decode(buf); + double count = ST_hyperloglog_count(hll); + ST_hyperloglog_free(hll); + return count; +} + +#define BigLittleSwap32(A) ((((uint32_t)(A) & 0xff000000) >> 24) | \ + (((uint32_t)(A) & 0x00ff0000) >> 8) | \ + (((uint32_t)(A) & 0x0000ff00) << 8) | \ + (((uint32_t)(A) & 0x000000ff) << 24)) + +void *hll_base64_decode(char *buf) +{ + size_t buffer_size = strlen(buf); + size_t dec_size = 0; + unsigned char *dec = b64_decode_ex(buf, buffer_size, &dec_size); + + unsigned char version; + memcpy(&version, dec, sizeof(unsigned char)); + if (version != 1) { + return NULL; + } + + unsigned char precision; + memcpy(&precision, dec + sizeof(unsigned char), sizeof(unsigned char)); + + struct ST_hyperloglog *hll_from_blob = ST_hyperloglog_new(precision); + + int num_reg = NUM_REG(precision); + int words = INT_CEIL(num_reg, REG_PER_WORD); + size_t reg_size = words * sizeof(uint32_t); + uint32_t *registers = (uint32_t *)malloc(reg_size); + memcpy(registers, dec + 2 * sizeof(unsigned char), reg_size); + for (int i = 0; i < words; i++) { + registers[i] = BigLittleSwap32(registers[i]); + } + memcpy(hll_from_blob->registers, registers, reg_size); + free(registers); + + free(dec); + return hll_from_blob; +} + +void hll_free(void *hll) +{ + ST_hyperloglog_free((struct ST_hyperloglog *)hll); +} + +// cppcheck-suppress [constParameterPointer, unmatchedSuppression] +bool is_hll(char *buf) +{ + size_t dec_size = 0; + unsigned char *dec = b64_decode_ex(buf, 4, &dec_size); // 4: base64 convert every 3 bytes to 4 bytes + char cookie = *(char *)dec; + free(dec); + + if (cookie == 1) { // hll version is 1 + return true; + } + + // histogram cookie is also fixed, it's first byte is always 28, refer to V2_ENCODING_COOKIE (1c == 28) + return false; +} diff --git a/src/utils/very_fast_json_writer.c b/src/utils/very_fast_json_writer.c index df2bf01..e512b78 100644 --- a/src/utils/very_fast_json_writer.c +++ b/src/utils/very_fast_json_writer.c @@ -131,6 +131,15 @@ void json_writer_finish(struct json_writer *writer, char **result, size_t *resul free(writer); } +const char *json_writer_unwrap(const struct json_writer *writer) +{ + if (writer->cursor == 0) { + return NULL; + } + + return writer->buffer; +} + void json_writer_object_item(struct json_writer *writer, const char *key, struct json_writer *writer_in) { json_writer_str_item(writer, key, strlen(key)); diff --git a/src/utils/very_fast_json_writer.h b/src/utils/very_fast_json_writer.h index ec7b858..912a580 100644 --- a/src/utils/very_fast_json_writer.h +++ b/src/utils/very_fast_json_writer.h @@ -21,6 +21,7 @@ void json_writer_double_field(struct json_writer *writer, const char *key, doubl void json_writer_longlong_field(struct json_writer *writer, const char *key, long long value); void json_writer_object_item(struct json_writer *writer, const char *key, struct json_writer *writer_in); void json_writer_finish(struct json_writer *writer, char **result, size_t *result_len); +const char *json_writer_unwrap(const struct json_writer *writer); #ifdef __cplusplus } |
