diff options
| author | fumingwei <[email protected]> | 2023-03-10 20:29:44 +0800 |
|---|---|---|
| committer | fumingwei <[email protected]> | 2023-03-13 11:56:02 +0800 |
| commit | 86c09082ae13bb4aebf7bdcd38edd62e16a0e809 (patch) | |
| tree | 8926f7660b121fa3e6c98272866d4e2906d1c7ca /src/prometheus_output.cpp | |
| parent | 8a97dbaa26f776bcdd0d647bd6de8b42e04ae831 (diff) | |
feature:将fieldstat.cpp拆分成多个文件
Diffstat (limited to 'src/prometheus_output.cpp')
| -rw-r--r-- | src/prometheus_output.cpp | 424 |
1 files changed, 424 insertions, 0 deletions
diff --git a/src/prometheus_output.cpp b/src/prometheus_output.cpp new file mode 100644 index 0000000..c3ee470 --- /dev/null +++ b/src/prometheus_output.cpp @@ -0,0 +1,424 @@ +#include "fieldstat_internal.h" +#define HTTPSERVER_IMPL +#include "httpserver.h" + +struct prometheus_endpoint_instance g_prometheus_endpoint_instance = +{ + 9273, + NULL, + 0, + 0, + NULL, + 0, + NULL, + REALLOC_SCALE_SIZE, +}; + +static char* str_unescape(char* s, char *d, int d_len) +{ + int i=0,j=0; + int len=strlen(s); + for(i=0; i<len && j<d_len; i++) + { + if(s[i]=='(' || s[i]==')' || s[i]=='{' || s[i]=='}' || + s[i]=='/' || s[i]=='\\' || s[i]=='%' || s[i]=='*' || + s[i]=='$' || s[i]=='-' || s[i]==',' || s[i]==';') + { + if(i==0) + { + continue; + } + + d[j++]='_'; + } + else + { + d[j++]=s[i]; + } + } + + if(d[j-1]=='_') + { + j-=1; + } + + d[j]='\0'; + + return 0; +} + +/* +* ret = -1, output not match fieldstat instance + output: + /metrics + /sapp + /tfe + ... +* ret >=0 && ret < instance_cnt output sepecify fieldstat instance + output: + http://127.0.0.1:9273/sapp content +* ret = instance_cnt output all fieldstat instance + output: + http://127.0.0.1:9273/metrics content +*/ +static int prometheus_get_fs_instance_id(struct prometheus_endpoint_instance *global_prometheus_output, char *uri, int uri_len) +{ + int i = 0; + char instance_name_url[INSTANCE_NAME_LEN + 1] = {0}; + int instance_name_url_len = 0; + + if(uri_len == (int)strlen(global_prometheus_output->url_path) + && 0 == memcmp(uri, global_prometheus_output->url_path, strlen(global_prometheus_output->url_path))) + { + return global_prometheus_output->fs_instance_cnt; + } + + for(i = 0; i < global_prometheus_output->fs_instance_cnt; i++) + { + memset(instance_name_url, 0, sizeof(instance_name_url)); + instance_name_url_len = snprintf(instance_name_url, sizeof(instance_name_url),"/%s",global_prometheus_output->fs_instance[i]->name); + + if(uri_len == instance_name_url_len + && 0 == memcmp( uri, instance_name_url, instance_name_url_len)) + { + return i; + } + } + return -1; +} + +static void prometheus_output_uri_list(struct prometheus_endpoint_instance *prometheus_output,struct http_request_s* request) +{ + int i = 0; + int payload_len = 0; + char *payload = NULL; + char *payload_append_position = NULL; + struct fieldstat_instance **fs_instance = NULL; + struct http_response_s* response = NULL; + + fs_instance = prometheus_output->fs_instance; + + + payload_len = prometheus_output->fs_instance_cnt * 128; //TODO using marco, len? + payload_append_position = payload = (char *)calloc(1,payload_len); + + payload_append_position += snprintf(payload_append_position, payload_len - (payload_append_position - payload),"url_path:\n\t%s\n", prometheus_output->url_path); + + for(i = 0; i < prometheus_output->fs_instance_cnt; i++) + { + payload_append_position += snprintf(payload_append_position, payload_len - (payload_append_position - payload),"\t/%s\n", fs_instance[i]->name); + } + + response = http_response_init(); + http_response_status(response, 404); + http_response_header(response, "Content-Type", "text/plain; charset=utf-8"); + http_response_body(response, payload, strlen(payload)); + http_respond(request, response); + + free(payload); + payload=NULL; + return; +} + +static int prometheus_output_read_metric_tags(struct metric_t *metric, char *instance_name, char *tags_buff, unsigned int size) +{ + int i = 0;//used_len = 0; + char unescape[STR_LEN_256] = {0}; + char *append_pos = tags_buff; + + //used_len += snprint(tags_buff, size - used_len, "app_name=\"%s\"", instance_name); + + append_pos += snprintf(append_pos, size - (append_pos - tags_buff), "app_name=\"%s\"", instance_name); + + if(metric->belong_to_table == 1) + { + append_pos += snprintf(append_pos, size - (append_pos - tags_buff), ",line_name=\"%s\"", metric->field_name); + } + + for(i = 0; i < (int)metric->n_tag; i++) + { + memset(unescape, 0, sizeof(unescape)); + str_unescape(metric->tag_key[i], unescape, sizeof(unescape)); + append_pos += snprintf(append_pos, size - (append_pos - tags_buff), ",%s=\"%s\"", unescape, metric->tag_value[i]); + } + return append_pos - tags_buff; +} + + +static int prometheus_output_read_metric_name(struct metric_t *metric, char *instance_app_name, char *name_buff, unsigned int size) +{ + char unescape[256] = {0}; + char *append_pos = name_buff; + if(metric->belong_to_table == 1) + { + str_unescape(metric->table_column_name, unescape, sizeof(unescape)); + } + else + { + str_unescape(metric->field_name, unescape, sizeof(unescape)); + } + append_pos += snprintf(append_pos, size - (append_pos - name_buff), "%s_%s", instance_app_name, unescape); + + return append_pos - name_buff; +} + + +static int prometheus_output_histogram_and_summary(struct metric_t *metric, char *payload, int payload_len, char *instance_app_name, char *metric_name, char *metric_tags) +{ + long long value=0; + long long sum = 0; + int i=0,used_len=0; + + struct hdr_iter iter; + char *output_format = NULL; + + //struct histogram_t* h=&(p->histogram); + struct histogram_t *h = &(metric->histogram); + struct hdr_histogram* h_out=NULL, *h_tmp=NULL; + int bin_num = metric->histogram.bins_num; + double *bins = metric->histogram.bins; + + if(metric->field_type == FILED_TYPE_HISTOGRAM) + { + output_format = (char *)"%s_bucket{%s,le=\"%0.2f\"} %llu\n"; + } + + if(metric->field_type == FIELD_TYPE_SUMMARY) + { + output_format = (char *)"%s{%s,quantile=\"%0.2f\"} %llu\n"; + } + + hdr_init(h->lowest_trackable_value, h->highest_trackable_value, h->significant_figures, &(h_tmp)); + + hdr_add(h_tmp, h->accumulated); + hdr_add(h_tmp, h->changing); + h_out=h_tmp; + + for(i=0;i<bin_num;i++) + { + if(metric->field_type == FILED_TYPE_HISTOGRAM) + { + value = (long long)hdr_count_le_value(h_out, (long long)bins[i]); + } + if(metric->field_type == FIELD_TYPE_SUMMARY) + { + value=(long long)hdr_value_at_percentile(h_out, bins[i]); + } + + used_len+=snprintf(payload+used_len, + payload_len-used_len, + output_format, + metric_name, + metric_tags, + bins[i], + value + ); + } + used_len+=snprintf(payload+used_len, + payload_len-used_len, + "%s_count{%s} %llu\n", + metric_name, + metric_tags, + (long long)h_out->total_count + ); + + hdr_iter_recorded_init(&iter, h_out); + while (hdr_iter_next(&iter)) + { + sum+=(long long)iter.value; + } + used_len+=snprintf(payload+used_len, + payload_len-used_len, + "%s_sum{%s} %llu\n", + metric_name, + metric_tags, + sum + ); + + hdr_close(h_tmp); + h_tmp=NULL; + + return used_len; +} + + + +static int prometheus_get_instance_metric_playload(struct fieldstat_instance *instance, char **payload, int *payload_size, int offset) +{ + int i = 0; + struct metric_t *metric = NULL; + long long value = 0; + char metric_name[256] = {0}; //match the regex [a-zA-Z_:][a-zA-Z0-9_:]* + char instance_name[256] = {0}; + char metric_tags[1024] = {0}; //label name match the regex [a-zA-Z_:][a-zA-Z0-9_:]* + int append_offset = offset; + char *new_payload = NULL; + int new_payload_size = 0; + + if(instance->running != 1) + { + return -1; + } + + if(payload == NULL) + { + return -1; + } + + new_payload = *payload; + new_payload_size = *payload_size; + + str_unescape(instance->name, instance_name, sizeof(instance_name)); + + for(i = 0; i < instance->metric_cnt; i++) + { + metric = get_metric(instance, i); + + if(metric->is_ratio == 1) + { + continue; + } + if(metric->is_invisible == 1) + { + continue; + } + + if( new_payload_size - append_offset <= LEFT_MIN_BUFF_LEN) + { + new_payload_size += REALLOC_SCALE_SIZE; + new_payload = (char *)realloc(new_payload, new_payload_size); + } + + prometheus_output_read_metric_name(metric, instance_name, metric_name, sizeof(metric_name)); + prometheus_output_read_metric_tags(metric, instance_name, metric_tags, sizeof(metric_tags)); + + switch(metric->field_type) + { + case FIELD_TYPE_COUNTER: + case FIELD_TYPE_GAUGE: + value = get_metric_unit_val(metric, FS_CALC_CURRENT, 1); + append_offset += snprintf(new_payload + append_offset, + new_payload_size - append_offset, + "%s{%s} %llu\n", + metric_name, + metric_tags, + value + ); + break; + case FIELD_TYPE_SUMMARY: + case FILED_TYPE_HISTOGRAM: + append_offset += prometheus_output_histogram_and_summary(metric,new_payload + append_offset, + new_payload_size - append_offset, instance_name, metric_name, metric_tags); + break; + default: + break; + } + } + + *payload = new_payload; + *payload_size = new_payload_size; + + return append_offset; + +} + +static void prometheus_output_instance_metric(struct prometheus_endpoint_instance *prometheus_output, struct http_request_s* request, int fs_instance_idx) +{ + int i = 0; + int payload_size = 0; + int payload_offset = 0; + char *payload = NULL; + struct fieldstat_instance *instance = NULL; + struct http_response_s* response = NULL; + + if(fs_instance_idx == prometheus_output->fs_instance_cnt) + { + for(i = 0; i < prometheus_output->fs_instance_cnt; i++) + { + instance = prometheus_output->fs_instance[i]; + payload_offset = prometheus_get_instance_metric_playload(instance, &payload, &payload_size, payload_offset); + } + } + else + { + instance = prometheus_output->fs_instance[fs_instance_idx]; + payload_offset = prometheus_get_instance_metric_playload(instance, &payload, &payload_size, payload_offset); + } + + if(payload != NULL) + { + response = http_response_init(); + http_response_status(response, 200); + http_response_header(response, "Content-Type", "text/plain; charset=utf-8"); + http_response_body(response, payload, strlen(payload)); + http_respond(request, response); + + free(payload); + payload=NULL; + } + +} + +void prometheus_endpoint_instance_output(struct http_request_s* request) +{ + int fs_instance_idx = -1; + struct http_string_s uri; + struct prometheus_endpoint_instance *prometheus_endpoint_instance = &g_prometheus_endpoint_instance;; + + uri = http_request_target(request); + fs_instance_idx = prometheus_get_fs_instance_id(prometheus_endpoint_instance, (char *)uri.buf, uri.len); + + fs_instance_idx == -1 ? + prometheus_output_uri_list(prometheus_endpoint_instance, request) + :prometheus_output_instance_metric(prometheus_endpoint_instance, request, fs_instance_idx); + + return; +} + + +void *prometheus_endpoint_listen_thread_entry(void *arg) +{ + struct prometheus_endpoint_instance *global_prometheus = (struct prometheus_endpoint_instance *)arg; + http_server_listen(global_prometheus->server_handle); + return NULL; +} + +int fieldstat_global_enable_prometheus_endpoint(unsigned short listen_port, const char *url) +{ + g_prometheus_endpoint_instance.server_handle = http_server_init(listen_port, prometheus_endpoint_instance_output); + if(g_prometheus_endpoint_instance.server_handle == NULL) + { + return -1; + } + + g_prometheus_endpoint_instance.url_path = url == NULL ? strdup(PROMETHEUS_ENDPOINT_DEFAULT_URL):strdup(url); + g_prometheus_endpoint_instance.port = listen_port; + g_prometheus_endpoint_instance.running = 1; + g_prometheus_endpoint_instance.fs_instance = (struct fieldstat_instance **)calloc( sizeof(struct fieldstat_instance *), g_prometheus_endpoint_instance.fs_instance_size); + + pthread_create(&g_prometheus_endpoint_instance.tid, NULL, prometheus_endpoint_listen_thread_entry, (void *)&g_prometheus_endpoint_instance); + + return 0; +} + +int fieldstat_set_prometheus_output(struct fieldstat_instance *instance) +{ + int fs_instance_id = 0; + + if(g_prometheus_endpoint_instance.running != 1) + { + return -1; + } + + if(g_prometheus_endpoint_instance.fs_instance_cnt >= g_prometheus_endpoint_instance.fs_instance_size) + { + g_prometheus_endpoint_instance.fs_instance_size += REALLOC_SCALE_SIZE; + g_prometheus_endpoint_instance.fs_instance = (struct fieldstat_instance **)realloc(g_prometheus_endpoint_instance.fs_instance, g_prometheus_endpoint_instance.fs_instance_size); + } + + fs_instance_id = g_prometheus_endpoint_instance.fs_instance_cnt++; + g_prometheus_endpoint_instance.fs_instance[fs_instance_id] = instance; + return 0; +} + + |
