summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorfumingwei <[email protected]>2023-09-01 22:52:43 +0800
committerfumingwei <[email protected]>2023-09-01 22:55:06 +0800
commit81d127144e499c451d0bdb452adb7fd22cd080c0 (patch)
treed1394940c6d52f2ee957dfd967faaca45a0e4198
parentc6e449c69900cdaed15c48a5f6d64a74e1a0f7c4 (diff)
bugfix:将line protocol发送方式改为先收集metric然后集体发送v3.0.13
-rw-r--r--src/fieldstat.cpp18
-rw-r--r--src/fieldstat_internal.h3
-rw-r--r--src/line_protocol_output.cpp39
-rw-r--r--test/bin/telegraf_unit_test.conf3
-rw-r--r--test/src/gtest_dynamic_fieldstat.cpp96
5 files changed, 147 insertions, 12 deletions
diff --git a/src/fieldstat.cpp b/src/fieldstat.cpp
index 5696345..2b4a04e 100644
--- a/src/fieldstat.cpp
+++ b/src/fieldstat.cpp
@@ -341,6 +341,24 @@ int startup_udp()
return sd_udp;
}
+ // uint64_t rcvbuf_size = 32 * 1024 * 1024;
+ // if (setsockopt (sd_udp, SOL_SOCKET, SO_RCVBUF, &rcvbuf_size, sizeof(rcvbuf_size) ) < 0)
+ // {
+ // printf("FS2:setsockopt error: %d %s, restart socket.", errno, strerror(errno));
+ // close (sd_udp);
+ // sd_udp=-1;
+ // return sd_udp;
+ // }
+
+ // uint64_t sndbuf_size = 32 * 1024 * 1024;
+ // if (setsockopt (sd_udp, SOL_SOCKET, SO_RCVBUF, &sndbuf_size, sizeof(sndbuf_size) ) < 0)
+ // {
+ // printf("FS2:setsockopt error: %d %s, restart socket.", errno, strerror(errno));
+ // close (sd_udp);
+ // sd_udp=-1;
+ // return sd_udp;
+ // }
+
return sd_udp;
}
diff --git a/src/fieldstat_internal.h b/src/fieldstat_internal.h
index 47da22d..e471116 100644
--- a/src/fieldstat_internal.h
+++ b/src/fieldstat_internal.h
@@ -27,7 +27,6 @@
#define INIT_STAT_FIELD_NUM 1024
#define MAX_PATH_LEN 256
-#define UDP_PAYLOAD_SIZE 1460
#define STATUS_PER_LINE 6
#define FIELD_PER_LINE 8
@@ -52,7 +51,7 @@
#define LEN_PATH_MAX 256
#define TABLE_MAX_NUM 64
#define TABLE_COLUMN_SIZE 64
-#define UDP_PAYLOAD_SIZE 1460
+#define UDP_PAYLOAD_SIZE 512
#define LEFT_MIN_BUFF_LEN 1024
#define REALLOC_SCALE_SIZE 2048
#define STR_LEN_32 32
diff --git a/src/line_protocol_output.cpp b/src/line_protocol_output.cpp
index b1f9389..c5ef544 100644
--- a/src/line_protocol_output.cpp
+++ b/src/line_protocol_output.cpp
@@ -1,5 +1,8 @@
#include "fieldstat_internal.h"
+#include <string>
+#include <vector>
+#define LINE_BUF_SIZE 1460
static void flush_send_buf(struct line_protocol_output *line_protocol_output)
{
@@ -21,13 +24,20 @@ static void flush_send_buf(struct line_protocol_output *line_protocol_output)
}
-static void send_line_buf(struct line_protocol_output *line_protocol_output, char *line_buf, unsigned int line_buf_len)
+void send_line_buf(struct line_protocol_output *line_protocol_output, const char *line_buf, unsigned int line_buf_len)
{
if(line_protocol_output == NULL || line_buf == NULL || line_buf_len == 0)
{
return;
}
+ if(line_buf_len > UDP_PAYLOAD_SIZE - 1)
+ {
+ send_udp(line_protocol_output->send_socket, line_protocol_output->server_ip,
+ line_protocol_output->server_port, line_buf, line_buf_len);
+ return;
+ }
+
if(UDP_PAYLOAD_SIZE - line_protocol_output->send_buf_offset - 1 < line_buf_len)
{
flush_send_buf(line_protocol_output);
@@ -363,7 +373,7 @@ static void output_line_protocol_single_metric(struct fieldstat_instance *instan
int used_len = 0;
struct metric *metric = NULL;
- char line_buf[UDP_PAYLOAD_SIZE];
+ char line_buf[LINE_BUF_SIZE];
for(i = 0; i < n_cur_metric; i++)
{
@@ -406,7 +416,7 @@ static void output_line_protocol_table_row(struct fieldstat_instance *instance,
int i = 0, j = 0, k = 0;
int used_len = 0;
- char line_buf[UDP_PAYLOAD_SIZE];
+ char line_buf[LINE_BUF_SIZE];
struct table_metric *table = NULL;
struct table_line *row = NULL;
struct metric *row_metrics[TABLE_COLUMN_SIZE];
@@ -465,32 +475,43 @@ int line_protocol_dynamic_metric_output(struct fieldstat_dynamic_instance *insta
struct metric **metrics = NULL;
struct metric *metric = NULL;
- char line_buf[UDP_PAYLOAD_SIZE];
- int used_len = 0;
+ //std::string line_buf_to_send;
+ //std::vector<std::string> line_buf_to_send;
for(int i = 0; i < instance->n_thread; i++)
{
+ std::vector<std::string> line_buf_to_send;
pthread_spin_lock(instance->uthash_locks + i);
head = &instance->n_thread_dynamic_metric[i];
HASH_ITER(hh, *head, dyn_metric, tmp_dyn_metric)
{
+ char line_buf[LINE_BUF_SIZE] = "\0";
+
metrics = dyn_metric->metrics;
metric = metrics[0];
- used_len = 0;
memset(line_buf, 0, sizeof(line_buf));
if(metric->table)
{
- used_len = build_table_row_line_buf(instance->name, instance->output_type, metric->table, metrics, line_buf, sizeof(line_buf));
+ build_table_row_line_buf(instance->name, instance->output_type, metric->table, metrics, line_buf, sizeof(line_buf));
}
else
{
- used_len = build_single_metric_line_buf(instance->name, instance->output_type, metric, line_buf, sizeof(line_buf));
+ build_single_metric_line_buf(instance->name, instance->output_type, metric, line_buf, sizeof(line_buf));
}
- send_line_buf(&instance->line_protocol_output, line_buf, used_len);
+
+ /* copy the line_buf as str to vector line_buf_to_send */
+ line_buf_to_send.push_back(std::string(line_buf));
}
pthread_spin_unlock(instance->uthash_locks + i);
+ for (auto it = line_buf_to_send.begin(); it != line_buf_to_send.end(); ++it)
+ {
+ const std::string& str = *it;
+ send_line_buf(&instance->line_protocol_output, str.c_str(), str.length());
+ }
}
+
flush_send_buf(&instance->line_protocol_output);
+
return 0;
}
diff --git a/test/bin/telegraf_unit_test.conf b/test/bin/telegraf_unit_test.conf
index 748060e..2de1dc7 100644
--- a/test/bin/telegraf_unit_test.conf
+++ b/test/bin/telegraf_unit_test.conf
@@ -2,7 +2,7 @@
interval = "5s"
round_interval = true
metric_batch_size = 1000
- metric_buffer_limit = 10000
+ metric_buffer_limit = 1000000
collection_jitter = "0s"
flush_interval = "1s"
flush_jitter = "0s"
@@ -23,3 +23,4 @@
[[inputs.socket_listener]]
service_address = "udp://:8700"
data_format = "influx"
+ read_buffer_size = "32MiB"
diff --git a/test/src/gtest_dynamic_fieldstat.cpp b/test/src/gtest_dynamic_fieldstat.cpp
index dd509e8..6fc9d43 100644
--- a/test/src/gtest_dynamic_fieldstat.cpp
+++ b/test/src/gtest_dynamic_fieldstat.cpp
@@ -2821,6 +2821,102 @@ TEST(FeildStatDynamicAPI, SendLenEqualUDPPayload)
}
+
+// TEST(FeildStatDynamicAPI, SendTo)
+// {
+// unsigned int server_ip = 0;
+// int send_socket = startup_udp();
+// struct line_protocol_output output;
+
+// inet_pton(AF_INET, "127.0.0.1", (void *)&(server_ip));
+
+// memset(&output, 0, sizeof(struct line_protocol_output));
+// output.send_socket = send_socket;
+// output.server_ip = server_ip;
+// output.server_port = 8700;
+
+
+// std::vector<std::string> send_buf;
+
+// char line_buf[1460];
+// int count = 0;
+
+// for(int i = 0; i < 90000; i++)
+// {
+// memset(line_buf, 0, sizeof(line_buf));
+// // snprintf(line_buf, sizeof(line_buf),
+// // "Active_session_%05d,app_name=firewall Active_session_01=12\n"
+// // "Active_session_%05d,app_name=firewall Active_session_02=12\n"
+// // "Active_session_%05d,app_name=firewall Active_session_03=12\n"
+// // "Active_session_%05d,app_name=firewall Active_session_04=12\n"
+// // "Active_session_%05d,app_name=firewall Active_session_05=12\n"
+// // "Active_session_%05d,app_name=firewall Active_session_06=12\n"
+// // "Active_session_%05d,app_name=firewall Active_session_07=12\n"
+// // "Active_session_%05d,app_name=firewall Active_session_08=12\n"
+// // "Active_session_%05d,app_name=firewall Active_session_09=12\n"
+// // "Active_session_%05d,app_name=firewall Active_session_10=12\n"
+// // "Active_session_%05d,app_name=firewall Active_session_11=12\n"
+// // "Active_session_%05d,app_name=firewall Active_session_12=12\n"
+// // "Active_session_%05d,app_name=firewall Active_session_13=12\n"
+// // "Active_session_%05d,app_name=firewall Active_session_14=12\n"
+// // "Active_session_%05d,app_name=firewall Active_session_15=12\n"
+// // "Active_session_%05d,app_name=firewall Active_session_16=12\n"
+// // "Active_session_%05d,app_name=firewall Active_session_17=12\n"
+// // "Active_session_%05d,app_name=firewall Active_session_18=12\n"
+// // "Active_session_%05d,app_name=firewall Active_session_19=12\n"
+// // "Active_session_%05d,app_name=firewall Active_session_20=12\n"
+// // "Active_session_%05d,app_name=firewall Active_session_21=12\n"
+// // "Active_session_%05d,app_name=firewall Active_session_22=12\n"
+// // "Active_session_%05d,app_name=firewall Active_session_23=12\n"
+// // "Active_session_%05d,app_name=firewall Active_session_24=12\n",
+// // i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i);
+
+// snprintf(line_buf, sizeof(line_buf),
+// "Active_sessi01_%05d,app_name=firewall,Active_session_01=12,"
+// "Active_sessi02=%05d,app_name=firewa01,Active_session_02=12,"
+// "Active_sessi03=%05d,app_name=firewa02,Active_session_03=12,"
+// "Active_sessi04=%05d,app_name=firewa03,Active_session_04=12,"
+// "Active_sessi05=%05d,app_name=firewa04,Active_session_05=12,"
+// "Active_sessi06=%05d,app_name=firewa05,Active_session_06=12,"
+// "Active_sessi07=%05d,app_name=firewa06,Active_session_07=12,"
+// "Active_sessi08=%05d,app_name=firewa07,Active_session_08=12,"
+// "Active_sessi09=%05d,app_name=firewa08,Active_session_09=12,"
+// "Active_sessi10=%05d,app_name=firewa09,Active_session_10=12,"
+// "Active_sessi11=%05d,app_name=firewa10,Active_session_11=12,"
+// "Active_sessi12=%05d,app_name=firewa11,Active_session_12=12,"
+// "Active_sessi13=%05d,app_name=firewa12,Active_session_13=12,"
+// "Active_sessi14=%05d,app_name=firewa13,Active_session_14=12,"
+// "Active_sessi15=%05d,app_name=firewa14,Active_session_15=12,"
+// "Active_sessi16=%05d,app_name=firewa15,Active_session_16=12,"
+// "Active_sessi17=%05d,app_name=firewa16,Active_session_17=12,"
+// "Active_sessi18=%05d,app_name=firewa17,Active_session_18=12,"
+// "Active_sessi19=%05d,app_name=firewa18,Active_session_19=12,"
+// "Active_sessi20=%05d,app_name=firewa19,Active_session_20=12,"
+// "Active_sessi21=%05d,app_name=firewa20,Active_session_21=12,"
+// "Active_sessi22=%05d,app_name=firewa21,Active_session_22=12,"
+// "Active_sessi23=%05d,app_name=firewa22,Active_session_23=12,"
+// "Active_sessi24=%05d,app_name=firewa23 Active_session_24=12\n",
+// i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i);
+// send_buf.push_back(std::string(line_buf));
+
+// }
+
+// system("cat /dev/null > /tmp/metrics.out");
+// for (auto it = send_buf.begin(); it != send_buf.end(); ++it) {
+// const std::string& str = *it;
+// //std::cout << str << " - Length: " << str.length() << std::endl;
+// //printf("%s\n", str.c_str());
+// send_udp(send_socket, server_ip, 8700, str.c_str(), str.length());
+// //printf("%5d,%s\n", (int)str.length(), str.c_str());
+// //send_line_buf(&output, str.c_str(), str.length());
+
+// count++;
+// }
+// printf("count:%d\n",count);
+
+// }
+
+
int main(int argc, char *argv[])
{
testing::InitGoogleTest(&argc, argv);