diff options
Diffstat (limited to 'src/fieldstat.cpp')
| -rw-r--r-- | src/fieldstat.cpp | 596 |
1 files changed, 596 insertions, 0 deletions
diff --git a/src/fieldstat.cpp b/src/fieldstat.cpp new file mode 100644 index 0000000..a277559 --- /dev/null +++ b/src/fieldstat.cpp @@ -0,0 +1,596 @@ +#include "fieldstat.h" +#include "fieldstat_internal.h" +#include "threadsafe_counter.h" +#include "hdr_histogram.h" +#include "cJSON.h" + +#include <sys/socket.h>//socket +#include <sys/types.h>//socket +#include <netinet/in.h> +#include <arpa/inet.h> +#include <string.h> //strerror +#include <errno.h>//strerror +#include <fcntl.h>//fcntl +#include <unistd.h>//fcntl +#include <net/if.h>//fcntl +#include <sys/ioctl.h>//ioctl +#include <stdio.h> +#include <time.h> +#include <unistd.h> +#include <stdlib.h> +#include <pthread.h> +#include <assert.h> +#include <sys/time.h> + +double HISTOGRAM_DEFAULT_BINS[]={50.0, 80.0, 90.0, 95.0, 99.0}; + +int FIELD_STAT_VERSION_2_8_20200805_fix_outOfBound=0; + +//Automatically generate the version number +#ifdef __cplusplus +extern "C" +{ +#endif + +#define GIT_VERSION_CATTER(v) __attribute__((__used__)) const char * GIT_VERSION_##v = NULL +#define GIT_VERSION_EXPEND(v) GIT_VERSION_CATTER(v) + +/* VERSION TAG */ +#ifdef GIT_VERSION +GIT_VERSION_EXPEND(GIT_VERSION); +#else +static __attribute__((__used__)) const char * GIT_VERSION_UNKNOWN = NULL; +#endif +#undef GIT_VERSION_CATTER +#undef GIT_VERSION_EXPEND + +#ifdef __cplusplus +} +#endif +//endof Automatically generate the version number + +const char* draw_line="________________________________________________________________________________________________________________________________________________"; +const char* draw_boundary="============================================================"; + +static char* __str_dup(const char* str) +{ + char* dup=NULL; + dup=(char*)calloc(sizeof(char),strlen(str)+1); + memcpy(dup, str, strlen(str)); + return dup; +} + +int is_valid_field_name(const char* name) +{ + const char* reserverd="|:\n\r. \t<>[]#!@"; + unsigned int i=0,j=0; + for(i=0;i<strlen(name);i++) + { + for(j=0;j<strlen(reserverd);j++) + if(name[i]==reserverd[j]) + { + return 0; + } + } + return 1; +} + + + +struct metric_t* metric_new(enum field_type type, const char *field_name, const char *tag_key[], const char *tag_value[], size_t n_tag) +{ + int i = 0; + struct metric_t* metric=(struct metric_t*)calloc(sizeof(struct metric_t),1); + metric->field_name =__str_dup(field_name); + metric->field_type = type; + metric->is_ratio = 0; + metric->output_scaling = 1; + metric->n_tag = n_tag; + + for(i = 0; i < (int)n_tag; i++) + { + metric->tag_key[i] = strdup(tag_key[i]); + metric->tag_value[i] = strdup(tag_value[i]); + } + return metric; +} + +void metric_free(struct metric_t* metric) +{ + int i = 0; + + free(metric->field_name); + metric->field_name = NULL; + + for(i = 0; i < (int)metric->n_tag; i++) + { + free(metric->tag_key[i]); + metric->tag_key[i] = NULL; + + free(metric->tag_value[i]); + metric->tag_value[i] = NULL; + } + metric->n_tag = 0; + + free(metric); + + return; +} + + + + +int fieldstat_set_app_name(struct fieldstat_instance *instance, const char *app_name) +{ + int len_app_name = strlen(app_name); + if(instance->running == 1) + { + return -1; + } + if(len_app_name <= 0 && len_app_name >= LEN_APP_NAME ) + { + return -1; + } + strncpy(instance->app_name,(char*)app_name, len_app_name); + return 0; +} + +int fieldstat_set_output_interval(struct fieldstat_instance *instance, int seconds) +{ + if(instance->running == 1 || seconds <= 0 ) + { + return -1; + } + instance->output_interval_s = seconds; + return 0; +} + +int fieldstat_backgroud_thead_disable(struct fieldstat_instance *instance) +{ + if(instance->running == 1) + { + return -1; + } + instance->background_thread_disable = 1; + return 0; +} + +int fieldstat_prometheus_output_enable(struct fieldstat_instance *instance) +{ + if(instance->running == 1) + { + return -1; + } + instance->prometheus_output_enable = 1; + return 0; +} + +int fieldstat_set_local_output(struct fieldstat_instance *instance, const char *filename, const char *format) +{ + int len_filename = strlen(filename); + int len_format = strlen(format); + + if(instance->running == 1) + { + return -1; + } + if(strcmp(format,"default") != 0 && strcmp(format,"json") != 0) + { + return -1; + } + if(len_filename <= 0 || len_filename >= LEN_PATH_MAX) + { + return -1; + } + if(len_format <= 0 || len_format >= LEN_FORMAT_MAX) + { + return -1; + } + + strncpy(instance->local_output_filename, (char *)filename, len_filename); + strncpy(instance->local_output_format, (char *)format, len_format); + instance->local_output_enable = 1; + + return 0; +} + +int fieldstat_set_line_protocol_server(struct fieldstat_instance *instance, const char *ip, unsigned short port) +{ + int len_ip = strlen(ip); + + if(instance->running == 1) + { + return -1; + } + if(len_ip <= 0 || len_ip >= LEN_IP_MAX) + { + return -1; + } + if(1 != inet_pton(AF_INET, ip, (void *)&(instance->line_protocol_server_ip))) + { + return -1; + } + + strncpy(instance->line_protocol_server_str_ip,(char *)ip,len_ip); + instance->line_protocol_server_port = port; + instance->line_protocol_output_enable = 1; + + return 0; +} + +int fieldstat_set_statsd_server(struct fieldstat_instance *instance, const char *ip, unsigned short port) +{ + int len_ip = strlen(ip); + if(instance->running == 1) + { + return -1; + } + if(len_ip <= 0 || len_ip >= LEN_IP_MAX) + { + return -1; + } + + if(1 != inet_pton(AF_INET, ip, (void *)&(instance->statsd_server_ip))) + { + return -1; + } + + strncpy(instance->statsd_server_str_ip,(char *)ip,len_ip); + instance->statsd_server_port = port; + instance->statsd_output_enable = 1; + return 0; +} + +int fieldstat_register(struct fieldstat_instance *instance, enum field_type type, const char *field_name, const char *tag_key[], const char *tag_value[], size_t n_tag) +{ + int metric_id = 0; + struct metric_t * metric_choosen = NULL; + if(!is_valid_field_name(field_name)) + { + return -1; + } + if(n_tag > N_TAG_MAX) + { + return -1; + } + //TODO not block + metric_id = instance->metric_cnt++; + assert(instance->metric_cnt < instance->metric_size); + metric_choosen = instance->metric[metric_id] = metric_new(type,field_name,tag_key,tag_value,n_tag); + switch(type) + { + case FIELD_TYPE_COUNTER: + instance->counter_cnt++; + memset(&(metric_choosen->counter), 0, sizeof(metric_choosen->counter)); + break; + case FIELD_TYPE_GAUGE: + instance->gauge_cnt++; + memset(&(metric_choosen->gauge), 0, sizeof(metric_choosen->gauge)); + break; + case FILED_TYPE_HISTOGRAM: + //instance->histogram_cnt++; + // TODO what? + break; + case FIELD_TYPE_SUMMARY: + //instance->summary_cnt++; + //TODO what ? + break; + default: + assert(0); + } + return metric_id; +} + + +//long long get_metric_unit_val(display_manifest_t* p, int column_seq,enum field_calc_algo calc_type,int is_refer) +long long get_metric_unit_val(struct metric_t *metric,enum field_calc_algo calc_type,int is_refer) +{ + stat_unit_t* target = NULL; + long long value = 0; + switch(metric->field_type) + { + case FIELD_TYPE_COUNTER: + target = &(metric->counter); + break; + case FIELD_TYPE_GAUGE: + target = &(metric->gauge); + break; + case FILED_TYPE_HISTOGRAM: + case FIELD_TYPE_SUMMARY: + default: + break; + } + value = threadsafe_counter_read(&(target->changing)); + //value= threadsafe_counter_read(&(target->changing)); + if(is_refer == 0) + { + target->previous_changed = value; + target->accumulated += value; + threadsafe_counter_set(&(target->changing), 0); + } + switch(calc_type) + { + case FS_CALC_CURRENT: + value=target->accumulated; + break; + case FS_CALC_SPEED: + value=target->previous_changed; + break; + default: + assert(0); + } + return value; +} + +static int print_buf_tag_append_position(metric_t *metric, char *print_buf_tags, unsigned int size) +{ + int i = 0; + char *print_buf_tags_append_position = print_buf_tags; + + print_buf_tags_append_position += snprintf(print_buf_tags_append_position, size - (print_buf_tags_append_position - print_buf_tags),"{"); + for(; i < metric->n_tag; i++) + { + if(i == 0) + { + print_buf_tags_append_position += snprintf(print_buf_tags_append_position, size - (print_buf_tags_append_position - print_buf_tags),"%s=\"%s\"", metric->tag_key[i],metric->tag_value[i]); + } + else + { + print_buf_tags_append_position += snprintf(print_buf_tags_append_position, size - (print_buf_tags_append_position - print_buf_tags),",%s=\"%s\"", metric->tag_key[i],metric->tag_value[i]); + } + } + print_buf_tags_append_position += snprintf(print_buf_tags_append_position, size - (print_buf_tags_append_position - print_buf_tags),"}"); + + return print_buf_tags_append_position - print_buf_tags; +} + + +static int output_file_format_default_type_gauge(struct fieldstat_instance *instance,long long interval_ms,char *print_buf, unsigned int size) +{ + int i = 0, j = 0; + //display_manifest_t* p = NULL; + metric_t *metric = NULL; + long long value = 0; + //double ratio = 0.0; + //char* pos=print_buf; + char *print_buf_append_position = print_buf; + char print_buf_tags[1024]; + + for(i = 0; i < instance->metric_cnt; i++) + { + metric = instance->metric[i]; + if(metric->field_type != FIELD_TYPE_GAUGE) + { + continue; + } + if(metric->is_invisible == 1) + { + value = get_metric_unit_val(metric, FS_CALC_SPEED, 0); + continue; + } + /* + if(metric->is_ratio==1) + { + ratio=get_stat_ratio(_handle->display[p->numerator_id], _handle->display[p->denominator_id], NULL, + p->output_scaling, p->calc_type); + pos+=snprintf(pos,size-(pos-print_buf),"%s: %10.2e\t",p->name,ratio); + } + */ + value = get_metric_unit_val(metric, FS_CALC_CURRENT, 0); + //value=value * metric->output_scaling * 1000 / interval_ms; + memset(print_buf_tags,0, sizeof(print_buf_tags)); + print_buf_tag_append_position(metric, print_buf_tags, sizeof(print_buf_tags)); + print_buf_append_position += snprintf(print_buf_append_position, size - (print_buf_append_position - print_buf), "%s %s: %-10lld\t", metric->field_name, print_buf_tags, value); + j++; + if(j == STATUS_PER_LINE) + { + print_buf_append_position += snprintf(print_buf_append_position, size - (print_buf_append_position - print_buf),"\n"); + j=0; + } + } + + if(print_buf_append_position - print_buf > 0) + { + if(*(print_buf_append_position - 1) == '\n') + { + print_buf_append_position --; + } + print_buf_append_position += snprintf(print_buf_append_position, size - (print_buf_append_position-print_buf),"\n%s\n",draw_line); + } + return print_buf_append_position - print_buf; +} + + +static int output_file_format_default_type_counter(struct fieldstat_instance *instance,long long interval_ms,char*print_buf, unsigned int size) +{ + int i=0,j=0; + //display_manifest_t* p=NULL; + metric_t *metric = NULL; + long long value = 0; + //double ratio = 0.0; + char* print_buf_append_position = print_buf; + int metric_id[INIT_STAT_FIELD_NUM] = {0}; + int metric_cnt = 0; + char print_buf_tags[1024]; + + for(i = 0;i < instance->metric_cnt; i++) + { + //p=_handle->display[i]; + metric = instance->metric[i]; + if(metric->field_type != FIELD_TYPE_COUNTER) + { + continue; + } + if(metric->is_invisible == 1) + { + get_metric_unit_val(metric,FS_CALC_CURRENT,0); + continue; + } + metric_id[metric_cnt] = i; + metric_cnt++; + } + + for(i = 0; i < metric_cnt; i++) + { + print_buf_append_position += snprintf(print_buf_append_position, size - (print_buf_append_position - print_buf),"\t"); + + for(j = 0; j < FIELD_PER_LINE && i+j < metric_cnt; j++) + { + metric = instance->metric[metric_id[i+j]]; + memset(print_buf_tags,0, sizeof(print_buf_tags)); + print_buf_tag_append_position(metric, print_buf_tags, sizeof(print_buf_tags)); + print_buf_append_position += snprintf(print_buf_append_position, size - (print_buf_append_position - print_buf), "%10s %s\t", metric->field_name, print_buf_tags); + } + print_buf_append_position += snprintf(print_buf_append_position, size - (print_buf_append_position-print_buf), "\nsum\t"); + + for(j=0; j < FIELD_PER_LINE && i+j < metric_cnt; j++) + { + metric = instance->metric[metric_id[i+j]]; + value = get_metric_unit_val(metric,FS_CALC_CURRENT, 1); + print_buf_append_position += snprintf(print_buf_append_position, sizeof(print_buf) - (print_buf_append_position - print_buf), "%10lld\t", value); + } + print_buf_append_position += snprintf(print_buf_append_position, sizeof(print_buf) - (print_buf_append_position - print_buf), "\nspeed/s\t"); + + for(j=0;j<FIELD_PER_LINE&&i+j<metric_cnt;j++) + { + metric = instance->metric[metric_id[i+j]]; + value = get_metric_unit_val(metric,FS_CALC_SPEED, 0); + print_buf_append_position += snprintf(print_buf_append_position, size - (print_buf_append_position - print_buf), "%10lld\t", value*1000/interval_ms); + } + i += (j-1); + print_buf_append_position += snprintf(print_buf_append_position, size - (print_buf_append_position - print_buf), "\n"); + } + + if(print_buf_append_position - print_buf > 0) + { + if(*(print_buf_append_position - 1)=='\n') + { + print_buf_append_position--; + } + print_buf_append_position += snprintf(print_buf_append_position, size - (print_buf_append_position - print_buf),"\n%s\n", draw_line); + } + + return print_buf_append_position - print_buf; +} + + +int fieldstat_output_file(struct fieldstat_instance *instance,long long interval_ms) +{ + size_t print_buf_sz = instance->metric_cnt*1024; + char *print_buf = NULL; + char *print_buf_append_position = NULL; + time_t current = 0; + char ctime_buff[32]={0}; + + if(instance->fp == NULL) + { + instance->fp = fopen(instance->local_output_filename, "w"); + if(instance->fp == NULL) + { + printf("Field Stat: open %s failed.\n",instance->local_output_filename); + assert(0); + return -1; + } + } + + if(!strcmp(instance->local_output_format, "default")) + { + time(¤t); + ctime_r(¤t, ctime_buff); + print_buf = (char*)calloc(sizeof(char), print_buf_sz); + print_buf_append_position = print_buf; + print_buf_append_position += snprintf(print_buf_append_position, print_buf_sz - (print_buf_append_position - print_buf), "%s%s", draw_boundary, ctime_buff); + print_buf_append_position --;//jump '\n' generate by ctime() + print_buf_append_position += snprintf(print_buf_append_position, print_buf_sz - (print_buf_append_position - print_buf),"%s\n",draw_boundary); + + //pthread_mutex_lock(&(_handle->reg_lock)); //TODO + print_buf_append_position += output_file_format_default_type_gauge(instance, interval_ms, print_buf_append_position, print_buf_sz - (print_buf_append_position - print_buf)); + print_buf_append_position += output_file_format_default_type_counter(instance, interval_ms, print_buf_append_position, print_buf_sz - (print_buf_append_position - print_buf)); + //TODO output table,output histogram,output summary + //pthread_mutex_unlock(&(_handle->reg_lock));//TODO + } + + if(!strcmp(instance->local_output_format, "json")) + { + //TODO from json output + } + + fseek(instance->fp,0,SEEK_SET); + fwrite(print_buf,print_buf_append_position - print_buf,1,instance->fp); + + fflush(instance->fp); + + if(print_buf) + { + free(print_buf); + print_buf = NULL; + } + return 0; +} + + +void fieldstat_passive_output(struct fieldstat_instance *instance) +{ + struct timespec this_output_time; + long long interval_ms = 0; + int ret = 0; + + if(instance->running == 0) + { + return; + } + + clock_gettime(CLOCK_MONOTONIC ,&this_output_time); + interval_ms = (this_output_time.tv_sec - instance->last_output_time.tv_sec) * 1000 + (this_output_time.tv_nsec - instance->last_output_time.tv_nsec) / 1000000; + if(interval_ms < 1) + { + printf("Passive return\n"); + return; + } + + if(instance->local_output_enable) + { + ret = fieldstat_output_file(instance, interval_ms); + } + if(ret == -1) + { + return; + } + memcpy(&(instance->last_output_time),&this_output_time, sizeof(this_output_time)); +} + +void *fieldstat_thread_schema_output(void *arg) +{ + struct fieldstat_instance *instance=(struct fieldstat_instance *)arg; + while(instance->background_thread_disable == 0) + { + fieldstat_passive_output(instance); + sleep(instance->output_interval_s); + } + return NULL; +} + + +void fieldstat_instance_start(struct fieldstat_instance *instance) +{ + instance->running = 1; + clock_gettime(CLOCK_MONOTONIC,&(instance->last_output_time)); + if(instance->background_thread_disable == 0) + { + pthread_create(&(instance->cfg_mon_t), NULL, fieldstat_thread_schema_output, (void*)instance); + } + //append instance to prometheus output +} + +struct fieldstat_instance * fieldstat_instance_create(void) +{ + struct fieldstat_instance *instance = (struct fieldstat_instance *)calloc(sizeof(struct fieldstat_instance),1); + + strcpy(instance->app_name, "?"); + instance->running = 0; + instance->output_interval_s = 2; //default 2s + instance->background_thread_disable = 0; + instance->metric_size = NUM_INIT_METRICS; + instance->metric =(struct metric_t **)calloc(sizeof(struct metric *), instance->metric_size); + return instance; +} |
