#include "test_utils.hpp" #include #include #include #include #include using namespace std; extern struct prometheus_endpoint_instance g_prometheus_endpoint_instance; void Heavy_keeper_tester::register_metric(const char *metric_name, int K, int out_window) { int metric_id = fieldstat_register_topK(this->instance, metric_name, NULL, 0, K, out_window); struct metric *metric = get_metric(this->instance, metric_id); struct metric_info metric_info = {metric, metric_id, out_window}; metric_map[metric_name] = metric_info; } void Heavy_keeper_tester::take_action(const char *metric_name, const char *member, int value) { const struct metric_info &metric_i = metric_map[metric_name]; struct action action; action.metric_name = metric_name; action.member = member; action.value = value; this->actions.push_back(action); if (metric_i.output_window == 0) { bool found = false; for (size_t i = 0; i < accu_actions.size(); i++) { if (accu_actions[i].metric_name == metric_name && accu_actions[i].member == member) { accu_actions[i].value += value; found = true; break; } } if (!found) { this->accu_actions.push_back(action); } } fieldstat_value_incrby_topK(this->instance, metric_i.id, member, strlen(member), value); } Heavy_keeper_tester::Heavy_keeper_tester(const char *instance_name) { this->instance = fieldstat_instance_new(instance_name); } Heavy_keeper_tester::~Heavy_keeper_tester() { fieldstat_global_disable_prometheus_endpoint(); fieldstat_instance_free(this->instance); } void Heavy_keeper_tester::sort_map(std::unordered_map &metric_map) { vector> tmp; for (const auto &m : metric_map) { tmp.push_back(make_pair(m.first, m.second)); } sort(tmp.begin(), tmp.end(), [](std::pair a, std::pair b) {return a.second.id > b.second.id;} ); metric_map.clear(); for (const auto &item : tmp) { metric_map[item.first] = item.second; } } vector> Heavy_keeper_tester::get_expected_topk(const std::string &metric_name, std::vector &tmp_actions) { vector> input; for (auto &a : tmp_actions) { if (metric_name == a.metric_name) { input.push_back(make_pair(a.member, a.value)); } } std::sort(input.begin(), input.end(), [](std::pair a, std::pair b) {return a.second > b.second;} ); return input; } std::string Heavy_keeper_tester::get_expected_pattern(std::string method) { sort_map(metric_map); if (string(method) == "json") { size_t i = 0; Reg_str pattern[metric_map.size()]; for (auto &m : this->metric_map) { pattern[i] += Reg_str().add_text().add(m.first).add_text().parenthsis(); pattern[i].add("|"); // head first or content first, both is ok, since its json format. auto &tmp_actions = m.second.output_window == 0 ? this->accu_actions : this->actions; vector> input = get_expected_topk(m.first, tmp_actions); Reg_str content; for (const auto &item : input) { std::string tmp = ".*" + item.first + ".*" + std::to_string(item.second) + ".*"; content += Reg_str(tmp); } pattern[i] += content.parenthsis(); i++; } Reg_str contat; for (size_t i = 0; i < metric_map.size(); i++) { contat += pattern[i].parenthsis().add("\\n?"); // in line protocol parser, every metric is independent, and is in different JSON object. } contat.add_any(); return contat.get_str(); } else if (string(method) == "default") { Reg_str pattern[metric_map.size()]; size_t i = 0; for (auto &m : this->metric_map) { Reg_str head; head.add_any().add(m.first).add_space().add_endl(); pattern[i] += head.add_endl(); auto &tmp_actions = m.second.output_window == 0 ? this->accu_actions : this->actions; vector> input = get_expected_topk(m.first, tmp_actions); for (const auto &item : input) { std::string tmp = ".*" + item.first + ".*" + std::to_string(item.second) + ".*"; pattern[i] += Reg_str(tmp).add_endl(); } i++; } Reg_str contat; for (size_t i = 0; i < metric_map.size(); i++) { contat += pattern[i]; } contat.add_any(); return contat.get_str(); } else { cout << "method not supported" << endl; return ""; } } void Heavy_keeper_tester::new_window() { actions.clear(); } std::string Heavy_keeper_tester::read_from_file(const char *output_file_path, const char *method) { if (!instance->running) { int ret = fieldstat_set_local_output(instance, output_file_path, method); EXPECT_EQ(0, ret); fieldstat_instance_start(instance); } sleep(3); std::ifstream ifs(output_file_path); std::string content( (std::istreambuf_iterator(ifs) ), (std::istreambuf_iterator() ) ); return content; } std::string Heavy_keeper_tester::read_from_line_protocol(const char *output_file_path, const char *ip, unsigned long portal) { if (!instance->running) { int ret = fieldstat_set_line_protocol_server(instance, ip, portal); fieldstat_instance_start(instance); EXPECT_EQ(0, ret); } clear_file(TEST_LINE_PROTOCOL_FILE_PATH); sleep(3); std::ifstream ifs(output_file_path); std::string content( (std::istreambuf_iterator(ifs) ), (std::istreambuf_iterator() ) ); return content; } std::string Heavy_keeper_tester::read_from_prometheus(const char *output_file_path, unsigned long portal) { int ret; if (g_prometheus_endpoint_instance.running != 1) { ret = fieldstat_global_enable_prometheus_endpoint(portal, NULL); EXPECT_EQ(0, ret); ret = fieldstat_enable_prometheus_output(instance); EXPECT_EQ(0, ret); } if (!instance->running) { fieldstat_instance_start(instance); } clear_file(TEST_PROMETHEUS_FILE_PATH); sleep(3); char cmd[128]; sprintf(cmd, "curl -s http://127.0.0.1:%lu/metrics -o %s", portal, output_file_path); system(cmd); std::ifstream ifs(output_file_path); std::string content( (std::istreambuf_iterator(ifs) ), (std::istreambuf_iterator() ) ); return content; } void clear_file(const char *file_paths) { std::string clear_cmd = "cat /dev/null > " + std::string(file_paths); system(clear_cmd.c_str()); } int Heavy_keeper_extension_test::add_metric_of_counter(const char *metric_name, int out_window) { return fieldstat_register(this->instance, FIELD_TYPE_COUNTER, metric_name, NULL, 0); } struct fieldstat_instance *Heavy_keeper_extension_test::get_instance() { return this->instance; }