summaryrefslogtreecommitdiff
path: root/src/fieldstat.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/fieldstat.cpp')
-rw-r--r--src/fieldstat.cpp596
1 files changed, 596 insertions, 0 deletions
diff --git a/src/fieldstat.cpp b/src/fieldstat.cpp
new file mode 100644
index 0000000..a277559
--- /dev/null
+++ b/src/fieldstat.cpp
@@ -0,0 +1,596 @@
+#include "fieldstat.h"
+#include "fieldstat_internal.h"
+#include "threadsafe_counter.h"
+#include "hdr_histogram.h"
+#include "cJSON.h"
+
+#include <sys/socket.h>//socket
+#include <sys/types.h>//socket
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <string.h> //strerror
+#include <errno.h>//strerror
+#include <fcntl.h>//fcntl
+#include <unistd.h>//fcntl
+#include <net/if.h>//fcntl
+#include <sys/ioctl.h>//ioctl
+#include <stdio.h>
+#include <time.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <pthread.h>
+#include <assert.h>
+#include <sys/time.h>
+
+double HISTOGRAM_DEFAULT_BINS[]={50.0, 80.0, 90.0, 95.0, 99.0};
+
+int FIELD_STAT_VERSION_2_8_20200805_fix_outOfBound=0;
+
+//Automatically generate the version number
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+#define GIT_VERSION_CATTER(v) __attribute__((__used__)) const char * GIT_VERSION_##v = NULL
+#define GIT_VERSION_EXPEND(v) GIT_VERSION_CATTER(v)
+
+/* VERSION TAG */
+#ifdef GIT_VERSION
+GIT_VERSION_EXPEND(GIT_VERSION);
+#else
+static __attribute__((__used__)) const char * GIT_VERSION_UNKNOWN = NULL;
+#endif
+#undef GIT_VERSION_CATTER
+#undef GIT_VERSION_EXPEND
+
+#ifdef __cplusplus
+}
+#endif
+//endof Automatically generate the version number
+
+const char* draw_line="________________________________________________________________________________________________________________________________________________";
+const char* draw_boundary="============================================================";
+
+static char* __str_dup(const char* str)
+{
+ char* dup=NULL;
+ dup=(char*)calloc(sizeof(char),strlen(str)+1);
+ memcpy(dup, str, strlen(str));
+ return dup;
+}
+
+int is_valid_field_name(const char* name)
+{
+ const char* reserverd="|:\n\r. \t<>[]#!@";
+ unsigned int i=0,j=0;
+ for(i=0;i<strlen(name);i++)
+ {
+ for(j=0;j<strlen(reserverd);j++)
+ if(name[i]==reserverd[j])
+ {
+ return 0;
+ }
+ }
+ return 1;
+}
+
+
+
+struct metric_t* metric_new(enum field_type type, const char *field_name, const char *tag_key[], const char *tag_value[], size_t n_tag)
+{
+ int i = 0;
+ struct metric_t* metric=(struct metric_t*)calloc(sizeof(struct metric_t),1);
+ metric->field_name =__str_dup(field_name);
+ metric->field_type = type;
+ metric->is_ratio = 0;
+ metric->output_scaling = 1;
+ metric->n_tag = n_tag;
+
+ for(i = 0; i < (int)n_tag; i++)
+ {
+ metric->tag_key[i] = strdup(tag_key[i]);
+ metric->tag_value[i] = strdup(tag_value[i]);
+ }
+ return metric;
+}
+
+void metric_free(struct metric_t* metric)
+{
+ int i = 0;
+
+ free(metric->field_name);
+ metric->field_name = NULL;
+
+ for(i = 0; i < (int)metric->n_tag; i++)
+ {
+ free(metric->tag_key[i]);
+ metric->tag_key[i] = NULL;
+
+ free(metric->tag_value[i]);
+ metric->tag_value[i] = NULL;
+ }
+ metric->n_tag = 0;
+
+ free(metric);
+
+ return;
+}
+
+
+
+
+int fieldstat_set_app_name(struct fieldstat_instance *instance, const char *app_name)
+{
+ int len_app_name = strlen(app_name);
+ if(instance->running == 1)
+ {
+ return -1;
+ }
+ if(len_app_name <= 0 && len_app_name >= LEN_APP_NAME )
+ {
+ return -1;
+ }
+ strncpy(instance->app_name,(char*)app_name, len_app_name);
+ return 0;
+}
+
+int fieldstat_set_output_interval(struct fieldstat_instance *instance, int seconds)
+{
+ if(instance->running == 1 || seconds <= 0 )
+ {
+ return -1;
+ }
+ instance->output_interval_s = seconds;
+ return 0;
+}
+
+int fieldstat_backgroud_thead_disable(struct fieldstat_instance *instance)
+{
+ if(instance->running == 1)
+ {
+ return -1;
+ }
+ instance->background_thread_disable = 1;
+ return 0;
+}
+
+int fieldstat_prometheus_output_enable(struct fieldstat_instance *instance)
+{
+ if(instance->running == 1)
+ {
+ return -1;
+ }
+ instance->prometheus_output_enable = 1;
+ return 0;
+}
+
+int fieldstat_set_local_output(struct fieldstat_instance *instance, const char *filename, const char *format)
+{
+ int len_filename = strlen(filename);
+ int len_format = strlen(format);
+
+ if(instance->running == 1)
+ {
+ return -1;
+ }
+ if(strcmp(format,"default") != 0 && strcmp(format,"json") != 0)
+ {
+ return -1;
+ }
+ if(len_filename <= 0 || len_filename >= LEN_PATH_MAX)
+ {
+ return -1;
+ }
+ if(len_format <= 0 || len_format >= LEN_FORMAT_MAX)
+ {
+ return -1;
+ }
+
+ strncpy(instance->local_output_filename, (char *)filename, len_filename);
+ strncpy(instance->local_output_format, (char *)format, len_format);
+ instance->local_output_enable = 1;
+
+ return 0;
+}
+
+int fieldstat_set_line_protocol_server(struct fieldstat_instance *instance, const char *ip, unsigned short port)
+{
+ int len_ip = strlen(ip);
+
+ if(instance->running == 1)
+ {
+ return -1;
+ }
+ if(len_ip <= 0 || len_ip >= LEN_IP_MAX)
+ {
+ return -1;
+ }
+ if(1 != inet_pton(AF_INET, ip, (void *)&(instance->line_protocol_server_ip)))
+ {
+ return -1;
+ }
+
+ strncpy(instance->line_protocol_server_str_ip,(char *)ip,len_ip);
+ instance->line_protocol_server_port = port;
+ instance->line_protocol_output_enable = 1;
+
+ return 0;
+}
+
+int fieldstat_set_statsd_server(struct fieldstat_instance *instance, const char *ip, unsigned short port)
+{
+ int len_ip = strlen(ip);
+ if(instance->running == 1)
+ {
+ return -1;
+ }
+ if(len_ip <= 0 || len_ip >= LEN_IP_MAX)
+ {
+ return -1;
+ }
+
+ if(1 != inet_pton(AF_INET, ip, (void *)&(instance->statsd_server_ip)))
+ {
+ return -1;
+ }
+
+ strncpy(instance->statsd_server_str_ip,(char *)ip,len_ip);
+ instance->statsd_server_port = port;
+ instance->statsd_output_enable = 1;
+ return 0;
+}
+
+int fieldstat_register(struct fieldstat_instance *instance, enum field_type type, const char *field_name, const char *tag_key[], const char *tag_value[], size_t n_tag)
+{
+ int metric_id = 0;
+ struct metric_t * metric_choosen = NULL;
+ if(!is_valid_field_name(field_name))
+ {
+ return -1;
+ }
+ if(n_tag > N_TAG_MAX)
+ {
+ return -1;
+ }
+ //TODO not block
+ metric_id = instance->metric_cnt++;
+ assert(instance->metric_cnt < instance->metric_size);
+ metric_choosen = instance->metric[metric_id] = metric_new(type,field_name,tag_key,tag_value,n_tag);
+ switch(type)
+ {
+ case FIELD_TYPE_COUNTER:
+ instance->counter_cnt++;
+ memset(&(metric_choosen->counter), 0, sizeof(metric_choosen->counter));
+ break;
+ case FIELD_TYPE_GAUGE:
+ instance->gauge_cnt++;
+ memset(&(metric_choosen->gauge), 0, sizeof(metric_choosen->gauge));
+ break;
+ case FILED_TYPE_HISTOGRAM:
+ //instance->histogram_cnt++;
+ // TODO what?
+ break;
+ case FIELD_TYPE_SUMMARY:
+ //instance->summary_cnt++;
+ //TODO what ?
+ break;
+ default:
+ assert(0);
+ }
+ return metric_id;
+}
+
+
+//long long get_metric_unit_val(display_manifest_t* p, int column_seq,enum field_calc_algo calc_type,int is_refer)
+long long get_metric_unit_val(struct metric_t *metric,enum field_calc_algo calc_type,int is_refer)
+{
+ stat_unit_t* target = NULL;
+ long long value = 0;
+ switch(metric->field_type)
+ {
+ case FIELD_TYPE_COUNTER:
+ target = &(metric->counter);
+ break;
+ case FIELD_TYPE_GAUGE:
+ target = &(metric->gauge);
+ break;
+ case FILED_TYPE_HISTOGRAM:
+ case FIELD_TYPE_SUMMARY:
+ default:
+ break;
+ }
+ value = threadsafe_counter_read(&(target->changing));
+ //value= threadsafe_counter_read(&(target->changing));
+ if(is_refer == 0)
+ {
+ target->previous_changed = value;
+ target->accumulated += value;
+ threadsafe_counter_set(&(target->changing), 0);
+ }
+ switch(calc_type)
+ {
+ case FS_CALC_CURRENT:
+ value=target->accumulated;
+ break;
+ case FS_CALC_SPEED:
+ value=target->previous_changed;
+ break;
+ default:
+ assert(0);
+ }
+ return value;
+}
+
+static int print_buf_tag_append_position(metric_t *metric, char *print_buf_tags, unsigned int size)
+{
+ int i = 0;
+ char *print_buf_tags_append_position = print_buf_tags;
+
+ print_buf_tags_append_position += snprintf(print_buf_tags_append_position, size - (print_buf_tags_append_position - print_buf_tags),"{");
+ for(; i < metric->n_tag; i++)
+ {
+ if(i == 0)
+ {
+ print_buf_tags_append_position += snprintf(print_buf_tags_append_position, size - (print_buf_tags_append_position - print_buf_tags),"%s=\"%s\"", metric->tag_key[i],metric->tag_value[i]);
+ }
+ else
+ {
+ print_buf_tags_append_position += snprintf(print_buf_tags_append_position, size - (print_buf_tags_append_position - print_buf_tags),",%s=\"%s\"", metric->tag_key[i],metric->tag_value[i]);
+ }
+ }
+ print_buf_tags_append_position += snprintf(print_buf_tags_append_position, size - (print_buf_tags_append_position - print_buf_tags),"}");
+
+ return print_buf_tags_append_position - print_buf_tags;
+}
+
+
+static int output_file_format_default_type_gauge(struct fieldstat_instance *instance,long long interval_ms,char *print_buf, unsigned int size)
+{
+ int i = 0, j = 0;
+ //display_manifest_t* p = NULL;
+ metric_t *metric = NULL;
+ long long value = 0;
+ //double ratio = 0.0;
+ //char* pos=print_buf;
+ char *print_buf_append_position = print_buf;
+ char print_buf_tags[1024];
+
+ for(i = 0; i < instance->metric_cnt; i++)
+ {
+ metric = instance->metric[i];
+ if(metric->field_type != FIELD_TYPE_GAUGE)
+ {
+ continue;
+ }
+ if(metric->is_invisible == 1)
+ {
+ value = get_metric_unit_val(metric, FS_CALC_SPEED, 0);
+ continue;
+ }
+ /*
+ if(metric->is_ratio==1)
+ {
+ ratio=get_stat_ratio(_handle->display[p->numerator_id], _handle->display[p->denominator_id], NULL,
+ p->output_scaling, p->calc_type);
+ pos+=snprintf(pos,size-(pos-print_buf),"%s: %10.2e\t",p->name,ratio);
+ }
+ */
+ value = get_metric_unit_val(metric, FS_CALC_CURRENT, 0);
+ //value=value * metric->output_scaling * 1000 / interval_ms;
+ memset(print_buf_tags,0, sizeof(print_buf_tags));
+ print_buf_tag_append_position(metric, print_buf_tags, sizeof(print_buf_tags));
+ print_buf_append_position += snprintf(print_buf_append_position, size - (print_buf_append_position - print_buf), "%s %s: %-10lld\t", metric->field_name, print_buf_tags, value);
+ j++;
+ if(j == STATUS_PER_LINE)
+ {
+ print_buf_append_position += snprintf(print_buf_append_position, size - (print_buf_append_position - print_buf),"\n");
+ j=0;
+ }
+ }
+
+ if(print_buf_append_position - print_buf > 0)
+ {
+ if(*(print_buf_append_position - 1) == '\n')
+ {
+ print_buf_append_position --;
+ }
+ print_buf_append_position += snprintf(print_buf_append_position, size - (print_buf_append_position-print_buf),"\n%s\n",draw_line);
+ }
+ return print_buf_append_position - print_buf;
+}
+
+
+static int output_file_format_default_type_counter(struct fieldstat_instance *instance,long long interval_ms,char*print_buf, unsigned int size)
+{
+ int i=0,j=0;
+ //display_manifest_t* p=NULL;
+ metric_t *metric = NULL;
+ long long value = 0;
+ //double ratio = 0.0;
+ char* print_buf_append_position = print_buf;
+ int metric_id[INIT_STAT_FIELD_NUM] = {0};
+ int metric_cnt = 0;
+ char print_buf_tags[1024];
+
+ for(i = 0;i < instance->metric_cnt; i++)
+ {
+ //p=_handle->display[i];
+ metric = instance->metric[i];
+ if(metric->field_type != FIELD_TYPE_COUNTER)
+ {
+ continue;
+ }
+ if(metric->is_invisible == 1)
+ {
+ get_metric_unit_val(metric,FS_CALC_CURRENT,0);
+ continue;
+ }
+ metric_id[metric_cnt] = i;
+ metric_cnt++;
+ }
+
+ for(i = 0; i < metric_cnt; i++)
+ {
+ print_buf_append_position += snprintf(print_buf_append_position, size - (print_buf_append_position - print_buf),"\t");
+
+ for(j = 0; j < FIELD_PER_LINE && i+j < metric_cnt; j++)
+ {
+ metric = instance->metric[metric_id[i+j]];
+ memset(print_buf_tags,0, sizeof(print_buf_tags));
+ print_buf_tag_append_position(metric, print_buf_tags, sizeof(print_buf_tags));
+ print_buf_append_position += snprintf(print_buf_append_position, size - (print_buf_append_position - print_buf), "%10s %s\t", metric->field_name, print_buf_tags);
+ }
+ print_buf_append_position += snprintf(print_buf_append_position, size - (print_buf_append_position-print_buf), "\nsum\t");
+
+ for(j=0; j < FIELD_PER_LINE && i+j < metric_cnt; j++)
+ {
+ metric = instance->metric[metric_id[i+j]];
+ value = get_metric_unit_val(metric,FS_CALC_CURRENT, 1);
+ print_buf_append_position += snprintf(print_buf_append_position, sizeof(print_buf) - (print_buf_append_position - print_buf), "%10lld\t", value);
+ }
+ print_buf_append_position += snprintf(print_buf_append_position, sizeof(print_buf) - (print_buf_append_position - print_buf), "\nspeed/s\t");
+
+ for(j=0;j<FIELD_PER_LINE&&i+j<metric_cnt;j++)
+ {
+ metric = instance->metric[metric_id[i+j]];
+ value = get_metric_unit_val(metric,FS_CALC_SPEED, 0);
+ print_buf_append_position += snprintf(print_buf_append_position, size - (print_buf_append_position - print_buf), "%10lld\t", value*1000/interval_ms);
+ }
+ i += (j-1);
+ print_buf_append_position += snprintf(print_buf_append_position, size - (print_buf_append_position - print_buf), "\n");
+ }
+
+ if(print_buf_append_position - print_buf > 0)
+ {
+ if(*(print_buf_append_position - 1)=='\n')
+ {
+ print_buf_append_position--;
+ }
+ print_buf_append_position += snprintf(print_buf_append_position, size - (print_buf_append_position - print_buf),"\n%s\n", draw_line);
+ }
+
+ return print_buf_append_position - print_buf;
+}
+
+
+int fieldstat_output_file(struct fieldstat_instance *instance,long long interval_ms)
+{
+ size_t print_buf_sz = instance->metric_cnt*1024;
+ char *print_buf = NULL;
+ char *print_buf_append_position = NULL;
+ time_t current = 0;
+ char ctime_buff[32]={0};
+
+ if(instance->fp == NULL)
+ {
+ instance->fp = fopen(instance->local_output_filename, "w");
+ if(instance->fp == NULL)
+ {
+ printf("Field Stat: open %s failed.\n",instance->local_output_filename);
+ assert(0);
+ return -1;
+ }
+ }
+
+ if(!strcmp(instance->local_output_format, "default"))
+ {
+ time(&current);
+ ctime_r(&current, ctime_buff);
+ print_buf = (char*)calloc(sizeof(char), print_buf_sz);
+ print_buf_append_position = print_buf;
+ print_buf_append_position += snprintf(print_buf_append_position, print_buf_sz - (print_buf_append_position - print_buf), "%s%s", draw_boundary, ctime_buff);
+ print_buf_append_position --;//jump '\n' generate by ctime()
+ print_buf_append_position += snprintf(print_buf_append_position, print_buf_sz - (print_buf_append_position - print_buf),"%s\n",draw_boundary);
+
+ //pthread_mutex_lock(&(_handle->reg_lock)); //TODO
+ print_buf_append_position += output_file_format_default_type_gauge(instance, interval_ms, print_buf_append_position, print_buf_sz - (print_buf_append_position - print_buf));
+ print_buf_append_position += output_file_format_default_type_counter(instance, interval_ms, print_buf_append_position, print_buf_sz - (print_buf_append_position - print_buf));
+ //TODO output table,output histogram,output summary
+ //pthread_mutex_unlock(&(_handle->reg_lock));//TODO
+ }
+
+ if(!strcmp(instance->local_output_format, "json"))
+ {
+ //TODO from json output
+ }
+
+ fseek(instance->fp,0,SEEK_SET);
+ fwrite(print_buf,print_buf_append_position - print_buf,1,instance->fp);
+
+ fflush(instance->fp);
+
+ if(print_buf)
+ {
+ free(print_buf);
+ print_buf = NULL;
+ }
+ return 0;
+}
+
+
+void fieldstat_passive_output(struct fieldstat_instance *instance)
+{
+ struct timespec this_output_time;
+ long long interval_ms = 0;
+ int ret = 0;
+
+ if(instance->running == 0)
+ {
+ return;
+ }
+
+ clock_gettime(CLOCK_MONOTONIC ,&this_output_time);
+ interval_ms = (this_output_time.tv_sec - instance->last_output_time.tv_sec) * 1000 + (this_output_time.tv_nsec - instance->last_output_time.tv_nsec) / 1000000;
+ if(interval_ms < 1)
+ {
+ printf("Passive return\n");
+ return;
+ }
+
+ if(instance->local_output_enable)
+ {
+ ret = fieldstat_output_file(instance, interval_ms);
+ }
+ if(ret == -1)
+ {
+ return;
+ }
+ memcpy(&(instance->last_output_time),&this_output_time, sizeof(this_output_time));
+}
+
+void *fieldstat_thread_schema_output(void *arg)
+{
+ struct fieldstat_instance *instance=(struct fieldstat_instance *)arg;
+ while(instance->background_thread_disable == 0)
+ {
+ fieldstat_passive_output(instance);
+ sleep(instance->output_interval_s);
+ }
+ return NULL;
+}
+
+
+void fieldstat_instance_start(struct fieldstat_instance *instance)
+{
+ instance->running = 1;
+ clock_gettime(CLOCK_MONOTONIC,&(instance->last_output_time));
+ if(instance->background_thread_disable == 0)
+ {
+ pthread_create(&(instance->cfg_mon_t), NULL, fieldstat_thread_schema_output, (void*)instance);
+ }
+ //append instance to prometheus output
+}
+
+struct fieldstat_instance * fieldstat_instance_create(void)
+{
+ struct fieldstat_instance *instance = (struct fieldstat_instance *)calloc(sizeof(struct fieldstat_instance),1);
+
+ strcpy(instance->app_name, "?");
+ instance->running = 0;
+ instance->output_interval_s = 2; //default 2s
+ instance->background_thread_disable = 0;
+ instance->metric_size = NUM_INIT_METRICS;
+ instance->metric =(struct metric_t **)calloc(sizeof(struct metric *), instance->metric_size);
+ return instance;
+}