diff options
| author | fumingwei <[email protected]> | 2023-09-01 22:52:43 +0800 |
|---|---|---|
| committer | fumingwei <[email protected]> | 2023-09-01 22:55:06 +0800 |
| commit | 81d127144e499c451d0bdb452adb7fd22cd080c0 (patch) | |
| tree | d1394940c6d52f2ee957dfd967faaca45a0e4198 /src/line_protocol_output.cpp | |
| parent | c6e449c69900cdaed15c48a5f6d64a74e1a0f7c4 (diff) | |
bugfix:将line protocol发送方式改为先收集metric然后集体发送v3.0.13
Diffstat (limited to 'src/line_protocol_output.cpp')
| -rw-r--r-- | src/line_protocol_output.cpp | 39 |
1 files changed, 30 insertions, 9 deletions
diff --git a/src/line_protocol_output.cpp b/src/line_protocol_output.cpp index b1f9389..c5ef544 100644 --- a/src/line_protocol_output.cpp +++ b/src/line_protocol_output.cpp @@ -1,5 +1,8 @@ #include "fieldstat_internal.h" +#include <string> +#include <vector> +#define LINE_BUF_SIZE 1460 static void flush_send_buf(struct line_protocol_output *line_protocol_output) { @@ -21,13 +24,20 @@ static void flush_send_buf(struct line_protocol_output *line_protocol_output) } -static void send_line_buf(struct line_protocol_output *line_protocol_output, char *line_buf, unsigned int line_buf_len) +void send_line_buf(struct line_protocol_output *line_protocol_output, const char *line_buf, unsigned int line_buf_len) { if(line_protocol_output == NULL || line_buf == NULL || line_buf_len == 0) { return; } + if(line_buf_len > UDP_PAYLOAD_SIZE - 1) + { + send_udp(line_protocol_output->send_socket, line_protocol_output->server_ip, + line_protocol_output->server_port, line_buf, line_buf_len); + return; + } + if(UDP_PAYLOAD_SIZE - line_protocol_output->send_buf_offset - 1 < line_buf_len) { flush_send_buf(line_protocol_output); @@ -363,7 +373,7 @@ static void output_line_protocol_single_metric(struct fieldstat_instance *instan int used_len = 0; struct metric *metric = NULL; - char line_buf[UDP_PAYLOAD_SIZE]; + char line_buf[LINE_BUF_SIZE]; for(i = 0; i < n_cur_metric; i++) { @@ -406,7 +416,7 @@ static void output_line_protocol_table_row(struct fieldstat_instance *instance, int i = 0, j = 0, k = 0; int used_len = 0; - char line_buf[UDP_PAYLOAD_SIZE]; + char line_buf[LINE_BUF_SIZE]; struct table_metric *table = NULL; struct table_line *row = NULL; struct metric *row_metrics[TABLE_COLUMN_SIZE]; @@ -465,32 +475,43 @@ int line_protocol_dynamic_metric_output(struct fieldstat_dynamic_instance *insta struct metric **metrics = NULL; struct metric *metric = NULL; - char line_buf[UDP_PAYLOAD_SIZE]; - int used_len = 0; + //std::string line_buf_to_send; + //std::vector<std::string> line_buf_to_send; for(int i = 0; i < instance->n_thread; i++) { + std::vector<std::string> line_buf_to_send; pthread_spin_lock(instance->uthash_locks + i); head = &instance->n_thread_dynamic_metric[i]; HASH_ITER(hh, *head, dyn_metric, tmp_dyn_metric) { + char line_buf[LINE_BUF_SIZE] = "\0"; + metrics = dyn_metric->metrics; metric = metrics[0]; - used_len = 0; memset(line_buf, 0, sizeof(line_buf)); if(metric->table) { - used_len = build_table_row_line_buf(instance->name, instance->output_type, metric->table, metrics, line_buf, sizeof(line_buf)); + build_table_row_line_buf(instance->name, instance->output_type, metric->table, metrics, line_buf, sizeof(line_buf)); } else { - used_len = build_single_metric_line_buf(instance->name, instance->output_type, metric, line_buf, sizeof(line_buf)); + build_single_metric_line_buf(instance->name, instance->output_type, metric, line_buf, sizeof(line_buf)); } - send_line_buf(&instance->line_protocol_output, line_buf, used_len); + + /* copy the line_buf as str to vector line_buf_to_send */ + line_buf_to_send.push_back(std::string(line_buf)); } pthread_spin_unlock(instance->uthash_locks + i); + for (auto it = line_buf_to_send.begin(); it != line_buf_to_send.end(); ++it) + { + const std::string& str = *it; + send_line_buf(&instance->line_protocol_output, str.c_str(), str.length()); + } } + flush_send_buf(&instance->line_protocol_output); + return 0; } |
