diff options
| author | fumingwei <[email protected]> | 2023-03-24 22:05:09 +0800 |
|---|---|---|
| committer | fumingwei <[email protected]> | 2023-03-27 20:16:55 +0800 |
| commit | 45a6ce34addd9187d7123c03d3badd5fa6d30589 (patch) | |
| tree | 35401d8a6cab312eeb20075895802ebd6436a746 | |
| parent | e09e7f397145613f1776622a2219047b9066f3a5 (diff) | |
feature:重写line protocol output代码
| -rw-r--r-- | src/fieldstat.cpp | 51 | ||||
| -rw-r--r-- | src/fieldstat_dynamic.cpp | 19 | ||||
| -rw-r--r-- | src/fieldstat_internal.h | 35 | ||||
| -rw-r--r-- | src/line_protocol_output.cpp | 540 | ||||
| -rw-r--r-- | test/src/gtest_dynamic_fieldstat.cpp | 9 | ||||
| -rw-r--r-- | test/src/gtest_fieldstat.cpp | 93 |
6 files changed, 466 insertions, 281 deletions
diff --git a/src/fieldstat.cpp b/src/fieldstat.cpp index 65a0c15..582efaa 100644 --- a/src/fieldstat.cpp +++ b/src/fieldstat.cpp @@ -350,20 +350,19 @@ int fieldstat_set_local_output(struct fieldstat_instance *instance, const char * int fieldstat_set_line_protocol_server(struct fieldstat_instance *instance, const char *ip, unsigned short port) { - if(instance->running == 1) + int ret = 0; + if(instance == NULL || instance->running == 1) { return -1; } + ret = enable_line_protocol_output(&instance->line_protocol_output, ip, port); - if(1 != inet_pton(AF_INET, ip, (void *)&(instance->line_protocol_server_ip))) + if(ret == 0) { - return -1; + instance->line_protocol_output_enable = 1; } - instance->line_protocol_socket = startup_udp(); - instance->line_protocol_server_port = port; - instance->line_protocol_output_enable = 1; - return 0; + return ret; } int fieldstat_set_statsd_server(struct fieldstat_instance *instance, const char *ip, unsigned short port) @@ -578,10 +577,10 @@ void fieldstat_instance_free(struct fieldstat_instance *instance) instance->local_output_enable = 0; } - if(instance->line_protocol_socket != -1) + if(instance->line_protocol_output_enable == 1) { - close(instance->line_protocol_socket); - instance->line_protocol_socket = -1; + disable_line_protocol_output(&instance->line_protocol_output); + instance->line_protocol_output_enable = 0; } for(i = 0; i < instance->metric_cnt; i++) @@ -1017,3 +1016,35 @@ int fieldstat_register_summary(struct fieldstat_instance *instance, const char * bins, lowest_trackable_value, highest_trackable_value, significant_figures, output_window); } + +int enable_line_protocol_output(struct line_protocol_output *line_protocol_output, const char *ip, unsigned short port) +{ + if(line_protocol_output == NULL) + { + return -1; + } + + if(1 != inet_pton(AF_INET, ip, (void *)&(line_protocol_output->server_ip))) + { + return -1; + } + line_protocol_output->server_port = port; + line_protocol_output->send_socket = startup_udp(); + + return 0; +} + +void disable_line_protocol_output(struct line_protocol_output *line_protocol_output) +{ + if(line_protocol_output == NULL) + { + return; + } + + if(line_protocol_output->send_socket != -1) + { + close(line_protocol_output->send_socket); + line_protocol_output->send_socket = -1; + } + return; +} diff --git a/src/fieldstat_dynamic.cpp b/src/fieldstat_dynamic.cpp index 8495a38..162e91a 100644 --- a/src/fieldstat_dynamic.cpp +++ b/src/fieldstat_dynamic.cpp @@ -47,10 +47,10 @@ void fieldstat_dynamic_instance_free(struct fieldstat_dynamic_instance *instance instance->running = 0; } - if(instance->line_protocol_socket != -1) + if(instance->line_protocol_output_enable == 1) { - close(instance->line_protocol_socket); - instance->line_protocol_socket = -1; + disable_line_protocol_output(&instance->line_protocol_output); + instance->line_protocol_output_enable = 0; } for(i = 0; i < instance->n_thread; i++) @@ -98,23 +98,22 @@ void fieldstat_dynamic_instance_free(struct fieldstat_dynamic_instance *instance } - int fieldstat_dynamic_set_line_protocol_server(struct fieldstat_dynamic_instance *instance, const char *ip, unsigned short port) { + int ret = 0; + if(instance == NULL || instance->running == 1) { return -1; } - if(1 != inet_pton(AF_INET, ip, (void *)&(instance->line_protocol_server_ip))) + ret = enable_line_protocol_output(&instance->line_protocol_output, ip, port); + if(ret == 0) { - return -1; + instance->line_protocol_output_enable = 1; } - instance->line_protocol_socket = startup_udp(); - instance->line_protocol_server_port = port; - instance->line_protocol_output_enable = 1; - return 0; + return ret; } int fieldstat_dynamic_disable_background_thread(struct fieldstat_dynamic_instance *instance) diff --git a/src/fieldstat_internal.h b/src/fieldstat_internal.h index 2cb888e..babaf4f 100644 --- a/src/fieldstat_internal.h +++ b/src/fieldstat_internal.h @@ -148,26 +148,31 @@ struct metric }; }; + +struct line_protocol_output +{ + unsigned int server_ip; + unsigned short server_port; + int send_socket; + char send_buf[UDP_PAYLOAD_SIZE]; + unsigned int send_buf_offset; +}; + struct fieldstat_instance { char name[INSTANCE_NAME_LEN]; - //char *statsd_server_ip; - //char statsd_server_str_ip[LEN_IP_MAX]; unsigned int statsd_server_ip; unsigned short statsd_server_port; int statsd_output_enable; - int line_protocol_socket; - - //char line_protocol_server_str_ip[LEN_IP_MAX]; - unsigned int line_protocol_server_ip; - unsigned short line_protocol_server_port; - int line_protocol_output_enable; char local_output_filename[LEN_PATH_MAX]; char local_output_format[LEN_FORMAT_MAX]; int local_output_enable; FILE* local_output_fp; + struct line_protocol_output line_protocol_output; + int line_protocol_output_enable; + int background_thread_disable; pthread_t background_thread; int background_thread_is_created; @@ -191,9 +196,6 @@ struct fieldstat_instance struct timespec last_output_time; - char line_protocol_send_buff[UDP_PAYLOAD_SIZE]; - size_t line_protocol_send_buff_offset; - }; struct prometheus_endpoint_instance @@ -219,8 +221,7 @@ struct dynamic_metric struct fieldstat_dynamic_instance { char name[INSTANCE_NAME_LEN]; - unsigned int line_protocol_server_ip; - unsigned short line_protocol_server_port; + struct line_protocol_output line_protocol_output; int line_protocol_output_enable; int background_thread_disable; @@ -233,9 +234,6 @@ struct fieldstat_dynamic_instance pthread_t background_thread; int background_thread_is_created; - char line_protocol_send_buff[UDP_PAYLOAD_SIZE]; - size_t line_protocol_send_buff_offset; - int line_protocol_socket; struct dynamic_metric **n_thread_dynamic_metric; int n_thread; @@ -270,4 +268,7 @@ int line_protocol_dynamic_metric_output(struct fieldstat_dynamic_instance *insta struct table_line ** read_table_line_slot(struct table_metric *table, int line_id); -void fieldstat_global_disable_prometheus_endpoint();
\ No newline at end of file +void fieldstat_global_disable_prometheus_endpoint(); + +int enable_line_protocol_output(struct line_protocol_output *line_protocol_output, const char *ip, unsigned short port); +void disable_line_protocol_output(struct line_protocol_output *line_protocol_output);
\ No newline at end of file diff --git a/src/line_protocol_output.cpp b/src/line_protocol_output.cpp index de42873..cfea9f7 100644 --- a/src/line_protocol_output.cpp +++ b/src/line_protocol_output.cpp @@ -1,336 +1,400 @@ #include "fieldstat_internal.h" -static void flush_line_protocol_metric_send_buff(struct fieldstat_instance *instance) + +static void flush_send_buf(struct line_protocol_output *line_protocol_output) { - if(instance->line_protocol_send_buff_offset == 0) + if(line_protocol_output == NULL || line_protocol_output->send_buf_offset == 0) { return; } - if(instance->line_protocol_server_ip > 0 && instance->line_protocol_server_port > 0) + if(line_protocol_output->server_ip > 0 && line_protocol_output->server_port > 0) { - send_udp(instance->line_protocol_socket, instance->line_protocol_server_ip, - (unsigned short)instance->line_protocol_server_port, - instance->line_protocol_send_buff, - instance->line_protocol_send_buff_offset - ); + send_udp(line_protocol_output->send_socket, line_protocol_output->server_ip, + line_protocol_output->server_port, line_protocol_output->send_buf, line_protocol_output->send_buf_offset); + } - instance->line_protocol_send_buff_offset = 0; - memset(instance->line_protocol_send_buff, 0, sizeof(instance->line_protocol_send_buff)); + line_protocol_output->send_buf_offset = 0; + memset(line_protocol_output->send_buf, 0, sizeof(line_protocol_output->send_buf)); + return; } -static void append_line_protocol_metric_to_send_buff(struct fieldstat_instance *instance, const char* measurement, char *tag_set, char *field_set) + +static void send_line_buf(struct line_protocol_output *line_protocol_output, char *line_buf, unsigned int line_buf_len) { - if(field_set==NULL) + if(line_protocol_output == NULL || line_buf == NULL) { return; } - if(UDP_PAYLOAD_SIZE - (unsigned int)instance->line_protocol_send_buff_offset < strlen(measurement) + strlen(field_set) + strlen(tag_set) + 2) + + if(UDP_PAYLOAD_SIZE - line_protocol_output->send_buf_offset < line_buf_len) { - flush_line_protocol_metric_send_buff(instance); + flush_send_buf(line_protocol_output); } - printf("Line_protocol metric: %s%s %s\n",measurement,tag_set,field_set); - instance->line_protocol_send_buff_offset += snprintf(instance->line_protocol_send_buff + instance->line_protocol_send_buff_offset, - sizeof(instance->line_protocol_send_buff) - instance->line_protocol_send_buff_offset, - "%s%s %s\n", - measurement, tag_set, field_set - ); + + line_protocol_output->send_buf_offset += snprintf(line_protocol_output->send_buf + line_protocol_output->send_buf_offset, + sizeof(line_protocol_output->send_buf) - line_protocol_output->send_buf_offset, + "%s", line_buf); return; } -static int output_line_protocol_tag_set_buf(char *tag_key[], char *tag_value[], int n_tag, char *tag_set_buf, unsigned int size) + +static int add_measurement(const char* measurement, char *line_buf, unsigned int line_buf_size) +{ + int used_len = 0; + + used_len = snprintf(line_buf, line_buf_size, "%s", measurement); + + return used_len; +} + + +static int add_default_tag_set(const char *instance_name, const char *table_name, char *line_buf, unsigned int line_buf_size) { - int i = 0; int used_len = 0; - for(i = 0; i < n_tag; i++) + + if(instance_name == NULL) { - used_len += snprintf(tag_set_buf + used_len, size - used_len, ",%s=%s", tag_key[i], tag_value[i]); + return 0; } + + if(table_name) + { + used_len += snprintf(line_buf, line_buf_size, ",app_name=%s,table_name=%s", instance_name, table_name); + } + else + { + used_len += snprintf(line_buf, line_buf_size, ",app_name=%s", instance_name); + } + return used_len; } -static void output_line_protocol_table(struct fieldstat_instance *instance,int tables_line_cnt[], int current_table_cnt) +static int add_user_tag_set(struct metric *metric, char *line_buf, unsigned int line_buf_size) +{ + int used_len = 0; + + for(int i = 0; i < (int)metric->n_tag; i++) + { + used_len += snprintf(line_buf + used_len, line_buf_size - used_len, ",%s=%s", metric->tag_key[i], metric->tag_value[i]); + } + + return used_len; +} + +static long long read_single_metric_value(struct metric *metric) { - int i = 0, j = 0, k = 0; - struct metric *metric = NULL; long long value = 0; - //double ratio = 0.0; - char field_set_buff[UDP_PAYLOAD_SIZE]; - char tag_set_buff[UDP_PAYLOAD_SIZE]; + metric->field_type == FIELD_TYPE_GAUGE + ?value = get_metric_unit_val(metric, FS_CALC_CURRENT, 0) + :value = get_metric_unit_val(metric, FS_CALC_SPEED, 0); + return value; +} - int tags_used_len = 0; - int fields_used_len = 0; +static int read_table_row_value(struct fieldstat_instance *instance, struct table_line *row, int n_column, long long *out_row_value) +{ + int i = 0; + struct metric *metric = NULL; - struct table_metric *table = NULL; - struct table_line *line = NULL; - int line_value_is_not_zero = 0; + if(row == NULL) + { + return -1; + } - for(i = 0; i < current_table_cnt; i++) + for(i = 0; i < n_column; i++) { - table = instance->table_metrics[i]; - for(j = 0; j < tables_line_cnt[i]; j++) - { - line = read_table_line(table, j); - if(line == NULL) - { - continue; - } - line_value_is_not_zero = 0; - tags_used_len = 0; - fields_used_len = 0; - memset(field_set_buff, 0, sizeof(field_set_buff)); - memset(tag_set_buff, 0, sizeof(tag_set_buff)); - line_value_is_not_zero = 0; - for(k = 0; k < table->column_cnt; k ++) - { - metric = get_metric(instance, line->metric_id_belong_to_line[k]); - - value = metric->field_type == FIELD_TYPE_GAUGE ? - get_metric_unit_val(metric, FS_CALC_CURRENT, 1): - get_metric_unit_val(metric, FS_CALC_SPEED, 1); - if(value != 0) - { - line_value_is_not_zero = 1; - } - fields_used_len += snprintf(field_set_buff + fields_used_len, - sizeof(field_set_buff) - fields_used_len, - "%s=%lld,", - metric->table->column_name[metric->table_column_id], - value - ); + metric = get_metric(instance, row->metric_id_belong_to_line[i]); + out_row_value[i] = read_single_metric_value(metric); + } + return 0; +} - } - if(line_value_is_not_zero == 0) - { - continue; - } - tags_used_len += snprintf(tag_set_buff + tags_used_len, - sizeof(tag_set_buff) - tags_used_len, - ",app_name=%s,table_name=%s", - instance->name, - table->name - ); - - tags_used_len += output_line_protocol_tag_set_buf(line->tag_key, line->tag_value, line->n_tag, - tag_set_buff + tags_used_len, - sizeof(tag_set_buff) - tags_used_len); - - // measurement,tag_set field_set - append_line_protocol_metric_to_send_buff(instance, metric->field_name, tag_set_buff, field_set_buff); + +static int is_send_table_row(long long *row_value, int n_column) +{ + int i = 0; + int is_send = 0; + + for(i = 0; i < n_column; i++) + { + if(row_value[i] != 0) + { + is_send = 1; + break; } + } + return is_send; +} + + +static int add_field_set(char *field_key, long long field_value, char *line_buf, int line_buf_size) +{ + int used_len = 0; + + used_len += snprintf(line_buf, line_buf_size, "%s=%lld", field_key, field_value); + + return used_len; +} + + +static int add_table_row_field_set(char *column_name[], long long *row_value, int n_column, char *line_buf, int line_buf_size) +{ + int used_len = 0; + + for(int i = 0; i < n_column; i++) + { + used_len += add_field_set(column_name[i], row_value[i], line_buf + used_len, line_buf_size - used_len); + used_len += snprintf(line_buf + used_len, line_buf_size - used_len, ","); + } + + if(used_len > 0) + { + used_len--; + line_buf[used_len] = '\0'; + } + + return used_len; +} + + +static int build_single_metric_line_buf(char *instance_name, struct metric *metric, char *line_buf, int line_buf_size) +{ + int used_len = 0; + long long value = 0; + + value = read_single_metric_value(metric); + + if(value == 0) + { + return 0; } + + used_len += add_measurement(metric->field_name, line_buf, line_buf_size); + + used_len += add_default_tag_set(instance_name, NULL, line_buf + used_len, line_buf_size - used_len); + + used_len += add_user_tag_set(metric, line_buf + used_len, line_buf_size - used_len); + + used_len += snprintf(line_buf + used_len, line_buf_size - used_len, " "); + + used_len += add_field_set(metric->field_name, value, line_buf + used_len, line_buf_size - used_len); + + used_len += snprintf(line_buf + used_len, line_buf_size - used_len, "\n"); + + return used_len; } -int line_protocol_output(struct fieldstat_instance *instance) +static int build_table_row_line_buf(struct fieldstat_instance *instance, struct table_metric *table, struct table_line *row, char *line_buf, int line_buf_size) { + int used_len = 0; struct metric *metric = NULL; - long long value = 0; + + if(table->column_cnt <= 0) + { + return 0; + } + + long long row_value[table->column_cnt] = {0}; + + + if(-1 == read_table_row_value(instance, row, table->column_cnt, row_value)) + { + return 0; + } + + if(1 != is_send_table_row(row_value, table->column_cnt)) + { + return 0; + } + + metric = get_metric(instance, row->metric_id_belong_to_line[0]); + + used_len += add_measurement(metric->field_name, line_buf, line_buf_size); + + used_len += add_default_tag_set(instance->name, table->name, line_buf + used_len, line_buf_size - used_len); + + used_len += add_user_tag_set(metric, line_buf + used_len, line_buf_size - used_len); + + used_len += snprintf(line_buf + used_len, line_buf_size - used_len, " "); + + used_len += add_table_row_field_set(table->column_name, row_value, table->column_cnt, line_buf + used_len, line_buf_size - used_len); + + used_len += snprintf(line_buf + used_len, line_buf_size - used_len, "\n"); + + return used_len; +} + + +static void output_line_protocol_single_metric(struct fieldstat_instance *instance, int n_cur_metric) +{ int i = 0; - char field_set_buff[UDP_PAYLOAD_SIZE]; - char tag_set_buff[UDP_PAYLOAD_SIZE]; - int tags_used_len = 0; + int used_len = 0; - //print current time instance start - int tables_line_cnt[TABLE_MAX_NUM]; - int current_table_cnt = instance->table_num; - get_current_table_line_cnt(instance, current_table_cnt, tables_line_cnt); - int current_metric_cnt = instance->metric_cnt; - //print current time instance end + struct metric *metric = NULL; + char line_buf[UDP_PAYLOAD_SIZE]; - for(i = 0; i < current_metric_cnt; i++) + for(i = 0; i < n_cur_metric; i++) { metric = get_metric(instance, i); - if(metric == NULL) - { - continue; - } - if(metric->is_ratio == 1) - { - continue; - } - if(metric->table) + if(metric == NULL || metric->table != NULL) { continue; } - memset(field_set_buff, 0, sizeof(field_set_buff)); - memset(tag_set_buff, 0, sizeof(tag_set_buff)); - tags_used_len = 0; - switch(metric->field_type) - { - case FIELD_TYPE_GAUGE: - value = get_metric_unit_val(metric, FS_CALC_CURRENT, 1); - if(value != 0) - { - snprintf(field_set_buff, UDP_PAYLOAD_SIZE, "%s=%lld", metric->field_name, value); - tags_used_len += snprintf(tag_set_buff + tags_used_len, - sizeof(tag_set_buff) - tags_used_len, - ",app_name=%s", - instance->name - ); - output_line_protocol_tag_set_buf(metric->tag_key, metric->tag_value, metric->n_tag, tag_set_buff + tags_used_len, sizeof(tag_set_buff) - tags_used_len); - append_line_protocol_metric_to_send_buff(instance, metric->field_name, tag_set_buff, field_set_buff); - - } - break; - case FIELD_TYPE_COUNTER: - value = get_metric_unit_val(metric, FS_CALC_SPEED, 1); - if(value != 0) - { - snprintf(field_set_buff, UDP_PAYLOAD_SIZE, "%s=%lld", metric->field_name, value); - tags_used_len += snprintf(tag_set_buff + tags_used_len, - sizeof(tag_set_buff) - tags_used_len, - ",app_name=%s", - instance->name - ); - output_line_protocol_tag_set_buf(metric->tag_key, metric->tag_value, metric->n_tag, tag_set_buff + tags_used_len, sizeof(tag_set_buff) - tags_used_len); - append_line_protocol_metric_to_send_buff(instance, metric->field_name, tag_set_buff, field_set_buff); - } - break; - default: - break; + memset(line_buf, 0, sizeof(line_buf)); - } + used_len = build_single_metric_line_buf(instance->name, metric, line_buf, sizeof(line_buf)); + + send_line_buf(&instance->line_protocol_output, line_buf, used_len); } + return; +} - output_line_protocol_table(instance, tables_line_cnt, current_table_cnt); - flush_line_protocol_metric_send_buff(instance); +static int read_dynamic_table_row(struct metric **row_metric, int n_column, long long *out_row_value) +{ + int i = 0; + struct metric *metric = NULL; + + if(row_metric == NULL || n_column < 1) + { + return -1; + } + + for(i = 0; i < n_column; i++) + { + metric = row_metric[i]; + out_row_value[i] = read_single_metric_value(metric); + } return 0; + } -static void flush_line_protocol_dynamic_metric(struct fieldstat_dynamic_instance *instance) + +static void output_line_protocol_table_row(struct fieldstat_instance *instance, int n_cur_table, int n_cur_table_row[]) { - if(instance->line_protocol_send_buff_offset == 0) - { - return; - } + int i = 0, j = 0; + int used_len = 0; + + char line_buf[UDP_PAYLOAD_SIZE]; + struct table_metric *table = NULL; + struct table_line *row = NULL; - if(instance->line_protocol_server_ip > 0 && instance->line_protocol_server_port > 0) + for(i = 0; i < n_cur_table; i++) { - send_udp(instance->line_protocol_socket, instance->line_protocol_server_ip, - (unsigned short)instance->line_protocol_server_port, - instance->line_protocol_send_buff, - instance->line_protocol_send_buff_offset - ); + table = instance->table_metrics[i]; + + for(j = 0; j < n_cur_table_row[i]; j++) + { + row = read_table_line(table, j); + if(row == NULL) + { + continue; + } + memset(line_buf, 0, sizeof(line_buf)); + used_len = build_table_row_line_buf(instance, table, row, line_buf, sizeof(line_buf)); + send_line_buf(&instance->line_protocol_output, line_buf, used_len); + } } - instance->line_protocol_send_buff_offset = 0; - memset(instance->line_protocol_send_buff, 0, sizeof(instance->line_protocol_send_buff)); + return; } -static void append_line_protocol_dynamic_row(struct fieldstat_dynamic_instance *instance, const char* measurement, char *tag_set, char *field_set) + + +int line_protocol_output(struct fieldstat_instance *instance) { - if(field_set==NULL) + + //print current time instance start + int tables_row_cnt[TABLE_MAX_NUM]; + int n_cur_table = instance->table_num; + get_current_table_line_cnt(instance, n_cur_table, tables_row_cnt); + int n_cur_metric = instance->metric_cnt; + //print current time instance end + + output_line_protocol_single_metric(instance, n_cur_metric); + + output_line_protocol_table_row(instance, n_cur_table, tables_row_cnt); + + flush_send_buf(&instance->line_protocol_output); + return 0; +} + + +static int build_dynamic_table_row_line_buf(struct fieldstat_dynamic_instance *instance, struct table_metric *table, struct metric **row_metric, char *line_buf, int line_buf_size) +{ + int used_len = 0; + struct metric *metric = NULL; + + if(table->column_cnt <= 0) { - return; + return 0; } - if(UDP_PAYLOAD_SIZE - (unsigned int)instance->line_protocol_send_buff_offset < strlen(measurement) + strlen(field_set) + strlen(tag_set) + 2) + long long row_value[table->column_cnt] = {0}; + + if(-1 == read_dynamic_table_row(row_metric, table->column_cnt, row_value)) { - flush_line_protocol_dynamic_metric(instance); + return 0; } - //printf("Line_protocol metric: %s:%s %s\n",measurement,tag_set,field_set); - instance->line_protocol_send_buff_offset += snprintf(instance->line_protocol_send_buff + instance->line_protocol_send_buff_offset, - sizeof(instance->line_protocol_send_buff) - instance->line_protocol_send_buff_offset, - "%s%s %s\n", - measurement, tag_set, field_set - ); - return; + + if(1 != is_send_table_row(row_value, table->column_cnt)) + { + return 0; + } + + metric = row_metric[0]; + + used_len += add_measurement(metric->field_name, line_buf, line_buf_size); + + used_len += add_default_tag_set(instance->name, table->name, line_buf + used_len, line_buf_size - used_len); + + used_len += add_user_tag_set(metric, line_buf + used_len, line_buf_size - used_len); + + used_len += snprintf(line_buf + used_len, line_buf_size - used_len, " "); + + used_len += add_table_row_field_set(table->column_name, row_value, table->column_cnt, line_buf + used_len, line_buf_size - used_len); + + used_len += snprintf(line_buf + used_len, line_buf_size - used_len, "\n"); + + return used_len; } + int line_protocol_dynamic_metric_output(struct fieldstat_dynamic_instance *instance) { struct dynamic_metric **head = NULL; struct dynamic_metric *dyn_metric, *tmp_dyn_metric; struct metric **metrics = NULL; struct metric *metric = NULL; - struct table_metric *table = NULL; - char tag_set_buff[UDP_PAYLOAD_SIZE]; - char field_set_buff[UDP_PAYLOAD_SIZE]; - - int field_used_len = 0; - int tag_used_len = 0; - int i = 0, j = 0; - long long value; - int row_value_is_not_zero = 0; + char line_buf[UDP_PAYLOAD_SIZE]; + int used_len = 0; - for(i = 0; i < instance->n_thread; i++) + for(int i = 0; i < instance->n_thread; i++) { head = &instance->n_thread_dynamic_metric[i]; HASH_ITER(hh, *head, dyn_metric, tmp_dyn_metric) { - row_value_is_not_zero = 0; metrics = dyn_metric->metrics; metric = metrics[0]; - memset(tag_set_buff, 0, sizeof(tag_set_buff)); - memset(field_set_buff, 0,sizeof(field_set_buff)); - field_used_len = 0; - tag_used_len = 0; - + used_len = 0; + memset(line_buf, 0, sizeof(line_buf)); if(metric->table) { - table = metric->table; - for(j = 0; j < table->column_cnt; j++) - { - metric = metrics[j]; - value = metric->field_type == FIELD_TYPE_GAUGE ? - get_metric_unit_val(metric, FS_CALC_CURRENT, 0): - get_metric_unit_val(metric, FS_CALC_SPEED, 0); - if(value != 0) - { - row_value_is_not_zero = 1; - } - field_used_len += snprintf(field_set_buff + field_used_len, - sizeof(field_set_buff) - field_used_len, - "%s=%lld,", - metric->table->column_name[metric->table_column_id], - value - ); - - } - if(row_value_is_not_zero != 1) - { - continue; - } - metric = metrics[0]; - field_used_len--; - field_set_buff[field_used_len] = '\0'; - tag_used_len += snprintf(tag_set_buff + tag_used_len, - sizeof(tag_set_buff) - tag_used_len, - ",app_name=%s,table_name=%s", - instance->name, - table->name - ); - + used_len = build_dynamic_table_row_line_buf(instance, metric->table, metrics, line_buf, sizeof(line_buf)); } else { - value = metric->field_type == FIELD_TYPE_GAUGE ? - get_metric_unit_val(metric, FS_CALC_CURRENT, 0): - get_metric_unit_val(metric, FS_CALC_SPEED, 0); - if(value == 0) - { - continue; - } - snprintf(field_set_buff, sizeof(field_set_buff), "%s=%lld", metric->field_name, value); - tag_used_len += snprintf(tag_set_buff + tag_used_len, - sizeof(tag_set_buff) - tag_used_len, - ",app_name=%s", - instance->name - ); - + used_len = build_single_metric_line_buf(instance->name, metric, line_buf, sizeof(line_buf)); } - tag_used_len += output_line_protocol_tag_set_buf(metric->tag_key, metric->tag_value, metric->n_tag, - tag_set_buff + tag_used_len, sizeof(tag_set_buff) - tag_used_len); - append_line_protocol_dynamic_row(instance, metric->field_name, tag_set_buff, field_set_buff); - } + send_line_buf(&instance->line_protocol_output, line_buf, used_len); + } } - flush_line_protocol_dynamic_metric(instance); + flush_send_buf(&instance->line_protocol_output); return 0; } diff --git a/test/src/gtest_dynamic_fieldstat.cpp b/test/src/gtest_dynamic_fieldstat.cpp index 6bdf643..669a980 100644 --- a/test/src/gtest_dynamic_fieldstat.cpp +++ b/test/src/gtest_dynamic_fieldstat.cpp @@ -80,11 +80,11 @@ TEST(FeildStatDynamicAPI, FieldStatDynamicInstanceSetPara) ret = fieldstat_dynamic_set_line_protocol_server(instance, "127.0.0.1", 8080); EXPECT_EQ(0, ret); - EXPECT_EQ(8080, instance->line_protocol_server_port); + EXPECT_EQ(8080, instance->line_protocol_output.server_port); ret = inet_pton(AF_INET, "127.0.0.1", (void *)&(input_ip)); EXPECT_EQ(1, ret); - EXPECT_EQ(input_ip, instance->line_protocol_server_ip); - EXPECT_NE(-1, instance->line_protocol_socket); + EXPECT_EQ(input_ip, instance->line_protocol_output.server_ip); + EXPECT_NE(-1, instance->line_protocol_output.send_socket); ret = fieldstat_dynamic_disable_background_thread(instance); EXPECT_EQ(0, ret); @@ -1065,8 +1065,7 @@ void parse_telegraf_cjson_output_not_equal(const char *compare) if(cjson_metric_str) { - ret = strcmp(cjson_metric_str, compare); - EXPECT_NE(0, ret); + EXPECT_STRNE(cjson_metric_str, compare); free(cjson_metric_str); cjson_metric_str = NULL; } diff --git a/test/src/gtest_fieldstat.cpp b/test/src/gtest_fieldstat.cpp index da1e8a5..1435391 100644 --- a/test/src/gtest_fieldstat.cpp +++ b/test/src/gtest_fieldstat.cpp @@ -68,7 +68,7 @@ TEST(FeildStatAPI, FieldStatSetLineProtocolServer) EXPECT_EQ(0, ret_set_line_protocol_server); EXPECT_EQ(1, instance->line_protocol_output_enable); - EXPECT_NE(-1, instance->line_protocol_socket); + EXPECT_NE(-1, instance->line_protocol_output.send_socket); fieldstat_instance_free(instance); } @@ -250,6 +250,97 @@ TEST(FeildStatAPI, FieldStatLocalOutputFormatJson) fieldstat_instance_free(instance); } +TEST(FeildStatAPI, FieldStatLineProtocolOutputTableMetric) +{ + int metric_id = -1; + int ret = 0; + int n_loops = 1; + struct fieldstat_instance * instance = NULL; + int table_id = -1; + int output_metric_ids[54] = {0}; + const char *telegraf_output_file = "/tmp/metrics.out"; + FILE *fp; + char line[2048] = {0}; + cJSON *cjson_metric = NULL; + cJSON *cjson_tags = NULL; + const char *compare = "{\"fields\":{\"alert_bytes\":0,\"allow_conn_num\":0,\"allow_in_bytes\":0,\"allow_in_packets\":0,\"allow_out_bytes\":0,"\ + "\"allow_out_packets\":0,\"block_bytes\":0,\"close_conn_num\":0,\"default_conn_num\":0,\"default_in_bytes\":0,"\ + "\"default_in_packets\":0,\"default_out_bytes\":0,\"default_out_packets\":0,\"deny_conn_num\":0,\"deny_in_bytes\":0,"\ + "\"deny_in_packets\":0,\"deny_out_bytes\":0,\"deny_out_packets\":0,\"established_conn_num\":0,\"intercept_conn_num\":0,"\ + "\"intercept_in_bytes\":0,\"intercept_in_packets\":0,\"intercept_out_bytes\":0,\"intercept_out_packets\":0,"\ + "\"ipv4_in_bytes\":0,\"ipv4_in_packets\":0,\"ipv4_out_packetsipv4_out_bytes\":0,\"ipv6_in_bytes\":0,\"ipv6_in_packets\":0,"\ + "\"ipv6_out_bytes\":0,\"ipv6_out_packets\":0,\"maybe_pinning_num\":0,\"monitor_conn_num\":0,\"monitor_in_bytes\":0,"\ + "\"monitor_in_packets\":0,\"monitor_out_bytes\":0,\"monitor_out_packets\":0,\"new_conn_num\":1000,\"not_pinning_num\":0,"\ + "\"pinning_num\":0,\"tcp_conn_num\":0,\"tcp_in_bytes\":0,\"tcp_in_packets\":0,\"tcp_out_bytes\":0,\"tcp_out_packets\":0,"\ + "\"total_in_bytes\":0,\"total_in_packets\":0,\"total_out_bytes\":0,\"total_out_packets\":0,\"udp_conn_num\":0,"\ + "\"udp_in_bytes\":0,\"udp_in_packets\":0,\"udp_out_bytes\":0,\"udp_out_packets\":0},\"name\":\"TRAFFIC\","\ + "\"tags\":{\"app_name\":\"tsg_statistic\",\"table_name\":\"network_activity\"}}"; + char *cjson_metric_str = NULL; + + const char *table_column_name[] = {"new_conn_num", "established_conn_num", "close_conn_num", "total_in_bytes", "total_out_bytes", "total_in_packets", + "total_out_packets", "default_conn_num", "default_in_bytes", "default_out_bytes", "default_in_packets", "default_out_packets", "allow_conn_num", + "allow_in_bytes", "allow_out_bytes", "allow_in_packets", "allow_out_packets", "deny_conn_num", "deny_in_bytes", "deny_out_bytes", "deny_in_packets", + "deny_out_packets", "monitor_conn_num", "monitor_in_bytes", "monitor_out_bytes", "monitor_in_packets", "monitor_out_packets", "intercept_conn_num", + "intercept_in_bytes", "intercept_out_bytes", "intercept_in_packets", "intercept_out_packets", "ipv4_in_packets", "ipv4_in_bytes", "ipv4_out_packets" + "ipv4_out_bytes", "ipv6_in_packets", "ipv6_in_bytes", "ipv6_out_packets", "ipv6_out_bytes", "tcp_conn_num", "tcp_in_packets", "tcp_in_bytes", + "tcp_out_packets", "tcp_out_bytes", "udp_conn_num", "udp_in_packets", "udp_in_bytes", "udp_out_packets", "udp_out_bytes", "alert_bytes", "block_bytes", + "pinning_num", "maybe_pinning_num", "not_pinning_num"}; + enum field_type table_column_type[54] = {FIELD_TYPE_COUNTER}; + + instance = fieldstat_instance_new("tsg_statistic"); + EXPECT_STREQ("tsg_statistic", instance->name); + + table_id = fieldstat_register_table(instance, "network_activity", table_column_name, table_column_type, sizeof(table_column_name)/sizeof(table_column_name[0])); + EXPECT_EQ(0, table_id); + ret = fieldstat_register_table_row(instance, table_id, "TRAFFIC", NULL, 0, output_metric_ids); + EXPECT_EQ(0, ret); + + ret = fieldstat_set_line_protocol_server(instance, "127.0.0.1", 8600); + EXPECT_EQ(0, ret); + ret = fieldstat_disable_background_thread(instance); + EXPECT_EQ(0, ret); + fieldstat_instance_start(instance); + fieldstat_value_incrby(instance, output_metric_ids[0], 1000); + ret = system("cat /dev/null > /tmp/metrics.out"); + sleep(1); + fieldstat_passive_output(instance); + sleep(1); + fp = fopen(telegraf_output_file, "r"); + EXPECT_NE(nullptr, fp); + int n_line = 0; + + while(!feof(fp)) + { + if(NULL == fgets(line, sizeof(line), fp)) + { + continue; + } + cjson_metric = cJSON_Parse(line); + EXPECT_NE(nullptr, cjson_metric); + cJSON_DeleteItemFromObject(cjson_metric, "timestamp"); + + cjson_tags = cJSON_GetObjectItem(cjson_metric, "tags"); + EXPECT_NE(nullptr, cjson_tags); + cJSON_DeleteItemFromObject(cjson_tags, "host"); + cjson_metric_str = cJSON_PrintUnformatted(cjson_metric); + EXPECT_NE(nullptr, cjson_metric_str); + if(cjson_metric_str) + { + EXPECT_STREQ(compare, cjson_metric_str); + free(cjson_metric_str); + cjson_metric_str = NULL; + } + cJSON_Delete(cjson_metric); + n_line++; + } + fclose(fp); + EXPECT_EQ(n_line, n_loops); + + fieldstat_instance_free(instance); + +} + + int main(int argc, char *argv[]) { testing::InitGoogleTest(&argc, argv); |
