summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorfumingwei <[email protected]>2023-02-27 20:44:58 +0800
committerfumingwei <[email protected]>2023-02-27 20:55:38 +0800
commitc2a77c781f1bfd4752999f41b32739874db17e40 (patch)
tree94d2e8796d83018e7ecfb1cf4b36d1e2746f0ad9 /src
parent9e71865523e1e8318b34b609900170b1079d5386 (diff)
feature:实现counter类型和gauge类型的line protocol输出
Diffstat (limited to 'src')
-rw-r--r--src/fieldstat.cpp171
1 files changed, 170 insertions, 1 deletions
diff --git a/src/fieldstat.cpp b/src/fieldstat.cpp
index 081126e..735b856 100644
--- a/src/fieldstat.cpp
+++ b/src/fieldstat.cpp
@@ -117,8 +117,68 @@ void metric_free(struct metric_t* metric)
return;
}
+static int startup_udp()
+{
+ int sd_udp=-1;
+ int flags;
+ if(-1==(sd_udp = socket(AF_INET, SOCK_DGRAM, 0)))
+ {
+ printf("FS2: socket error: %d %s, restart socket.", errno, strerror(errno));
+ sd_udp=-1;
+ }
+ flags=fcntl(sd_udp,F_GETFL);
+ flags|=O_NONBLOCK;
+ if(fcntl(sd_udp,F_SETFL,flags)==-1)
+ {
+ printf("FS2: socket error: %d %s, restart socket.", errno, strerror(errno));
+ sd_udp=-1;
+ }
+ int opt=1;
+ if (setsockopt (sd_udp, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt) ) < 0) {
+ printf("FS2:setsockopt error: %d %s, restart socket.", errno, strerror(errno));
+ close (sd_udp);
+ sd_udp=-1;
+ return sd_udp;
+ }
+
+ return sd_udp;
+}
+static int send_udp(int sd, unsigned int dest_ip, unsigned short dest_port, const char * data, int len)
+{
+ int to_send_len=len;
+ int already_sended_len=0,this_sended_len=0;
+ struct sockaddr_in addr;
+ addr.sin_family = AF_INET;
+ addr.sin_addr.s_addr =dest_ip;
+ addr.sin_port = htons(dest_port);
+
+ while(to_send_len>already_sended_len)
+ {
+ this_sended_len=sendto(sd,(void*)(data+already_sended_len),
+ to_send_len-already_sended_len,
+ 0,
+ (struct sockaddr *)&(addr),
+ sizeof(addr));
+ if(this_sended_len==-1)
+ {
+ if((EAGAIN == errno)||( EINTR == errno )|| (EWOULDBLOCK==errno))
+ {
+ continue;
+ }
+ else
+ {
+ printf("FS2: at send,socket error: %d %s", errno, strerror(errno));
+ return -1;
+ }
+ }
+ already_sended_len=+this_sended_len;
+ }
+ return 0;
+}
+
+
int fieldstat_set_app_name(struct fieldstat_instance *instance, const char *app_name)
{
@@ -186,7 +246,7 @@ int fieldstat_set_local_output(struct fieldstat_instance *instance, const char *
{
return -1;
}
-
+ instance->line_protocol_socket = startup_udp();
strncpy(instance->local_output_filename, (char *)filename, len_filename);
strncpy(instance->local_output_format, (char *)format, len_format);
instance->local_output_enable = 1;
@@ -322,6 +382,111 @@ long long get_metric_unit_val(struct metric_t *metric,enum field_calc_algo calc_
return value;
}
+
+void flush_line_protocol_metric(struct fieldstat_instance *instance)
+{
+ if(instance->line_protocol_send_buff_offset == 0)
+ {
+ return;
+ }
+
+ if(instance->line_protocol_server_ip > 0 && instance->line_protocol_server_port > 0)
+ {
+ send_udp(instance->line_protocol_socket, instance->line_protocol_server_ip,(unsigned short)instance->line_protocol_server_port, instance->line_protocol_send_buff, instance->line_protocol_send_buff_offset);
+ }
+ instance->line_protocol_send_buff_offset = 0;
+ memset(instance->line_protocol_send_buff, 0, sizeof(instance->line_protocol_send_buff));
+ return;
+}
+
+void append_line_protocol_line(struct fieldstat_instance *instance, const char* measurement, char *tag_set, char *field_set)
+{
+ if(field_set==NULL)
+ {
+ return;
+ }
+ if(UDP_PAYLOAD_SIZE - (unsigned int)instance->line_protocol_send_buff_offset < strlen(measurement) + strlen(field_set) + strlen(tag_set) + 2)
+ {
+ flush_line_protocol_metric(instance);
+ }
+ //printf("Line_protocol metric: %s%s %s\n",measurement,tag_set,field_set);
+ instance->line_protocol_send_buff_offset += snprintf(instance->line_protocol_send_buff + instance->line_protocol_send_buff_offset,
+ sizeof(instance->line_protocol_send_buff) - instance->line_protocol_send_buff_offset,
+ "%s%s %s\n",
+ measurement, tag_set, field_set);
+ return;
+}
+
+
+int line_protocol_output(struct fieldstat_instance *instance)
+{
+ metric_t *metric = NULL;
+ long long value=0;
+ int i=0,j=0;
+ char field_set_buff[UDP_PAYLOAD_SIZE];
+ char tag_set_buff[UDP_PAYLOAD_SIZE];
+
+ memset(field_set_buff, 0, sizeof(field_set_buff));
+ memset(tag_set_buff, 0, sizeof(tag_set_buff));
+ char *tag_set_buff_append = tag_set_buff;
+
+ for(i = 0; i < instance->metric_cnt; i++)
+ {
+ metric = instance->metric[i];
+ if(metric->is_ratio == 1)
+ {
+ continue;
+ }
+
+ if(metric->belong_to_table == 1)
+ {
+ continue;
+ }
+
+ switch(metric->field_type)
+ {
+ case FIELD_TYPE_GAUGE:
+ value = get_metric_unit_val(metric, FS_CALC_CURRENT, 1);
+ if(value != 0)
+ {
+ snprintf(field_set_buff, UDP_PAYLOAD_SIZE, "%s=%lld", metric->field_name, value);
+ tag_set_buff_append += snprintf(tag_set_buff_append, sizeof(tag_set_buff) - (tag_set_buff_append - tag_set_buff), ",app_name=%s", instance->app_name);
+ for(j = 0; j < (int)metric->n_tag; j++)
+ {
+ tag_set_buff_append += snprintf(tag_set_buff_append, sizeof(tag_set_buff) - (tag_set_buff_append - tag_set_buff), ",%s=%s", metric->tag_key[i],metric->tag_value[i]);
+ }
+ append_line_protocol_line(instance, metric->field_name, tag_set_buff, field_set_buff);
+ tag_set_buff_append = tag_set_buff;
+
+ }
+ break;
+ case FIELD_TYPE_COUNTER:
+ value = get_metric_unit_val(metric, FS_CALC_SPEED, 1);
+ if(value != 0)
+ {
+ snprintf(field_set_buff, UDP_PAYLOAD_SIZE, "%s=%lld", metric->field_name, value);
+ tag_set_buff_append += snprintf(tag_set_buff_append, sizeof(tag_set_buff) - (tag_set_buff_append - tag_set_buff), ",app_name=%s", instance->app_name);
+ for(j = 0; j < (int)metric->n_tag; j++)
+ {
+ tag_set_buff_append += snprintf(tag_set_buff_append, sizeof(tag_set_buff) - (tag_set_buff_append - tag_set_buff), ",%s=%s", metric->tag_key[i],metric->tag_value[i]);
+ }
+ append_line_protocol_line(instance, metric->field_name, tag_set_buff, field_set_buff);
+ tag_set_buff_append = tag_set_buff;
+ }
+ break;
+ default:
+ break;
+
+ }
+ }
+ flush_line_protocol_metric(instance);
+ return 0;
+}
+
+
+
+
+
static int print_buf_tag_append_position(metric_t *metric, char *print_buf_tags, unsigned int size)
{
int i = 0;
@@ -639,6 +804,10 @@ void fieldstat_passive_output(struct fieldstat_instance *instance)
{
ret = fieldstat_output_file(instance, interval_ms);
}
+ if(instance->line_protocol_output_enable)
+ {
+ ret = line_protocol_output(instance);
+ }
if(ret == -1)
{
return;