From c2a77c781f1bfd4752999f41b32739874db17e40 Mon Sep 17 00:00:00 2001 From: fumingwei Date: Mon, 27 Feb 2023 20:44:58 +0800 Subject: feature:实现counter类型和gauge类型的line protocol输出 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/fieldstat.cpp | 171 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 170 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/fieldstat.cpp b/src/fieldstat.cpp index 081126e..735b856 100644 --- a/src/fieldstat.cpp +++ b/src/fieldstat.cpp @@ -117,8 +117,68 @@ void metric_free(struct metric_t* metric) return; } +static int startup_udp() +{ + int sd_udp=-1; + int flags; + if(-1==(sd_udp = socket(AF_INET, SOCK_DGRAM, 0))) + { + printf("FS2: socket error: %d %s, restart socket.", errno, strerror(errno)); + sd_udp=-1; + } + flags=fcntl(sd_udp,F_GETFL); + flags|=O_NONBLOCK; + if(fcntl(sd_udp,F_SETFL,flags)==-1) + { + printf("FS2: socket error: %d %s, restart socket.", errno, strerror(errno)); + sd_udp=-1; + } + int opt=1; + if (setsockopt (sd_udp, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt) ) < 0) { + printf("FS2:setsockopt error: %d %s, restart socket.", errno, strerror(errno)); + close (sd_udp); + sd_udp=-1; + return sd_udp; + } + + return sd_udp; +} +static int send_udp(int sd, unsigned int dest_ip, unsigned short dest_port, const char * data, int len) +{ + int to_send_len=len; + int already_sended_len=0,this_sended_len=0; + struct sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr =dest_ip; + addr.sin_port = htons(dest_port); + + while(to_send_len>already_sended_len) + { + this_sended_len=sendto(sd,(void*)(data+already_sended_len), + to_send_len-already_sended_len, + 0, + (struct sockaddr *)&(addr), + sizeof(addr)); + if(this_sended_len==-1) + { + if((EAGAIN == errno)||( EINTR == errno )|| (EWOULDBLOCK==errno)) + { + continue; + } + else + { + printf("FS2: at send,socket error: %d %s", errno, strerror(errno)); + return -1; + } + } + already_sended_len=+this_sended_len; + } + return 0; +} + + int fieldstat_set_app_name(struct fieldstat_instance *instance, const char *app_name) { @@ -186,7 +246,7 @@ int fieldstat_set_local_output(struct fieldstat_instance *instance, const char * { return -1; } - + instance->line_protocol_socket = startup_udp(); strncpy(instance->local_output_filename, (char *)filename, len_filename); strncpy(instance->local_output_format, (char *)format, len_format); instance->local_output_enable = 1; @@ -322,6 +382,111 @@ long long get_metric_unit_val(struct metric_t *metric,enum field_calc_algo calc_ return value; } + +void flush_line_protocol_metric(struct fieldstat_instance *instance) +{ + if(instance->line_protocol_send_buff_offset == 0) + { + return; + } + + if(instance->line_protocol_server_ip > 0 && instance->line_protocol_server_port > 0) + { + send_udp(instance->line_protocol_socket, instance->line_protocol_server_ip,(unsigned short)instance->line_protocol_server_port, instance->line_protocol_send_buff, instance->line_protocol_send_buff_offset); + } + instance->line_protocol_send_buff_offset = 0; + memset(instance->line_protocol_send_buff, 0, sizeof(instance->line_protocol_send_buff)); + return; +} + +void append_line_protocol_line(struct fieldstat_instance *instance, const char* measurement, char *tag_set, char *field_set) +{ + if(field_set==NULL) + { + return; + } + if(UDP_PAYLOAD_SIZE - (unsigned int)instance->line_protocol_send_buff_offset < strlen(measurement) + strlen(field_set) + strlen(tag_set) + 2) + { + flush_line_protocol_metric(instance); + } + //printf("Line_protocol metric: %s%s %s\n",measurement,tag_set,field_set); + instance->line_protocol_send_buff_offset += snprintf(instance->line_protocol_send_buff + instance->line_protocol_send_buff_offset, + sizeof(instance->line_protocol_send_buff) - instance->line_protocol_send_buff_offset, + "%s%s %s\n", + measurement, tag_set, field_set); + return; +} + + +int line_protocol_output(struct fieldstat_instance *instance) +{ + metric_t *metric = NULL; + long long value=0; + int i=0,j=0; + char field_set_buff[UDP_PAYLOAD_SIZE]; + char tag_set_buff[UDP_PAYLOAD_SIZE]; + + memset(field_set_buff, 0, sizeof(field_set_buff)); + memset(tag_set_buff, 0, sizeof(tag_set_buff)); + char *tag_set_buff_append = tag_set_buff; + + for(i = 0; i < instance->metric_cnt; i++) + { + metric = instance->metric[i]; + if(metric->is_ratio == 1) + { + continue; + } + + if(metric->belong_to_table == 1) + { + continue; + } + + switch(metric->field_type) + { + case FIELD_TYPE_GAUGE: + value = get_metric_unit_val(metric, FS_CALC_CURRENT, 1); + if(value != 0) + { + snprintf(field_set_buff, UDP_PAYLOAD_SIZE, "%s=%lld", metric->field_name, value); + tag_set_buff_append += snprintf(tag_set_buff_append, sizeof(tag_set_buff) - (tag_set_buff_append - tag_set_buff), ",app_name=%s", instance->app_name); + for(j = 0; j < (int)metric->n_tag; j++) + { + tag_set_buff_append += snprintf(tag_set_buff_append, sizeof(tag_set_buff) - (tag_set_buff_append - tag_set_buff), ",%s=%s", metric->tag_key[i],metric->tag_value[i]); + } + append_line_protocol_line(instance, metric->field_name, tag_set_buff, field_set_buff); + tag_set_buff_append = tag_set_buff; + + } + break; + case FIELD_TYPE_COUNTER: + value = get_metric_unit_val(metric, FS_CALC_SPEED, 1); + if(value != 0) + { + snprintf(field_set_buff, UDP_PAYLOAD_SIZE, "%s=%lld", metric->field_name, value); + tag_set_buff_append += snprintf(tag_set_buff_append, sizeof(tag_set_buff) - (tag_set_buff_append - tag_set_buff), ",app_name=%s", instance->app_name); + for(j = 0; j < (int)metric->n_tag; j++) + { + tag_set_buff_append += snprintf(tag_set_buff_append, sizeof(tag_set_buff) - (tag_set_buff_append - tag_set_buff), ",%s=%s", metric->tag_key[i],metric->tag_value[i]); + } + append_line_protocol_line(instance, metric->field_name, tag_set_buff, field_set_buff); + tag_set_buff_append = tag_set_buff; + } + break; + default: + break; + + } + } + flush_line_protocol_metric(instance); + return 0; +} + + + + + static int print_buf_tag_append_position(metric_t *metric, char *print_buf_tags, unsigned int size) { int i = 0; @@ -639,6 +804,10 @@ void fieldstat_passive_output(struct fieldstat_instance *instance) { ret = fieldstat_output_file(instance, interval_ms); } + if(instance->line_protocol_output_enable) + { + ret = line_protocol_output(instance); + } if(ret == -1) { return; -- cgit v1.2.3