summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorfumingwei <[email protected]>2023-03-24 22:05:09 +0800
committerfumingwei <[email protected]>2023-03-27 20:16:55 +0800
commit45a6ce34addd9187d7123c03d3badd5fa6d30589 (patch)
tree35401d8a6cab312eeb20075895802ebd6436a746
parente09e7f397145613f1776622a2219047b9066f3a5 (diff)
feature:重写line protocol output代码
-rw-r--r--src/fieldstat.cpp51
-rw-r--r--src/fieldstat_dynamic.cpp19
-rw-r--r--src/fieldstat_internal.h35
-rw-r--r--src/line_protocol_output.cpp540
-rw-r--r--test/src/gtest_dynamic_fieldstat.cpp9
-rw-r--r--test/src/gtest_fieldstat.cpp93
6 files changed, 466 insertions, 281 deletions
diff --git a/src/fieldstat.cpp b/src/fieldstat.cpp
index 65a0c15..582efaa 100644
--- a/src/fieldstat.cpp
+++ b/src/fieldstat.cpp
@@ -350,20 +350,19 @@ int fieldstat_set_local_output(struct fieldstat_instance *instance, const char *
int fieldstat_set_line_protocol_server(struct fieldstat_instance *instance, const char *ip, unsigned short port)
{
- if(instance->running == 1)
+ int ret = 0;
+ if(instance == NULL || instance->running == 1)
{
return -1;
}
+ ret = enable_line_protocol_output(&instance->line_protocol_output, ip, port);
- if(1 != inet_pton(AF_INET, ip, (void *)&(instance->line_protocol_server_ip)))
+ if(ret == 0)
{
- return -1;
+ instance->line_protocol_output_enable = 1;
}
- instance->line_protocol_socket = startup_udp();
- instance->line_protocol_server_port = port;
- instance->line_protocol_output_enable = 1;
- return 0;
+ return ret;
}
int fieldstat_set_statsd_server(struct fieldstat_instance *instance, const char *ip, unsigned short port)
@@ -578,10 +577,10 @@ void fieldstat_instance_free(struct fieldstat_instance *instance)
instance->local_output_enable = 0;
}
- if(instance->line_protocol_socket != -1)
+ if(instance->line_protocol_output_enable == 1)
{
- close(instance->line_protocol_socket);
- instance->line_protocol_socket = -1;
+ disable_line_protocol_output(&instance->line_protocol_output);
+ instance->line_protocol_output_enable = 0;
}
for(i = 0; i < instance->metric_cnt; i++)
@@ -1017,3 +1016,35 @@ int fieldstat_register_summary(struct fieldstat_instance *instance, const char *
bins, lowest_trackable_value, highest_trackable_value, significant_figures, output_window);
}
+
+int enable_line_protocol_output(struct line_protocol_output *line_protocol_output, const char *ip, unsigned short port)
+{
+ if(line_protocol_output == NULL)
+ {
+ return -1;
+ }
+
+ if(1 != inet_pton(AF_INET, ip, (void *)&(line_protocol_output->server_ip)))
+ {
+ return -1;
+ }
+ line_protocol_output->server_port = port;
+ line_protocol_output->send_socket = startup_udp();
+
+ return 0;
+}
+
+void disable_line_protocol_output(struct line_protocol_output *line_protocol_output)
+{
+ if(line_protocol_output == NULL)
+ {
+ return;
+ }
+
+ if(line_protocol_output->send_socket != -1)
+ {
+ close(line_protocol_output->send_socket);
+ line_protocol_output->send_socket = -1;
+ }
+ return;
+}
diff --git a/src/fieldstat_dynamic.cpp b/src/fieldstat_dynamic.cpp
index 8495a38..162e91a 100644
--- a/src/fieldstat_dynamic.cpp
+++ b/src/fieldstat_dynamic.cpp
@@ -47,10 +47,10 @@ void fieldstat_dynamic_instance_free(struct fieldstat_dynamic_instance *instance
instance->running = 0;
}
- if(instance->line_protocol_socket != -1)
+ if(instance->line_protocol_output_enable == 1)
{
- close(instance->line_protocol_socket);
- instance->line_protocol_socket = -1;
+ disable_line_protocol_output(&instance->line_protocol_output);
+ instance->line_protocol_output_enable = 0;
}
for(i = 0; i < instance->n_thread; i++)
@@ -98,23 +98,22 @@ void fieldstat_dynamic_instance_free(struct fieldstat_dynamic_instance *instance
}
-
int fieldstat_dynamic_set_line_protocol_server(struct fieldstat_dynamic_instance *instance, const char *ip, unsigned short port)
{
+ int ret = 0;
+
if(instance == NULL || instance->running == 1)
{
return -1;
}
- if(1 != inet_pton(AF_INET, ip, (void *)&(instance->line_protocol_server_ip)))
+ ret = enable_line_protocol_output(&instance->line_protocol_output, ip, port);
+ if(ret == 0)
{
- return -1;
+ instance->line_protocol_output_enable = 1;
}
- instance->line_protocol_socket = startup_udp();
- instance->line_protocol_server_port = port;
- instance->line_protocol_output_enable = 1;
- return 0;
+ return ret;
}
int fieldstat_dynamic_disable_background_thread(struct fieldstat_dynamic_instance *instance)
diff --git a/src/fieldstat_internal.h b/src/fieldstat_internal.h
index 2cb888e..babaf4f 100644
--- a/src/fieldstat_internal.h
+++ b/src/fieldstat_internal.h
@@ -148,26 +148,31 @@ struct metric
};
};
+
+struct line_protocol_output
+{
+ unsigned int server_ip;
+ unsigned short server_port;
+ int send_socket;
+ char send_buf[UDP_PAYLOAD_SIZE];
+ unsigned int send_buf_offset;
+};
+
struct fieldstat_instance
{
char name[INSTANCE_NAME_LEN];
- //char *statsd_server_ip;
- //char statsd_server_str_ip[LEN_IP_MAX];
unsigned int statsd_server_ip;
unsigned short statsd_server_port;
int statsd_output_enable;
- int line_protocol_socket;
-
- //char line_protocol_server_str_ip[LEN_IP_MAX];
- unsigned int line_protocol_server_ip;
- unsigned short line_protocol_server_port;
- int line_protocol_output_enable;
char local_output_filename[LEN_PATH_MAX];
char local_output_format[LEN_FORMAT_MAX];
int local_output_enable;
FILE* local_output_fp;
+ struct line_protocol_output line_protocol_output;
+ int line_protocol_output_enable;
+
int background_thread_disable;
pthread_t background_thread;
int background_thread_is_created;
@@ -191,9 +196,6 @@ struct fieldstat_instance
struct timespec last_output_time;
- char line_protocol_send_buff[UDP_PAYLOAD_SIZE];
- size_t line_protocol_send_buff_offset;
-
};
struct prometheus_endpoint_instance
@@ -219,8 +221,7 @@ struct dynamic_metric
struct fieldstat_dynamic_instance
{
char name[INSTANCE_NAME_LEN];
- unsigned int line_protocol_server_ip;
- unsigned short line_protocol_server_port;
+ struct line_protocol_output line_protocol_output;
int line_protocol_output_enable;
int background_thread_disable;
@@ -233,9 +234,6 @@ struct fieldstat_dynamic_instance
pthread_t background_thread;
int background_thread_is_created;
- char line_protocol_send_buff[UDP_PAYLOAD_SIZE];
- size_t line_protocol_send_buff_offset;
- int line_protocol_socket;
struct dynamic_metric **n_thread_dynamic_metric;
int n_thread;
@@ -270,4 +268,7 @@ int line_protocol_dynamic_metric_output(struct fieldstat_dynamic_instance *insta
struct table_line ** read_table_line_slot(struct table_metric *table, int line_id);
-void fieldstat_global_disable_prometheus_endpoint(); \ No newline at end of file
+void fieldstat_global_disable_prometheus_endpoint();
+
+int enable_line_protocol_output(struct line_protocol_output *line_protocol_output, const char *ip, unsigned short port);
+void disable_line_protocol_output(struct line_protocol_output *line_protocol_output); \ No newline at end of file
diff --git a/src/line_protocol_output.cpp b/src/line_protocol_output.cpp
index de42873..cfea9f7 100644
--- a/src/line_protocol_output.cpp
+++ b/src/line_protocol_output.cpp
@@ -1,336 +1,400 @@
#include "fieldstat_internal.h"
-static void flush_line_protocol_metric_send_buff(struct fieldstat_instance *instance)
+
+static void flush_send_buf(struct line_protocol_output *line_protocol_output)
{
- if(instance->line_protocol_send_buff_offset == 0)
+ if(line_protocol_output == NULL || line_protocol_output->send_buf_offset == 0)
{
return;
}
- if(instance->line_protocol_server_ip > 0 && instance->line_protocol_server_port > 0)
+ if(line_protocol_output->server_ip > 0 && line_protocol_output->server_port > 0)
{
- send_udp(instance->line_protocol_socket, instance->line_protocol_server_ip,
- (unsigned short)instance->line_protocol_server_port,
- instance->line_protocol_send_buff,
- instance->line_protocol_send_buff_offset
- );
+ send_udp(line_protocol_output->send_socket, line_protocol_output->server_ip,
+ line_protocol_output->server_port, line_protocol_output->send_buf, line_protocol_output->send_buf_offset);
+
}
- instance->line_protocol_send_buff_offset = 0;
- memset(instance->line_protocol_send_buff, 0, sizeof(instance->line_protocol_send_buff));
+ line_protocol_output->send_buf_offset = 0;
+ memset(line_protocol_output->send_buf, 0, sizeof(line_protocol_output->send_buf));
+
return;
}
-static void append_line_protocol_metric_to_send_buff(struct fieldstat_instance *instance, const char* measurement, char *tag_set, char *field_set)
+
+static void send_line_buf(struct line_protocol_output *line_protocol_output, char *line_buf, unsigned int line_buf_len)
{
- if(field_set==NULL)
+ if(line_protocol_output == NULL || line_buf == NULL)
{
return;
}
- if(UDP_PAYLOAD_SIZE - (unsigned int)instance->line_protocol_send_buff_offset < strlen(measurement) + strlen(field_set) + strlen(tag_set) + 2)
+
+ if(UDP_PAYLOAD_SIZE - line_protocol_output->send_buf_offset < line_buf_len)
{
- flush_line_protocol_metric_send_buff(instance);
+ flush_send_buf(line_protocol_output);
}
- printf("Line_protocol metric: %s%s %s\n",measurement,tag_set,field_set);
- instance->line_protocol_send_buff_offset += snprintf(instance->line_protocol_send_buff + instance->line_protocol_send_buff_offset,
- sizeof(instance->line_protocol_send_buff) - instance->line_protocol_send_buff_offset,
- "%s%s %s\n",
- measurement, tag_set, field_set
- );
+
+ line_protocol_output->send_buf_offset += snprintf(line_protocol_output->send_buf + line_protocol_output->send_buf_offset,
+ sizeof(line_protocol_output->send_buf) - line_protocol_output->send_buf_offset,
+ "%s", line_buf);
return;
}
-static int output_line_protocol_tag_set_buf(char *tag_key[], char *tag_value[], int n_tag, char *tag_set_buf, unsigned int size)
+
+static int add_measurement(const char* measurement, char *line_buf, unsigned int line_buf_size)
+{
+ int used_len = 0;
+
+ used_len = snprintf(line_buf, line_buf_size, "%s", measurement);
+
+ return used_len;
+}
+
+
+static int add_default_tag_set(const char *instance_name, const char *table_name, char *line_buf, unsigned int line_buf_size)
{
- int i = 0;
int used_len = 0;
- for(i = 0; i < n_tag; i++)
+
+ if(instance_name == NULL)
{
- used_len += snprintf(tag_set_buf + used_len, size - used_len, ",%s=%s", tag_key[i], tag_value[i]);
+ return 0;
}
+
+ if(table_name)
+ {
+ used_len += snprintf(line_buf, line_buf_size, ",app_name=%s,table_name=%s", instance_name, table_name);
+ }
+ else
+ {
+ used_len += snprintf(line_buf, line_buf_size, ",app_name=%s", instance_name);
+ }
+
return used_len;
}
-static void output_line_protocol_table(struct fieldstat_instance *instance,int tables_line_cnt[], int current_table_cnt)
+static int add_user_tag_set(struct metric *metric, char *line_buf, unsigned int line_buf_size)
+{
+ int used_len = 0;
+
+ for(int i = 0; i < (int)metric->n_tag; i++)
+ {
+ used_len += snprintf(line_buf + used_len, line_buf_size - used_len, ",%s=%s", metric->tag_key[i], metric->tag_value[i]);
+ }
+
+ return used_len;
+}
+
+static long long read_single_metric_value(struct metric *metric)
{
- int i = 0, j = 0, k = 0;
- struct metric *metric = NULL;
long long value = 0;
- //double ratio = 0.0;
- char field_set_buff[UDP_PAYLOAD_SIZE];
- char tag_set_buff[UDP_PAYLOAD_SIZE];
+ metric->field_type == FIELD_TYPE_GAUGE
+ ?value = get_metric_unit_val(metric, FS_CALC_CURRENT, 0)
+ :value = get_metric_unit_val(metric, FS_CALC_SPEED, 0);
+ return value;
+}
- int tags_used_len = 0;
- int fields_used_len = 0;
+static int read_table_row_value(struct fieldstat_instance *instance, struct table_line *row, int n_column, long long *out_row_value)
+{
+ int i = 0;
+ struct metric *metric = NULL;
- struct table_metric *table = NULL;
- struct table_line *line = NULL;
- int line_value_is_not_zero = 0;
+ if(row == NULL)
+ {
+ return -1;
+ }
- for(i = 0; i < current_table_cnt; i++)
+ for(i = 0; i < n_column; i++)
{
- table = instance->table_metrics[i];
- for(j = 0; j < tables_line_cnt[i]; j++)
- {
- line = read_table_line(table, j);
- if(line == NULL)
- {
- continue;
- }
- line_value_is_not_zero = 0;
- tags_used_len = 0;
- fields_used_len = 0;
- memset(field_set_buff, 0, sizeof(field_set_buff));
- memset(tag_set_buff, 0, sizeof(tag_set_buff));
- line_value_is_not_zero = 0;
- for(k = 0; k < table->column_cnt; k ++)
- {
- metric = get_metric(instance, line->metric_id_belong_to_line[k]);
-
- value = metric->field_type == FIELD_TYPE_GAUGE ?
- get_metric_unit_val(metric, FS_CALC_CURRENT, 1):
- get_metric_unit_val(metric, FS_CALC_SPEED, 1);
- if(value != 0)
- {
- line_value_is_not_zero = 1;
- }
- fields_used_len += snprintf(field_set_buff + fields_used_len,
- sizeof(field_set_buff) - fields_used_len,
- "%s=%lld,",
- metric->table->column_name[metric->table_column_id],
- value
- );
+ metric = get_metric(instance, row->metric_id_belong_to_line[i]);
+ out_row_value[i] = read_single_metric_value(metric);
+ }
+ return 0;
+}
- }
- if(line_value_is_not_zero == 0)
- {
- continue;
- }
- tags_used_len += snprintf(tag_set_buff + tags_used_len,
- sizeof(tag_set_buff) - tags_used_len,
- ",app_name=%s,table_name=%s",
- instance->name,
- table->name
- );
-
- tags_used_len += output_line_protocol_tag_set_buf(line->tag_key, line->tag_value, line->n_tag,
- tag_set_buff + tags_used_len,
- sizeof(tag_set_buff) - tags_used_len);
-
- // measurement,tag_set field_set
- append_line_protocol_metric_to_send_buff(instance, metric->field_name, tag_set_buff, field_set_buff);
+
+static int is_send_table_row(long long *row_value, int n_column)
+{
+ int i = 0;
+ int is_send = 0;
+
+ for(i = 0; i < n_column; i++)
+ {
+ if(row_value[i] != 0)
+ {
+ is_send = 1;
+ break;
}
+ }
+ return is_send;
+}
+
+
+static int add_field_set(char *field_key, long long field_value, char *line_buf, int line_buf_size)
+{
+ int used_len = 0;
+
+ used_len += snprintf(line_buf, line_buf_size, "%s=%lld", field_key, field_value);
+
+ return used_len;
+}
+
+
+static int add_table_row_field_set(char *column_name[], long long *row_value, int n_column, char *line_buf, int line_buf_size)
+{
+ int used_len = 0;
+
+ for(int i = 0; i < n_column; i++)
+ {
+ used_len += add_field_set(column_name[i], row_value[i], line_buf + used_len, line_buf_size - used_len);
+ used_len += snprintf(line_buf + used_len, line_buf_size - used_len, ",");
+ }
+
+ if(used_len > 0)
+ {
+ used_len--;
+ line_buf[used_len] = '\0';
+ }
+
+ return used_len;
+}
+
+
+static int build_single_metric_line_buf(char *instance_name, struct metric *metric, char *line_buf, int line_buf_size)
+{
+ int used_len = 0;
+ long long value = 0;
+
+ value = read_single_metric_value(metric);
+
+ if(value == 0)
+ {
+ return 0;
}
+
+ used_len += add_measurement(metric->field_name, line_buf, line_buf_size);
+
+ used_len += add_default_tag_set(instance_name, NULL, line_buf + used_len, line_buf_size - used_len);
+
+ used_len += add_user_tag_set(metric, line_buf + used_len, line_buf_size - used_len);
+
+ used_len += snprintf(line_buf + used_len, line_buf_size - used_len, " ");
+
+ used_len += add_field_set(metric->field_name, value, line_buf + used_len, line_buf_size - used_len);
+
+ used_len += snprintf(line_buf + used_len, line_buf_size - used_len, "\n");
+
+ return used_len;
}
-int line_protocol_output(struct fieldstat_instance *instance)
+static int build_table_row_line_buf(struct fieldstat_instance *instance, struct table_metric *table, struct table_line *row, char *line_buf, int line_buf_size)
{
+ int used_len = 0;
struct metric *metric = NULL;
- long long value = 0;
+
+ if(table->column_cnt <= 0)
+ {
+ return 0;
+ }
+
+ long long row_value[table->column_cnt] = {0};
+
+
+ if(-1 == read_table_row_value(instance, row, table->column_cnt, row_value))
+ {
+ return 0;
+ }
+
+ if(1 != is_send_table_row(row_value, table->column_cnt))
+ {
+ return 0;
+ }
+
+ metric = get_metric(instance, row->metric_id_belong_to_line[0]);
+
+ used_len += add_measurement(metric->field_name, line_buf, line_buf_size);
+
+ used_len += add_default_tag_set(instance->name, table->name, line_buf + used_len, line_buf_size - used_len);
+
+ used_len += add_user_tag_set(metric, line_buf + used_len, line_buf_size - used_len);
+
+ used_len += snprintf(line_buf + used_len, line_buf_size - used_len, " ");
+
+ used_len += add_table_row_field_set(table->column_name, row_value, table->column_cnt, line_buf + used_len, line_buf_size - used_len);
+
+ used_len += snprintf(line_buf + used_len, line_buf_size - used_len, "\n");
+
+ return used_len;
+}
+
+
+static void output_line_protocol_single_metric(struct fieldstat_instance *instance, int n_cur_metric)
+{
int i = 0;
- char field_set_buff[UDP_PAYLOAD_SIZE];
- char tag_set_buff[UDP_PAYLOAD_SIZE];
- int tags_used_len = 0;
+ int used_len = 0;
- //print current time instance start
- int tables_line_cnt[TABLE_MAX_NUM];
- int current_table_cnt = instance->table_num;
- get_current_table_line_cnt(instance, current_table_cnt, tables_line_cnt);
- int current_metric_cnt = instance->metric_cnt;
- //print current time instance end
+ struct metric *metric = NULL;
+ char line_buf[UDP_PAYLOAD_SIZE];
- for(i = 0; i < current_metric_cnt; i++)
+ for(i = 0; i < n_cur_metric; i++)
{
metric = get_metric(instance, i);
- if(metric == NULL)
- {
- continue;
- }
- if(metric->is_ratio == 1)
- {
- continue;
- }
- if(metric->table)
+ if(metric == NULL || metric->table != NULL)
{
continue;
}
- memset(field_set_buff, 0, sizeof(field_set_buff));
- memset(tag_set_buff, 0, sizeof(tag_set_buff));
- tags_used_len = 0;
- switch(metric->field_type)
- {
- case FIELD_TYPE_GAUGE:
- value = get_metric_unit_val(metric, FS_CALC_CURRENT, 1);
- if(value != 0)
- {
- snprintf(field_set_buff, UDP_PAYLOAD_SIZE, "%s=%lld", metric->field_name, value);
- tags_used_len += snprintf(tag_set_buff + tags_used_len,
- sizeof(tag_set_buff) - tags_used_len,
- ",app_name=%s",
- instance->name
- );
- output_line_protocol_tag_set_buf(metric->tag_key, metric->tag_value, metric->n_tag, tag_set_buff + tags_used_len, sizeof(tag_set_buff) - tags_used_len);
- append_line_protocol_metric_to_send_buff(instance, metric->field_name, tag_set_buff, field_set_buff);
-
- }
- break;
- case FIELD_TYPE_COUNTER:
- value = get_metric_unit_val(metric, FS_CALC_SPEED, 1);
- if(value != 0)
- {
- snprintf(field_set_buff, UDP_PAYLOAD_SIZE, "%s=%lld", metric->field_name, value);
- tags_used_len += snprintf(tag_set_buff + tags_used_len,
- sizeof(tag_set_buff) - tags_used_len,
- ",app_name=%s",
- instance->name
- );
- output_line_protocol_tag_set_buf(metric->tag_key, metric->tag_value, metric->n_tag, tag_set_buff + tags_used_len, sizeof(tag_set_buff) - tags_used_len);
- append_line_protocol_metric_to_send_buff(instance, metric->field_name, tag_set_buff, field_set_buff);
- }
- break;
- default:
- break;
+ memset(line_buf, 0, sizeof(line_buf));
- }
+ used_len = build_single_metric_line_buf(instance->name, metric, line_buf, sizeof(line_buf));
+
+ send_line_buf(&instance->line_protocol_output, line_buf, used_len);
}
+ return;
+}
- output_line_protocol_table(instance, tables_line_cnt, current_table_cnt);
- flush_line_protocol_metric_send_buff(instance);
+static int read_dynamic_table_row(struct metric **row_metric, int n_column, long long *out_row_value)
+{
+ int i = 0;
+ struct metric *metric = NULL;
+
+ if(row_metric == NULL || n_column < 1)
+ {
+ return -1;
+ }
+
+ for(i = 0; i < n_column; i++)
+ {
+ metric = row_metric[i];
+ out_row_value[i] = read_single_metric_value(metric);
+ }
return 0;
+
}
-static void flush_line_protocol_dynamic_metric(struct fieldstat_dynamic_instance *instance)
+
+static void output_line_protocol_table_row(struct fieldstat_instance *instance, int n_cur_table, int n_cur_table_row[])
{
- if(instance->line_protocol_send_buff_offset == 0)
- {
- return;
- }
+ int i = 0, j = 0;
+ int used_len = 0;
+
+ char line_buf[UDP_PAYLOAD_SIZE];
+ struct table_metric *table = NULL;
+ struct table_line *row = NULL;
- if(instance->line_protocol_server_ip > 0 && instance->line_protocol_server_port > 0)
+ for(i = 0; i < n_cur_table; i++)
{
- send_udp(instance->line_protocol_socket, instance->line_protocol_server_ip,
- (unsigned short)instance->line_protocol_server_port,
- instance->line_protocol_send_buff,
- instance->line_protocol_send_buff_offset
- );
+ table = instance->table_metrics[i];
+
+ for(j = 0; j < n_cur_table_row[i]; j++)
+ {
+ row = read_table_line(table, j);
+ if(row == NULL)
+ {
+ continue;
+ }
+ memset(line_buf, 0, sizeof(line_buf));
+ used_len = build_table_row_line_buf(instance, table, row, line_buf, sizeof(line_buf));
+ send_line_buf(&instance->line_protocol_output, line_buf, used_len);
+ }
}
- instance->line_protocol_send_buff_offset = 0;
- memset(instance->line_protocol_send_buff, 0, sizeof(instance->line_protocol_send_buff));
+
return;
}
-static void append_line_protocol_dynamic_row(struct fieldstat_dynamic_instance *instance, const char* measurement, char *tag_set, char *field_set)
+
+
+int line_protocol_output(struct fieldstat_instance *instance)
{
- if(field_set==NULL)
+
+ //print current time instance start
+ int tables_row_cnt[TABLE_MAX_NUM];
+ int n_cur_table = instance->table_num;
+ get_current_table_line_cnt(instance, n_cur_table, tables_row_cnt);
+ int n_cur_metric = instance->metric_cnt;
+ //print current time instance end
+
+ output_line_protocol_single_metric(instance, n_cur_metric);
+
+ output_line_protocol_table_row(instance, n_cur_table, tables_row_cnt);
+
+ flush_send_buf(&instance->line_protocol_output);
+ return 0;
+}
+
+
+static int build_dynamic_table_row_line_buf(struct fieldstat_dynamic_instance *instance, struct table_metric *table, struct metric **row_metric, char *line_buf, int line_buf_size)
+{
+ int used_len = 0;
+ struct metric *metric = NULL;
+
+ if(table->column_cnt <= 0)
{
- return;
+ return 0;
}
- if(UDP_PAYLOAD_SIZE - (unsigned int)instance->line_protocol_send_buff_offset < strlen(measurement) + strlen(field_set) + strlen(tag_set) + 2)
+ long long row_value[table->column_cnt] = {0};
+
+ if(-1 == read_dynamic_table_row(row_metric, table->column_cnt, row_value))
{
- flush_line_protocol_dynamic_metric(instance);
+ return 0;
}
- //printf("Line_protocol metric: %s:%s %s\n",measurement,tag_set,field_set);
- instance->line_protocol_send_buff_offset += snprintf(instance->line_protocol_send_buff + instance->line_protocol_send_buff_offset,
- sizeof(instance->line_protocol_send_buff) - instance->line_protocol_send_buff_offset,
- "%s%s %s\n",
- measurement, tag_set, field_set
- );
- return;
+
+ if(1 != is_send_table_row(row_value, table->column_cnt))
+ {
+ return 0;
+ }
+
+ metric = row_metric[0];
+
+ used_len += add_measurement(metric->field_name, line_buf, line_buf_size);
+
+ used_len += add_default_tag_set(instance->name, table->name, line_buf + used_len, line_buf_size - used_len);
+
+ used_len += add_user_tag_set(metric, line_buf + used_len, line_buf_size - used_len);
+
+ used_len += snprintf(line_buf + used_len, line_buf_size - used_len, " ");
+
+ used_len += add_table_row_field_set(table->column_name, row_value, table->column_cnt, line_buf + used_len, line_buf_size - used_len);
+
+ used_len += snprintf(line_buf + used_len, line_buf_size - used_len, "\n");
+
+ return used_len;
}
+
int line_protocol_dynamic_metric_output(struct fieldstat_dynamic_instance *instance)
{
struct dynamic_metric **head = NULL;
struct dynamic_metric *dyn_metric, *tmp_dyn_metric;
struct metric **metrics = NULL;
struct metric *metric = NULL;
- struct table_metric *table = NULL;
- char tag_set_buff[UDP_PAYLOAD_SIZE];
- char field_set_buff[UDP_PAYLOAD_SIZE];
-
- int field_used_len = 0;
- int tag_used_len = 0;
- int i = 0, j = 0;
- long long value;
- int row_value_is_not_zero = 0;
+ char line_buf[UDP_PAYLOAD_SIZE];
+ int used_len = 0;
- for(i = 0; i < instance->n_thread; i++)
+ for(int i = 0; i < instance->n_thread; i++)
{
head = &instance->n_thread_dynamic_metric[i];
HASH_ITER(hh, *head, dyn_metric, tmp_dyn_metric)
{
- row_value_is_not_zero = 0;
metrics = dyn_metric->metrics;
metric = metrics[0];
- memset(tag_set_buff, 0, sizeof(tag_set_buff));
- memset(field_set_buff, 0,sizeof(field_set_buff));
- field_used_len = 0;
- tag_used_len = 0;
-
+ used_len = 0;
+ memset(line_buf, 0, sizeof(line_buf));
if(metric->table)
{
- table = metric->table;
- for(j = 0; j < table->column_cnt; j++)
- {
- metric = metrics[j];
- value = metric->field_type == FIELD_TYPE_GAUGE ?
- get_metric_unit_val(metric, FS_CALC_CURRENT, 0):
- get_metric_unit_val(metric, FS_CALC_SPEED, 0);
- if(value != 0)
- {
- row_value_is_not_zero = 1;
- }
- field_used_len += snprintf(field_set_buff + field_used_len,
- sizeof(field_set_buff) - field_used_len,
- "%s=%lld,",
- metric->table->column_name[metric->table_column_id],
- value
- );
-
- }
- if(row_value_is_not_zero != 1)
- {
- continue;
- }
- metric = metrics[0];
- field_used_len--;
- field_set_buff[field_used_len] = '\0';
- tag_used_len += snprintf(tag_set_buff + tag_used_len,
- sizeof(tag_set_buff) - tag_used_len,
- ",app_name=%s,table_name=%s",
- instance->name,
- table->name
- );
-
+ used_len = build_dynamic_table_row_line_buf(instance, metric->table, metrics, line_buf, sizeof(line_buf));
}
else
{
- value = metric->field_type == FIELD_TYPE_GAUGE ?
- get_metric_unit_val(metric, FS_CALC_CURRENT, 0):
- get_metric_unit_val(metric, FS_CALC_SPEED, 0);
- if(value == 0)
- {
- continue;
- }
- snprintf(field_set_buff, sizeof(field_set_buff), "%s=%lld", metric->field_name, value);
- tag_used_len += snprintf(tag_set_buff + tag_used_len,
- sizeof(tag_set_buff) - tag_used_len,
- ",app_name=%s",
- instance->name
- );
-
+ used_len = build_single_metric_line_buf(instance->name, metric, line_buf, sizeof(line_buf));
}
- tag_used_len += output_line_protocol_tag_set_buf(metric->tag_key, metric->tag_value, metric->n_tag,
- tag_set_buff + tag_used_len, sizeof(tag_set_buff) - tag_used_len);
- append_line_protocol_dynamic_row(instance, metric->field_name, tag_set_buff, field_set_buff);
- }
+ send_line_buf(&instance->line_protocol_output, line_buf, used_len);
+ }
}
- flush_line_protocol_dynamic_metric(instance);
+ flush_send_buf(&instance->line_protocol_output);
return 0;
}
diff --git a/test/src/gtest_dynamic_fieldstat.cpp b/test/src/gtest_dynamic_fieldstat.cpp
index 6bdf643..669a980 100644
--- a/test/src/gtest_dynamic_fieldstat.cpp
+++ b/test/src/gtest_dynamic_fieldstat.cpp
@@ -80,11 +80,11 @@ TEST(FeildStatDynamicAPI, FieldStatDynamicInstanceSetPara)
ret = fieldstat_dynamic_set_line_protocol_server(instance, "127.0.0.1", 8080);
EXPECT_EQ(0, ret);
- EXPECT_EQ(8080, instance->line_protocol_server_port);
+ EXPECT_EQ(8080, instance->line_protocol_output.server_port);
ret = inet_pton(AF_INET, "127.0.0.1", (void *)&(input_ip));
EXPECT_EQ(1, ret);
- EXPECT_EQ(input_ip, instance->line_protocol_server_ip);
- EXPECT_NE(-1, instance->line_protocol_socket);
+ EXPECT_EQ(input_ip, instance->line_protocol_output.server_ip);
+ EXPECT_NE(-1, instance->line_protocol_output.send_socket);
ret = fieldstat_dynamic_disable_background_thread(instance);
EXPECT_EQ(0, ret);
@@ -1065,8 +1065,7 @@ void parse_telegraf_cjson_output_not_equal(const char *compare)
if(cjson_metric_str)
{
- ret = strcmp(cjson_metric_str, compare);
- EXPECT_NE(0, ret);
+ EXPECT_STRNE(cjson_metric_str, compare);
free(cjson_metric_str);
cjson_metric_str = NULL;
}
diff --git a/test/src/gtest_fieldstat.cpp b/test/src/gtest_fieldstat.cpp
index da1e8a5..1435391 100644
--- a/test/src/gtest_fieldstat.cpp
+++ b/test/src/gtest_fieldstat.cpp
@@ -68,7 +68,7 @@ TEST(FeildStatAPI, FieldStatSetLineProtocolServer)
EXPECT_EQ(0, ret_set_line_protocol_server);
EXPECT_EQ(1, instance->line_protocol_output_enable);
- EXPECT_NE(-1, instance->line_protocol_socket);
+ EXPECT_NE(-1, instance->line_protocol_output.send_socket);
fieldstat_instance_free(instance);
}
@@ -250,6 +250,97 @@ TEST(FeildStatAPI, FieldStatLocalOutputFormatJson)
fieldstat_instance_free(instance);
}
+TEST(FeildStatAPI, FieldStatLineProtocolOutputTableMetric)
+{
+ int metric_id = -1;
+ int ret = 0;
+ int n_loops = 1;
+ struct fieldstat_instance * instance = NULL;
+ int table_id = -1;
+ int output_metric_ids[54] = {0};
+ const char *telegraf_output_file = "/tmp/metrics.out";
+ FILE *fp;
+ char line[2048] = {0};
+ cJSON *cjson_metric = NULL;
+ cJSON *cjson_tags = NULL;
+ const char *compare = "{\"fields\":{\"alert_bytes\":0,\"allow_conn_num\":0,\"allow_in_bytes\":0,\"allow_in_packets\":0,\"allow_out_bytes\":0,"\
+ "\"allow_out_packets\":0,\"block_bytes\":0,\"close_conn_num\":0,\"default_conn_num\":0,\"default_in_bytes\":0,"\
+ "\"default_in_packets\":0,\"default_out_bytes\":0,\"default_out_packets\":0,\"deny_conn_num\":0,\"deny_in_bytes\":0,"\
+ "\"deny_in_packets\":0,\"deny_out_bytes\":0,\"deny_out_packets\":0,\"established_conn_num\":0,\"intercept_conn_num\":0,"\
+ "\"intercept_in_bytes\":0,\"intercept_in_packets\":0,\"intercept_out_bytes\":0,\"intercept_out_packets\":0,"\
+ "\"ipv4_in_bytes\":0,\"ipv4_in_packets\":0,\"ipv4_out_packetsipv4_out_bytes\":0,\"ipv6_in_bytes\":0,\"ipv6_in_packets\":0,"\
+ "\"ipv6_out_bytes\":0,\"ipv6_out_packets\":0,\"maybe_pinning_num\":0,\"monitor_conn_num\":0,\"monitor_in_bytes\":0,"\
+ "\"monitor_in_packets\":0,\"monitor_out_bytes\":0,\"monitor_out_packets\":0,\"new_conn_num\":1000,\"not_pinning_num\":0,"\
+ "\"pinning_num\":0,\"tcp_conn_num\":0,\"tcp_in_bytes\":0,\"tcp_in_packets\":0,\"tcp_out_bytes\":0,\"tcp_out_packets\":0,"\
+ "\"total_in_bytes\":0,\"total_in_packets\":0,\"total_out_bytes\":0,\"total_out_packets\":0,\"udp_conn_num\":0,"\
+ "\"udp_in_bytes\":0,\"udp_in_packets\":0,\"udp_out_bytes\":0,\"udp_out_packets\":0},\"name\":\"TRAFFIC\","\
+ "\"tags\":{\"app_name\":\"tsg_statistic\",\"table_name\":\"network_activity\"}}";
+ char *cjson_metric_str = NULL;
+
+ const char *table_column_name[] = {"new_conn_num", "established_conn_num", "close_conn_num", "total_in_bytes", "total_out_bytes", "total_in_packets",
+ "total_out_packets", "default_conn_num", "default_in_bytes", "default_out_bytes", "default_in_packets", "default_out_packets", "allow_conn_num",
+ "allow_in_bytes", "allow_out_bytes", "allow_in_packets", "allow_out_packets", "deny_conn_num", "deny_in_bytes", "deny_out_bytes", "deny_in_packets",
+ "deny_out_packets", "monitor_conn_num", "monitor_in_bytes", "monitor_out_bytes", "monitor_in_packets", "monitor_out_packets", "intercept_conn_num",
+ "intercept_in_bytes", "intercept_out_bytes", "intercept_in_packets", "intercept_out_packets", "ipv4_in_packets", "ipv4_in_bytes", "ipv4_out_packets"
+ "ipv4_out_bytes", "ipv6_in_packets", "ipv6_in_bytes", "ipv6_out_packets", "ipv6_out_bytes", "tcp_conn_num", "tcp_in_packets", "tcp_in_bytes",
+ "tcp_out_packets", "tcp_out_bytes", "udp_conn_num", "udp_in_packets", "udp_in_bytes", "udp_out_packets", "udp_out_bytes", "alert_bytes", "block_bytes",
+ "pinning_num", "maybe_pinning_num", "not_pinning_num"};
+ enum field_type table_column_type[54] = {FIELD_TYPE_COUNTER};
+
+ instance = fieldstat_instance_new("tsg_statistic");
+ EXPECT_STREQ("tsg_statistic", instance->name);
+
+ table_id = fieldstat_register_table(instance, "network_activity", table_column_name, table_column_type, sizeof(table_column_name)/sizeof(table_column_name[0]));
+ EXPECT_EQ(0, table_id);
+ ret = fieldstat_register_table_row(instance, table_id, "TRAFFIC", NULL, 0, output_metric_ids);
+ EXPECT_EQ(0, ret);
+
+ ret = fieldstat_set_line_protocol_server(instance, "127.0.0.1", 8600);
+ EXPECT_EQ(0, ret);
+ ret = fieldstat_disable_background_thread(instance);
+ EXPECT_EQ(0, ret);
+ fieldstat_instance_start(instance);
+ fieldstat_value_incrby(instance, output_metric_ids[0], 1000);
+ ret = system("cat /dev/null > /tmp/metrics.out");
+ sleep(1);
+ fieldstat_passive_output(instance);
+ sleep(1);
+ fp = fopen(telegraf_output_file, "r");
+ EXPECT_NE(nullptr, fp);
+ int n_line = 0;
+
+ while(!feof(fp))
+ {
+ if(NULL == fgets(line, sizeof(line), fp))
+ {
+ continue;
+ }
+ cjson_metric = cJSON_Parse(line);
+ EXPECT_NE(nullptr, cjson_metric);
+ cJSON_DeleteItemFromObject(cjson_metric, "timestamp");
+
+ cjson_tags = cJSON_GetObjectItem(cjson_metric, "tags");
+ EXPECT_NE(nullptr, cjson_tags);
+ cJSON_DeleteItemFromObject(cjson_tags, "host");
+ cjson_metric_str = cJSON_PrintUnformatted(cjson_metric);
+ EXPECT_NE(nullptr, cjson_metric_str);
+ if(cjson_metric_str)
+ {
+ EXPECT_STREQ(compare, cjson_metric_str);
+ free(cjson_metric_str);
+ cjson_metric_str = NULL;
+ }
+ cJSON_Delete(cjson_metric);
+ n_line++;
+ }
+ fclose(fp);
+ EXPECT_EQ(n_line, n_loops);
+
+ fieldstat_instance_free(instance);
+
+}
+
+
int main(int argc, char *argv[])
{
testing::InitGoogleTest(&argc, argv);