diff options
| -rw-r--r-- | src/fieldstat.cpp | 18 | ||||
| -rw-r--r-- | src/fieldstat_internal.h | 3 | ||||
| -rw-r--r-- | src/line_protocol_output.cpp | 39 | ||||
| -rw-r--r-- | test/bin/telegraf_unit_test.conf | 3 | ||||
| -rw-r--r-- | test/src/gtest_dynamic_fieldstat.cpp | 96 |
5 files changed, 147 insertions, 12 deletions
diff --git a/src/fieldstat.cpp b/src/fieldstat.cpp index 5696345..2b4a04e 100644 --- a/src/fieldstat.cpp +++ b/src/fieldstat.cpp @@ -341,6 +341,24 @@ int startup_udp() return sd_udp; } + // uint64_t rcvbuf_size = 32 * 1024 * 1024; + // if (setsockopt (sd_udp, SOL_SOCKET, SO_RCVBUF, &rcvbuf_size, sizeof(rcvbuf_size) ) < 0) + // { + // printf("FS2:setsockopt error: %d %s, restart socket.", errno, strerror(errno)); + // close (sd_udp); + // sd_udp=-1; + // return sd_udp; + // } + + // uint64_t sndbuf_size = 32 * 1024 * 1024; + // if (setsockopt (sd_udp, SOL_SOCKET, SO_RCVBUF, &sndbuf_size, sizeof(sndbuf_size) ) < 0) + // { + // printf("FS2:setsockopt error: %d %s, restart socket.", errno, strerror(errno)); + // close (sd_udp); + // sd_udp=-1; + // return sd_udp; + // } + return sd_udp; } diff --git a/src/fieldstat_internal.h b/src/fieldstat_internal.h index 47da22d..e471116 100644 --- a/src/fieldstat_internal.h +++ b/src/fieldstat_internal.h @@ -27,7 +27,6 @@ #define INIT_STAT_FIELD_NUM 1024 #define MAX_PATH_LEN 256 -#define UDP_PAYLOAD_SIZE 1460 #define STATUS_PER_LINE 6 #define FIELD_PER_LINE 8 @@ -52,7 +51,7 @@ #define LEN_PATH_MAX 256 #define TABLE_MAX_NUM 64 #define TABLE_COLUMN_SIZE 64 -#define UDP_PAYLOAD_SIZE 1460 +#define UDP_PAYLOAD_SIZE 512 #define LEFT_MIN_BUFF_LEN 1024 #define REALLOC_SCALE_SIZE 2048 #define STR_LEN_32 32 diff --git a/src/line_protocol_output.cpp b/src/line_protocol_output.cpp index b1f9389..c5ef544 100644 --- a/src/line_protocol_output.cpp +++ b/src/line_protocol_output.cpp @@ -1,5 +1,8 @@ #include "fieldstat_internal.h" +#include <string> +#include <vector> +#define LINE_BUF_SIZE 1460 static void flush_send_buf(struct line_protocol_output *line_protocol_output) { @@ -21,13 +24,20 @@ static void flush_send_buf(struct line_protocol_output *line_protocol_output) } -static void send_line_buf(struct line_protocol_output *line_protocol_output, char *line_buf, unsigned int line_buf_len) +void send_line_buf(struct line_protocol_output *line_protocol_output, const char *line_buf, unsigned int line_buf_len) { if(line_protocol_output == NULL || line_buf == NULL || line_buf_len == 0) { return; } + if(line_buf_len > UDP_PAYLOAD_SIZE - 1) + { + send_udp(line_protocol_output->send_socket, line_protocol_output->server_ip, + line_protocol_output->server_port, line_buf, line_buf_len); + return; + } + if(UDP_PAYLOAD_SIZE - line_protocol_output->send_buf_offset - 1 < line_buf_len) { flush_send_buf(line_protocol_output); @@ -363,7 +373,7 @@ static void output_line_protocol_single_metric(struct fieldstat_instance *instan int used_len = 0; struct metric *metric = NULL; - char line_buf[UDP_PAYLOAD_SIZE]; + char line_buf[LINE_BUF_SIZE]; for(i = 0; i < n_cur_metric; i++) { @@ -406,7 +416,7 @@ static void output_line_protocol_table_row(struct fieldstat_instance *instance, int i = 0, j = 0, k = 0; int used_len = 0; - char line_buf[UDP_PAYLOAD_SIZE]; + char line_buf[LINE_BUF_SIZE]; struct table_metric *table = NULL; struct table_line *row = NULL; struct metric *row_metrics[TABLE_COLUMN_SIZE]; @@ -465,32 +475,43 @@ int line_protocol_dynamic_metric_output(struct fieldstat_dynamic_instance *insta struct metric **metrics = NULL; struct metric *metric = NULL; - char line_buf[UDP_PAYLOAD_SIZE]; - int used_len = 0; + //std::string line_buf_to_send; + //std::vector<std::string> line_buf_to_send; for(int i = 0; i < instance->n_thread; i++) { + std::vector<std::string> line_buf_to_send; pthread_spin_lock(instance->uthash_locks + i); head = &instance->n_thread_dynamic_metric[i]; HASH_ITER(hh, *head, dyn_metric, tmp_dyn_metric) { + char line_buf[LINE_BUF_SIZE] = "\0"; + metrics = dyn_metric->metrics; metric = metrics[0]; - used_len = 0; memset(line_buf, 0, sizeof(line_buf)); if(metric->table) { - used_len = build_table_row_line_buf(instance->name, instance->output_type, metric->table, metrics, line_buf, sizeof(line_buf)); + build_table_row_line_buf(instance->name, instance->output_type, metric->table, metrics, line_buf, sizeof(line_buf)); } else { - used_len = build_single_metric_line_buf(instance->name, instance->output_type, metric, line_buf, sizeof(line_buf)); + build_single_metric_line_buf(instance->name, instance->output_type, metric, line_buf, sizeof(line_buf)); } - send_line_buf(&instance->line_protocol_output, line_buf, used_len); + + /* copy the line_buf as str to vector line_buf_to_send */ + line_buf_to_send.push_back(std::string(line_buf)); } pthread_spin_unlock(instance->uthash_locks + i); + for (auto it = line_buf_to_send.begin(); it != line_buf_to_send.end(); ++it) + { + const std::string& str = *it; + send_line_buf(&instance->line_protocol_output, str.c_str(), str.length()); + } } + flush_send_buf(&instance->line_protocol_output); + return 0; } diff --git a/test/bin/telegraf_unit_test.conf b/test/bin/telegraf_unit_test.conf index 748060e..2de1dc7 100644 --- a/test/bin/telegraf_unit_test.conf +++ b/test/bin/telegraf_unit_test.conf @@ -2,7 +2,7 @@ interval = "5s" round_interval = true metric_batch_size = 1000 - metric_buffer_limit = 10000 + metric_buffer_limit = 1000000 collection_jitter = "0s" flush_interval = "1s" flush_jitter = "0s" @@ -23,3 +23,4 @@ [[inputs.socket_listener]] service_address = "udp://:8700" data_format = "influx" + read_buffer_size = "32MiB" diff --git a/test/src/gtest_dynamic_fieldstat.cpp b/test/src/gtest_dynamic_fieldstat.cpp index dd509e8..6fc9d43 100644 --- a/test/src/gtest_dynamic_fieldstat.cpp +++ b/test/src/gtest_dynamic_fieldstat.cpp @@ -2821,6 +2821,102 @@ TEST(FeildStatDynamicAPI, SendLenEqualUDPPayload) } + +// TEST(FeildStatDynamicAPI, SendTo) +// { +// unsigned int server_ip = 0; +// int send_socket = startup_udp(); +// struct line_protocol_output output; + +// inet_pton(AF_INET, "127.0.0.1", (void *)&(server_ip)); + +// memset(&output, 0, sizeof(struct line_protocol_output)); +// output.send_socket = send_socket; +// output.server_ip = server_ip; +// output.server_port = 8700; + + +// std::vector<std::string> send_buf; + +// char line_buf[1460]; +// int count = 0; + +// for(int i = 0; i < 90000; i++) +// { +// memset(line_buf, 0, sizeof(line_buf)); +// // snprintf(line_buf, sizeof(line_buf), +// // "Active_session_%05d,app_name=firewall Active_session_01=12\n" +// // "Active_session_%05d,app_name=firewall Active_session_02=12\n" +// // "Active_session_%05d,app_name=firewall Active_session_03=12\n" +// // "Active_session_%05d,app_name=firewall Active_session_04=12\n" +// // "Active_session_%05d,app_name=firewall Active_session_05=12\n" +// // "Active_session_%05d,app_name=firewall Active_session_06=12\n" +// // "Active_session_%05d,app_name=firewall Active_session_07=12\n" +// // "Active_session_%05d,app_name=firewall Active_session_08=12\n" +// // "Active_session_%05d,app_name=firewall Active_session_09=12\n" +// // "Active_session_%05d,app_name=firewall Active_session_10=12\n" +// // "Active_session_%05d,app_name=firewall Active_session_11=12\n" +// // "Active_session_%05d,app_name=firewall Active_session_12=12\n" +// // "Active_session_%05d,app_name=firewall Active_session_13=12\n" +// // "Active_session_%05d,app_name=firewall Active_session_14=12\n" +// // "Active_session_%05d,app_name=firewall Active_session_15=12\n" +// // "Active_session_%05d,app_name=firewall Active_session_16=12\n" +// // "Active_session_%05d,app_name=firewall Active_session_17=12\n" +// // "Active_session_%05d,app_name=firewall Active_session_18=12\n" +// // "Active_session_%05d,app_name=firewall Active_session_19=12\n" +// // "Active_session_%05d,app_name=firewall Active_session_20=12\n" +// // "Active_session_%05d,app_name=firewall Active_session_21=12\n" +// // "Active_session_%05d,app_name=firewall Active_session_22=12\n" +// // "Active_session_%05d,app_name=firewall Active_session_23=12\n" +// // "Active_session_%05d,app_name=firewall Active_session_24=12\n", +// // i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i); + +// snprintf(line_buf, sizeof(line_buf), +// "Active_sessi01_%05d,app_name=firewall,Active_session_01=12," +// "Active_sessi02=%05d,app_name=firewa01,Active_session_02=12," +// "Active_sessi03=%05d,app_name=firewa02,Active_session_03=12," +// "Active_sessi04=%05d,app_name=firewa03,Active_session_04=12," +// "Active_sessi05=%05d,app_name=firewa04,Active_session_05=12," +// "Active_sessi06=%05d,app_name=firewa05,Active_session_06=12," +// "Active_sessi07=%05d,app_name=firewa06,Active_session_07=12," +// "Active_sessi08=%05d,app_name=firewa07,Active_session_08=12," +// "Active_sessi09=%05d,app_name=firewa08,Active_session_09=12," +// "Active_sessi10=%05d,app_name=firewa09,Active_session_10=12," +// "Active_sessi11=%05d,app_name=firewa10,Active_session_11=12," +// "Active_sessi12=%05d,app_name=firewa11,Active_session_12=12," +// "Active_sessi13=%05d,app_name=firewa12,Active_session_13=12," +// "Active_sessi14=%05d,app_name=firewa13,Active_session_14=12," +// "Active_sessi15=%05d,app_name=firewa14,Active_session_15=12," +// "Active_sessi16=%05d,app_name=firewa15,Active_session_16=12," +// "Active_sessi17=%05d,app_name=firewa16,Active_session_17=12," +// "Active_sessi18=%05d,app_name=firewa17,Active_session_18=12," +// "Active_sessi19=%05d,app_name=firewa18,Active_session_19=12," +// "Active_sessi20=%05d,app_name=firewa19,Active_session_20=12," +// "Active_sessi21=%05d,app_name=firewa20,Active_session_21=12," +// "Active_sessi22=%05d,app_name=firewa21,Active_session_22=12," +// "Active_sessi23=%05d,app_name=firewa22,Active_session_23=12," +// "Active_sessi24=%05d,app_name=firewa23 Active_session_24=12\n", +// i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i); +// send_buf.push_back(std::string(line_buf)); + +// } + +// system("cat /dev/null > /tmp/metrics.out"); +// for (auto it = send_buf.begin(); it != send_buf.end(); ++it) { +// const std::string& str = *it; +// //std::cout << str << " - Length: " << str.length() << std::endl; +// //printf("%s\n", str.c_str()); +// send_udp(send_socket, server_ip, 8700, str.c_str(), str.length()); +// //printf("%5d,%s\n", (int)str.length(), str.c_str()); +// //send_line_buf(&output, str.c_str(), str.length()); + +// count++; +// } +// printf("count:%d\n",count); + +// } + + int main(int argc, char *argv[]) { testing::InitGoogleTest(&argc, argv); |
