summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchenzizhan <[email protected]>2023-05-29 15:22:42 +0800
committerchenzizhan <[email protected]>2023-05-29 15:22:42 +0800
commit9e9617aa26e5487fa785e9e570e60a9e0ca31d3e (patch)
tree7db60ce36c446126b7d74305a4f9b04cf8688ab4
parent16251f9b2255ad4faf77aef5b75ebc489aabcc33 (diff)
line protocol implements with gtest
-rw-r--r--inc/fieldstat.h2
-rw-r--r--src/fieldstat.cpp6
-rw-r--r--src/fieldstat_internal.h2
-rw-r--r--src/line_protocol_output.cpp94
-rw-r--r--test/src/gtest_fieldstat_topk.cpp98
-rw-r--r--test/src/test_utils.cpp49
-rw-r--r--test/src/test_utils.hpp12
7 files changed, 211 insertions, 52 deletions
diff --git a/inc/fieldstat.h b/inc/fieldstat.h
index c52a99a..5786545 100644
--- a/inc/fieldstat.h
+++ b/inc/fieldstat.h
@@ -216,7 +216,7 @@ int fieldstat_register_summary(struct fieldstat_instance *instance, const char *
* @param n_tag Size of tags
* @param K How many fields to keep
**/
-int fieldstat_register_heavy_keeper(struct fieldstat_instance *instance, const char *field_name, const struct fieldstat_tag tags[], size_t n_tag, int K, int output_window);
+int fieldstat_register_topK(struct fieldstat_instance *instance, const char *field_name, const struct fieldstat_tag tags[], size_t n_tag, int K, int output_window);
/**
* Register dynamic fieldstat instance.
diff --git a/src/fieldstat.cpp b/src/fieldstat.cpp
index cacf6b9..9b411e6 100644
--- a/src/fieldstat.cpp
+++ b/src/fieldstat.cpp
@@ -998,7 +998,7 @@ error_out:
// field name: 用来打印的名字,不是很重要
// tags: 没看懂有啥用,好像也是只有在打印的时候有用。另外有一处调用:is_output_prometheus。总之可以发现这个tag写进去就不变了。
// n_tag: sizeof(tags) / sizeof(tags[0])
-int fieldstat_register_heavy_keeper(struct fieldstat_instance *instance, const char *field_name, const struct fieldstat_tag tags[], size_t n_tag, int K, int output_window)
+int fieldstat_register_topK(struct fieldstat_instance *instance, const char *field_name, const struct fieldstat_tag tags[], size_t n_tag, int K, int output_window)
{
if(!is_valid_field_name(field_name))
{
@@ -1008,6 +1008,10 @@ int fieldstat_register_heavy_keeper(struct fieldstat_instance *instance, const c
{
return -1;
}
+ if (K > 50) { // TODO: 要不要加限制,有一个往文件输出的问题,不过说实话,往文件写也不能让K太大。
+ printf("Due to the limitation of UDP packet size, K should be less than 50\n");
+ return -1;
+ }
enum field_type type = FIELD_TYPE_TOPK;
struct metric *metric = metric_new(type, field_name, tags, n_tag);
diff --git a/src/fieldstat_internal.h b/src/fieldstat_internal.h
index 0f2fd48..444279e 100644
--- a/src/fieldstat_internal.h
+++ b/src/fieldstat_internal.h
@@ -272,6 +272,8 @@ int startup_udp();
void metric_value_operate(struct metric *metric, enum field_op op, long long value);
struct table_metric* table_metric_new(const char *name, const char *column_name[], enum field_type column_type[], size_t n_column);
void table_metric_free(struct table_metric *table);
+struct heavy_keeper *choose_heavy_keeper_for_output(struct metric *metric);
+void heavy_keeper_archieve_before_output(struct metric *metric);
struct table_line *table_line_new(const char *name, const struct fieldstat_tag tags[],size_t n_tag);
void table_line_free(struct table_line *table_line);
diff --git a/src/line_protocol_output.cpp b/src/line_protocol_output.cpp
index 9646fdd..661053d 100644
--- a/src/line_protocol_output.cpp
+++ b/src/line_protocol_output.cpp
@@ -81,7 +81,7 @@ static int add_user_tag_set(struct metric *metric, char *line_buf, unsigned int
used_len += snprintf(line_buf + used_len, line_buf_size - used_len, ",%s=%s", metric->tag_key[i], metric->tag_value[i]);
}
- return used_len;
+ return used_len;
}
static long long read_single_metric_value(struct metric *metric, int output_type)
@@ -240,6 +240,90 @@ static int build_table_row_line_buf(char *instance_name, int output_type, struct
return used_len;
}
+// keylen: including '\0'
+int add_one_topk_field(char *key, size_t key_len, unsigned value, char *line_buf, unsigned int line_buf_size)
+{
+ if (key_len >= line_buf_size - 3) { // "=", "\n", and at least one char for value
+ printf("add_one_topk_field, 1\n");
+ return -1;
+ }
+ int used_len = snprintf(line_buf, line_buf_size, "%s=%u", key, value);
+ printf("add_one_topk_field, buffer: %s\n", line_buf);
+ if (used_len < 0 || used_len >= (int)line_buf_size - 1) { // -1: line buf should end with '\n' // TODO: what if timestamp or anything else are inserted
+ printf("add_one_topk_field, 2\n");
+
+ line_buf[0] = '\0';
+ }
+ return used_len;
+}
+
+int add_field_set_for_topk(struct heavy_keeper *hk, char *line_buf, unsigned int line_buf_size)
+{
+ struct heavy_keeper_result *result = heavy_keeper_query(hk);
+ printf("n_key: %zu\n", result->n_key);
+
+ int used_len = 0;
+ for (size_t i = 0; i < result->n_key; i++) {
+
+ int tmp_ret = add_one_topk_field(result->key[i], result->key_len[i], result->count[i], line_buf + used_len, line_buf_size - used_len);
+ if (tmp_ret < 0) {
+ break;
+ }
+ used_len += tmp_ret;
+
+ if (i < result->n_key - 1) { // not the last one
+ int tmp_ret = snprintf(line_buf + used_len, line_buf_size - used_len, ",");
+ if (tmp_ret < 0 || used_len >= (int)line_buf_size - 1) {
+ line_buf[0] = '\0';
+ break;
+ }
+ used_len++;
+ }
+ }
+
+ // IF the last is ',' remove it
+ if (used_len > 0 && line_buf[used_len - 1] == ',') {
+ line_buf[used_len - 1] = '\0';
+ used_len--;
+ }
+
+ heavy_keeper_result_free(result);
+ return used_len;
+}
+
+void output_line_protocol_topk(struct fieldstat_instance *instance, struct metric *metric)
+{
+ int used_len = 0;
+ char line_buf[UDP_PAYLOAD_SIZE] = ""; // FIXME: TODO:不够长,实际上支持的长度不止这么多,是不是可以扩一下。
+ // 扩有一些修改点,比如send_line_buf里就直接用了这个宏。
+ // UDP自己的限制是64k。但是如果超过了MTU会很麻烦。当前的数值是绝对安全的数值,再大一点就会有问题了。
+ // [sendto接口限制]https://stackoverflow.com/questions/22773391/what-is-the-maximum-buffer-length-allowed-by-the-sendto-function-in-c
+ // 方案2:当前先这样做,写的时候截断。平均每个flow id的长度是15的话,数字平均是100以内的话,一个的平均长度在20,那么20*50 = 1000,所以大概支持top 50,直接限制用户创建K > 50的topk
+
+ size_t line_buf_size = sizeof(line_buf);
+
+ used_len += add_measurement(metric->field_name, line_buf, line_buf_size);
+
+ used_len += add_default_tag_set(instance->name, NULL, line_buf + used_len, line_buf_size - used_len);
+
+ used_len += add_user_tag_set(metric, line_buf + used_len, line_buf_size - used_len);
+
+ used_len += snprintf(line_buf + used_len, line_buf_size - used_len, " ");
+ printf("remaining line_buf_size: %zu\n", line_buf_size - used_len);
+
+ // TODO: <key>=value,<key>=value...
+ heavy_keeper_archieve_before_output(metric);
+ struct heavy_keeper *hk = choose_heavy_keeper_for_output(metric);
+ used_len += add_field_set_for_topk(hk, line_buf + used_len, line_buf_size - used_len);
+ if (used_len == 0) {
+ printf("ERROR: line protocol has no space in UDP buffer for writing field."); // line protocol packet has at lease one field
+ }
+ used_len += snprintf(line_buf + used_len, line_buf_size - used_len, "\n"); // TODO: 这个回车是干嘛的?
+
+ printf("the output buffer : \n%s\n---------------------\n", line_buf);
+
+ send_line_buf(&instance->line_protocol_output, line_buf, used_len);
+}
static void output_line_protocol_single_metric(struct fieldstat_instance *instance, int n_cur_metric)
{
@@ -257,6 +341,14 @@ static void output_line_protocol_single_metric(struct fieldstat_instance *instan
{
continue;
}
+
+ if (metric->field_type == FIELD_TYPE_TOPK) {
+ output_line_protocol_topk(instance, metric);
+ continue;
+ } else if (metric->field_type == FIELD_TYPE_SUMMARY || metric->field_type == FILED_TYPE_HISTOGRAM) {
+ continue; // currently not support output of summary and histogram with line protocol
+ }
+
if(metric->is_ratio == 1)
{
continue;
diff --git a/test/src/gtest_fieldstat_topk.cpp b/test/src/gtest_fieldstat_topk.cpp
index 6a7df2b..ecfb428 100644
--- a/test/src/gtest_fieldstat_topk.cpp
+++ b/test/src/gtest_fieldstat_topk.cpp
@@ -12,34 +12,6 @@
#include "test_utils.hpp"
-std::vector<std::string> add_to_heavy_keeper(struct fieldstat_instance * instance, int metric_id, std::vector<std::pair<std::string, int>> &input)
-{
- for (auto item : input) {
- fieldstat_value_incrby_topK(instance, metric_id, item.first.c_str(), item.first.size(), item.second);
- }
-
- //rank by count
- std::sort(input.begin(), input.end(), [](std::pair<std::string, int> a, std::pair<std::string, int> b) {
- return a.second > b.second;
- });
-
- std::vector<std::string> patterns;
- for (auto item : input) {
- std::string pattern = ".*" + item.first + ".*" + std::to_string(item.second) + ".*";
- patterns.push_back(pattern);
- }
- return patterns;
-}
-
-std::string convert_default_pattern_to_json(std::vector<std::string> &patterns)
-{
- std::string pattern;
- for (size_t i = 0; i < patterns.size(); i++) {
- pattern += patterns[i];
- }
- return pattern;
-}
-
Heavy_keeper_tester *Construct_default_tester(int output_window = 1)
{
Heavy_keeper_tester *tester = new Heavy_keeper_tester("heavy_keeper_test");
@@ -59,22 +31,22 @@ Heavy_keeper_tester *Construct_default_tester(int output_window = 1)
return tester;
}
-TEST(FeildStatOutput, output_topk_default)
+TEST(TopkOutput, output_topk_default)
{
Heavy_keeper_tester *tester = Construct_default_tester();
std::string pattern_expect = tester->get_expected_pattern("default");
- std::string file_content = tester->write_to_file("/tmp/czzdefault.txt", "default");
+ std::string file_content = tester->read_from_file(TEST_DEFAULT_FILE_PATH, "default");
EXPECT_TRUE(std::regex_match(file_content, std::regex(pattern_expect)));
delete tester;
}
-TEST(FeildStatOutput, output_topk_json)
+TEST(TopkOutput, output_topk_json)
{
Heavy_keeper_tester *tester = Construct_default_tester();
std::string pattern_expect = tester->get_expected_pattern("json");
- std::string file_content = tester->write_to_file("/tmp/czzjson.txt", "json");
+ std::string file_content = tester->read_from_file(TEST_JSON_FILE_PATH, "json");
EXPECT_TRUE(std::regex_match(file_content, std::regex(pattern_expect)));
delete tester;
}
@@ -95,12 +67,12 @@ std::vector<std::string> splitString(const std::string& str)
}
const std::regex CONTENT_PATTERN("^([0-9]+).*");
-TEST(FeildStatOutput, output_topk_default_flushed_up)
+TEST(TopkOutput, output_topk_default_flushed_up)
{
Heavy_keeper_tester *tester = Construct_default_tester(1);
std::string pattern_expect = tester->get_expected_pattern("default");
- std::string file_content1 = tester->write_to_file("/tmp/czzdefault.txt", "default"); // changing not flushed
- std::string file_content2 = tester->write_to_file("/tmp/czzdefault.txt", "default"); // changing deleted
+ std::string file_content1 = tester->read_from_file(TEST_DEFAULT_FILE_PATH, "default"); // changing not flushed
+ std::string file_content2 = tester->read_from_file(TEST_DEFAULT_FILE_PATH, "default"); // changing flushed
EXPECT_TRUE(std::regex_match(file_content1, std::regex(pattern_expect)));
std::vector<std::string> lines2 = splitString(file_content2);
@@ -111,17 +83,67 @@ TEST(FeildStatOutput, output_topk_default_flushed_up)
delete tester;
}
-TEST(FeildStatOutput, output_topk_default_accumulated)
+TEST(TopkOutput, output_topk_default_accumulated)
{
Heavy_keeper_tester *tester = Construct_default_tester(0);
- (void)tester->write_to_file("/tmp/czzdefault.txt", "default");
+ (void)tester->read_from_file(TEST_DEFAULT_FILE_PATH, "default");
tester->take_action("field1", "hello", 20); // hello set to 30
tester->take_action("field1", "Hello_new", 20);
std::string expected2 = tester->get_expected_pattern("default");
- std::string file_content2 = tester->write_to_file("/tmp/czzdefault.txt", "default");
+ std::string file_content2 = tester->read_from_file(TEST_DEFAULT_FILE_PATH, "default");
+
+ EXPECT_TRUE(std::regex_match(file_content2, std::regex(expected2)));
+
+ delete tester;
+}
+
+TEST(TopkOutput, output_topk_line_protocol)
+{
+ Heavy_keeper_tester *tester = Construct_default_tester();
+ clear_file(TEST_LINE_PROTOCOL_FILE_PATH);
+
+ std::string expected = tester->get_expected_pattern("json");
+
+ std::string content = tester->read_from_line_protocol(TEST_LINE_PROTOCOL_FILE_PATH);
+
+ EXPECT_TRUE(std::regex_match(content, std::regex(expected)));
+
+ delete tester;
+}
+
+TEST(TopkOutput, output_topk_line_protocol_flushed_up)
+{
+ Heavy_keeper_tester *tester = Construct_default_tester(1);
+ clear_file(TEST_LINE_PROTOCOL_FILE_PATH);
+
+ std::string pattern_expect = tester->get_expected_pattern("json");
+
+ std::string file_content1 = tester->read_from_line_protocol(TEST_LINE_PROTOCOL_FILE_PATH); // changing not flushed
+ std::string file_content2 = tester->read_from_line_protocol(TEST_LINE_PROTOCOL_FILE_PATH); // changing flushed
+
+ std::vector<std::string> lines2 = splitString(file_content2);
+ for (auto line : lines2) { // only have headers
+ EXPECT_FALSE(std::regex_match(line, std::regex(CONTENT_PATTERN)));
+ }
+
+ delete tester;
+}
+
+TEST(TopkOutput, output_topk_line_protocol_accumulated)
+{
+ Heavy_keeper_tester *tester = Construct_default_tester(0);
+ clear_file(TEST_LINE_PROTOCOL_FILE_PATH);
+
+ (void)tester->read_from_line_protocol(TEST_LINE_PROTOCOL_FILE_PATH); // wait one rount
+
+ tester->take_action("field1", "hello", 20); // hello set to 30
+ tester->take_action("field1", "Hello_new", 20);
+ std::string expected2 = tester->get_expected_pattern("json");
+
+ std::string file_content2 = tester->read_from_line_protocol(TEST_LINE_PROTOCOL_FILE_PATH);
EXPECT_TRUE(std::regex_match(file_content2, std::regex(expected2)));
diff --git a/test/src/test_utils.cpp b/test/src/test_utils.cpp
index f1fa21a..b1e27a1 100644
--- a/test/src/test_utils.cpp
+++ b/test/src/test_utils.cpp
@@ -2,6 +2,7 @@
#include <regex>
#include <iostream>
#include <fstream>
+#include <stdlib.h>
#include <gtest/gtest.h>
using namespace std;
@@ -9,7 +10,7 @@ using namespace std;
void Heavy_keeper_tester::register_metric(const char *metric_name, int K, int out_window)
{
- int metric_id = fieldstat_register_heavy_keeper(this->instance, metric_name, NULL, 0, K, out_window);
+ int metric_id = fieldstat_register_topK(this->instance, metric_name, NULL, 0, K, out_window);
struct metric *metric = get_metric(this->instance, metric_id);
struct metric_info metric_info = {metric, metric_id, out_window};
metric_map[metric_name] = metric_info;
@@ -17,16 +18,14 @@ void Heavy_keeper_tester::register_metric(const char *metric_name, int K, int ou
void Heavy_keeper_tester::take_action(const char *metric_name, const char *member, int value)
{
- // 1. 从map里找到metric
const struct metric_info &metric_i = metric_map[metric_name];
- // 2. 保存action
struct action action;
action.metric_name = metric_name;
action.member = member;
action.value = value;
-
this->actions.push_back(action);
+
if (metric_i.output_window == 0) {
bool found = false;
for (size_t i = 0; i < accu_actions.size(); i++) {
@@ -89,17 +88,27 @@ std::string Heavy_keeper_tester::get_expected_pattern(std::string method)
sort_map(metric_map);
if (string(method) == "json") {
- Reg_str pattern;
+ size_t i = 0;
+ Reg_str pattern[metric_map.size()];
for (auto &m : this->metric_map) {
- pattern.add_text().add(m.first).add_text();
+ pattern[i] += Reg_str().add_text().add(m.first).add_text().parenthsis();
+ pattern[i].add("|"); // head first or content first, both is ok, since its json format.
auto &tmp_actions = m.second.output_window == 0 ? this->accu_actions : this->actions;
vector<pair<string, int>> input = get_expected_topk(m.first, tmp_actions);
+ Reg_str content;
for (const auto &item : input) {
std::string tmp = ".*" + item.first + ".*" + std::to_string(item.second) + ".*";
- pattern += Reg_str(tmp);
+ content += Reg_str(tmp);
}
+ pattern[i] += content.parenthsis();
+ i++;
}
- return pattern.add_any().get_str();
+ Reg_str contat;
+ for (size_t i = 0; i < metric_map.size(); i++) {
+ contat += pattern[i].parenthsis().add("\\n?"); // in line protocol parser, every metric is independent, and is in different JSON object.
+ }
+ contat.add_any();
+ return contat.get_str();
} else if (string(method) == "default") {
Reg_str pattern[metric_map.size()];
size_t i = 0;
@@ -134,10 +143,26 @@ void Heavy_keeper_tester::new_window()
actions.clear();
}
-std::string Heavy_keeper_tester::write_to_file(const char *output_file_path, const char *method)
+std::string Heavy_keeper_tester::read_from_file(const char *output_file_path, const char *method)
{
if (!instance->running) {
int ret = fieldstat_set_local_output(instance, output_file_path, method);
+ EXPECT_EQ(0, ret);
+ fieldstat_instance_start(instance);
+ }
+
+ sleep(3);
+
+ std::ifstream ifs(output_file_path);
+ std::string content( (std::istreambuf_iterator<char>(ifs) ),
+ (std::istreambuf_iterator<char>() ) );
+ return content;
+}
+
+std::string Heavy_keeper_tester::read_from_line_protocol(const char *output_file_path, const char *ip, unsigned long portal)
+{
+ if (!instance->running) {
+ int ret = fieldstat_set_line_protocol_server(instance, ip, portal);
fieldstat_instance_start(instance);
EXPECT_EQ(0, ret);
}
@@ -149,3 +174,9 @@ std::string Heavy_keeper_tester::write_to_file(const char *output_file_path, con
(std::istreambuf_iterator<char>() ) );
return content;
}
+
+void clear_file(const char *file_paths)
+{
+ std::string clear_cmd = "cat /dev/null > " + std::string(file_paths);
+ system(clear_cmd.c_str());
+}
diff --git a/test/src/test_utils.hpp b/test/src/test_utils.hpp
index 23fb72e..e0c5665 100644
--- a/test/src/test_utils.hpp
+++ b/test/src/test_utils.hpp
@@ -7,6 +7,11 @@
#include "fieldstat.h"
#include "fieldstat_internal.h"
+#define TEST_PORTAL 8700
+#define TEST_LINE_PROTOCOL_FILE_PATH "/tmp/metrics.out"
+#define TEST_DEFAULT_FILE_PATH "/tmp/czzdefault.txt"
+#define TEST_JSON_FILE_PATH "/tmp/czzjson.txt"
+
class Reg_str
{
public:
@@ -64,7 +69,8 @@ class Heavy_keeper_tester
void register_metric(const char *metric_name, int K, int out_window);
void take_action(const char *metric_name, const char *member, int value);
- std::string write_to_file(const char *output_file_path, const char *method);
+ std::string read_from_file(const char *output_file_path, const char *method);
+ std::string read_from_line_protocol(const char *output_file_path, const char *ip = "127.0.0.1", unsigned long portal = TEST_PORTAL);
std::string get_expected_pattern(std::string method);
void new_window();
@@ -86,4 +92,6 @@ class Heavy_keeper_tester
struct fieldstat_instance * instance;
void sort_map(std::unordered_map<std::string, struct metric_info> &metric_map);
std::vector<std::pair<std::string, int>> get_expected_topk(const std::string &metric_name, std::vector<struct action> &tmp_actions);
-}; \ No newline at end of file
+};
+
+void clear_file(const char *file_paths); \ No newline at end of file