diff options
| author | chenzizhan <[email protected]> | 2023-05-29 15:22:42 +0800 |
|---|---|---|
| committer | chenzizhan <[email protected]> | 2023-05-29 15:22:42 +0800 |
| commit | 9e9617aa26e5487fa785e9e570e60a9e0ca31d3e (patch) | |
| tree | 7db60ce36c446126b7d74305a4f9b04cf8688ab4 | |
| parent | 16251f9b2255ad4faf77aef5b75ebc489aabcc33 (diff) | |
line protocol implements with gtest
| -rw-r--r-- | inc/fieldstat.h | 2 | ||||
| -rw-r--r-- | src/fieldstat.cpp | 6 | ||||
| -rw-r--r-- | src/fieldstat_internal.h | 2 | ||||
| -rw-r--r-- | src/line_protocol_output.cpp | 94 | ||||
| -rw-r--r-- | test/src/gtest_fieldstat_topk.cpp | 98 | ||||
| -rw-r--r-- | test/src/test_utils.cpp | 49 | ||||
| -rw-r--r-- | test/src/test_utils.hpp | 12 |
7 files changed, 211 insertions, 52 deletions
diff --git a/inc/fieldstat.h b/inc/fieldstat.h index c52a99a..5786545 100644 --- a/inc/fieldstat.h +++ b/inc/fieldstat.h @@ -216,7 +216,7 @@ int fieldstat_register_summary(struct fieldstat_instance *instance, const char * * @param n_tag Size of tags * @param K How many fields to keep **/ -int fieldstat_register_heavy_keeper(struct fieldstat_instance *instance, const char *field_name, const struct fieldstat_tag tags[], size_t n_tag, int K, int output_window); +int fieldstat_register_topK(struct fieldstat_instance *instance, const char *field_name, const struct fieldstat_tag tags[], size_t n_tag, int K, int output_window); /** * Register dynamic fieldstat instance. diff --git a/src/fieldstat.cpp b/src/fieldstat.cpp index cacf6b9..9b411e6 100644 --- a/src/fieldstat.cpp +++ b/src/fieldstat.cpp @@ -998,7 +998,7 @@ error_out: // field name: 用来打印的名字,不是很重要 // tags: 没看懂有啥用,好像也是只有在打印的时候有用。另外有一处调用:is_output_prometheus。总之可以发现这个tag写进去就不变了。 // n_tag: sizeof(tags) / sizeof(tags[0]) -int fieldstat_register_heavy_keeper(struct fieldstat_instance *instance, const char *field_name, const struct fieldstat_tag tags[], size_t n_tag, int K, int output_window) +int fieldstat_register_topK(struct fieldstat_instance *instance, const char *field_name, const struct fieldstat_tag tags[], size_t n_tag, int K, int output_window) { if(!is_valid_field_name(field_name)) { @@ -1008,6 +1008,10 @@ int fieldstat_register_heavy_keeper(struct fieldstat_instance *instance, const c { return -1; } + if (K > 50) { // TODO: 要不要加限制,有一个往文件输出的问题,不过说实话,往文件写也不能让K太大。 + printf("Due to the limitation of UDP packet size, K should be less than 50\n"); + return -1; + } enum field_type type = FIELD_TYPE_TOPK; struct metric *metric = metric_new(type, field_name, tags, n_tag); diff --git a/src/fieldstat_internal.h b/src/fieldstat_internal.h index 0f2fd48..444279e 100644 --- a/src/fieldstat_internal.h +++ b/src/fieldstat_internal.h @@ -272,6 +272,8 @@ int startup_udp(); void metric_value_operate(struct metric *metric, enum field_op op, long long value); struct table_metric* table_metric_new(const char *name, const char *column_name[], enum field_type column_type[], size_t n_column); void table_metric_free(struct table_metric *table); +struct heavy_keeper *choose_heavy_keeper_for_output(struct metric *metric); +void heavy_keeper_archieve_before_output(struct metric *metric); struct table_line *table_line_new(const char *name, const struct fieldstat_tag tags[],size_t n_tag); void table_line_free(struct table_line *table_line); diff --git a/src/line_protocol_output.cpp b/src/line_protocol_output.cpp index 9646fdd..661053d 100644 --- a/src/line_protocol_output.cpp +++ b/src/line_protocol_output.cpp @@ -81,7 +81,7 @@ static int add_user_tag_set(struct metric *metric, char *line_buf, unsigned int used_len += snprintf(line_buf + used_len, line_buf_size - used_len, ",%s=%s", metric->tag_key[i], metric->tag_value[i]); } - return used_len; + return used_len; } static long long read_single_metric_value(struct metric *metric, int output_type) @@ -240,6 +240,90 @@ static int build_table_row_line_buf(char *instance_name, int output_type, struct return used_len; } +// keylen: including '\0' +int add_one_topk_field(char *key, size_t key_len, unsigned value, char *line_buf, unsigned int line_buf_size) +{ + if (key_len >= line_buf_size - 3) { // "=", "\n", and at least one char for value + printf("add_one_topk_field, 1\n"); + return -1; + } + int used_len = snprintf(line_buf, line_buf_size, "%s=%u", key, value); + printf("add_one_topk_field, buffer: %s\n", line_buf); + if (used_len < 0 || used_len >= (int)line_buf_size - 1) { // -1: line buf should end with '\n' // TODO: what if timestamp or anything else are inserted + printf("add_one_topk_field, 2\n"); + + line_buf[0] = '\0'; + } + return used_len; +} + +int add_field_set_for_topk(struct heavy_keeper *hk, char *line_buf, unsigned int line_buf_size) +{ + struct heavy_keeper_result *result = heavy_keeper_query(hk); + printf("n_key: %zu\n", result->n_key); + + int used_len = 0; + for (size_t i = 0; i < result->n_key; i++) { + + int tmp_ret = add_one_topk_field(result->key[i], result->key_len[i], result->count[i], line_buf + used_len, line_buf_size - used_len); + if (tmp_ret < 0) { + break; + } + used_len += tmp_ret; + + if (i < result->n_key - 1) { // not the last one + int tmp_ret = snprintf(line_buf + used_len, line_buf_size - used_len, ","); + if (tmp_ret < 0 || used_len >= (int)line_buf_size - 1) { + line_buf[0] = '\0'; + break; + } + used_len++; + } + } + + // IF the last is ',' remove it + if (used_len > 0 && line_buf[used_len - 1] == ',') { + line_buf[used_len - 1] = '\0'; + used_len--; + } + + heavy_keeper_result_free(result); + return used_len; +} + +void output_line_protocol_topk(struct fieldstat_instance *instance, struct metric *metric) +{ + int used_len = 0; + char line_buf[UDP_PAYLOAD_SIZE] = ""; // FIXME: TODO:不够长,实际上支持的长度不止这么多,是不是可以扩一下。 + // 扩有一些修改点,比如send_line_buf里就直接用了这个宏。 + // UDP自己的限制是64k。但是如果超过了MTU会很麻烦。当前的数值是绝对安全的数值,再大一点就会有问题了。 + // [sendto接口限制]https://stackoverflow.com/questions/22773391/what-is-the-maximum-buffer-length-allowed-by-the-sendto-function-in-c + // 方案2:当前先这样做,写的时候截断。平均每个flow id的长度是15的话,数字平均是100以内的话,一个的平均长度在20,那么20*50 = 1000,所以大概支持top 50,直接限制用户创建K > 50的topk + + size_t line_buf_size = sizeof(line_buf); + + used_len += add_measurement(metric->field_name, line_buf, line_buf_size); + + used_len += add_default_tag_set(instance->name, NULL, line_buf + used_len, line_buf_size - used_len); + + used_len += add_user_tag_set(metric, line_buf + used_len, line_buf_size - used_len); + + used_len += snprintf(line_buf + used_len, line_buf_size - used_len, " "); + printf("remaining line_buf_size: %zu\n", line_buf_size - used_len); + + // TODO: <key>=value,<key>=value... + heavy_keeper_archieve_before_output(metric); + struct heavy_keeper *hk = choose_heavy_keeper_for_output(metric); + used_len += add_field_set_for_topk(hk, line_buf + used_len, line_buf_size - used_len); + if (used_len == 0) { + printf("ERROR: line protocol has no space in UDP buffer for writing field."); // line protocol packet has at lease one field + } + used_len += snprintf(line_buf + used_len, line_buf_size - used_len, "\n"); // TODO: 这个回车是干嘛的? + + printf("the output buffer : \n%s\n---------------------\n", line_buf); + + send_line_buf(&instance->line_protocol_output, line_buf, used_len); +} static void output_line_protocol_single_metric(struct fieldstat_instance *instance, int n_cur_metric) { @@ -257,6 +341,14 @@ static void output_line_protocol_single_metric(struct fieldstat_instance *instan { continue; } + + if (metric->field_type == FIELD_TYPE_TOPK) { + output_line_protocol_topk(instance, metric); + continue; + } else if (metric->field_type == FIELD_TYPE_SUMMARY || metric->field_type == FILED_TYPE_HISTOGRAM) { + continue; // currently not support output of summary and histogram with line protocol + } + if(metric->is_ratio == 1) { continue; diff --git a/test/src/gtest_fieldstat_topk.cpp b/test/src/gtest_fieldstat_topk.cpp index 6a7df2b..ecfb428 100644 --- a/test/src/gtest_fieldstat_topk.cpp +++ b/test/src/gtest_fieldstat_topk.cpp @@ -12,34 +12,6 @@ #include "test_utils.hpp" -std::vector<std::string> add_to_heavy_keeper(struct fieldstat_instance * instance, int metric_id, std::vector<std::pair<std::string, int>> &input) -{ - for (auto item : input) { - fieldstat_value_incrby_topK(instance, metric_id, item.first.c_str(), item.first.size(), item.second); - } - - //rank by count - std::sort(input.begin(), input.end(), [](std::pair<std::string, int> a, std::pair<std::string, int> b) { - return a.second > b.second; - }); - - std::vector<std::string> patterns; - for (auto item : input) { - std::string pattern = ".*" + item.first + ".*" + std::to_string(item.second) + ".*"; - patterns.push_back(pattern); - } - return patterns; -} - -std::string convert_default_pattern_to_json(std::vector<std::string> &patterns) -{ - std::string pattern; - for (size_t i = 0; i < patterns.size(); i++) { - pattern += patterns[i]; - } - return pattern; -} - Heavy_keeper_tester *Construct_default_tester(int output_window = 1) { Heavy_keeper_tester *tester = new Heavy_keeper_tester("heavy_keeper_test"); @@ -59,22 +31,22 @@ Heavy_keeper_tester *Construct_default_tester(int output_window = 1) return tester; } -TEST(FeildStatOutput, output_topk_default) +TEST(TopkOutput, output_topk_default) { Heavy_keeper_tester *tester = Construct_default_tester(); std::string pattern_expect = tester->get_expected_pattern("default"); - std::string file_content = tester->write_to_file("/tmp/czzdefault.txt", "default"); + std::string file_content = tester->read_from_file(TEST_DEFAULT_FILE_PATH, "default"); EXPECT_TRUE(std::regex_match(file_content, std::regex(pattern_expect))); delete tester; } -TEST(FeildStatOutput, output_topk_json) +TEST(TopkOutput, output_topk_json) { Heavy_keeper_tester *tester = Construct_default_tester(); std::string pattern_expect = tester->get_expected_pattern("json"); - std::string file_content = tester->write_to_file("/tmp/czzjson.txt", "json"); + std::string file_content = tester->read_from_file(TEST_JSON_FILE_PATH, "json"); EXPECT_TRUE(std::regex_match(file_content, std::regex(pattern_expect))); delete tester; } @@ -95,12 +67,12 @@ std::vector<std::string> splitString(const std::string& str) } const std::regex CONTENT_PATTERN("^([0-9]+).*"); -TEST(FeildStatOutput, output_topk_default_flushed_up) +TEST(TopkOutput, output_topk_default_flushed_up) { Heavy_keeper_tester *tester = Construct_default_tester(1); std::string pattern_expect = tester->get_expected_pattern("default"); - std::string file_content1 = tester->write_to_file("/tmp/czzdefault.txt", "default"); // changing not flushed - std::string file_content2 = tester->write_to_file("/tmp/czzdefault.txt", "default"); // changing deleted + std::string file_content1 = tester->read_from_file(TEST_DEFAULT_FILE_PATH, "default"); // changing not flushed + std::string file_content2 = tester->read_from_file(TEST_DEFAULT_FILE_PATH, "default"); // changing flushed EXPECT_TRUE(std::regex_match(file_content1, std::regex(pattern_expect))); std::vector<std::string> lines2 = splitString(file_content2); @@ -111,17 +83,67 @@ TEST(FeildStatOutput, output_topk_default_flushed_up) delete tester; } -TEST(FeildStatOutput, output_topk_default_accumulated) +TEST(TopkOutput, output_topk_default_accumulated) { Heavy_keeper_tester *tester = Construct_default_tester(0); - (void)tester->write_to_file("/tmp/czzdefault.txt", "default"); + (void)tester->read_from_file(TEST_DEFAULT_FILE_PATH, "default"); tester->take_action("field1", "hello", 20); // hello set to 30 tester->take_action("field1", "Hello_new", 20); std::string expected2 = tester->get_expected_pattern("default"); - std::string file_content2 = tester->write_to_file("/tmp/czzdefault.txt", "default"); + std::string file_content2 = tester->read_from_file(TEST_DEFAULT_FILE_PATH, "default"); + + EXPECT_TRUE(std::regex_match(file_content2, std::regex(expected2))); + + delete tester; +} + +TEST(TopkOutput, output_topk_line_protocol) +{ + Heavy_keeper_tester *tester = Construct_default_tester(); + clear_file(TEST_LINE_PROTOCOL_FILE_PATH); + + std::string expected = tester->get_expected_pattern("json"); + + std::string content = tester->read_from_line_protocol(TEST_LINE_PROTOCOL_FILE_PATH); + + EXPECT_TRUE(std::regex_match(content, std::regex(expected))); + + delete tester; +} + +TEST(TopkOutput, output_topk_line_protocol_flushed_up) +{ + Heavy_keeper_tester *tester = Construct_default_tester(1); + clear_file(TEST_LINE_PROTOCOL_FILE_PATH); + + std::string pattern_expect = tester->get_expected_pattern("json"); + + std::string file_content1 = tester->read_from_line_protocol(TEST_LINE_PROTOCOL_FILE_PATH); // changing not flushed + std::string file_content2 = tester->read_from_line_protocol(TEST_LINE_PROTOCOL_FILE_PATH); // changing flushed + + std::vector<std::string> lines2 = splitString(file_content2); + for (auto line : lines2) { // only have headers + EXPECT_FALSE(std::regex_match(line, std::regex(CONTENT_PATTERN))); + } + + delete tester; +} + +TEST(TopkOutput, output_topk_line_protocol_accumulated) +{ + Heavy_keeper_tester *tester = Construct_default_tester(0); + clear_file(TEST_LINE_PROTOCOL_FILE_PATH); + + (void)tester->read_from_line_protocol(TEST_LINE_PROTOCOL_FILE_PATH); // wait one rount + + tester->take_action("field1", "hello", 20); // hello set to 30 + tester->take_action("field1", "Hello_new", 20); + std::string expected2 = tester->get_expected_pattern("json"); + + std::string file_content2 = tester->read_from_line_protocol(TEST_LINE_PROTOCOL_FILE_PATH); EXPECT_TRUE(std::regex_match(file_content2, std::regex(expected2))); diff --git a/test/src/test_utils.cpp b/test/src/test_utils.cpp index f1fa21a..b1e27a1 100644 --- a/test/src/test_utils.cpp +++ b/test/src/test_utils.cpp @@ -2,6 +2,7 @@ #include <regex> #include <iostream> #include <fstream> +#include <stdlib.h> #include <gtest/gtest.h> using namespace std; @@ -9,7 +10,7 @@ using namespace std; void Heavy_keeper_tester::register_metric(const char *metric_name, int K, int out_window) { - int metric_id = fieldstat_register_heavy_keeper(this->instance, metric_name, NULL, 0, K, out_window); + int metric_id = fieldstat_register_topK(this->instance, metric_name, NULL, 0, K, out_window); struct metric *metric = get_metric(this->instance, metric_id); struct metric_info metric_info = {metric, metric_id, out_window}; metric_map[metric_name] = metric_info; @@ -17,16 +18,14 @@ void Heavy_keeper_tester::register_metric(const char *metric_name, int K, int ou void Heavy_keeper_tester::take_action(const char *metric_name, const char *member, int value) { - // 1. 从map里找到metric const struct metric_info &metric_i = metric_map[metric_name]; - // 2. 保存action struct action action; action.metric_name = metric_name; action.member = member; action.value = value; - this->actions.push_back(action); + if (metric_i.output_window == 0) { bool found = false; for (size_t i = 0; i < accu_actions.size(); i++) { @@ -89,17 +88,27 @@ std::string Heavy_keeper_tester::get_expected_pattern(std::string method) sort_map(metric_map); if (string(method) == "json") { - Reg_str pattern; + size_t i = 0; + Reg_str pattern[metric_map.size()]; for (auto &m : this->metric_map) { - pattern.add_text().add(m.first).add_text(); + pattern[i] += Reg_str().add_text().add(m.first).add_text().parenthsis(); + pattern[i].add("|"); // head first or content first, both is ok, since its json format. auto &tmp_actions = m.second.output_window == 0 ? this->accu_actions : this->actions; vector<pair<string, int>> input = get_expected_topk(m.first, tmp_actions); + Reg_str content; for (const auto &item : input) { std::string tmp = ".*" + item.first + ".*" + std::to_string(item.second) + ".*"; - pattern += Reg_str(tmp); + content += Reg_str(tmp); } + pattern[i] += content.parenthsis(); + i++; } - return pattern.add_any().get_str(); + Reg_str contat; + for (size_t i = 0; i < metric_map.size(); i++) { + contat += pattern[i].parenthsis().add("\\n?"); // in line protocol parser, every metric is independent, and is in different JSON object. + } + contat.add_any(); + return contat.get_str(); } else if (string(method) == "default") { Reg_str pattern[metric_map.size()]; size_t i = 0; @@ -134,10 +143,26 @@ void Heavy_keeper_tester::new_window() actions.clear(); } -std::string Heavy_keeper_tester::write_to_file(const char *output_file_path, const char *method) +std::string Heavy_keeper_tester::read_from_file(const char *output_file_path, const char *method) { if (!instance->running) { int ret = fieldstat_set_local_output(instance, output_file_path, method); + EXPECT_EQ(0, ret); + fieldstat_instance_start(instance); + } + + sleep(3); + + std::ifstream ifs(output_file_path); + std::string content( (std::istreambuf_iterator<char>(ifs) ), + (std::istreambuf_iterator<char>() ) ); + return content; +} + +std::string Heavy_keeper_tester::read_from_line_protocol(const char *output_file_path, const char *ip, unsigned long portal) +{ + if (!instance->running) { + int ret = fieldstat_set_line_protocol_server(instance, ip, portal); fieldstat_instance_start(instance); EXPECT_EQ(0, ret); } @@ -149,3 +174,9 @@ std::string Heavy_keeper_tester::write_to_file(const char *output_file_path, con (std::istreambuf_iterator<char>() ) ); return content; } + +void clear_file(const char *file_paths) +{ + std::string clear_cmd = "cat /dev/null > " + std::string(file_paths); + system(clear_cmd.c_str()); +} diff --git a/test/src/test_utils.hpp b/test/src/test_utils.hpp index 23fb72e..e0c5665 100644 --- a/test/src/test_utils.hpp +++ b/test/src/test_utils.hpp @@ -7,6 +7,11 @@ #include "fieldstat.h" #include "fieldstat_internal.h" +#define TEST_PORTAL 8700 +#define TEST_LINE_PROTOCOL_FILE_PATH "/tmp/metrics.out" +#define TEST_DEFAULT_FILE_PATH "/tmp/czzdefault.txt" +#define TEST_JSON_FILE_PATH "/tmp/czzjson.txt" + class Reg_str { public: @@ -64,7 +69,8 @@ class Heavy_keeper_tester void register_metric(const char *metric_name, int K, int out_window); void take_action(const char *metric_name, const char *member, int value); - std::string write_to_file(const char *output_file_path, const char *method); + std::string read_from_file(const char *output_file_path, const char *method); + std::string read_from_line_protocol(const char *output_file_path, const char *ip = "127.0.0.1", unsigned long portal = TEST_PORTAL); std::string get_expected_pattern(std::string method); void new_window(); @@ -86,4 +92,6 @@ class Heavy_keeper_tester struct fieldstat_instance * instance; void sort_map(std::unordered_map<std::string, struct metric_info> &metric_map); std::vector<std::pair<std::string, int>> get_expected_topk(const std::string &metric_name, std::vector<struct action> &tmp_actions); -};
\ No newline at end of file +}; + +void clear_file(const char *file_paths);
\ No newline at end of file |
