diff options
| -rw-r--r-- | src/fieldstat.cpp | 1153 | ||||
| -rw-r--r-- | src/fieldstat_internal.h | 39 | ||||
| -rw-r--r-- | src/file_output.cpp | 507 | ||||
| -rw-r--r-- | src/line_protocol_output.cpp | 190 | ||||
| -rw-r--r-- | src/prometheus_output.cpp | 424 |
5 files changed, 1160 insertions, 1153 deletions
diff --git a/src/fieldstat.cpp b/src/fieldstat.cpp index 86d455c..d4687e1 100644 --- a/src/fieldstat.cpp +++ b/src/fieldstat.cpp @@ -1,29 +1,4 @@ -#include "fieldstat.h" #include "fieldstat_internal.h" -#include "threadsafe_counter.h" -#include "hdr_histogram.h" -#include "cJSON.h" -#define HTTPSERVER_IMPL -#include "httpserver.h" - -#include <sys/socket.h>//socket -#include <sys/types.h>//socket -#include <netinet/in.h> -#include <arpa/inet.h> -#include <string.h> //strerror -#include <errno.h>//strerror -#include <fcntl.h>//fcntl -#include <unistd.h>//fcntl -#include <net/if.h>//fcntl -#include <sys/ioctl.h>//ioctl -#include <stdio.h> -#include <time.h> -#include <unistd.h> -#include <stdlib.h> -#include <pthread.h> -#include <assert.h> -#include <sys/time.h> - int FIELD_STAT_VERSION_2_8_20200805_fix_outOfBound=0; @@ -50,22 +25,7 @@ static __attribute__((__used__)) const char * GIT_VERSION_UNKNOWN = NULL; #endif //endof Automatically generate the version number -struct prometheus_endpoint_instance g_prometheus_endpoint_instance = -{ - 9273, - NULL, - 0, - 0, - NULL, - 0, - NULL, - REALLOC_SCALE_SIZE, -}; - -const char* draw_line="________________________________________________________________________________________________________________________________________________"; -const char* draw_boundary="============================================================"; - -static char* __str_dup(const char* str) +char* __str_dup(const char* str) { char* dup=NULL; dup=(char*)calloc(sizeof(char),strlen(str)+1); @@ -73,7 +33,7 @@ static char* __str_dup(const char* str) return dup; } -static int is_valid_field_name(const char* name) +int is_valid_field_name(const char* name) { const char* reserverd="|:\n\r. \t<>[]#!@"; unsigned int i=0,j=0; @@ -88,39 +48,6 @@ static int is_valid_field_name(const char* name) return 1; } -static char* str_unescape(char* s, char *d, int d_len) -{ - int i=0,j=0; - int len=strlen(s); - for(i=0; i<len && j<d_len; i++) - { - if(s[i]=='(' || s[i]==')' || s[i]=='{' || s[i]=='}' || - s[i]=='/' || s[i]=='\\' || s[i]=='%' || s[i]=='*' || - s[i]=='$' || s[i]=='-' || s[i]==',' || s[i]==';') - { - if(i==0) - { - continue; - } - - d[j++]='_'; - } - else - { - d[j++]=s[i]; - } - } - - if(d[j-1]=='_') - { - j-=1; - } - - d[j]='\0'; - - return 0; -} - struct metric_t * get_metric(struct fieldstat_instance *instance, int metric_id) { @@ -183,7 +110,7 @@ void metric_free(struct metric_t* metric) return; } -static struct metric_t ** read_metric_slot(struct fieldstat_instance *instance, int metric_id) +struct metric_t ** read_metric_slot(struct fieldstat_instance *instance, int metric_id) { int block_index = 0; int in_block_index = 0; @@ -254,7 +181,7 @@ static int startup_udp() return sd_udp; } -static int send_udp(int sd, unsigned int dest_ip, unsigned short dest_port, const char * data, int len) +int send_udp(int sd, unsigned int dest_ip, unsigned short dest_port, const char * data, int len) { int to_send_len=len; int already_sended_len=0,this_sended_len=0; @@ -443,523 +370,6 @@ long long get_metric_unit_val(struct metric_t *metric,enum field_calc_algo calc_ return value; } - -void flush_line_protocol_metric(struct fieldstat_instance *instance) -{ - if(instance->line_protocol_send_buff_offset == 0) - { - return; - } - - if(instance->line_protocol_server_ip > 0 && instance->line_protocol_server_port > 0) - { - send_udp(instance->line_protocol_socket, instance->line_protocol_server_ip, - (unsigned short)instance->line_protocol_server_port, - instance->line_protocol_send_buff, - instance->line_protocol_send_buff_offset - ); - } - instance->line_protocol_send_buff_offset = 0; - memset(instance->line_protocol_send_buff, 0, sizeof(instance->line_protocol_send_buff)); - return; -} - -void append_line_protocol_line(struct fieldstat_instance *instance, const char* measurement, char *tag_set, char *field_set) -{ - if(field_set==NULL) - { - return; - } - if(UDP_PAYLOAD_SIZE - (unsigned int)instance->line_protocol_send_buff_offset < strlen(measurement) + strlen(field_set) + strlen(tag_set) + 2) - { - flush_line_protocol_metric(instance); - } - printf("Line_protocol metric: %s%s %s\n",measurement,tag_set,field_set); - instance->line_protocol_send_buff_offset += snprintf(instance->line_protocol_send_buff + instance->line_protocol_send_buff_offset, - sizeof(instance->line_protocol_send_buff) - instance->line_protocol_send_buff_offset, - "%s%s %s\n", - measurement, tag_set, field_set - ); - return; -} - -static int output_line_protocol_tag_set_buf(char *tag_key[], char *tag_value[], int n_tag, char *tag_set_buff, unsigned int size) -{ - int i = 0; - char *tag_pos = tag_set_buff; - for(i = 0; i < n_tag; i++) - { - tag_pos += snprintf(tag_pos, - size - (tag_pos - tag_set_buff), - ",%s=%s", - tag_key[i], - tag_value[i] - ); - } - return tag_pos - tag_set_buff; -} - - -static void output_line_protocol_table(struct fieldstat_instance *instance) -{ - int i = 0, j = 0, k = 0; - metric_t *metric = NULL; - long long value = 0; - //double ratio = 0.0; - char field_set_buff[UDP_PAYLOAD_SIZE]; - char tag_set_buff[UDP_PAYLOAD_SIZE]; - - memset(field_set_buff, 0, sizeof(field_set_buff)); - memset(tag_set_buff, 0, sizeof(tag_set_buff)); - char *tag_pos = tag_set_buff; - char *field_pos = field_set_buff; - - struct table_metric *table = NULL; - struct table_line *line = NULL; - - for(i = 0; i < instance->table_num; i++) - { - table = instance->table_metrics[i]; - for(j = 0; j < table->line_cnt; j++) - { - line = read_table_line(table, j); - tag_pos += snprintf(tag_pos, - sizeof(tag_set_buff) - (tag_pos - tag_set_buff), - ",app_name=%s,table_name=%s", - instance->name, - table->name - ); - - tag_pos += output_line_protocol_tag_set_buf(line->tag_key, line->tag_value, line->n_tag, tag_pos, - sizeof(tag_set_buff) - (tag_pos - tag_set_buff)); - - for(k = 0; k < table->column_cnt; k ++) - { - metric = get_metric(instance, line->metric_id_belong_to_line[k]); - - value = metric->field_type == FIELD_TYPE_GAUGE ? - get_metric_unit_val(metric, FS_CALC_CURRENT, 1): - get_metric_unit_val(metric, FS_CALC_SPEED, 1); - - field_pos += snprintf(field_pos, - sizeof(field_set_buff) - (field_set_buff - field_set_buff), - "%s=%lld,", - metric->table_column_name, - value - ); - - } - if(field_pos - field_set_buff > 0) - { - *(field_pos - 1) = '\0'; - } - // measurement,tag_set field_set - append_line_protocol_line(instance, metric->field_name, tag_set_buff, field_set_buff); - tag_pos = tag_set_buff; - field_pos = field_set_buff; - - } - - } -} - -int line_protocol_output(struct fieldstat_instance *instance) -{ - metric_t *metric = NULL; - long long value=0; - int i=0; - char field_set_buff[UDP_PAYLOAD_SIZE]; - char tag_set_buff[UDP_PAYLOAD_SIZE]; - - memset(field_set_buff, 0, sizeof(field_set_buff)); - memset(tag_set_buff, 0, sizeof(tag_set_buff)); - char *tag_pos = tag_set_buff; - - for(i = 0; i < instance->metric_cnt; i++) - { - metric = get_metric(instance, i); - if(metric->is_ratio == 1) - { - continue; - } - - if(metric->belong_to_table == 1) - { - continue; - } - - switch(metric->field_type) - { - case FIELD_TYPE_GAUGE: - value = get_metric_unit_val(metric, FS_CALC_CURRENT, 1); - if(value != 0) - { - snprintf(field_set_buff, UDP_PAYLOAD_SIZE, "%s=%lld", metric->field_name, value); - tag_pos += snprintf(tag_pos, - sizeof(tag_set_buff) - (tag_pos - tag_set_buff), - ",app_name=%s", - instance->name - ); - output_line_protocol_tag_set_buf(metric->tag_key, metric->tag_value, metric->n_tag, tag_pos, sizeof(tag_set_buff) - (tag_pos - tag_set_buff)); - append_line_protocol_line(instance, metric->field_name, tag_set_buff, field_set_buff); - tag_pos = tag_set_buff; - - } - break; - case FIELD_TYPE_COUNTER: - value = get_metric_unit_val(metric, FS_CALC_SPEED, 1); - if(value != 0) - { - snprintf(field_set_buff, UDP_PAYLOAD_SIZE, "%s=%lld", metric->field_name, value); - tag_pos += snprintf(tag_pos, - sizeof(tag_set_buff) - (tag_pos - tag_set_buff), - ",app_name=%s", - instance->name - ); - output_line_protocol_tag_set_buf(metric->tag_key, metric->tag_value, metric->n_tag, tag_pos, sizeof(tag_set_buff) - (tag_pos - tag_set_buff)); - append_line_protocol_line(instance, metric->field_name, tag_set_buff, field_set_buff); - tag_pos = tag_set_buff; - } - break; - default: - break; - - } - } - - output_line_protocol_table(instance); - flush_line_protocol_metric(instance); - - return 0; -} - - -static int print_buf_tag_append_position(char *tag_key[], char *tag_value[], size_t n_tag, char *print_buf_tags, unsigned int size) -{ - int i = 0; - char *append_pos = print_buf_tags; - - if(n_tag <= 0) - { - return 0; - } - - append_pos += snprintf(append_pos, - size - (append_pos - print_buf_tags), - "{" - ); - - for(i = 0; i < (int)n_tag; i++) - { - append_pos += snprintf(append_pos, - size - (append_pos - print_buf_tags), - "%s=\"%s\",", - tag_key[i], - tag_value[i] - ); - } - - if(append_pos - print_buf_tags > 0) - { - append_pos--; - } - - append_pos += snprintf(append_pos, - size - (append_pos - print_buf_tags), - "}\t" - ); - - return append_pos - print_buf_tags; -} - - -static int output_file_format_default_type_gauge(struct fieldstat_instance *instance,long long interval_ms,char *print_buf, unsigned int size) -{ - int i = 0, j = 0; - //display_manifest_t* p = NULL; - metric_t *metric = NULL; - long long value = 0; - //double ratio = 0.0; - //char* pos=print_buf; - char *append_pos = print_buf; - char print_buf_tags[1024]; - - for(i = 0; i < instance->metric_cnt; i++) - { - //metric = instance->metric[i]; - metric = get_metric(instance, i); - if(metric->field_type != FIELD_TYPE_GAUGE) - { - continue; - } - if(metric->belong_to_table == 1) - { - continue; - } - if(metric->is_invisible == 1) - { - value = get_metric_unit_val(metric, FS_CALC_SPEED, 0); - continue; - } - /* - if(metric->is_ratio==1) - { - ratio=get_stat_ratio(_handle->display[p->numerator_id], _handle->display[p->denominator_id], NULL, - p->output_scaling, p->calc_type); - pos+=snprintf(pos,size-(pos-print_buf),"%s: %10.2e\t",p->name,ratio); - } - */ - value = get_metric_unit_val(metric, FS_CALC_CURRENT, 0); - //value=value * metric->output_scaling * 1000 / interval_ms; - memset(print_buf_tags,0, sizeof(print_buf_tags)); - print_buf_tag_append_position(metric->tag_key,metric->tag_value, metric->n_tag, print_buf_tags, sizeof(print_buf_tags)); - append_pos += snprintf(append_pos, - size - (append_pos - print_buf), - "%s %s: %-10lld\t", - metric->field_name, - print_buf_tags, - value - ); - j++; - if(j == STATUS_PER_LINE) - { - append_pos += snprintf(append_pos, size - (append_pos - print_buf),"\n"); - j=0; - } - } - - if(append_pos - print_buf > 0) - { - if(*(append_pos - 1) == '\n') - { - append_pos --; - } - append_pos += snprintf(append_pos, size - (append_pos-print_buf),"\n%s\n",draw_line); - } - return append_pos - print_buf; -} - - -static int output_file_format_default_type_counter(struct fieldstat_instance *instance,long long interval_ms,char*print_buf, unsigned int size) -{ - int i=0,j=0; - //display_manifest_t* p=NULL; - metric_t *metric = NULL; - long long value = 0; - //double ratio = 0.0; - char* append_pos = print_buf; - int metric_id[INIT_STAT_FIELD_NUM] = {0}; - int metric_cnt = 0; - char print_buf_tags[1024]; - - for(i = 0;i < instance->metric_cnt; i++) - { - //p=_handle->display[i]; - //metric = instance->metric[i]; - metric = get_metric(instance, i); - if(metric->field_type != FIELD_TYPE_COUNTER) - { - continue; - } - if(metric->belong_to_table == 1) - { - continue; - } - if(metric->is_invisible == 1) - { - get_metric_unit_val(metric,FS_CALC_CURRENT,0); - continue; - } - metric_id[metric_cnt] = i; - metric_cnt++; - } - - for(i = 0; i < metric_cnt; i++) - { - append_pos += snprintf(append_pos, size - (append_pos - print_buf),"\t"); - - for(j = 0; j < FIELD_PER_LINE && i+j < metric_cnt; j++) - { - //metric = instance->metric[metric_id[i+j]]; - metric = get_metric(instance, metric_id[i+j]); - memset(print_buf_tags,0, sizeof(print_buf_tags)); - print_buf_tag_append_position(metric->tag_key,metric->tag_value, metric->n_tag, print_buf_tags, sizeof(print_buf_tags)); - append_pos += snprintf(append_pos, - size - (append_pos - print_buf), - "%10s %s\t", - metric->field_name, - print_buf_tags - ); - } - append_pos += snprintf(append_pos, size - (append_pos-print_buf), "\nsum\t"); - - for(j=0; j < FIELD_PER_LINE && i+j < metric_cnt; j++) - { - //metric = instance->metric[metric_id[i+j]]; - metric = get_metric(instance, metric_id[i+j]); - value = get_metric_unit_val(metric,FS_CALC_CURRENT, 1); - append_pos += snprintf(append_pos, - sizeof(print_buf) - (append_pos - print_buf), - "%10lld\t", - value - ); - } - append_pos += snprintf(append_pos, - sizeof(print_buf) - (append_pos - print_buf), - "\nspeed/s\t" - ); - - for(j=0;j<FIELD_PER_LINE&&i+j<metric_cnt;j++) - { - //metric = instance->metric[metric_id[i+j]]; - metric = get_metric(instance, metric_id[i+j]); - value = get_metric_unit_val(metric,FS_CALC_SPEED, 0); - append_pos += snprintf(append_pos, - size - (append_pos - print_buf), - "%10lld\t", - value*1000/interval_ms - ); - } - i += (j-1); - append_pos += snprintf(append_pos, - size - (append_pos - print_buf), - "\n" - ); - } - - if(append_pos - print_buf > 0) - { - if(*(append_pos - 1)=='\n') - { - append_pos--; - } - append_pos += snprintf(append_pos, - size - (append_pos - print_buf), - "\n%s\n", - draw_line - ); - } - - return append_pos - print_buf; -} - -static int output_file_format_default_table(struct fieldstat_instance *instance,long long interval_ms,char*print_buf, unsigned int size) -{ - int i = 0, j = 0, k = 0; - struct table_metric *table = NULL; - struct table_line *line = NULL; - int metric_id = 0; - struct metric_t *metric = NULL; - long long value = 0; - char* append_pos = print_buf; - - for(i = 0; i < instance->table_num; i++) //per table - { - table = instance->table_metrics[i]; - append_pos += snprintf(append_pos, size - (append_pos - print_buf),"%-20s\t\t",table->name); - for(j = 0; j < table->column_cnt; j ++) - { - //print table column - append_pos += snprintf(append_pos, - size - (append_pos - print_buf), - "\t%10s", - table->column_name[j] - ); - } - for(j = 0; j < table->line_cnt; j++) //per line - { - line = read_table_line(table,j); - //print table line name + tag - append_pos += snprintf(append_pos, - size - (append_pos - print_buf), - "\n%s ", - line->name - ); - - append_pos += print_buf_tag_append_position(line->tag_key, line->tag_value, line->n_tag, - append_pos, size - (append_pos - print_buf)); - - - for(k = 0; k < table->column_cnt; k++) //per metric - { - //print table metric value - metric_id = line->metric_id_belong_to_line[k]; - metric = get_metric(instance,metric_id); - value = get_metric_unit_val(metric, FS_CALC_CURRENT, 0); - append_pos += snprintf(append_pos, - size - (append_pos - print_buf), - "%10lld\t", - value - ); - - } - - } - - if(append_pos - print_buf > 0) - { - if(*(append_pos - 1) == '\n') - { - append_pos--; - } - append_pos += snprintf(append_pos, - size - (append_pos - print_buf), - "\n%s\n", - draw_line - ); - } - - } - return append_pos - print_buf; -} - -static int output_file_print_hdr_head(struct metric_t *metric, char *print_buf, size_t size) -{ - char * pos = print_buf; - char bin_format[STR_LEN_256], str_format[STR_LEN_256]; - const char* extra[]={"MAX", "MIN", "AVG", "STDDEV", "CNT"}; - char buff[STR_LEN_32]; - int i=0; - double * bins = metric->histogram.bins; - int bins_num = metric->histogram.bins_num; - if(metric->field_type == FIELD_TYPE_SUMMARY) - { - snprintf(bin_format, sizeof(bin_format), "%%%d.2lf%%%%", HISTOGRAM_WIDTH-1); - } - if(metric->field_type == FILED_TYPE_HISTOGRAM) - { - snprintf(bin_format, sizeof(bin_format), "le=%%0.0f"); - } - snprintf(str_format, sizeof(str_format), "%%%ds", HISTOGRAM_WIDTH); - if(metric->field_type == FIELD_TYPE_SUMMARY) - { - pos+=snprintf(pos,size-(pos-print_buf),"%-8s\t","summary"); - } - if(metric->field_type == FILED_TYPE_HISTOGRAM) - { - pos+=snprintf(pos,size-(pos-print_buf),"%-8s\t","histogram"); - } - for(i = 0;i < bins_num; i++) - { - if(metric->field_type == FIELD_TYPE_SUMMARY) - { - snprintf(buff,sizeof(buff),bin_format,bins[i]*100); - } - if(metric->field_type == FILED_TYPE_HISTOGRAM) - { - snprintf(buff,sizeof(buff),bin_format,bins[i]); - } - pos+=snprintf(pos,size-(pos-print_buf),str_format, buff); - } - for(i=0;(unsigned int)i<sizeof(extra)/sizeof(extra[0]);i++) - { - pos+=snprintf(pos,size-(pos-print_buf),str_format, extra[i]); - } - pos+=snprintf(pos,size-(pos-print_buf),"\n"); - return pos-print_buf; -} - - long long hdr_count_le_value(const struct hdr_histogram* h, long long value) { struct hdr_iter iter; @@ -972,183 +382,6 @@ long long hdr_count_le_value(const struct hdr_histogram* h, long long value) } return count; } -static int output_file_print_hdr_unit(struct metric_t *metric, char*print_buf, size_t size) -{ - double * bins = metric->histogram.bins; - int bins_num = metric->histogram.bins_num; - char* pos=print_buf; - long long value=0; - int i=0; - struct histogram_t* h=&(metric->histogram); - struct hdr_histogram* h_out=NULL, *h_tmp=NULL; - char int_format[STR_LEN_256], double_format[STR_LEN_256]; - snprintf(int_format, sizeof(int_format), "%%%dlld",HISTOGRAM_WIDTH); - snprintf(double_format, sizeof(double_format), "%%%d.2lf",HISTOGRAM_WIDTH); - - hdr_init(h->lowest_trackable_value, h->highest_trackable_value, h->significant_figures, &(h_tmp)); - if(h->previous_changed!=NULL) hdr_close(h->previous_changed); - - h->previous_changed=atomic_read(&(h->changing)); - h_tmp=atomic_set(&(h->changing), h_tmp);// left h_tmp is used to avoid warining [-Wunused-value] - - hdr_add(h->accumulated, h->previous_changed); - h_out=h->accumulated; //TODO -/* - if(metric->calc_type==FS_CALC_SPEED) - { - h_out=h->previous_changed; - } - else - { - h_out=h->accumulated; - } -*/ - pos+=snprintf(pos,size-(pos-print_buf),"%-10s\t", metric->field_name); - - for(i=0;i<bins_num;i++) - { - if(metric->field_type == FIELD_TYPE_SUMMARY) - { - value=(long long)hdr_value_at_percentile(h_out, bins[i]*100); - } - if(metric->field_type == FILED_TYPE_HISTOGRAM) - { - value= hdr_count_le_value(h_out, (long long)bins[i]); - } - pos+=snprintf(pos,size-(pos-print_buf),int_format, value); - } - pos+=snprintf(pos,size-(pos-print_buf),int_format,h_out->total_count==0?0:(long long)hdr_max(h_out)); - pos+=snprintf(pos,size-(pos-print_buf),int_format,h_out->total_count==0?0:(long long)hdr_min(h_out)); - pos+=snprintf(pos,size-(pos-print_buf),double_format,h_out->total_count==0?0:hdr_mean(h_out)); - pos+=snprintf(pos,size-(pos-print_buf),double_format,h_out->total_count==0?0:hdr_stddev(h_out)); - pos+=snprintf(pos,size-(pos-print_buf),int_format,(long long)h_out->total_count); - - pos+=snprintf(pos,size-(pos-print_buf),"\n"); - - h_tmp=NULL; - - return pos-print_buf; -} - -static int output_file_format_default_type_histogram_and_summary(struct fieldstat_instance *instance, long long interval_ms, char*print_buf, size_t size) -{ - int i = 0, j = 0, metric_num = 0; - char *pos = print_buf; - //display_manifest_t* p=NULL; - struct metric_t *metric = NULL; - struct metric_t *metric_array[INIT_STAT_FIELD_NUM] = {NULL}; - int metric_is_print[INIT_STAT_FIELD_NUM] = {0}; - - if(instance->histogram_cnt == 0 - && instance->summary_cnt == 0) - { - return 0; - } - - for(i = 0; i < instance->metric_cnt; i ++) - { - metric = get_metric(instance, i); - if(metric->field_type != FIELD_TYPE_SUMMARY - && metric->field_type != FILED_TYPE_HISTOGRAM) - { - continue; - } - metric_array[metric_num++] = metric; - } - - for(i = 0; i < metric_num; i++) - { - if(metric_is_print[i] == 1) - { - continue; - } - pos += output_file_print_hdr_head(metric_array[i], pos, size-(pos-print_buf)); - for(j = i; j < metric_num; j ++) - { - if(metric_array[j] == NULL) - { - continue; - } - if(metric_array[i]->histogram.bins_num != metric_array[j]->histogram.bins_num) - { - continue; - } - if(memcmp(metric_array[i]->histogram.bins, metric_array[j]->histogram.bins, metric_array[i]->histogram.bins_num)) - { - continue; - } - pos += output_file_print_hdr_unit(metric_array[j], pos, size-(pos-print_buf)); - metric_is_print[j] = 1; - } - if(pos-print_buf>0) - { - if(*(pos-1)=='\n') - { - pos--; - } - pos+=snprintf(pos,size-(pos-print_buf),"\n%s\n", draw_line); - } - } - return pos-print_buf; -} - - -int fieldstat_output_file(struct fieldstat_instance *instance,long long interval_ms) -{ - size_t print_buf_sz = instance->metric_cnt*1024; - char *print_buf = NULL; - char *append_pos = NULL; - time_t current = 0; - char ctime_buff[STR_LEN_32]={0}; - - if(instance->fp == NULL) - { - instance->fp = fopen(instance->local_output_filename, "w"); - if(instance->fp == NULL) - { - printf("Field Stat: open %s failed.\n",instance->local_output_filename); - assert(0); - return -1; - } - } - - if(!strcmp(instance->local_output_format, "default")) - { - time(¤t); - ctime_r(¤t, ctime_buff); - print_buf = (char*)calloc(sizeof(char), print_buf_sz); - append_pos = print_buf; - append_pos += snprintf(append_pos, print_buf_sz - (append_pos - print_buf), "%s%s", draw_boundary, ctime_buff); - append_pos --;//jump '\n' generate by ctime() - append_pos += snprintf(append_pos, print_buf_sz - (append_pos - print_buf),"%s\n",draw_boundary); - - //pthread_mutex_lock(&(_handle->reg_lock)); //TODO - append_pos += output_file_format_default_type_gauge(instance, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf)); - append_pos += output_file_format_default_type_counter(instance, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf)); - append_pos += output_file_format_default_table(instance, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf)); - append_pos += output_file_format_default_type_histogram_and_summary(instance, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf)); - //TODO output table,output histogram,output summary - //pthread_mutex_unlock(&(_handle->reg_lock));//TODO - } - - if(!strcmp(instance->local_output_format, "json")) - { - //TODO from json output - } - - fseek(instance->fp,0,SEEK_SET); - fwrite(print_buf,append_pos - print_buf,1,instance->fp); - - fflush(instance->fp); - - if(print_buf) - { - free(print_buf); - print_buf = NULL; - } - return 0; -} - void fieldstat_passive_output(struct fieldstat_instance *instance) { @@ -1350,383 +583,6 @@ struct metric_id_list fieldstat_register_table_metrics(struct fieldstat_instance return ret_metric_id_list; } -/* -* ret = -1, output not match fieldstat instance - output: - /metrics - /sapp - /tfe - ... -* ret >=0 && ret < instance_cnt output sepecify fieldstat instance - output: - http://127.0.0.1:9273/sapp content -* ret = instance_cnt output all fieldstat instance - output: - http://127.0.0.1:9273/metrics content -*/ -static int prometheus_get_fs_instance_id(struct prometheus_endpoint_instance *global_prometheus_output, char *uri, int uri_len) -{ - int i = 0; - char instance_name_url[INSTANCE_NAME_LEN + 1] = {0}; - int instance_name_url_len = 0; - - if(uri_len == (int)strlen(global_prometheus_output->url_path) - && 0 == memcmp(uri, global_prometheus_output->url_path, strlen(global_prometheus_output->url_path))) - { - return global_prometheus_output->fs_instance_cnt; - } - - for(i = 0; i < global_prometheus_output->fs_instance_cnt; i++) - { - memset(instance_name_url, 0, sizeof(instance_name_url)); - instance_name_url_len = snprintf(instance_name_url, sizeof(instance_name_url),"/%s",global_prometheus_output->fs_instance[i]->name); - - if(uri_len == instance_name_url_len - && 0 == memcmp( uri, instance_name_url, instance_name_url_len)) - { - return i; - } - } - return -1; -} - -static void prometheus_output_uri_list(struct prometheus_endpoint_instance *prometheus_output,struct http_request_s* request) -{ - int i = 0; - int payload_len = 0; - char *payload = NULL; - char *payload_append_position = NULL; - struct fieldstat_instance **fs_instance = NULL; - struct http_response_s* response = NULL; - - fs_instance = prometheus_output->fs_instance; - - - payload_len = prometheus_output->fs_instance_cnt * 128; //TODO using marco, len? - payload_append_position = payload = (char *)calloc(1,payload_len); - - payload_append_position += snprintf(payload_append_position, payload_len - (payload_append_position - payload),"url_path:\n\t%s\n", prometheus_output->url_path); - - for(i = 0; i < prometheus_output->fs_instance_cnt; i++) - { - payload_append_position += snprintf(payload_append_position, payload_len - (payload_append_position - payload),"\t/%s\n", fs_instance[i]->name); - } - - response = http_response_init(); - http_response_status(response, 404); - http_response_header(response, "Content-Type", "text/plain; charset=utf-8"); - http_response_body(response, payload, strlen(payload)); - http_respond(request, response); - - free(payload); - payload=NULL; - return; -} - -static int prometheus_output_read_metric_tags(struct metric_t *metric, char *instance_name, char *tags_buff, unsigned int size) -{ - int i = 0;//used_len = 0; - char unescape[STR_LEN_256] = {0}; - char *append_pos = tags_buff; - - //used_len += snprint(tags_buff, size - used_len, "app_name=\"%s\"", instance_name); - - append_pos += snprintf(append_pos, size - (append_pos - tags_buff), "app_name=\"%s\"", instance_name); - - if(metric->belong_to_table == 1) - { - append_pos += snprintf(append_pos, size - (append_pos - tags_buff), ",line_name=\"%s\"", metric->field_name); - } - - for(i = 0; i < (int)metric->n_tag; i++) - { - memset(unescape, 0, sizeof(unescape)); - str_unescape(metric->tag_key[i], unescape, sizeof(unescape)); - append_pos += snprintf(append_pos, size - (append_pos - tags_buff), ",%s=\"%s\"", unescape, metric->tag_value[i]); - } - return append_pos - tags_buff; -} - - -static int prometheus_output_read_metric_name(struct metric_t *metric, char *instance_app_name, char *name_buff, unsigned int size) -{ - char unescape[256] = {0}; - char *append_pos = name_buff; - if(metric->belong_to_table == 1) - { - str_unescape(metric->table_column_name, unescape, sizeof(unescape)); - } - else - { - str_unescape(metric->field_name, unescape, sizeof(unescape)); - } - append_pos += snprintf(append_pos, size - (append_pos - name_buff), "%s_%s", instance_app_name, unescape); - - return append_pos - name_buff; -} - - -static int prometheus_output_histogram_and_summary(struct metric_t *metric, char *payload, int payload_len, char *instance_app_name, char *metric_name, char *metric_tags) -{ - long long value=0; - long long sum = 0; - int i=0,used_len=0; - - struct hdr_iter iter; - char *output_format = NULL; - - //struct histogram_t* h=&(p->histogram); - struct histogram_t *h = &(metric->histogram); - struct hdr_histogram* h_out=NULL, *h_tmp=NULL; - int bin_num = metric->histogram.bins_num; - double *bins = metric->histogram.bins; - - if(metric->field_type == FILED_TYPE_HISTOGRAM) - { - output_format = (char *)"%s_bucket{%s,le=\"%0.2f\"} %llu\n"; - } - - if(metric->field_type == FIELD_TYPE_SUMMARY) - { - output_format = (char *)"%s{%s,quantile=\"%0.2f\"} %llu\n"; - } - - hdr_init(h->lowest_trackable_value, h->highest_trackable_value, h->significant_figures, &(h_tmp)); - - hdr_add(h_tmp, h->accumulated); - hdr_add(h_tmp, h->changing); - h_out=h_tmp; - - for(i=0;i<bin_num;i++) - { - if(metric->field_type == FILED_TYPE_HISTOGRAM) - { - value = (long long)hdr_count_le_value(h_out, (long long)bins[i]); - } - if(metric->field_type == FIELD_TYPE_SUMMARY) - { - value=(long long)hdr_value_at_percentile(h_out, bins[i]); - } - - used_len+=snprintf(payload+used_len, - payload_len-used_len, - output_format, - metric_name, - metric_tags, - bins[i], - value - ); - } - used_len+=snprintf(payload+used_len, - payload_len-used_len, - "%s_count{%s} %llu\n", - metric_name, - metric_tags, - (long long)h_out->total_count - ); - - hdr_iter_recorded_init(&iter, h_out); - while (hdr_iter_next(&iter)) - { - sum+=(long long)iter.value; - } - used_len+=snprintf(payload+used_len, - payload_len-used_len, - "%s_sum{%s} %llu\n", - metric_name, - metric_tags, - sum - ); - - hdr_close(h_tmp); - h_tmp=NULL; - - return used_len; -} - - - -static int prometheus_get_instance_metric_playload(struct fieldstat_instance *instance, char **payload, int *payload_size, int offset) -{ - int i = 0; - struct metric_t *metric = NULL; - long long value = 0; - char metric_name[256] = {0}; //match the regex [a-zA-Z_:][a-zA-Z0-9_:]* - char instance_name[256] = {0}; - char metric_tags[1024] = {0}; //label name match the regex [a-zA-Z_:][a-zA-Z0-9_:]* - int append_offset = offset; - char *new_payload = NULL; - int new_payload_size = 0; - - if(instance->running != 1) - { - return -1; - } - - if(payload == NULL) - { - return -1; - } - - new_payload = *payload; - new_payload_size = *payload_size; - - str_unescape(instance->name, instance_name, sizeof(instance_name)); - - for(i = 0; i < instance->metric_cnt; i++) - { - metric = get_metric(instance, i); - - if(metric->is_ratio == 1) - { - continue; - } - if(metric->is_invisible == 1) - { - continue; - } - - if( new_payload_size - append_offset <= LEFT_MIN_BUFF_LEN) - { - new_payload_size += REALLOC_SCALE_SIZE; - new_payload = (char *)realloc(new_payload, new_payload_size); - } - - prometheus_output_read_metric_name(metric, instance_name, metric_name, sizeof(metric_name)); - prometheus_output_read_metric_tags(metric, instance_name, metric_tags, sizeof(metric_tags)); - - switch(metric->field_type) - { - case FIELD_TYPE_COUNTER: - case FIELD_TYPE_GAUGE: - value = get_metric_unit_val(metric, FS_CALC_CURRENT, 1); - append_offset += snprintf(new_payload + append_offset, - new_payload_size - append_offset, - "%s{%s} %llu\n", - metric_name, - metric_tags, - value - ); - break; - case FIELD_TYPE_SUMMARY: - case FILED_TYPE_HISTOGRAM: - append_offset += prometheus_output_histogram_and_summary(metric,new_payload + append_offset, - new_payload_size - append_offset, instance_name, metric_name, metric_tags); - break; - default: - break; - } - } - - *payload = new_payload; - *payload_size = new_payload_size; - - return append_offset; - -} - -static void prometheus_output_instance_metric(struct prometheus_endpoint_instance *prometheus_output, struct http_request_s* request, int fs_instance_idx) -{ - int i = 0; - int payload_size = 0; - int payload_offset = 0; - char *payload = NULL; - struct fieldstat_instance *instance = NULL; - struct http_response_s* response = NULL; - - if(fs_instance_idx == prometheus_output->fs_instance_cnt) - { - for(i = 0; i < prometheus_output->fs_instance_cnt; i++) - { - instance = prometheus_output->fs_instance[i]; - payload_offset = prometheus_get_instance_metric_playload(instance, &payload, &payload_size, payload_offset); - } - } - else - { - instance = prometheus_output->fs_instance[fs_instance_idx]; - payload_offset = prometheus_get_instance_metric_playload(instance, &payload, &payload_size, payload_offset); - } - - if(payload != NULL) - { - response = http_response_init(); - http_response_status(response, 200); - http_response_header(response, "Content-Type", "text/plain; charset=utf-8"); - http_response_body(response, payload, strlen(payload)); - http_respond(request, response); - - free(payload); - payload=NULL; - } - -} - - -static void prometheus_endpoint_instance_output(struct http_request_s* request) -{ - int fs_instance_idx = -1; - struct http_string_s uri; - struct prometheus_endpoint_instance *prometheus_endpoint_instance = &g_prometheus_endpoint_instance;; - - uri = http_request_target(request); - fs_instance_idx = prometheus_get_fs_instance_id(prometheus_endpoint_instance, (char *)uri.buf, uri.len); - - fs_instance_idx == -1 ? - prometheus_output_uri_list(prometheus_endpoint_instance, request) - :prometheus_output_instance_metric(prometheus_endpoint_instance, request, fs_instance_idx); - - return; -} - - -static void *prometheus_endpoint_listen_thread_entry(void *arg) -{ - struct prometheus_endpoint_instance *global_prometheus = (struct prometheus_endpoint_instance *)arg; - http_server_listen(global_prometheus->server_handle); - return NULL; -} - -int fieldstat_global_enable_prometheus_endpoint(unsigned short listen_port, const char *url) -{ - g_prometheus_endpoint_instance.server_handle = http_server_init(listen_port, prometheus_endpoint_instance_output); - if(g_prometheus_endpoint_instance.server_handle == NULL) - { - return -1; - } - - g_prometheus_endpoint_instance.url_path = url == NULL ? strdup(PROMETHEUS_ENDPOINT_DEFAULT_URL):strdup(url); - g_prometheus_endpoint_instance.port = listen_port; - g_prometheus_endpoint_instance.running = 1; - g_prometheus_endpoint_instance.fs_instance = (struct fieldstat_instance **)calloc( sizeof(struct fieldstat_instance *), g_prometheus_endpoint_instance.fs_instance_size); - - pthread_create(&g_prometheus_endpoint_instance.tid, NULL, prometheus_endpoint_listen_thread_entry, (void *)&g_prometheus_endpoint_instance); - - return 0; -} - - -int fieldstat_set_prometheus_output(struct fieldstat_instance *instance) -{ - int fs_instance_id = 0; - - if(g_prometheus_endpoint_instance.running != 1) - { - return -1; - } - - if(g_prometheus_endpoint_instance.fs_instance_cnt >= g_prometheus_endpoint_instance.fs_instance_size) - { - g_prometheus_endpoint_instance.fs_instance_size += REALLOC_SCALE_SIZE; - g_prometheus_endpoint_instance.fs_instance = (struct fieldstat_instance **)realloc(g_prometheus_endpoint_instance.fs_instance, g_prometheus_endpoint_instance.fs_instance_size); - } - - fs_instance_id = g_prometheus_endpoint_instance.fs_instance_cnt++; - g_prometheus_endpoint_instance.fs_instance[fs_instance_id] = instance; - return 0; -} - - static int fieldstat_value_operate(struct fieldstat_instance *instance, int field_id, enum field_op op, long long value) { @@ -1896,6 +752,7 @@ static int fieldstat_register_histogram_and_summary(struct fieldstat_instance *i return metric_id; } + int fieldstat_register_histogram(struct fieldstat_instance *instance, const char *field_name, const char *tag_key[], const char *tag_value[], size_t n_tag, const char * bins, long long lowest_trackable_value,long long highest_trackable_value,int significant_figures) { diff --git a/src/fieldstat_internal.h b/src/fieldstat_internal.h index 1b86b81..e617787 100644 --- a/src/fieldstat_internal.h +++ b/src/fieldstat_internal.h @@ -1,9 +1,28 @@ -#ifndef __FIELD_STAT2_INTERNAL_H__ -#define __FIELD_STAT2_INTERNAL_H__ +#pragma once #include <pthread.h> #include "hdr_histogram.h" #include "threadsafe_counter.h" +#include "fieldstat.h" +#include <assert.h> +#include <string.h> +#include "cJSON.h" +#include <sys/socket.h>//socket +#include <sys/types.h>//socket +#include <netinet/in.h> +#include <arpa/inet.h> +#include <string.h> //strerror +#include <errno.h>//strerror +#include <fcntl.h>//fcntl +#include <unistd.h>//fcntl +#include <net/if.h>//fcntl +#include <sys/ioctl.h>//ioctl +#include <stdio.h> +#include <time.h> +#include <stdlib.h> +#include <pthread.h> +#include <assert.h> +#include <sys/time.h> #define INIT_STAT_FIELD_NUM 1024 #define MAX_STAT_COLUMN_NUM 64 @@ -197,6 +216,16 @@ struct prometheus_endpoint_instance int fs_instance_size; // number }; - - -#endif +void prometheus_endpoint_instance_output(struct http_request_s* request); + +char* __str_dup(const char* str); +long long hdr_count_le_value(const struct hdr_histogram* h, long long value); +struct metric_t * get_metric(struct fieldstat_instance *instance, int metric_id); +long long get_metric_unit_val(struct metric_t *metric,enum field_calc_algo calc_type,int is_refer); +int is_valid_field_name(const char* name); +struct metric_t ** read_metric_slot(struct fieldstat_instance *instance, int metric_id); +struct metric_t* metric_new(enum field_type type, const char *field_name, const char *tag_key[], const char *tag_value[], size_t n_tag); +int fieldstat_output_file(struct fieldstat_instance *instance,long long interval_ms); +struct table_line * read_table_line(struct table_metric *table, int line_id); +int send_udp(int sd, unsigned int dest_ip, unsigned short dest_port, const char * data, int len); +int line_protocol_output(struct fieldstat_instance *instance); diff --git a/src/file_output.cpp b/src/file_output.cpp new file mode 100644 index 0000000..195be5c --- /dev/null +++ b/src/file_output.cpp @@ -0,0 +1,507 @@ +#include "fieldstat_internal.h" + +const char* draw_line="________________________________________________________________________________________________________________________________________________"; +const char* draw_boundary="============================================================"; + +static int print_buf_tag_append_position(char *tag_key[], char *tag_value[], size_t n_tag, char *print_buf_tags, unsigned int size) +{ + int i = 0; + char *append_pos = print_buf_tags; + + if(n_tag <= 0) + { + return 0; + } + + append_pos += snprintf(append_pos, + size - (append_pos - print_buf_tags), + "{" + ); + + for(i = 0; i < (int)n_tag; i++) + { + append_pos += snprintf(append_pos, + size - (append_pos - print_buf_tags), + "%s=\"%s\",", + tag_key[i], + tag_value[i] + ); + } + + if(append_pos - print_buf_tags > 0) + { + append_pos--; + } + + append_pos += snprintf(append_pos, + size - (append_pos - print_buf_tags), + "}\t" + ); + + return append_pos - print_buf_tags; +} + +static int output_file_format_default_type_gauge(struct fieldstat_instance *instance,long long interval_ms,char *print_buf, unsigned int size) +{ + int i = 0, j = 0; + //display_manifest_t* p = NULL; + metric_t *metric = NULL; + long long value = 0; + //double ratio = 0.0; + //char* pos=print_buf; + char *append_pos = print_buf; + char print_buf_tags[1024]; + + for(i = 0; i < instance->metric_cnt; i++) + { + //metric = instance->metric[i]; + metric = get_metric(instance, i); + if(metric->field_type != FIELD_TYPE_GAUGE) + { + continue; + } + if(metric->belong_to_table == 1) + { + continue; + } + if(metric->is_invisible == 1) + { + value = get_metric_unit_val(metric, FS_CALC_SPEED, 0); + continue; + } + /* + if(metric->is_ratio==1) + { + ratio=get_stat_ratio(_handle->display[p->numerator_id], _handle->display[p->denominator_id], NULL, + p->output_scaling, p->calc_type); + pos+=snprintf(pos,size-(pos-print_buf),"%s: %10.2e\t",p->name,ratio); + } + */ + value = get_metric_unit_val(metric, FS_CALC_CURRENT, 0); + //value=value * metric->output_scaling * 1000 / interval_ms; + memset(print_buf_tags,0, sizeof(print_buf_tags)); + print_buf_tag_append_position(metric->tag_key,metric->tag_value, metric->n_tag, print_buf_tags, sizeof(print_buf_tags)); + append_pos += snprintf(append_pos, + size - (append_pos - print_buf), + "%s %s: %-10lld\t", + metric->field_name, + print_buf_tags, + value + ); + j++; + if(j == STATUS_PER_LINE) + { + append_pos += snprintf(append_pos, size - (append_pos - print_buf),"\n"); + j=0; + } + } + + if(append_pos - print_buf > 0) + { + if(*(append_pos - 1) == '\n') + { + append_pos --; + } + append_pos += snprintf(append_pos, size - (append_pos-print_buf),"\n%s\n",draw_line); + } + return append_pos - print_buf; +} + + +static int output_file_format_default_type_counter(struct fieldstat_instance *instance,long long interval_ms,char*print_buf, unsigned int size) +{ + int i=0,j=0; + //display_manifest_t* p=NULL; + metric_t *metric = NULL; + long long value = 0; + //double ratio = 0.0; + char* append_pos = print_buf; + int metric_id[INIT_STAT_FIELD_NUM] = {0}; + int metric_cnt = 0; + char print_buf_tags[1024]; + + for(i = 0;i < instance->metric_cnt; i++) + { + //p=_handle->display[i]; + //metric = instance->metric[i]; + metric = get_metric(instance, i); + if(metric->field_type != FIELD_TYPE_COUNTER) + { + continue; + } + if(metric->belong_to_table == 1) + { + continue; + } + if(metric->is_invisible == 1) + { + get_metric_unit_val(metric,FS_CALC_CURRENT,0); + continue; + } + metric_id[metric_cnt] = i; + metric_cnt++; + } + + for(i = 0; i < metric_cnt; i++) + { + append_pos += snprintf(append_pos, size - (append_pos - print_buf),"\t"); + + for(j = 0; j < FIELD_PER_LINE && i+j < metric_cnt; j++) + { + //metric = instance->metric[metric_id[i+j]]; + metric = get_metric(instance, metric_id[i+j]); + memset(print_buf_tags,0, sizeof(print_buf_tags)); + print_buf_tag_append_position(metric->tag_key,metric->tag_value, metric->n_tag, print_buf_tags, sizeof(print_buf_tags)); + append_pos += snprintf(append_pos, + size - (append_pos - print_buf), + "%10s %s\t", + metric->field_name, + print_buf_tags + ); + } + append_pos += snprintf(append_pos, size - (append_pos-print_buf), "\nsum\t"); + + for(j=0; j < FIELD_PER_LINE && i+j < metric_cnt; j++) + { + //metric = instance->metric[metric_id[i+j]]; + metric = get_metric(instance, metric_id[i+j]); + value = get_metric_unit_val(metric,FS_CALC_CURRENT, 1); + append_pos += snprintf(append_pos, + sizeof(print_buf) - (append_pos - print_buf), + "%10lld\t", + value + ); + } + append_pos += snprintf(append_pos, + sizeof(print_buf) - (append_pos - print_buf), + "\nspeed/s\t" + ); + + for(j=0;j<FIELD_PER_LINE&&i+j<metric_cnt;j++) + { + //metric = instance->metric[metric_id[i+j]]; + metric = get_metric(instance, metric_id[i+j]); + value = get_metric_unit_val(metric,FS_CALC_SPEED, 0); + append_pos += snprintf(append_pos, + size - (append_pos - print_buf), + "%10lld\t", + value*1000/interval_ms + ); + } + i += (j-1); + append_pos += snprintf(append_pos, + size - (append_pos - print_buf), + "\n" + ); + } + + if(append_pos - print_buf > 0) + { + if(*(append_pos - 1)=='\n') + { + append_pos--; + } + append_pos += snprintf(append_pos, + size - (append_pos - print_buf), + "\n%s\n", + draw_line + ); + } + + return append_pos - print_buf; +} + +static int output_file_format_default_table(struct fieldstat_instance *instance,long long interval_ms,char*print_buf, unsigned int size) +{ + int i = 0, j = 0, k = 0; + struct table_metric *table = NULL; + struct table_line *line = NULL; + int metric_id = 0; + struct metric_t *metric = NULL; + long long value = 0; + char* append_pos = print_buf; + + for(i = 0; i < instance->table_num; i++) //per table + { + table = instance->table_metrics[i]; + append_pos += snprintf(append_pos, size - (append_pos - print_buf),"%-20s\t\t",table->name); + for(j = 0; j < table->column_cnt; j ++) + { + //print table column + append_pos += snprintf(append_pos, + size - (append_pos - print_buf), + "\t%10s", + table->column_name[j] + ); + } + for(j = 0; j < table->line_cnt; j++) //per line + { + line = read_table_line(table,j); + //print table line name + tag + append_pos += snprintf(append_pos, + size - (append_pos - print_buf), + "\n%s ", + line->name + ); + + append_pos += print_buf_tag_append_position(line->tag_key, line->tag_value, line->n_tag, + append_pos, size - (append_pos - print_buf)); + + + for(k = 0; k < table->column_cnt; k++) //per metric + { + //print table metric value + metric_id = line->metric_id_belong_to_line[k]; + metric = get_metric(instance,metric_id); + value = get_metric_unit_val(metric, FS_CALC_CURRENT, 0); + append_pos += snprintf(append_pos, + size - (append_pos - print_buf), + "%10lld\t", + value + ); + + } + + } + + if(append_pos - print_buf > 0) + { + if(*(append_pos - 1) == '\n') + { + append_pos--; + } + append_pos += snprintf(append_pos, + size - (append_pos - print_buf), + "\n%s\n", + draw_line + ); + } + + } + return append_pos - print_buf; +} + +static int output_file_print_hdr_head(struct metric_t *metric, char *print_buf, size_t size) +{ + char * pos = print_buf; + char bin_format[STR_LEN_256], str_format[STR_LEN_256]; + const char* extra[]={"MAX", "MIN", "AVG", "STDDEV", "CNT"}; + char buff[STR_LEN_32]; + int i=0; + double * bins = metric->histogram.bins; + int bins_num = metric->histogram.bins_num; + if(metric->field_type == FIELD_TYPE_SUMMARY) + { + snprintf(bin_format, sizeof(bin_format), "%%%d.2lf%%%%", HISTOGRAM_WIDTH-1); + } + if(metric->field_type == FILED_TYPE_HISTOGRAM) + { + snprintf(bin_format, sizeof(bin_format), "le=%%0.0f"); + } + snprintf(str_format, sizeof(str_format), "%%%ds", HISTOGRAM_WIDTH); + if(metric->field_type == FIELD_TYPE_SUMMARY) + { + pos+=snprintf(pos,size-(pos-print_buf),"%-8s\t","summary"); + } + if(metric->field_type == FILED_TYPE_HISTOGRAM) + { + pos+=snprintf(pos,size-(pos-print_buf),"%-8s\t","histogram"); + } + for(i = 0;i < bins_num; i++) + { + if(metric->field_type == FIELD_TYPE_SUMMARY) + { + snprintf(buff,sizeof(buff),bin_format,bins[i]*100); + } + if(metric->field_type == FILED_TYPE_HISTOGRAM) + { + snprintf(buff,sizeof(buff),bin_format,bins[i]); + } + pos+=snprintf(pos,size-(pos-print_buf),str_format, buff); + } + for(i=0;(unsigned int)i<sizeof(extra)/sizeof(extra[0]);i++) + { + pos+=snprintf(pos,size-(pos-print_buf),str_format, extra[i]); + } + pos+=snprintf(pos,size-(pos-print_buf),"\n"); + return pos-print_buf; +} + +static int output_file_print_hdr_unit(struct metric_t *metric, char*print_buf, size_t size) +{ + double * bins = metric->histogram.bins; + int bins_num = metric->histogram.bins_num; + char* pos=print_buf; + long long value=0; + int i=0; + struct histogram_t* h=&(metric->histogram); + struct hdr_histogram* h_out=NULL, *h_tmp=NULL; + char int_format[STR_LEN_256], double_format[STR_LEN_256]; + snprintf(int_format, sizeof(int_format), "%%%dlld",HISTOGRAM_WIDTH); + snprintf(double_format, sizeof(double_format), "%%%d.2lf",HISTOGRAM_WIDTH); + + hdr_init(h->lowest_trackable_value, h->highest_trackable_value, h->significant_figures, &(h_tmp)); + if(h->previous_changed!=NULL) hdr_close(h->previous_changed); + + h->previous_changed=atomic_read(&(h->changing)); + h_tmp=atomic_set(&(h->changing), h_tmp);// left h_tmp is used to avoid warining [-Wunused-value] + + hdr_add(h->accumulated, h->previous_changed); + h_out=h->accumulated; //TODO +/* + if(metric->calc_type==FS_CALC_SPEED) + { + h_out=h->previous_changed; + } + else + { + h_out=h->accumulated; + } +*/ + pos+=snprintf(pos,size-(pos-print_buf),"%-10s\t", metric->field_name); + + for(i=0;i<bins_num;i++) + { + if(metric->field_type == FIELD_TYPE_SUMMARY) + { + value=(long long)hdr_value_at_percentile(h_out, bins[i]*100); + } + if(metric->field_type == FILED_TYPE_HISTOGRAM) + { + value= hdr_count_le_value(h_out, (long long)bins[i]); + } + pos+=snprintf(pos,size-(pos-print_buf),int_format, value); + } + pos+=snprintf(pos,size-(pos-print_buf),int_format,h_out->total_count==0?0:(long long)hdr_max(h_out)); + pos+=snprintf(pos,size-(pos-print_buf),int_format,h_out->total_count==0?0:(long long)hdr_min(h_out)); + pos+=snprintf(pos,size-(pos-print_buf),double_format,h_out->total_count==0?0:hdr_mean(h_out)); + pos+=snprintf(pos,size-(pos-print_buf),double_format,h_out->total_count==0?0:hdr_stddev(h_out)); + pos+=snprintf(pos,size-(pos-print_buf),int_format,(long long)h_out->total_count); + + pos+=snprintf(pos,size-(pos-print_buf),"\n"); + + h_tmp=NULL; + + return pos-print_buf; +} + +static int output_file_format_default_type_histogram_and_summary(struct fieldstat_instance *instance, long long interval_ms, char*print_buf, size_t size) +{ + int i = 0, j = 0, metric_num = 0; + char *pos = print_buf; + //display_manifest_t* p=NULL; + struct metric_t *metric = NULL; + struct metric_t *metric_array[INIT_STAT_FIELD_NUM] = {NULL}; + int metric_is_print[INIT_STAT_FIELD_NUM] = {0}; + + if(instance->histogram_cnt == 0 + && instance->summary_cnt == 0) + { + return 0; + } + + for(i = 0; i < instance->metric_cnt; i ++) + { + metric = get_metric(instance, i); + if(metric->field_type != FIELD_TYPE_SUMMARY + && metric->field_type != FILED_TYPE_HISTOGRAM) + { + continue; + } + metric_array[metric_num++] = metric; + } + + for(i = 0; i < metric_num; i++) + { + if(metric_is_print[i] == 1) + { + continue; + } + pos += output_file_print_hdr_head(metric_array[i], pos, size-(pos-print_buf)); + for(j = i; j < metric_num; j ++) + { + if(metric_array[j] == NULL) + { + continue; + } + if(metric_array[i]->histogram.bins_num != metric_array[j]->histogram.bins_num) + { + continue; + } + if(memcmp(metric_array[i]->histogram.bins, metric_array[j]->histogram.bins, metric_array[i]->histogram.bins_num)) + { + continue; + } + pos += output_file_print_hdr_unit(metric_array[j], pos, size-(pos-print_buf)); + metric_is_print[j] = 1; + } + if(pos-print_buf>0) + { + if(*(pos-1)=='\n') + { + pos--; + } + pos+=snprintf(pos,size-(pos-print_buf),"\n%s\n", draw_line); + } + } + return pos-print_buf; +} + + +int fieldstat_output_file(struct fieldstat_instance *instance,long long interval_ms) +{ + size_t print_buf_sz = instance->metric_cnt*1024; + char *print_buf = NULL; + char *append_pos = NULL; + time_t current = 0; + char ctime_buff[STR_LEN_32]={0}; + + if(instance->fp == NULL) + { + instance->fp = fopen(instance->local_output_filename, "w"); + if(instance->fp == NULL) + { + printf("Field Stat: open %s failed.\n",instance->local_output_filename); + assert(0); + return -1; + } + } + + if(!strcmp(instance->local_output_format, "default")) + { + time(¤t); + ctime_r(¤t, ctime_buff); + print_buf = (char*)calloc(sizeof(char), print_buf_sz); + append_pos = print_buf; + append_pos += snprintf(append_pos, print_buf_sz - (append_pos - print_buf), "%s%s", draw_boundary, ctime_buff); + append_pos --;//jump '\n' generate by ctime() + append_pos += snprintf(append_pos, print_buf_sz - (append_pos - print_buf),"%s\n",draw_boundary); + + //pthread_mutex_lock(&(_handle->reg_lock)); //TODO + append_pos += output_file_format_default_type_gauge(instance, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf)); + append_pos += output_file_format_default_type_counter(instance, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf)); + append_pos += output_file_format_default_table(instance, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf)); + append_pos += output_file_format_default_type_histogram_and_summary(instance, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf)); + //TODO output table,output histogram,output summary + //pthread_mutex_unlock(&(_handle->reg_lock));//TODO + } + + if(!strcmp(instance->local_output_format, "json")) + { + //TODO from json output + } + + fseek(instance->fp,0,SEEK_SET); + fwrite(print_buf,append_pos - print_buf,1,instance->fp); + + fflush(instance->fp); + + if(print_buf) + { + free(print_buf); + print_buf = NULL; + } + return 0; +} + + diff --git a/src/line_protocol_output.cpp b/src/line_protocol_output.cpp new file mode 100644 index 0000000..5d820c0 --- /dev/null +++ b/src/line_protocol_output.cpp @@ -0,0 +1,190 @@ +#include "fieldstat_internal.h" + +static void flush_line_protocol_metric(struct fieldstat_instance *instance) +{ + if(instance->line_protocol_send_buff_offset == 0) + { + return; + } + + if(instance->line_protocol_server_ip > 0 && instance->line_protocol_server_port > 0) + { + send_udp(instance->line_protocol_socket, instance->line_protocol_server_ip, + (unsigned short)instance->line_protocol_server_port, + instance->line_protocol_send_buff, + instance->line_protocol_send_buff_offset + ); + } + instance->line_protocol_send_buff_offset = 0; + memset(instance->line_protocol_send_buff, 0, sizeof(instance->line_protocol_send_buff)); + return; +} + +static void append_line_protocol_line(struct fieldstat_instance *instance, const char* measurement, char *tag_set, char *field_set) +{ + if(field_set==NULL) + { + return; + } + if(UDP_PAYLOAD_SIZE - (unsigned int)instance->line_protocol_send_buff_offset < strlen(measurement) + strlen(field_set) + strlen(tag_set) + 2) + { + flush_line_protocol_metric(instance); + } + printf("Line_protocol metric: %s%s %s\n",measurement,tag_set,field_set); + instance->line_protocol_send_buff_offset += snprintf(instance->line_protocol_send_buff + instance->line_protocol_send_buff_offset, + sizeof(instance->line_protocol_send_buff) - instance->line_protocol_send_buff_offset, + "%s%s %s\n", + measurement, tag_set, field_set + ); + return; +} + +static int output_line_protocol_tag_set_buf(char *tag_key[], char *tag_value[], int n_tag, char *tag_set_buff, unsigned int size) +{ + int i = 0; + char *tag_pos = tag_set_buff; + for(i = 0; i < n_tag; i++) + { + tag_pos += snprintf(tag_pos, + size - (tag_pos - tag_set_buff), + ",%s=%s", + tag_key[i], + tag_value[i] + ); + } + return tag_pos - tag_set_buff; +} + + +static void output_line_protocol_table(struct fieldstat_instance *instance) +{ + int i = 0, j = 0, k = 0; + metric_t *metric = NULL; + long long value = 0; + //double ratio = 0.0; + char field_set_buff[UDP_PAYLOAD_SIZE]; + char tag_set_buff[UDP_PAYLOAD_SIZE]; + + memset(field_set_buff, 0, sizeof(field_set_buff)); + memset(tag_set_buff, 0, sizeof(tag_set_buff)); + char *tag_pos = tag_set_buff; + char *field_pos = field_set_buff; + + struct table_metric *table = NULL; + struct table_line *line = NULL; + + for(i = 0; i < instance->table_num; i++) + { + table = instance->table_metrics[i]; + for(j = 0; j < table->line_cnt; j++) + { + line = read_table_line(table, j); + tag_pos += snprintf(tag_pos, + sizeof(tag_set_buff) - (tag_pos - tag_set_buff), + ",app_name=%s,table_name=%s", + instance->name, + table->name + ); + + tag_pos += output_line_protocol_tag_set_buf(line->tag_key, line->tag_value, line->n_tag, tag_pos, + sizeof(tag_set_buff) - (tag_pos - tag_set_buff)); + + for(k = 0; k < table->column_cnt; k ++) + { + metric = get_metric(instance, line->metric_id_belong_to_line[k]); + + value = metric->field_type == FIELD_TYPE_GAUGE ? + get_metric_unit_val(metric, FS_CALC_CURRENT, 1): + get_metric_unit_val(metric, FS_CALC_SPEED, 1); + + field_pos += snprintf(field_pos, + sizeof(field_set_buff) - (field_set_buff - field_set_buff), + "%s=%lld,", + metric->table_column_name, + value + ); + + } + if(field_pos - field_set_buff > 0) + { + *(field_pos - 1) = '\0'; + } + // measurement,tag_set field_set + append_line_protocol_line(instance, metric->field_name, tag_set_buff, field_set_buff); + tag_pos = tag_set_buff; + field_pos = field_set_buff; + + } + + } +} + +int line_protocol_output(struct fieldstat_instance *instance) +{ + metric_t *metric = NULL; + long long value=0; + int i=0; + char field_set_buff[UDP_PAYLOAD_SIZE]; + char tag_set_buff[UDP_PAYLOAD_SIZE]; + + memset(field_set_buff, 0, sizeof(field_set_buff)); + memset(tag_set_buff, 0, sizeof(tag_set_buff)); + char *tag_pos = tag_set_buff; + + for(i = 0; i < instance->metric_cnt; i++) + { + metric = get_metric(instance, i); + if(metric->is_ratio == 1) + { + continue; + } + + if(metric->belong_to_table == 1) + { + continue; + } + + switch(metric->field_type) + { + case FIELD_TYPE_GAUGE: + value = get_metric_unit_val(metric, FS_CALC_CURRENT, 1); + if(value != 0) + { + snprintf(field_set_buff, UDP_PAYLOAD_SIZE, "%s=%lld", metric->field_name, value); + tag_pos += snprintf(tag_pos, + sizeof(tag_set_buff) - (tag_pos - tag_set_buff), + ",app_name=%s", + instance->name + ); + output_line_protocol_tag_set_buf(metric->tag_key, metric->tag_value, metric->n_tag, tag_pos, sizeof(tag_set_buff) - (tag_pos - tag_set_buff)); + append_line_protocol_line(instance, metric->field_name, tag_set_buff, field_set_buff); + tag_pos = tag_set_buff; + + } + break; + case FIELD_TYPE_COUNTER: + value = get_metric_unit_val(metric, FS_CALC_SPEED, 1); + if(value != 0) + { + snprintf(field_set_buff, UDP_PAYLOAD_SIZE, "%s=%lld", metric->field_name, value); + tag_pos += snprintf(tag_pos, + sizeof(tag_set_buff) - (tag_pos - tag_set_buff), + ",app_name=%s", + instance->name + ); + output_line_protocol_tag_set_buf(metric->tag_key, metric->tag_value, metric->n_tag, tag_pos, sizeof(tag_set_buff) - (tag_pos - tag_set_buff)); + append_line_protocol_line(instance, metric->field_name, tag_set_buff, field_set_buff); + tag_pos = tag_set_buff; + } + break; + default: + break; + + } + } + + output_line_protocol_table(instance); + flush_line_protocol_metric(instance); + + return 0; +}
\ No newline at end of file diff --git a/src/prometheus_output.cpp b/src/prometheus_output.cpp new file mode 100644 index 0000000..c3ee470 --- /dev/null +++ b/src/prometheus_output.cpp @@ -0,0 +1,424 @@ +#include "fieldstat_internal.h" +#define HTTPSERVER_IMPL +#include "httpserver.h" + +struct prometheus_endpoint_instance g_prometheus_endpoint_instance = +{ + 9273, + NULL, + 0, + 0, + NULL, + 0, + NULL, + REALLOC_SCALE_SIZE, +}; + +static char* str_unescape(char* s, char *d, int d_len) +{ + int i=0,j=0; + int len=strlen(s); + for(i=0; i<len && j<d_len; i++) + { + if(s[i]=='(' || s[i]==')' || s[i]=='{' || s[i]=='}' || + s[i]=='/' || s[i]=='\\' || s[i]=='%' || s[i]=='*' || + s[i]=='$' || s[i]=='-' || s[i]==',' || s[i]==';') + { + if(i==0) + { + continue; + } + + d[j++]='_'; + } + else + { + d[j++]=s[i]; + } + } + + if(d[j-1]=='_') + { + j-=1; + } + + d[j]='\0'; + + return 0; +} + +/* +* ret = -1, output not match fieldstat instance + output: + /metrics + /sapp + /tfe + ... +* ret >=0 && ret < instance_cnt output sepecify fieldstat instance + output: + http://127.0.0.1:9273/sapp content +* ret = instance_cnt output all fieldstat instance + output: + http://127.0.0.1:9273/metrics content +*/ +static int prometheus_get_fs_instance_id(struct prometheus_endpoint_instance *global_prometheus_output, char *uri, int uri_len) +{ + int i = 0; + char instance_name_url[INSTANCE_NAME_LEN + 1] = {0}; + int instance_name_url_len = 0; + + if(uri_len == (int)strlen(global_prometheus_output->url_path) + && 0 == memcmp(uri, global_prometheus_output->url_path, strlen(global_prometheus_output->url_path))) + { + return global_prometheus_output->fs_instance_cnt; + } + + for(i = 0; i < global_prometheus_output->fs_instance_cnt; i++) + { + memset(instance_name_url, 0, sizeof(instance_name_url)); + instance_name_url_len = snprintf(instance_name_url, sizeof(instance_name_url),"/%s",global_prometheus_output->fs_instance[i]->name); + + if(uri_len == instance_name_url_len + && 0 == memcmp( uri, instance_name_url, instance_name_url_len)) + { + return i; + } + } + return -1; +} + +static void prometheus_output_uri_list(struct prometheus_endpoint_instance *prometheus_output,struct http_request_s* request) +{ + int i = 0; + int payload_len = 0; + char *payload = NULL; + char *payload_append_position = NULL; + struct fieldstat_instance **fs_instance = NULL; + struct http_response_s* response = NULL; + + fs_instance = prometheus_output->fs_instance; + + + payload_len = prometheus_output->fs_instance_cnt * 128; //TODO using marco, len? + payload_append_position = payload = (char *)calloc(1,payload_len); + + payload_append_position += snprintf(payload_append_position, payload_len - (payload_append_position - payload),"url_path:\n\t%s\n", prometheus_output->url_path); + + for(i = 0; i < prometheus_output->fs_instance_cnt; i++) + { + payload_append_position += snprintf(payload_append_position, payload_len - (payload_append_position - payload),"\t/%s\n", fs_instance[i]->name); + } + + response = http_response_init(); + http_response_status(response, 404); + http_response_header(response, "Content-Type", "text/plain; charset=utf-8"); + http_response_body(response, payload, strlen(payload)); + http_respond(request, response); + + free(payload); + payload=NULL; + return; +} + +static int prometheus_output_read_metric_tags(struct metric_t *metric, char *instance_name, char *tags_buff, unsigned int size) +{ + int i = 0;//used_len = 0; + char unescape[STR_LEN_256] = {0}; + char *append_pos = tags_buff; + + //used_len += snprint(tags_buff, size - used_len, "app_name=\"%s\"", instance_name); + + append_pos += snprintf(append_pos, size - (append_pos - tags_buff), "app_name=\"%s\"", instance_name); + + if(metric->belong_to_table == 1) + { + append_pos += snprintf(append_pos, size - (append_pos - tags_buff), ",line_name=\"%s\"", metric->field_name); + } + + for(i = 0; i < (int)metric->n_tag; i++) + { + memset(unescape, 0, sizeof(unescape)); + str_unescape(metric->tag_key[i], unescape, sizeof(unescape)); + append_pos += snprintf(append_pos, size - (append_pos - tags_buff), ",%s=\"%s\"", unescape, metric->tag_value[i]); + } + return append_pos - tags_buff; +} + + +static int prometheus_output_read_metric_name(struct metric_t *metric, char *instance_app_name, char *name_buff, unsigned int size) +{ + char unescape[256] = {0}; + char *append_pos = name_buff; + if(metric->belong_to_table == 1) + { + str_unescape(metric->table_column_name, unescape, sizeof(unescape)); + } + else + { + str_unescape(metric->field_name, unescape, sizeof(unescape)); + } + append_pos += snprintf(append_pos, size - (append_pos - name_buff), "%s_%s", instance_app_name, unescape); + + return append_pos - name_buff; +} + + +static int prometheus_output_histogram_and_summary(struct metric_t *metric, char *payload, int payload_len, char *instance_app_name, char *metric_name, char *metric_tags) +{ + long long value=0; + long long sum = 0; + int i=0,used_len=0; + + struct hdr_iter iter; + char *output_format = NULL; + + //struct histogram_t* h=&(p->histogram); + struct histogram_t *h = &(metric->histogram); + struct hdr_histogram* h_out=NULL, *h_tmp=NULL; + int bin_num = metric->histogram.bins_num; + double *bins = metric->histogram.bins; + + if(metric->field_type == FILED_TYPE_HISTOGRAM) + { + output_format = (char *)"%s_bucket{%s,le=\"%0.2f\"} %llu\n"; + } + + if(metric->field_type == FIELD_TYPE_SUMMARY) + { + output_format = (char *)"%s{%s,quantile=\"%0.2f\"} %llu\n"; + } + + hdr_init(h->lowest_trackable_value, h->highest_trackable_value, h->significant_figures, &(h_tmp)); + + hdr_add(h_tmp, h->accumulated); + hdr_add(h_tmp, h->changing); + h_out=h_tmp; + + for(i=0;i<bin_num;i++) + { + if(metric->field_type == FILED_TYPE_HISTOGRAM) + { + value = (long long)hdr_count_le_value(h_out, (long long)bins[i]); + } + if(metric->field_type == FIELD_TYPE_SUMMARY) + { + value=(long long)hdr_value_at_percentile(h_out, bins[i]); + } + + used_len+=snprintf(payload+used_len, + payload_len-used_len, + output_format, + metric_name, + metric_tags, + bins[i], + value + ); + } + used_len+=snprintf(payload+used_len, + payload_len-used_len, + "%s_count{%s} %llu\n", + metric_name, + metric_tags, + (long long)h_out->total_count + ); + + hdr_iter_recorded_init(&iter, h_out); + while (hdr_iter_next(&iter)) + { + sum+=(long long)iter.value; + } + used_len+=snprintf(payload+used_len, + payload_len-used_len, + "%s_sum{%s} %llu\n", + metric_name, + metric_tags, + sum + ); + + hdr_close(h_tmp); + h_tmp=NULL; + + return used_len; +} + + + +static int prometheus_get_instance_metric_playload(struct fieldstat_instance *instance, char **payload, int *payload_size, int offset) +{ + int i = 0; + struct metric_t *metric = NULL; + long long value = 0; + char metric_name[256] = {0}; //match the regex [a-zA-Z_:][a-zA-Z0-9_:]* + char instance_name[256] = {0}; + char metric_tags[1024] = {0}; //label name match the regex [a-zA-Z_:][a-zA-Z0-9_:]* + int append_offset = offset; + char *new_payload = NULL; + int new_payload_size = 0; + + if(instance->running != 1) + { + return -1; + } + + if(payload == NULL) + { + return -1; + } + + new_payload = *payload; + new_payload_size = *payload_size; + + str_unescape(instance->name, instance_name, sizeof(instance_name)); + + for(i = 0; i < instance->metric_cnt; i++) + { + metric = get_metric(instance, i); + + if(metric->is_ratio == 1) + { + continue; + } + if(metric->is_invisible == 1) + { + continue; + } + + if( new_payload_size - append_offset <= LEFT_MIN_BUFF_LEN) + { + new_payload_size += REALLOC_SCALE_SIZE; + new_payload = (char *)realloc(new_payload, new_payload_size); + } + + prometheus_output_read_metric_name(metric, instance_name, metric_name, sizeof(metric_name)); + prometheus_output_read_metric_tags(metric, instance_name, metric_tags, sizeof(metric_tags)); + + switch(metric->field_type) + { + case FIELD_TYPE_COUNTER: + case FIELD_TYPE_GAUGE: + value = get_metric_unit_val(metric, FS_CALC_CURRENT, 1); + append_offset += snprintf(new_payload + append_offset, + new_payload_size - append_offset, + "%s{%s} %llu\n", + metric_name, + metric_tags, + value + ); + break; + case FIELD_TYPE_SUMMARY: + case FILED_TYPE_HISTOGRAM: + append_offset += prometheus_output_histogram_and_summary(metric,new_payload + append_offset, + new_payload_size - append_offset, instance_name, metric_name, metric_tags); + break; + default: + break; + } + } + + *payload = new_payload; + *payload_size = new_payload_size; + + return append_offset; + +} + +static void prometheus_output_instance_metric(struct prometheus_endpoint_instance *prometheus_output, struct http_request_s* request, int fs_instance_idx) +{ + int i = 0; + int payload_size = 0; + int payload_offset = 0; + char *payload = NULL; + struct fieldstat_instance *instance = NULL; + struct http_response_s* response = NULL; + + if(fs_instance_idx == prometheus_output->fs_instance_cnt) + { + for(i = 0; i < prometheus_output->fs_instance_cnt; i++) + { + instance = prometheus_output->fs_instance[i]; + payload_offset = prometheus_get_instance_metric_playload(instance, &payload, &payload_size, payload_offset); + } + } + else + { + instance = prometheus_output->fs_instance[fs_instance_idx]; + payload_offset = prometheus_get_instance_metric_playload(instance, &payload, &payload_size, payload_offset); + } + + if(payload != NULL) + { + response = http_response_init(); + http_response_status(response, 200); + http_response_header(response, "Content-Type", "text/plain; charset=utf-8"); + http_response_body(response, payload, strlen(payload)); + http_respond(request, response); + + free(payload); + payload=NULL; + } + +} + +void prometheus_endpoint_instance_output(struct http_request_s* request) +{ + int fs_instance_idx = -1; + struct http_string_s uri; + struct prometheus_endpoint_instance *prometheus_endpoint_instance = &g_prometheus_endpoint_instance;; + + uri = http_request_target(request); + fs_instance_idx = prometheus_get_fs_instance_id(prometheus_endpoint_instance, (char *)uri.buf, uri.len); + + fs_instance_idx == -1 ? + prometheus_output_uri_list(prometheus_endpoint_instance, request) + :prometheus_output_instance_metric(prometheus_endpoint_instance, request, fs_instance_idx); + + return; +} + + +void *prometheus_endpoint_listen_thread_entry(void *arg) +{ + struct prometheus_endpoint_instance *global_prometheus = (struct prometheus_endpoint_instance *)arg; + http_server_listen(global_prometheus->server_handle); + return NULL; +} + +int fieldstat_global_enable_prometheus_endpoint(unsigned short listen_port, const char *url) +{ + g_prometheus_endpoint_instance.server_handle = http_server_init(listen_port, prometheus_endpoint_instance_output); + if(g_prometheus_endpoint_instance.server_handle == NULL) + { + return -1; + } + + g_prometheus_endpoint_instance.url_path = url == NULL ? strdup(PROMETHEUS_ENDPOINT_DEFAULT_URL):strdup(url); + g_prometheus_endpoint_instance.port = listen_port; + g_prometheus_endpoint_instance.running = 1; + g_prometheus_endpoint_instance.fs_instance = (struct fieldstat_instance **)calloc( sizeof(struct fieldstat_instance *), g_prometheus_endpoint_instance.fs_instance_size); + + pthread_create(&g_prometheus_endpoint_instance.tid, NULL, prometheus_endpoint_listen_thread_entry, (void *)&g_prometheus_endpoint_instance); + + return 0; +} + +int fieldstat_set_prometheus_output(struct fieldstat_instance *instance) +{ + int fs_instance_id = 0; + + if(g_prometheus_endpoint_instance.running != 1) + { + return -1; + } + + if(g_prometheus_endpoint_instance.fs_instance_cnt >= g_prometheus_endpoint_instance.fs_instance_size) + { + g_prometheus_endpoint_instance.fs_instance_size += REALLOC_SCALE_SIZE; + g_prometheus_endpoint_instance.fs_instance = (struct fieldstat_instance **)realloc(g_prometheus_endpoint_instance.fs_instance, g_prometheus_endpoint_instance.fs_instance_size); + } + + fs_instance_id = g_prometheus_endpoint_instance.fs_instance_cnt++; + g_prometheus_endpoint_instance.fs_instance[fs_instance_id] = instance; + return 0; +} + + |
