summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchenzizhan <[email protected]>2023-05-26 20:07:03 +0800
committerchenzizhan <[email protected]>2023-05-26 20:07:03 +0800
commita1f417a75fe3361b7645cd49389a6e4a98d12308 (patch)
tree852f158b6a1cc505fa4a1be6dc3e5b20645bc6b8
parent5f520ba18cfc92f3802fc3e6077504a6a4ec804e (diff)
output window implemented. Overhaul gtest with cpp. 2 test cases on window output
-rwxr-xr-xCMakeLists.txt1
-rw-r--r--inc/fieldstat.h6
-rwxr-xr-xlib/libheavykeeper.sobin200936 -> 200992 bytes
-rw-r--r--src/fieldstat.cpp30
-rw-r--r--src/fieldstat_internal.h11
-rw-r--r--src/file_output.cpp34
-rw-r--r--src/heavy_keeper.h280
-rw-r--r--src/line_protocol_output.cpp10
-rw-r--r--test/src/CMakeLists.txt22
-rw-r--r--test/src/gtest_fieldstat_output_file_instance.cpp186
-rw-r--r--test/src/test_utils.cpp149
-rw-r--r--test/src/test_utils.hpp92
12 files changed, 558 insertions, 263 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 64b089f..5a731d9 100755
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -39,6 +39,7 @@ if (CMAKE_CXX_CPPCHECK)
"--suppress=nullPointerRedundantCheck"
"--suppress=ctunullpointer"
"--suppress=shadowFunction"
+ "--suppress=useStlAlgorithm" # only affect test because source code is writen by C
)
set(CMAKE_C_CPPCHECK ${CMAKE_CXX_CPPCHECK})
else()
diff --git a/inc/fieldstat.h b/inc/fieldstat.h
index 19667e5..c52a99a 100644
--- a/inc/fieldstat.h
+++ b/inc/fieldstat.h
@@ -216,7 +216,7 @@ int fieldstat_register_summary(struct fieldstat_instance *instance, const char *
* @param n_tag Size of tags
* @param K How many fields to keep
**/
-int fieldstat_register_heavy_keeper(struct fieldstat_instance *instance, const char *field_name, const struct fieldstat_tag tags[], size_t n_tag, int K);
+int fieldstat_register_heavy_keeper(struct fieldstat_instance *instance, const char *field_name, const struct fieldstat_tag tags[], size_t n_tag, int K, int output_window);
/**
* Register dynamic fieldstat instance.
@@ -294,7 +294,7 @@ int fieldstat_dynamic_metric_value_incrby(struct fieldstat_dynamic_instance *ins
* @param thread_id The thread id of the call.
* @return -1 is failed. 0 is success.
*/
-// TODO:heavy keeper根本不支持这种操作
+// TODO:heavy keeper根本不支持这种操作, 错误提示信息?
int fieldstat_dynamic_metric_value_set(struct fieldstat_dynamic_instance *instance, enum field_type type, const char *field_name, long long value, const struct fieldstat_tag tags[], size_t n_tags, int thread_id);
/**
* fieldstat dynamic instance metric value decrment operate. metirc type in[gauge, counter]
@@ -307,7 +307,7 @@ int fieldstat_dynamic_metric_value_set(struct fieldstat_dynamic_instance *instan
* @param thread_id The thread id of the call.
* @return -1 is failed. 0 is success.
*/
-// TODO: heavy keeper 根本不支持这种操作
+// TODO: heavy keeper 根本不支持这种操作,错误提示信息?
int fieldstat_dynamic_metric_value_decrby(struct fieldstat_dynamic_instance *instance, enum field_type type, const char *field_name, long long value, const struct fieldstat_tag tags[], size_t n_tags, int thread_id);
/**
* fieldstat dynamic instance table metric value incrment operate.
diff --git a/lib/libheavykeeper.so b/lib/libheavykeeper.so
index 19f7dc8..1852a4d 100755
--- a/lib/libheavykeeper.so
+++ b/lib/libheavykeeper.so
Binary files differ
diff --git a/src/fieldstat.cpp b/src/fieldstat.cpp
index ec42c2b..ee07ac6 100644
--- a/src/fieldstat.cpp
+++ b/src/fieldstat.cpp
@@ -167,6 +167,19 @@ struct metric * metric_new(enum field_type type, const char *field_name, const s
return metric;
}
+void free_heavy_keeper(struct heavy_keeper_t *hk)
+{
+ if (hk->chaning != NULL) {
+ heavy_keeper_free(hk->chaning);
+ }
+ if (hk->accumulated != NULL) {
+ heavy_keeper_free(hk->accumulated);
+ }
+ if (hk->previous_changed != NULL) {
+ heavy_keeper_free(hk->previous_changed);
+ }
+}
+
void metric_free(struct metric *metric)
{
int i = 0;
@@ -188,9 +201,7 @@ void metric_free(struct metric *metric)
{
metric->table = NULL;
}
- if (metric->heavy_keeper != NULL) {
- free(metric->heavy_keeper);
- }
+ free_heavy_keeper(&metric->heavy_keeper);
free(metric);
@@ -923,7 +934,7 @@ int fieldstat_value_incrby_topK(struct fieldstat_instance *instance, int metric_
{
return -1;
}
- heavy_keeper_add(metric->heavy_keeper, key, key_len, (unsigned int)increment);
+ heavy_keeper_add(metric->heavy_keeper.chaning, key, key_len, (unsigned int)increment);
return 1;
}
@@ -986,7 +997,7 @@ error_out:
// field name: 用来打印的名字,不是很重要
// tags: 没看懂有啥用,好像也是只有在打印的时候有用。另外有一处调用:is_output_prometheus。总之可以发现这个tag写进去就不变了。
// n_tag: sizeof(tags) / sizeof(tags[0])
-int fieldstat_register_heavy_keeper(struct fieldstat_instance *instance, const char *field_name, const struct fieldstat_tag tags[], size_t n_tag, int K)
+int fieldstat_register_heavy_keeper(struct fieldstat_instance *instance, const char *field_name, const struct fieldstat_tag tags[], size_t n_tag, int K, int output_window)
{
if(!is_valid_field_name(field_name))
{
@@ -999,12 +1010,17 @@ int fieldstat_register_heavy_keeper(struct fieldstat_instance *instance, const c
enum field_type type = FIELD_TYPE_TOPK;
struct metric *metric = metric_new(type, field_name, tags, n_tag);
- metric->heavy_keeper = heavy_keeper_new(K, NULL);
+ metric->heavy_keeper.chaning = heavy_keeper_new(K, NULL);
+ metric->heavy_keeper.accumulated = heavy_keeper_new(K, NULL);
+ metric->heavy_keeper.previous_changed = NULL;
+ metric->heavy_keeper.K = K;
+ metric->output_window = output_window;
+
+ atomic_inc(&instance->topk_cnt);
int metric_id = atomic_inc(&instance->metric_cnt) - 1;
struct metric **metric_slot = read_metric_slot(instance, metric_id);
- atomic_inc(&instance->topk_cnt);
*metric_slot = metric;
return metric_id;
}
diff --git a/src/fieldstat_internal.h b/src/fieldstat_internal.h
index e72ddf9..934a99d 100644
--- a/src/fieldstat_internal.h
+++ b/src/fieldstat_internal.h
@@ -100,6 +100,14 @@ struct histogram_t
double *bins;
};
+struct heavy_keeper_t
+{
+ struct heavy_keeper *chaning;
+ struct heavy_keeper *accumulated;
+ struct heavy_keeper *previous_changed;
+ int K;
+};
+
struct table_line
{
char *name;
@@ -145,8 +153,7 @@ struct metric
struct stat_unit_t counter;
struct stat_unit_t gauge;
struct histogram_t histogram;
- struct heavy_keeper *heavy_keeper; // notice that heavy_keeper is an pointer
- // TODO: actually there are 3: topK in total, top k from last record to this one, topK tmp(copy this hk and hk to be another one)
+ struct heavy_keeper_t heavy_keeper;
};
};
diff --git a/src/file_output.cpp b/src/file_output.cpp
index 90a3b1a..6e98a66 100644
--- a/src/file_output.cpp
+++ b/src/file_output.cpp
@@ -530,6 +530,15 @@ static int output_file_format_default_type_histogram_and_summary(struct fieldsta
return used_len;
}
+struct heavy_keeper *choose_heavy_keeper_for_output(struct metric *metric)
+{
+ if (metric->output_window == 0) {
+ return metric->heavy_keeper.accumulated;
+ } else {
+ return metric->heavy_keeper.previous_changed;
+ }
+}
+
cJSON *output_file_type_topk_json(struct metric *metric) {
cJSON *out_obj = cJSON_CreateObject();
cJSON_AddStringToObject(out_obj, "name", metric->field_name);
@@ -537,7 +546,7 @@ cJSON *output_file_type_topk_json(struct metric *metric) {
cJSON *topk_array = cJSON_CreateArray();
- struct heavy_keeper *hk = metric->heavy_keeper;
+ struct heavy_keeper *hk = choose_heavy_keeper_for_output(metric);
struct heavy_keeper_result *result = heavy_keeper_query(hk);
cJSON *tmp_obj = NULL;
@@ -553,6 +562,22 @@ cJSON *output_file_type_topk_json(struct metric *metric) {
return out_obj;
}
+void heavy_keeper_archieve_before_output(struct metric *metric)
+{
+ if (metric->heavy_keeper.accumulated == NULL) {
+ metric->heavy_keeper.accumulated = heavy_keeper_new(metric->heavy_keeper.K, NULL);
+ }
+ heavy_keeper_merge(metric->heavy_keeper.accumulated, metric->heavy_keeper.chaning);
+ heavy_keeper_replicas_all_join_to_mine(metric->heavy_keeper.accumulated); // Let heavy_keeper.accumulated always have only one replica, so that memory usage will remain constant.
+
+ if (metric->heavy_keeper.previous_changed != NULL) {
+ heavy_keeper_free(metric->heavy_keeper.previous_changed);
+ (void)atomic_set(&(metric->heavy_keeper.previous_changed), NULL);
+ }
+ metric->heavy_keeper.previous_changed = atomic_read(&(metric->heavy_keeper.chaning));
+ (void)atomic_set(&(metric->heavy_keeper.chaning), heavy_keeper_new(metric->heavy_keeper.K, NULL));
+}
+
int output_file_format_json(struct fieldstat_instance *instance, int n_cur_metric, char **print_buf)
{
int i = 0;
@@ -669,6 +694,7 @@ int output_file_format_json(struct fieldstat_instance *instance, int n_cur_metri
}
}
case FIELD_TYPE_TOPK:
+ heavy_keeper_archieve_before_output(metric);
cJSON_AddItemToArray(metrics_array_obj, output_file_type_topk_json(metric));
break;
default:
@@ -750,8 +776,10 @@ void output_file_type_topk_default(struct fieldstat_instance *instance, int curr
if (metric->field_type != FIELD_TYPE_TOPK) {
continue;
}
+
+ heavy_keeper_archieve_before_output(metric);
names[metric_num] = metric->field_name;
- hks[metric_num] = metric->heavy_keeper;
+ hks[metric_num] = choose_heavy_keeper_for_output(metric);
metric_num++;
}
@@ -843,6 +871,7 @@ int file_output(struct fieldstat_instance *instance,long long interval_ms)
used_len = output_file_format_json(instance, current_metric_cnt, &print_buf);
}
+ ftruncate(fileno(instance->local_output_fp), 0); // clear file
fseek(instance->local_output_fp, 0, SEEK_SET);
if(print_buf)
{
@@ -860,6 +889,7 @@ int file_output(struct fieldstat_instance *instance,long long interval_ms)
output_file_type_topk_default(instance, current_metric_cnt); // directly output to file. topk is long, so use another buffer to hold it.
}
+ printf("file_output done\n");
return 0;
}
diff --git a/src/heavy_keeper.h b/src/heavy_keeper.h
index 5e1b7bf..e971b10 100644
--- a/src/heavy_keeper.h
+++ b/src/heavy_keeper.h
@@ -1,133 +1,149 @@
-#pragma once
-
-#include <stddef.h>
-
-/* ---------------------------------------------------------------------------------------
-TOP K using heavykeeper algorithm.
-Author: Chen Zizhan
-Time: 2023/4/26
-
-# usage example
-struct heavy_keeper *hk = heavy_keeper_new(K, NULL);
-
-// or: run heavykeeper with user defined params:
-// heavy_keeper_options params = {5, 1000, 1.5};
-// struct heavy_keeper *hk = heavy_keeper_new(K, &params);
-
-const char flow_id[] = "any string representing url, ip, etc";
-unsigned count = 1; // if count the number of session/packet/...
-// Or: if sum up the total traffic volume in bytes
-// unsigned count = 700;
-
-heavy_keeper_add(hk, flow_id, strlen(flow_id), count);
-// repeat update for every flow ... ... ... ...
-// ...........................
-struct heavy_keeper_result* stats = heavy_keeper_result_new(K);
-heavy_keeper_query(hk, stats);
-
-heavy_keeper_free(hk);
-
-use the stats variable
-For example :
- for (int i = 0; i < stats->n_key; i++) {
- printf("top %d flow id is: %s, its count: #llu", i, stats->key[i], stats->count[i]);
- }
-
-heavy_keeper_result_free(stats);
-
----------------------------------------------------------------------------------------------*/
-
-#ifdef __cplusplus
-extern "C"{
-#endif
-
-// The handler of heavykeeper
-struct heavy_keeper;
-
-// Query result
-struct heavy_keeper_result {
- size_t n_key; // number of result
- char **key; // key[i] is the flow id, i < n_key
- size_t *key_len; // key_len[i] is the length flow id string, i < n_key
- unsigned *count; // count[i] is the count of the corresponding flow id
-};
-
-// Parameters used in algorithm
-struct heavy_keeper_options{
- int array_num; // the size of the array. Default value: 4
- // Set it by how accurate you want. Value of 4 garentees an accuray more than 90% and around 97% in all tests.
- // Not too big because it have an impact on both time and memory efficiency.
- int max_bucket_num; // M2, the maximum number of buckets every array keeps. Default value: k*log(k) and no less than 100.
- // Basically, as long as big enough, it won't affect the accuracy significantly.
- double decay_exp_rate; // b, bigger variance of flow size is(elephant flows take more ratio), smaller it should be.
- // Must bigger than 1. Better not bigger than 1.3, otherwise some elephant flow will be missed.
-};
-
-/**
- * @brief Create a heavy keeper.
- * @param max_query_num you want to get a summery of top-max_query_num flows.
- * @param params parameters used in algorithm. If NULL, default params will be used.
- * @returns a pointer to the heavy keeper.
- */
-struct heavy_keeper *heavy_keeper_new(int max_query_num, struct heavy_keeper_options *params);
-
-/**
- * @brief free a heavy keeper.
- * @param hk the pointer to the heavy keeper.
- */
-void heavy_keeper_free(struct heavy_keeper *hk);
-
-/**
- * @brief Add a flow to the heavy keeper.
- * @param hk the pointer to the heavy keeper.
- * @param key the flow id string.
- * @param key_len the length of the flow id string.
- * @param count the count of the flow or its traffic volume in bytes.
- */
-void heavy_keeper_add(struct heavy_keeper *hk, const char *key, size_t key_len, unsigned int count);
-
-/**
- * @brief Query the top-K flows.The flow id with bigger count ranks at smaller index.
- * @param hk the pointer to the heavy keeper.
- * @return a pointer to the heavy keeper query result. User should free it after use by calling heavy_keeper_result_free.
- */
-struct heavy_keeper_result *heavy_keeper_query(struct heavy_keeper *hk);
-
-/**
- * @brief free a heavy keeper query result.
- * @param stats_hd the pointer to the heavy keeper query result.
- */
-void heavy_keeper_result_free(struct heavy_keeper_result *stats_hd);
-
-/**
- * @brief Merge two heavy keeper.
- * @param hk the pointer to the heavy keeper.
- * @param hk_merged the pointer to the heavy keeper to be merged. Must have the same params with hk.
- * @returns 0 if success, -1 if failed.
- */
-int heavy_keeper_merge(struct heavy_keeper *hk, const struct heavy_keeper *src);
-
-/**
- * @brief Serialize a heavy keeper.
- * @param hk the pointer to the heavy keeper.
- * @param blob output of the serialized data. Must be freed by caller. *blob can be NULL, the function will allocate memory for it.
- * @param blob_sz Output, the size of the serialized data.
-*/
-void heavy_keeper_serialization(const struct heavy_keeper *hk, char **blob, size_t *blob_sz);
-
-/**
- * @brief Deserialize a heavy keeper.
- * @param blob the serialized data. From heavy_keeper_serialization.
- * @param size the size of the serialized data.
- * @returns a pointer to the heavy keeper.
-*/
-struct heavy_keeper *heavy_keeper_deserialization(const char *blob, size_t size);
-
-/*
- *@returns the total memory usage of the heavy keeper.
-*/
-size_t heavy_keeper_cal_total_memory_usage(const struct heavy_keeper *hk);
-
-#ifdef __cplusplus
-}
+#pragma once
+
+#include <stddef.h>
+#include <uuid/uuid.h>
+
+/* ---------------------------------------------------------------------------------------
+TOP K using heavykeeper algorithm.
+Author: Chen Zizhan
+Time: 2023/4/26
+
+# usage example
+struct heavy_keeper *hk = heavy_keeper_new(K, NULL);
+
+// or: run heavykeeper with user defined params:
+// heavy_keeper_options params = {5, 1000, 1.5};
+// struct heavy_keeper *hk = heavy_keeper_new(K, &params);
+
+const char flow_id[] = "any string representing url, ip, etc";
+unsigned count = 1; // if count the number of session/packet/...
+// Or: if sum up the total traffic volume in bytes
+// unsigned count = 700;
+
+heavy_keeper_add(hk, flow_id, strlen(flow_id), count);
+// repeat update for every flow ... ... ... ...
+// ...........................
+struct heavy_keeper_result* stats = heavy_keeper_result_new(K);
+heavy_keeper_query(hk, stats);
+
+heavy_keeper_free(hk);
+
+use the stats variable
+For example :
+ for (int i = 0; i < stats->n_key; i++) {
+ printf("top %d flow id is: %s, its count: #llu", i, stats->key[i], stats->count[i]);
+ }
+
+heavy_keeper_result_free(stats);
+
+---------------------------------------------------------------------------------------------*/
+
+#ifdef __cplusplus
+extern "C"{
+#endif
+
+// The handler of heavykeeper
+struct heavy_keeper;
+
+// Query result
+struct heavy_keeper_result {
+ size_t n_key; // number of result
+ char **key; // key[i] is the flow id, i < n_key
+ size_t *key_len; // key_len[i] is the length flow id string, i < n_key
+ unsigned *count; // count[i] is the count of the corresponding flow id
+};
+
+// Parameters used in algorithm
+struct heavy_keeper_options{
+ int array_num; // the size of the array. Default value: 4
+ // Set it by how accurate you want. Value of 4 garentees an accuray more than 90% and around 97% in all tests.
+ // Not too big because it have an impact on both time and memory efficiency.
+ int max_bucket_num; // M2, the maximum number of buckets every array keeps. Default value: k*log(k) and no less than 100.
+ // Basically, as long as big enough, it won't affect the accuracy significantly.
+ double decay_exp_rate; // b, bigger variance of flow size is(elephant flows take more ratio), smaller it should be.
+ // Must bigger than 1. Better not bigger than 1.3, otherwise some elephant flow will be missed.
+};
+
+/**
+ * @brief Create a heavy keeper.
+ * @param max_query_num you want to get a summery of top-max_query_num flows.
+ * @param params parameters used in algorithm. If NULL, default params will be used.
+ * @returns a pointer to the heavy keeper.
+ */
+struct heavy_keeper *heavy_keeper_new(int max_query_num, struct heavy_keeper_options *params);
+
+/**
+ * @brief Create a heavy keeper.
+ * @param max_query_num you want to get a summery of top-max_query_num flows.
+ * @param params parameters used in algorithm. If NULL, default params will be used.
+ * @param uuid any uuid_t typed unsigned char[16] you want to use to identify a heavykeeper uniquely.
+ * @returns a pointer to the heavy keeper.
+ */
+struct heavy_keeper *heavy_keeper_new_with_uuid(int max_query_num, struct heavy_keeper_options *params, uuid_t uuid);
+
+/**
+ * @brief free a heavy keeper.
+ * @param hk the pointer to the heavy keeper.
+ */
+void heavy_keeper_free(struct heavy_keeper *hk);
+
+/**
+ * @brief Add a flow to the heavy keeper.
+ * @param hk the pointer to the heavy keeper.
+ * @param key the flow id string.
+ * @param key_len the length of the flow id string.
+ * @param count the count of the flow or its traffic volume in bytes.
+ */
+void heavy_keeper_add(struct heavy_keeper *hk, const char *key, size_t key_len, unsigned int count);
+
+/**
+ * @brief Query the top-K flows.The flow id with bigger count ranks at smaller index.
+ * @param hk the pointer to the heavy keeper.
+ * @return a pointer to the heavy keeper query result. User should free it after use by calling heavy_keeper_result_free.
+ */
+struct heavy_keeper_result *heavy_keeper_query(struct heavy_keeper *hk);
+
+/**
+ * @brief free a heavy keeper query result.
+ * @param stats_hd the pointer to the heavy keeper query result.
+ */
+void heavy_keeper_result_free(struct heavy_keeper_result *stats_hd);
+
+/**
+ * @brief Merge two heavy keeper.
+ * @param hk the pointer to the heavy keeper.
+ * @param hk_merged the pointer to the heavy keeper to be merged. Must have the same params with hk.
+ * @returns 0 if success, -1 if failed.
+ */
+int heavy_keeper_merge(struct heavy_keeper *hk, const struct heavy_keeper *src);
+
+/**
+ * @brief Merge all replicas of a heavy keeper to my replica, and delete all the others, so as to save memory. This function is only safe to use when the hk is not merged to the others.
+ * @param hk the pointer to the heavy keeper.
+ */
+void heavy_keeper_replicas_all_join_to_mine(struct heavy_keeper *hk);
+
+/**
+ * @brief Serialize a heavy keeper.
+ * @param hk the pointer to the heavy keeper.
+ * @param blob output of the serialized data. Must be freed by caller. *blob can be NULL, the function will allocate memory for it.
+ * @param blob_sz Output, the size of the serialized data.
+*/
+void heavy_keeper_serialization(const struct heavy_keeper *hk, char **blob, size_t *blob_sz);
+
+/**
+ * @brief Deserialize a heavy keeper.
+ * @param blob the serialized data. From heavy_keeper_serialization.
+ * @param size the size of the serialized data.
+ * @returns a pointer to the heavy keeper.
+*/
+struct heavy_keeper *heavy_keeper_deserialization(const char *blob, size_t size);
+
+/*
+ *@returns the total memory usage of the heavy keeper.
+*/
+size_t heavy_keeper_cal_total_memory_usage(const struct heavy_keeper *hk);
+
+#ifdef __cplusplus
+}
#endif \ No newline at end of file
diff --git a/src/line_protocol_output.cpp b/src/line_protocol_output.cpp
index 3b8ef00..a753ea9 100644
--- a/src/line_protocol_output.cpp
+++ b/src/line_protocol_output.cpp
@@ -307,11 +307,8 @@ static void output_line_protocol_table_row(struct fieldstat_instance *instance,
return;
}
-
-
int line_protocol_output(struct fieldstat_instance *instance)
{
-
//print current time instance start
int tables_row_cnt[TABLE_MAX_NUM];
int n_cur_table = instance->table_num;
@@ -324,6 +321,13 @@ int line_protocol_output(struct fieldstat_instance *instance)
output_line_protocol_table_row(instance, n_cur_table, tables_row_cnt);
flush_send_buf(&instance->line_protocol_output);
+ // TODO: heavykeeper 的line protocol 实现
+ // Measurement : field name
+ // field: member:score pair
+ // tag: 参考别的
+
+ // TODO: 我怀疑当前已经有一点输出了,首先得解耦。
+
return 0;
}
diff --git a/test/src/CMakeLists.txt b/test/src/CMakeLists.txt
index 57a01d5..1fae028 100644
--- a/test/src/CMakeLists.txt
+++ b/test/src/CMakeLists.txt
@@ -3,20 +3,20 @@ cmake_minimum_required(VERSION 2.8)
include_directories(${PROJECT_SOURCE_DIR}/inc/)
include_directories(${PROJECT_SOURCE_DIR}/src/)
add_definitions(-std=c++11)
-add_executable(gtest_rule ${PROJECT_SOURCE_DIR}/src/tsg_rule.cpp ${PROJECT_SOURCE_DIR}/src/tsg_bridge.cpp ${PROJECT_SOURCE_DIR}/src/tsg_leaky_bucket.cpp gtest_common.cpp gtest_rule.cpp)
-target_link_libraries(gtest_rule gtest-static ctemplate-static cjson MESA_prof_load MESA_handle_logger MESA_jump_layer MESA_field_stat2 maatframe)
+#add_executable(gtest_rule ${PROJECT_SOURCE_DIR}/src/tsg_rule.cpp ${PROJECT_SOURCE_DIR}/src/tsg_bridge.cpp ${PROJECT_SOURCE_DIR}/src/tsg_leaky_bucket.cpp gtest_common.cpp gtest_rule.cpp)
+#target_link_libraries(gtest_rule gtest-static ctemplate-static cjson MESA_prof_load MESA_handle_logger MESA_jump_layer MESA_field_stat2 maatframe)
-add_executable(gtest_fieldstat_interface ${SRC} gtest_fieldstat_interface.cpp)
-target_link_libraries(gtest_fieldstat_interface gtest-static heavykeeper)
+# add_executable(gtest_fieldstat_interface ${SRC} gtest_fieldstat_interface.cpp)
+# target_link_libraries(gtest_fieldstat_interface gtest-static heavykeeper)
-add_executable(gtest_fieldstat_output ${SRC} gtest_fieldstat_output.cpp)
-target_link_libraries(gtest_fieldstat_output gtest-static heavykeeper)
+# add_executable(gtest_fieldstat_output ${SRC} gtest_fieldstat_output.cpp)
+# target_link_libraries(gtest_fieldstat_output gtest-static heavykeeper)
-add_executable(gtest_dynamic_fieldstat ${SRC} gtest_dynamic_fieldstat.cpp)
-target_link_libraries(gtest_dynamic_fieldstat gtest-static heavykeeper)
+# add_executable(gtest_dynamic_fieldstat ${SRC} gtest_dynamic_fieldstat.cpp)
+# target_link_libraries(gtest_dynamic_fieldstat gtest-static heavykeeper)
-add_executable(gtest_dynamic_fieldstat_output ${SRC} gtest_dynamic_fieldstat_output.cpp)
-target_link_libraries(gtest_dynamic_fieldstat_output gtest-static heavykeeper)
+# add_executable(gtest_dynamic_fieldstat_output ${SRC} gtest_dynamic_fieldstat_output.cpp)
+# target_link_libraries(gtest_dynamic_fieldstat_output gtest-static heavykeeper)
-add_executable(gtest_fieldstat_output_file_instance ${SRC} gtest_fieldstat_output_file_instance.cpp)
+add_executable(gtest_fieldstat_output_file_instance ${SRC} gtest_fieldstat_output_file_instance.cpp test_utils.cpp)
target_link_libraries(gtest_fieldstat_output_file_instance gtest-static heavykeeper)
diff --git a/test/src/gtest_fieldstat_output_file_instance.cpp b/test/src/gtest_fieldstat_output_file_instance.cpp
index 256c983..59b8c21 100644
--- a/test/src/gtest_fieldstat_output_file_instance.cpp
+++ b/test/src/gtest_fieldstat_output_file_instance.cpp
@@ -1,10 +1,13 @@
#include <stdio.h>
#include <string.h>
#include <unistd.h>
+#include <vector>
#include <gtest/gtest.h>
#include "fieldstat.h"
#include "fieldstat_internal.h"
+#include "test_utils.hpp"
+
extern struct prometheus_endpoint_instance g_prometheus_endpoint_instance;
// TEST(FeildStatOutput, OutputMaatTable)
@@ -235,8 +238,6 @@ extern struct prometheus_endpoint_instance g_prometheus_endpoint_instance;
#include <regex>
-const std::regex CONTENT_PATTERN("^([0-9]+).*");
-
std::vector<std::string> add_to_heavy_keeper(struct fieldstat_instance * instance, int metric_id, std::vector<std::pair<std::string, int>> &input)
{
for (auto item : input) {
@@ -265,113 +266,92 @@ std::string convert_default_pattern_to_json(std::vector<std::string> &patterns)
return pattern;
}
-TEST(FeildStatOutput, output_topk_default)
+Heavy_keeper_tester *Construct_default_tester(int output_window = 1)
{
- struct fieldstat_instance * instance = fieldstat_instance_new("heavy_keeper_test");
- const char m1_name[] = "field1";
- const char m2_name[] = "field2";
- int m_id1 = fieldstat_register_heavy_keeper(instance, m1_name, NULL, 0, 10);
- int m_id2 = fieldstat_register_heavy_keeper(instance, m2_name, NULL, 0, 10);
-
- std::vector<std::pair<std::string, int>> input;
- input.push_back(std::make_pair("hello", 10));
- input.push_back(std::make_pair("HELLO", 5));
- input.push_back(std::make_pair("123123123123123123jjkasdlfjasldjkfkjjk", 9999));
- std::vector<std::string> patterns1 = add_to_heavy_keeper(instance, m_id1, input);
- input.clear();
- input.push_back(std::make_pair("world", 10));
- input.push_back(std::make_pair("WORLD", 42));
- std::vector<std::string> patterns2 = add_to_heavy_keeper(instance, m_id2, input);
-
- const char *output_file_path = "/tmp/czzdefault.txt";
- char line[2048] = {0};
- int ret = fieldstat_set_local_output(instance, output_file_path, "default");
- EXPECT_EQ(0, ret);
- fieldstat_instance_start(instance);
- sleep(3);
-
- FILE *fp = fopen(output_file_path, "r");
- EXPECT_NE(nullptr, fp);
- size_t rank = 0;
- std::vector<std::string> *using_pattern = &patterns1;
- int content_count = 0;
- while(!feof(fp))
- {
- if(NULL == fgets(line, sizeof(line), fp)) {
- continue;
- }
-
- // remove \n
- line[strlen(line) - 1] = '\0';
-
- if (std::regex_match(line, CONTENT_PATTERN)) {
- content_count++;
- printf("line: %s\n", line);
- EXPECT_TRUE(std::regex_match(line, std::regex((*using_pattern)[rank++])));
- } else {
- if (std::regex_match(line, std::regex(".*" + std::string(m2_name) + ".*"))) {
- using_pattern = &patterns2;
- rank = 0;
- } else if (std::regex_match(line, std::regex(".*" + std::string(m1_name) + ".*"))) {
- using_pattern = &patterns1;
- rank = 0;
- }
- }
- }
- int expected_count = patterns1.size() + patterns2.size();
- EXPECT_EQ(content_count, expected_count);
-
- fclose(fp);
- fieldstat_instance_free(instance);
+ Heavy_keeper_tester *tester = new Heavy_keeper_tester("heavy_keeper_test");
+
+ tester->register_metric("field1", 10, output_window);
+ tester->register_metric("field2", 10, output_window);
+
+ tester->take_action("field1", "hello", 10);
+ tester->take_action("field1", "HELLO", 5);
+ tester->take_action("field1", "123123123123123123jjkasdlfjasldjkfkjjk", 9999);
+
+ tester->take_action("field2", "world", 10);
+ tester->take_action("field2", "WORLD", 42);
+
+ printf("Construct_default_tester done\n");
+
+ return tester;
}
+TEST(FeildStatOutput, output_topk_default)
+{
+ Heavy_keeper_tester *tester = Construct_default_tester();
+ std::string pattern_expect = tester->get_expected_pattern("default");
+ std::string file_content = tester->write_to_file("/tmp/czzdefault.txt", "default");
+
+ EXPECT_TRUE(std::regex_match(file_content, std::regex(pattern_expect)));
+
+ delete tester;
+}
TEST(FeildStatOutput, output_topk_json)
{
- struct fieldstat_instance * instance = fieldstat_instance_new("heavy_keeper_test");
- const char m1_name[] = "field1";
- const char m2_name[] = "field2";
- int m_id1 = fieldstat_register_heavy_keeper(instance, m1_name, NULL, 0, 10);
- int m_id2 = fieldstat_register_heavy_keeper(instance, m2_name, NULL, 0, 10);
-
- std::vector<std::pair<std::string, int>> input;
- input.push_back(std::make_pair("hello", 10));
- input.push_back(std::make_pair("HELLO", 5));
- input.push_back(std::make_pair("123123123123123123jjkasdlfjasldjkfkjjk", 9999));
- std::vector<std::string> patterns = add_to_heavy_keeper(instance, m_id1, input);
- std::string pattern1 = convert_default_pattern_to_json(patterns);
- input.clear();
- input.push_back(std::make_pair("world", 10));
- input.push_back(std::make_pair("WORLD", 42));
- patterns = add_to_heavy_keeper(instance, m_id2, input);
- std::string pattern2 = convert_default_pattern_to_json(patterns);
-
- const char *output_file_path = "/tmp/czzjson.txt";
- char line[2048] = {0};
- int ret = fieldstat_set_local_output(instance, output_file_path, "json");
- EXPECT_EQ(0, ret);
- fieldstat_instance_start(instance);
- sleep(3);
-
- FILE *fp = fopen(output_file_path, "r");
- EXPECT_NE(nullptr, fp);
-
- while(!feof(fp))
- {
- if(NULL == fgets(line, sizeof(line), fp)) {
- continue;
- }
-
- // remove \n
- // line[strlen(line) - 1] = '\0';
- printf("line of json: %s\n", line);
- EXPECT_TRUE(std::regex_match(line, std::regex(pattern1)));
- EXPECT_TRUE(std::regex_match(line, std::regex(pattern2)));
- break; // only one line
- }
-
- fclose(fp);
- fieldstat_instance_free(instance);
+ Heavy_keeper_tester *tester = Construct_default_tester();
+ std::string pattern_expect = tester->get_expected_pattern("json");
+ std::string file_content = tester->write_to_file("/tmp/czzjson.txt", "json");
+ EXPECT_TRUE(std::regex_match(file_content, std::regex(pattern_expect)));
+ delete tester;
+}
+
+std::vector<std::string> splitString(const std::string& str)
+{
+ std::vector<std::string> tokens;
+
+ std::string::size_type pos = 0;
+ std::string::size_type prev = 0;
+ while ((pos = str.find('\n', prev)) != std::string::npos) {
+ tokens.push_back(str.substr(prev, pos - prev));
+ prev = pos + 1;
+ }
+ tokens.push_back(str.substr(prev));
+
+ return tokens;
+}
+
+const std::regex CONTENT_PATTERN("^([0-9]+).*");
+TEST(FeildStatOutput, output_topk_default_flushed_up)
+{
+ Heavy_keeper_tester *tester = Construct_default_tester(1);
+ std::string pattern_expect = tester->get_expected_pattern("default");
+ std::string file_content1 = tester->write_to_file("/tmp/czzdefault.txt", "default");
+ std::string file_content2 = tester->write_to_file("/tmp/czzdefault.txt", "default");
+
+ EXPECT_TRUE(std::regex_match(file_content1, std::regex(pattern_expect)));
+ std::vector<std::string> lines2 = splitString(file_content2);
+ for (auto line : lines2) { // only have headers
+ EXPECT_FALSE(std::regex_match(line, std::regex(CONTENT_PATTERN)));
+ }
+
+ delete tester;
+}
+
+TEST(FeildStatOutput, output_topk_default_accumulated)
+{
+ Heavy_keeper_tester *tester = Construct_default_tester(0);
+
+ (void)tester->write_to_file("/tmp/czzdefault.txt", "default");
+
+ tester->take_action("field1", "hello", 20); // hello set to 30
+ tester->take_action("field1", "Hello_new", 20);
+ std::string expected2 = tester->get_expected_pattern("default");
+
+ std::string file_content2 = tester->write_to_file("/tmp/czzdefault.txt", "default");
+
+ EXPECT_TRUE(std::regex_match(file_content2, std::regex(expected2)));
+
+ delete tester;
}
int main(int argc, char *argv[])
diff --git a/test/src/test_utils.cpp b/test/src/test_utils.cpp
new file mode 100644
index 0000000..05490d6
--- /dev/null
+++ b/test/src/test_utils.cpp
@@ -0,0 +1,149 @@
+#include "test_utils.hpp"
+#include <regex>
+#include <iostream>
+#include <fstream>
+#include <gtest/gtest.h>
+
+using namespace std;
+
+
+void Heavy_keeper_tester::register_metric(const char *metric_name, int K, int out_window)
+{
+ int metric_id = fieldstat_register_heavy_keeper(this->instance, metric_name, NULL, 0, K, out_window);
+ struct metric *metric = get_metric(this->instance, metric_id);
+ struct metric_info metric_info = {metric, metric_id, out_window};
+ metric_map[metric_name] = metric_info;
+}
+
+void Heavy_keeper_tester::take_action(const char *metric_name, const char *member, int value)
+{
+ // 1. 从map里找到metric
+ const struct metric_info &metric_i = metric_map[metric_name];
+
+ // 2. 保存action
+ struct action action;
+ action.metric_name = metric_name;
+ action.member = member;
+ action.value = value;
+
+ this->actions.push_back(action);
+ if (metric_i.output_window == 0) {
+ bool found = false;
+ for (size_t i = 0; i < accu_actions.size(); i++) {
+ if (accu_actions[i].metric_name == metric_name && accu_actions[i].member == member) {
+ accu_actions[i].value += value;
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ this->accu_actions.push_back(action);
+ }
+ }
+
+ fieldstat_value_incrby_topK(this->instance,
+ metric_i.id, member, strlen(member), value);
+}
+
+Heavy_keeper_tester::Heavy_keeper_tester(const char *instance_name)
+{
+ this->instance = fieldstat_instance_new(instance_name);
+}
+
+Heavy_keeper_tester::~Heavy_keeper_tester()
+{
+ fieldstat_instance_free(this->instance);
+}
+
+void Heavy_keeper_tester::sort_map(std::unordered_map<std::string, struct metric_info> &metric_map)
+{
+ vector<pair<string, struct metric_info>> tmp;
+ for (const auto &m : metric_map) {
+ tmp.push_back(make_pair(m.first, m.second));
+ }
+ sort(tmp.begin(), tmp.end(),
+ [](std::pair<std::string, struct metric_info> a, std::pair<std::string, struct metric_info> b) {return a.second.id > b.second.id;}
+ );
+ metric_map.clear();
+ for (const auto &item : tmp) {
+ metric_map[item.first] = item.second;
+ }
+}
+
+vector<pair<string, int>> Heavy_keeper_tester::get_expected_topk(const std::string &metric_name, std::vector<struct action> &tmp_actions)
+{
+ vector<pair<string, int>> input;
+ for (auto &a : tmp_actions) {
+ if (metric_name == a.metric_name) {
+ input.push_back(make_pair(a.member, a.value));
+ }
+ }
+ std::sort(input.begin(), input.end(),
+ [](std::pair<std::string, int> a, std::pair<std::string, int> b) {return a.second > b.second;}
+ );
+ return input;
+}
+
+std::string Heavy_keeper_tester::get_expected_pattern(std::string method)
+{
+ // get a pattern
+ sort_map(metric_map);
+
+ if (string(method) == "json") {
+ Reg_str pattern;
+ for (auto &m : this->metric_map) {
+ pattern.add_text().add(m.first).add_text();
+ auto &tmp_actions = m.second.output_window == 0 ? this->accu_actions : this->actions;
+ vector<pair<string, int>> input = get_expected_topk(m.first, tmp_actions);
+ for (const auto &item : input) {
+ std::string tmp = ".*" + item.first + ".*" + std::to_string(item.second) + ".*";
+ pattern += Reg_str(tmp);
+ }
+ }
+ return pattern.add_any().get_str();
+ } else if (string(method) == "default") {
+ Reg_str pattern[metric_map.size()];
+ size_t i = 0;
+ for (auto &m : this->metric_map) {
+ Reg_str head;
+ head.add_any().add(m.first).add_space().add_endl();
+ pattern[i] += head.add_endl();
+
+ auto &tmp_actions = m.second.output_window == 0 ? this->accu_actions : this->actions;
+ vector<pair<string, int>> input = get_expected_topk(m.first, tmp_actions);
+ for (const auto &item : input) {
+ std::string tmp = ".*" + item.first + ".*" + std::to_string(item.second) + ".*";
+ pattern[i] += Reg_str(tmp).add_endl();
+ }
+ i++;
+ }
+
+ Reg_str contat;
+ for (size_t i = 0; i < metric_map.size(); i++) {
+ contat += pattern[i];
+ }
+ contat.add_any();
+ return contat.get_str();
+ } else {
+ cout << "method not supported" << endl;
+ return "";
+ }
+
+ actions.clear();
+}
+
+std::string Heavy_keeper_tester::write_to_file(const char *output_file_path, const char *method)
+{
+ if (!instance->running) {
+ int ret = fieldstat_set_local_output(instance, output_file_path, method);
+ fieldstat_instance_start(instance);
+ EXPECT_EQ(0, ret);
+ }
+
+ sleep(3);
+
+ std::ifstream ifs(output_file_path);
+ std::string content( (std::istreambuf_iterator<char>(ifs) ),
+ (std::istreambuf_iterator<char>() ) );
+ return content;
+}
diff --git a/test/src/test_utils.hpp b/test/src/test_utils.hpp
new file mode 100644
index 0000000..cc678bb
--- /dev/null
+++ b/test/src/test_utils.hpp
@@ -0,0 +1,92 @@
+
+#include <string>
+#include <unordered_map>
+#include <vector>
+#include <type_traits>
+
+#include "fieldstat.h"
+#include "fieldstat_internal.h"
+
+
+// TODO: 1 写创建Tester,写一个instance, 对应的析构
+
+
+class Reg_str
+{
+ public:
+ Reg_str() : str_("") {}
+ explicit Reg_str(const std::string &p) : str_(p) {}
+ Reg_str operator+=(const Reg_str &p2) {
+ str_ += p2.str_;
+ return *this;
+ }
+ Reg_str operator+(const std::string &p2) {
+ return Reg_str(str_ + p2);
+ }
+
+ Reg_str &add_any() {
+ str_ += std::string("[\\s\\S]*");
+ return *this;
+ }
+ Reg_str &add_text() {
+ str_ += std::string(".*");
+ return *this;
+ }
+ Reg_str &add_space() {
+ str_ += std::string(" *");
+ return *this;
+ }
+ Reg_str &add_endl() {
+ str_ += std::string("\\n");
+ return *this;
+ }
+ Reg_str &add(const std::string &t) {
+ str_ += t;
+ return *this;
+ }
+ template <typename T>
+ Reg_str &add_num(T t) {
+ str_ += std::to_string(t);
+ return *this;
+ }
+ Reg_str &parenthsis() {
+ str_ = "(" + str_ + ")";
+ return *this;
+ }
+ std::string get_str() {
+ return str_;
+ }
+ private:
+ std::string str_;
+};
+
+class Heavy_keeper_tester
+{
+ public:
+ explicit Heavy_keeper_tester(const char *instance_name);
+ ~Heavy_keeper_tester();
+ void register_metric(const char *metric_name, int K, int out_window);
+
+ void take_action(const char *metric_name, const char *member, int value);
+ std::string write_to_file(const char *output_file_path, const char *method);
+ std::string get_expected_pattern(std::string method);
+
+ private:
+ struct action {
+ std::string metric_name;
+ std::string member;
+ int value;
+ };
+
+ struct metric_info {
+ struct metric *m;
+ int id;
+ int output_window;
+ };
+ std::unordered_map<std::string, struct metric_info> metric_map;
+ std::vector<struct action> actions;
+ std::vector<struct action> accu_actions;
+ struct fieldstat_instance * instance;
+ void sort_map(std::unordered_map<std::string, struct metric_info> &metric_map);
+ std::vector<std::pair<std::string, int>> get_expected_topk(const std::string &metric_name, std::vector<struct action> &tmp_actions);
+}; \ No newline at end of file