summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author刘畅 <[email protected]>2024-07-26 06:34:16 +0000
committer刘畅 <[email protected]>2024-07-26 06:34:16 +0000
commit2c2e3ac4c8ea3d04ca942330d7501bb28c48f5b9 (patch)
tree65ed259e7ddd34f339e31e39b6cfea81549a1f02
parent50c554ee4ec2ae89cba124605775995f8afeb34f (diff)
parent3a7497b7fff96d37108dc997b95ecee129dee428 (diff)
Merge branch 'update_libfieldstat4' into 'rel'
TSG-21834: update libfieldstat4 See merge request tango/shaping-engine!100
-rw-r--r--ci/travis.sh3
-rw-r--r--conf/shaping.conf2
-rw-r--r--shaping/CMakeLists.txt3
-rw-r--r--shaping/include/shaper.h1
-rw-r--r--shaping/include/shaper_global_stat.h11
-rw-r--r--shaping/include/shaper_stat.h17
-rw-r--r--shaping/src/main.cpp22
-rw-r--r--shaping/src/shaper.cpp7
-rw-r--r--shaping/src/shaper_global_stat.cpp293
-rw-r--r--shaping/src/shaper_stat.cpp215
-rw-r--r--shaping/test/CMakeLists.txt6
-rw-r--r--shaping/test/dummy_rdkafka.cpp41
-rw-r--r--shaping/test/gtest_shaper.cpp369
-rw-r--r--shaping/test/stub.h5
-rw-r--r--shaping/test/telegraf/self_test_shaping.conf102
-rw-r--r--shaping/test/test_conf/shaping.conf10
16 files changed, 505 insertions, 602 deletions
diff --git a/ci/travis.sh b/ci/travis.sh
index a8b7553..2df9b40 100644
--- a/ci/travis.sh
+++ b/ci/travis.sh
@@ -38,7 +38,7 @@ yum install -y systemd-devel
yum install -y tsg_master-devel
yum install -y framework_env
yum install -y mrzcpd-corei7
-yum install -y libfieldstat3-devel
+yum install -y libfieldstat4-devel
yum install -y libmaatframe-devel
yum install -y libswarmkv-devel
yum install -y libMESA_handle_logger-devel
@@ -48,6 +48,7 @@ yum install -y numactl-libs # required by mrzcpd
yum install -y libibverbs # required by mrzcpd
yum install -y libbreakpad_mini-devel
yum install -y msgpack-devel
+yum install -y librdkafka-devel
source /etc/profile.d/framework.sh
source /etc/profile.d/mrzcpd.sh
diff --git a/conf/shaping.conf b/conf/shaping.conf
index 70a1811..6c67f8e 100644
--- a/conf/shaping.conf
+++ b/conf/shaping.conf
@@ -30,7 +30,7 @@ SWARMKV_HEALTH_CHECK_PORT=0
SWARMKV_HEALTH_CHECK_ANNOUNCE_PORT=1111
[METRIC]
-FIELDSTAT_OUTPUT_INTERVAL_MS=1000
+FIELDSTAT_OUTPUT_INTERVAL_S=1
GLOBAL_STAT_OUTPUT_INTERVAL_S=1
LINE_PROTOCOL_SERVER_IP="127.0.0.1"
LINE_PROTOCOL_SERVER_PORT=6667
diff --git a/shaping/CMakeLists.txt b/shaping/CMakeLists.txt
index 19862eb..30ddf4c 100644
--- a/shaping/CMakeLists.txt
+++ b/shaping/CMakeLists.txt
@@ -4,7 +4,7 @@ target_link_libraries(shaper PUBLIC avl_tree)
target_link_libraries(shaper PUBLIC cjson)
target_link_libraries(shaper PUBLIC MESA_handle_logger)
target_link_libraries(shaper PUBLIC MESA_prof_load)
-target_link_libraries(shaper PUBLIC fieldstat3)
+target_link_libraries(shaper PUBLIC fieldstat4)
target_link_libraries(shaper PUBLIC pthread)
target_include_directories(shaper PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include)
target_include_directories(shaper PUBLIC ${PROJECT_SOURCE_DIR}/deps/timeout)
@@ -14,6 +14,7 @@ target_link_libraries(shaping_engine PUBLIC maatframe)
target_link_libraries(shaping_engine PUBLIC mrzcpd)
target_link_libraries(shaping_engine PUBLIC libjemalloc-static dl)
target_link_libraries(shaping_engine PUBLIC swarmkv)
+target_link_libraries(shaping_engine PUBLIC rdkafka)
install(TARGETS shaping_engine RUNTIME DESTINATION bin COMPONENT Program)
diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h
index 5e2f7c0..a438242 100644
--- a/shaping/include/shaper.h
+++ b/shaping/include/shaper.h
@@ -56,6 +56,7 @@ struct shaping_thread_ctx {
struct shaping_ctx *ref_ctx;
struct shaper *sp;
struct shaping_stat *stat;
+ struct shaping_global_stat *global_stat;
struct shaping_marsio_info *marsio_info;
struct swarmkv *swarmkv_db;//handle of swarmkv
struct shaping_maat_info *maat_info;
diff --git a/shaping/include/shaper_global_stat.h b/shaping/include/shaper_global_stat.h
index 4c100a7..ec68453 100644
--- a/shaping/include/shaper_global_stat.h
+++ b/shaping/include/shaper_global_stat.h
@@ -1,7 +1,5 @@
#pragma once
-#include <fieldstat.h>
-
enum shaping_global_stat_column_index {
CURR_SESSION_NUM_IDX = 0,
QUEUEING_PKTS_IDX,
@@ -82,23 +80,22 @@ struct shaping_global_stat_data {
};
struct shaping_global_stat {
- struct fieldstat_instance *instance;
+ struct fieldstat_easy *instance;
int column_ids[GLOBAL_STAT_COLUNM_IDX_MAX];
int swarmkv_latency_summary_id;
- struct shaping_global_stat_data *stat_data;
int output_interval_s;
};
struct shaping_global_stat* shaper_global_stat_init(int work_thread_num);
void shaper_global_stat_destroy(struct shaping_global_stat *stat);
-void shaper_global_stat_swarmkv_latency_update(struct shaping_global_stat *stat, long long latency_us);
+void shaper_global_stat_swarmkv_latency_update(struct shaping_global_stat *stat, long long latency_us, int thread_idx);
void shaper_global_stat_curr_session_inc(struct shaping_global_stat_data *thread_global_stat);
void shaper_global_stat_curr_session_dec(struct shaping_global_stat_data *thread_global_stat);
void shaper_global_stat_queueing_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len);
void shaper_global_stat_queueing_dec(struct shaping_global_stat_data *thread_global_stat, int pkt_len);
-long long shaper_global_stat_queueing_pkts_get(struct shaping_global_stat_data *thread_global_stat);
+long long shaper_global_stat_queueing_pkts_get();
void shaper_global_stat_ctrlpkt_err_inc(struct shaping_global_stat_data *thread_global_stat);
void shaper_global_stat_ctrlpkt_opening_inc(struct shaping_global_stat_data *thread_global_stat);
@@ -132,4 +129,4 @@ void shaper_global_stat_hit_policy_throughput_tx_inc(struct shaping_global_stat_
void shaper_global_stat_hit_policy_throughput_tx_syn_ack_inc(struct shaping_global_stat_data *thread_global_stat);
void shaper_global_stat_hit_policy_drop_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len);
-void shaper_global_stat_refresh(struct shaping_ctx *ctx); \ No newline at end of file
+void shaper_thread_global_stat_refresh(struct shaping_thread_ctx *ctx); \ No newline at end of file
diff --git a/shaping/include/shaper_stat.h b/shaping/include/shaper_stat.h
index e6feeb3..22ca620 100644
--- a/shaping/include/shaper_stat.h
+++ b/shaping/include/shaper_stat.h
@@ -1,8 +1,8 @@
#pragma once
#include <netinet/in.h>
+#include <librdkafka/rdkafka.h>
#include "uthash.h"
-#include <fieldstat.h>
enum shaping_packet_dir {
SHAPING_DIR_IN = 0,
@@ -20,9 +20,7 @@ enum shaping_stat_tags_index {
};
enum shaping_stat_column_index {
- IN_MAX_LATENCY_IDX = 0,
- IN_QUEUE_LEN_IDX,
- OUT_MAX_LATENCY_IDX,
+ IN_QUEUE_LEN_IDX = 0,
OUT_QUEUE_LEN_IDX,
IN_PKTS_IDX,
IN_BYTES_IDX,
@@ -47,8 +45,11 @@ struct shaping_stat_for_profile {
};
struct shaping_stat {
- struct fieldstat_dynamic_instance *instance;
- int table_id;
+ struct fieldstat_easy *instance;
+ rd_kafka_t *kafka_handle;
+ rd_kafka_topic_t *topic_rkt;
+ int output_interval_s;
+ int latency_histogram_id;
unsigned int column_ids[STAT_COLUNM_IDX_MAX];
};
@@ -65,4 +66,6 @@ void shaper_stat_drop_inc(struct shaping_stat_for_profile *profile_stat, unsigne
void shaper_stat_max_latency_update(struct shaping_stat_for_profile *profile_stat, unsigned char direction, unsigned long long latency, int thread_id);
void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int force);
-void shaper_stat_priority_queue_len_refresh_all(struct shaping_thread_ctx *ctx, struct shaping_profile_hash_node *profile_hash_node); \ No newline at end of file
+void shaper_stat_priority_queue_len_refresh_all(struct shaping_thread_ctx *ctx, struct shaping_profile_hash_node *profile_hash_node);
+
+void shaper_stat_output(struct shaping_stat *stat); \ No newline at end of file
diff --git a/shaping/src/main.cpp b/shaping/src/main.cpp
index 597230b..6d210ea 100644
--- a/shaping/src/main.cpp
+++ b/shaping/src/main.cpp
@@ -23,6 +23,7 @@ static void *shaper_thread_loop(void *data)
char thread_name[16] = {0};
struct shaping_thread_ctx *ctx = (struct shaping_thread_ctx *)data;
int output_interval_s = ctx->ref_ctx->global_stat->output_interval_s;
+ time_t last_refresh_stat_time = time(NULL);
snprintf(thread_name, sizeof(thread_name), "shape-work-%d", ctx->thread_index);
prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL);
@@ -41,7 +42,14 @@ static void *shaper_thread_loop(void *data)
session_table_reset_with_callback(ctx->session_table, shaper_session_data_free_cb, ctx);
__atomic_fetch_and(&ctx->session_need_reset, 0, __ATOMIC_SEQ_CST);
}
- marsio_poll_wait(ctx->marsio_info->instance, &ctx->marsio_info->mr_dev, 1, ctx->thread_index, output_interval_s);
+
+ time_t curr_time = time(NULL);
+ if (curr_time - last_refresh_stat_time >= output_interval_s) {
+ shaper_thread_global_stat_refresh(ctx);
+ last_refresh_stat_time = curr_time;
+ }
+
+ marsio_poll_wait(ctx->marsio_info->instance, &ctx->marsio_info->mr_dev, 1, ctx->thread_index, 10);
}
shaper_thread_resource_clear();
@@ -68,7 +76,7 @@ static void sig_handler(int signo)
int main(int argc, char **argv)
{
struct shaping_ctx *ctx = NULL;
- time_t last_update_time = time(NULL);
+ time_t last_stat_update_time = time(NULL);
if (LOG_INIT("./conf/zlog.conf") == -1)
{
@@ -111,11 +119,13 @@ int main(int argc, char **argv)
while(!quit) {
time_t curr_time = time(NULL);
- if (curr_time - last_update_time >= ctx->global_stat->output_interval_s) {
- shaper_global_stat_refresh(ctx);
- last_update_time = curr_time;
+
+ if (curr_time - last_stat_update_time >= ctx->stat->output_interval_s) {
+ shaper_stat_output(ctx->stat);
+ last_stat_update_time = curr_time;
}
- sleep(ctx->global_stat->output_interval_s);
+
+ sleep(ctx->stat->output_interval_s);
}
for (int i = 0; i < ctx->thread_num; i++) {
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index ba8e1f0..9c2a49f 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -503,7 +503,7 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg
clock_gettime(CLOCK_MONOTONIC, &curr_time);
curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
- shaper_global_stat_swarmkv_latency_update(ctx->ref_ctx->global_stat, curr_time_us - arg->start_time_us);
+ shaper_global_stat_swarmkv_latency_update(ctx->ref_ctx->global_stat, curr_time_us - arg->start_time_us, ctx->thread_index);
shaper_global_stat_async_callback_inc(&ctx->thread_global_stat);
shaper_global_stat_tconsume_callback_inc(&ctx->thread_global_stat);
@@ -705,7 +705,7 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb
clock_gettime(CLOCK_MONOTONIC, &curr_time);
curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
curr_time_ms = curr_time_us / 1000;
- shaper_global_stat_swarmkv_latency_update(ctx->ref_ctx->global_stat, curr_time_us - arg->start_time_us);
+ shaper_global_stat_swarmkv_latency_update(ctx->ref_ctx->global_stat, curr_time_us - arg->start_time_us, ctx->thread_index);
shaper_global_stat_async_callback_inc(&ctx->thread_global_stat);
shaper_global_stat_hmget_callback_inc(&ctx->thread_global_stat);
@@ -1291,7 +1291,7 @@ void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_
cnt++;
}
- if (shaper_global_stat_queueing_pkts_get(&ctx->thread_global_stat) == 0) {
+ if (shaper_global_stat_queueing_pkts_get() == 0) {
return;
}
@@ -1625,6 +1625,7 @@ struct shaping_ctx *shaping_engine_init()
ctx->thread_ctx[i].thread_index = i;
ctx->thread_ctx[i].sp = shaper_new(conf.priority_queue_len_max);
ctx->thread_ctx[i].stat = ctx->stat;
+ ctx->thread_ctx[i].global_stat = ctx->global_stat;
ctx->thread_ctx[i].session_table = session_table_create();
ctx->thread_ctx[i].maat_info = ctx->maat_info;
ctx->thread_ctx[i].marsio_info = ctx->marsio_info;
diff --git a/shaping/src/shaper_global_stat.cpp b/shaping/src/shaper_global_stat.cpp
index d59b290..c039fda 100644
--- a/shaping/src/shaper_global_stat.cpp
+++ b/shaping/src/shaper_global_stat.cpp
@@ -2,70 +2,72 @@
#include <stdlib.h>
#include <MESA/MESA_prof_load.h>
-#include <fieldstat.h>
+#include <fieldstat/fieldstat_easy.h>
#include "log.h"
#include "utils.h"
#include "shaper.h"
#include "shaper_global_stat.h"
-struct shaping_global_stat_conf {
- int is_self_test;
+struct shping_global_stat_conf
+{
+ int self_test;
};
-static int shaper_global_stat_conf_load(struct shaping_global_stat *stat, struct shaping_global_stat_conf *conf)
+thread_local unsigned long long g_queueing_pkts = 0;
+
+static int shaper_global_stat_conf_load(struct shaping_global_stat *stat, struct shping_global_stat_conf *conf)
{
MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "GLOBAL_STAT_OUTPUT_INTERVAL_S", &stat->output_interval_s, 1);
- MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "SELF_TEST", &conf->is_self_test, 0);
+ MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "GLOBAL_STAT_SELF_TEST", &conf->self_test, 0);
return 0;
}
static void shaper_global_stat_fieldstat_reg(struct shaping_global_stat *stat)
{
- const char * quantiles = "0.1,0.5,0.8,0.9,0.95,0.99";
- stat->swarmkv_latency_summary_id = fieldstat_register_summary(stat->instance, "async_delay(us)", NULL, 0, quantiles, 1, 500000, 3, 1);
-
- stat->column_ids[CURR_SESSION_NUM_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_GAUGE, "curr_session_num", NULL, 0);
- stat->column_ids[QUEUEING_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_GAUGE, "curr_queueing_pkts", NULL, 0);
- stat->column_ids[QUEUEING_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_GAUGE, "curr_queueing_bytes", NULL, 0);
-
- stat->column_ids[CTRL_ERR_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_error", NULL, 0);
- stat->column_ids[CTRL_OPENING_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_open", NULL, 0);
- stat->column_ids[CTRL_ACTIVE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_active", NULL, 0);
- stat->column_ids[CTRL_CLOSE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_close", NULL, 0);
- stat->column_ids[CTRL_ACTIVE_CLOSE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_sf_close", NULL, 0);
- stat->column_ids[CTRL_RESETALL_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_reset", NULL, 0);
- stat->column_ids[SESSION_LOG_SEND_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "sess_log_send", NULL, 0);
-
- stat->column_ids[ASYNC_INVOKE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "async", NULL, 0);
- stat->column_ids[ASYNC_CALLBACK_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "async_cb", NULL, 0);
-
- stat->column_ids[ASYNC_TCONSUME_INVOKE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "tconsume", NULL, 0);
- stat->column_ids[ASYNC_TCONSUME_CALLBACK_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "tconsume_cb", NULL, 0);
- stat->column_ids[ASYNC_HINCRBY_INVOKE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hincrby", NULL, 0);
- stat->column_ids[ASYNC_HINCRBY_CALLBACK_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hincrby_cb", NULL, 0);
- stat->column_ids[ASYNC_HMGET_INVOKE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hmget", NULL, 0);
- stat->column_ids[ASYNC_HMGET_CALLBACK_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hmget_cb", NULL, 0);
-
- stat->column_ids[ASYNC_TCONSUME_FAILED] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "tconsume_failed", NULL, 0);
- stat->column_ids[ASYNC_HINCRBY_FAILED] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hincrby_failed", NULL, 0);
- stat->column_ids[ASYNC_HMGET_FAILED] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hmget_failed", NULL, 0);
-
- stat->column_ids[RX_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "all_rx_pkts", NULL, 0);
- stat->column_ids[RX_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "all_rx_bytes", NULL, 0);
- stat->column_ids[TX_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "all_tx_pkts", NULL, 0);
- stat->column_ids[TX_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "all_tx_bytes", NULL, 0);
- stat->column_ids[DROP_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "all_drop_pkts", NULL, 0);
- stat->column_ids[DROP_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "all_drop_bytes", NULL, 0);
-
- stat->column_ids[HIT_POLICY_RX_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_rx_pkts", NULL, 0);
- stat->column_ids[HIT_POLICY_RX_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_rx_bytes", NULL, 0);
- stat->column_ids[HIT_POLICY_TX_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_tx_pkts", NULL, 0);
- stat->column_ids[HIT_POLICY_TX_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_tx_bytes", NULL, 0);
- stat->column_ids[HIT_POLICY_TX_SYN_ACK_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_tx_syn_ack_pkts", NULL, 0);
- stat->column_ids[HIT_POLICY_DROP_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_drop_pkts", NULL, 0);
- stat->column_ids[HIT_POLICY_DROP_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_drop_bytes", NULL, 0);
+ stat->swarmkv_latency_summary_id = fieldstat_easy_register_histogram(stat->instance, "async_delay(us)", 1, 500000, 1);
+
+ stat->column_ids[CURR_SESSION_NUM_IDX] = fieldstat_easy_register_counter(stat->instance, "curr_session_num");
+ stat->column_ids[QUEUEING_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "curr_queueing_pkts");
+ stat->column_ids[QUEUEING_BYTES_IDX] = fieldstat_easy_register_counter(stat->instance, "curr_queueing_bytes");
+
+ stat->column_ids[CTRL_ERR_IDX] = fieldstat_easy_register_counter(stat->instance, "ctrl_error");
+ stat->column_ids[CTRL_OPENING_IDX] = fieldstat_easy_register_counter(stat->instance, "ctrl_open");
+ stat->column_ids[CTRL_ACTIVE_IDX] = fieldstat_easy_register_counter(stat->instance, "ctrl_active");
+ stat->column_ids[CTRL_CLOSE_IDX] = fieldstat_easy_register_counter(stat->instance, "ctrl_close");
+ stat->column_ids[CTRL_ACTIVE_CLOSE_IDX] = fieldstat_easy_register_counter(stat->instance, "ctrl_sf_close");
+ stat->column_ids[CTRL_RESETALL_IDX] = fieldstat_easy_register_counter(stat->instance, "ctrl_reset");
+ stat->column_ids[SESSION_LOG_SEND_IDX] = fieldstat_easy_register_counter(stat->instance, "sess_log_send");
+
+ stat->column_ids[ASYNC_INVOKE_IDX] = fieldstat_easy_register_counter(stat->instance, "async");
+ stat->column_ids[ASYNC_CALLBACK_IDX] = fieldstat_easy_register_counter(stat->instance, "async_cb");
+
+ stat->column_ids[ASYNC_TCONSUME_INVOKE_IDX] = fieldstat_easy_register_counter(stat->instance, "tconsume");
+ stat->column_ids[ASYNC_TCONSUME_CALLBACK_IDX] = fieldstat_easy_register_counter(stat->instance, "tconsume_cb");
+ stat->column_ids[ASYNC_HINCRBY_INVOKE_IDX] = fieldstat_easy_register_counter(stat->instance, "hincrby");
+ stat->column_ids[ASYNC_HINCRBY_CALLBACK_IDX] = fieldstat_easy_register_counter(stat->instance, "hincrby_cb");
+ stat->column_ids[ASYNC_HMGET_INVOKE_IDX] = fieldstat_easy_register_counter(stat->instance, "hmget");
+ stat->column_ids[ASYNC_HMGET_CALLBACK_IDX] = fieldstat_easy_register_counter(stat->instance, "hmget_cb");
+
+ stat->column_ids[ASYNC_TCONSUME_FAILED] = fieldstat_easy_register_counter(stat->instance, "tconsume_failed");
+ stat->column_ids[ASYNC_HINCRBY_FAILED] = fieldstat_easy_register_counter(stat->instance, "hincrby_failed");
+ stat->column_ids[ASYNC_HMGET_FAILED] = fieldstat_easy_register_counter(stat->instance, "hmget_failed");
+
+ stat->column_ids[RX_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "all_rx_pkts");
+ stat->column_ids[RX_BYTES_IDX] = fieldstat_easy_register_counter(stat->instance, "all_rx_bytes");
+ stat->column_ids[TX_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "all_tx_pkts");
+ stat->column_ids[TX_BYTES_IDX] = fieldstat_easy_register_counter(stat->instance, "all_tx_bytes");
+ stat->column_ids[DROP_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "all_drop_pkts");
+ stat->column_ids[DROP_BYTES_IDX] = fieldstat_easy_register_counter(stat->instance, "all_drop_bytes");
+
+ stat->column_ids[HIT_POLICY_RX_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "shape_rx_pkts");
+ stat->column_ids[HIT_POLICY_RX_BYTES_IDX] = fieldstat_easy_register_counter(stat->instance, "shape_rx_bytes");
+ stat->column_ids[HIT_POLICY_TX_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "shape_tx_pkts");
+ stat->column_ids[HIT_POLICY_TX_BYTES_IDX] = fieldstat_easy_register_counter(stat->instance, "shape_tx_bytes");
+ stat->column_ids[HIT_POLICY_TX_SYN_ACK_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "shape_tx_syn_ack_pkts");
+ stat->column_ids[HIT_POLICY_DROP_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "shape_drop_pkts");
+ stat->column_ids[HIT_POLICY_DROP_BYTES_IDX] = fieldstat_easy_register_counter(stat->instance, "shape_drop_bytes");
return;
}
@@ -73,49 +75,40 @@ static void shaper_global_stat_fieldstat_reg(struct shaping_global_stat *stat)
struct shaping_global_stat* shaper_global_stat_init(int work_thread_num)
{
struct shaping_global_stat *stat = NULL;
- struct shaping_global_stat_conf conf;
+ struct shping_global_stat_conf conf;
+ struct fieldstat_tag tag;
stat = (struct shaping_global_stat*)calloc(1, sizeof(struct shaping_global_stat));
- stat->stat_data = (struct shaping_global_stat_data*)calloc(work_thread_num, sizeof(struct shaping_global_stat_data));
if (shaper_global_stat_conf_load(stat, &conf) != 0) {
LOG_ERROR("%s: shaping init metric conf failed", LOG_TAG_STAT);
goto ERROR;
}
- stat->instance = fieldstat_instance_new("shaping_global");
+ tag.key = "shaping_global";
+ tag.type = TAG_CSTRING;
+ tag.value_str = "shaping_global";
+ stat->instance = fieldstat_easy_new(work_thread_num, "shaping_global", &tag, 1);
if (stat->instance == NULL) {
LOG_ERROR("%s: shaping global init fieldstat instance failed", LOG_TAG_STAT);
goto ERROR;
}
shaper_global_stat_fieldstat_reg(stat);
-
- fieldstat_disable_background_thread(stat->instance);
- if (conf.is_self_test) {
- fieldstat_set_local_output(stat->instance, "shaping_global_metric", "json");
- } else {
- fieldstat_set_local_output(stat->instance, "shaping_global_metric", "default");
- }
-
- if (fieldstat_global_enable_prometheus_endpoint(9007, NULL) != 0) {
- LOG_ERROR("%s: shaping global fieldstat enable prometheus endpoint failed", LOG_TAG_STAT);
- goto ERROR;
- }
-
- if (fieldstat_enable_prometheus_output(stat->instance) != 0) {
- LOG_ERROR("%s: shaping global fieldstat enable prometheus output failed", LOG_TAG_STAT);
- goto ERROR;
+ if (!conf.self_test) {
+ int ret = fieldstat_easy_enable_auto_output(stat->instance, "./metric/shaping_global_stat.json", stat->output_interval_s);
+ if (ret < 0) {
+ LOG_ERROR("%s: shaping global enable auto output failed, ret %d", LOG_TAG_STAT, ret);
+ goto ERROR;
+ }
}
-
- fieldstat_instance_start(stat->instance);
return stat;
ERROR:
if (stat) {
if (stat->instance) {
- fieldstat_instance_free(stat->instance);
+ fieldstat_easy_free(stat->instance);
}
free(stat);
}
@@ -128,21 +121,17 @@ void shaper_global_stat_destroy(struct shaping_global_stat *stat)
return;
}
- if (stat->stat_data) {
- free(stat->stat_data);
- }
-
if (stat->instance) {
- fieldstat_instance_free(stat->instance);
+ fieldstat_easy_free(stat->instance);
}
free(stat);
return;
}
-void shaper_global_stat_swarmkv_latency_update(struct shaping_global_stat *stat, long long latency_us)
+void shaper_global_stat_swarmkv_latency_update(struct shaping_global_stat *stat, long long latency_us, int thread_idx)
{
- fieldstat_value_set(stat->instance, stat->swarmkv_latency_summary_id, latency_us);
+ fieldstat_easy_histogram_record(stat->instance, thread_idx, stat->swarmkv_latency_summary_id, NULL, 0, latency_us);
return;
}
@@ -166,6 +155,8 @@ void shaper_global_stat_queueing_inc(struct shaping_global_stat_data *thread_glo
thread_global_stat->queueing_pkts++;
thread_global_stat->queueing_bytes += pkt_len;
+ g_queueing_pkts++;
+
return;
}
@@ -174,12 +165,14 @@ void shaper_global_stat_queueing_dec(struct shaping_global_stat_data *thread_glo
thread_global_stat->queueing_pkts--;
thread_global_stat->queueing_bytes -= pkt_len;
+ g_queueing_pkts--;
+
return;
}
-long long shaper_global_stat_queueing_pkts_get(struct shaping_global_stat_data *thread_global_stat)
+long long shaper_global_stat_queueing_pkts_get()
{
- return thread_global_stat->queueing_pkts;
+ return g_queueing_pkts;
}
void shaper_global_stat_ctrlpkt_err_inc(struct shaping_global_stat_data *thread_global_stat)
@@ -360,103 +353,53 @@ void shaper_global_stat_hit_policy_drop_inc(struct shaping_global_stat_data *thr
return;
}
-void shaper_global_stat_refresh(struct shaping_ctx *ctx)
+void shaper_thread_global_stat_refresh(struct shaping_thread_ctx *ctx)
{
- static struct shaping_global_stat_data sum;
+ struct shaping_global_stat_data *stat_data = &ctx->thread_global_stat;
struct shaping_global_stat *global_stat = ctx->global_stat;
- struct shaping_global_stat_data *stat_data = global_stat->stat_data;
- for (int i = 0; i < ctx->thread_num; i++) {
- memcpy(&stat_data[i], &ctx->thread_ctx[i].thread_global_stat, sizeof(struct shaping_global_stat_data));
- }
-
- memset(&sum, 0, sizeof(struct shaping_global_stat_data));
- for (int i = 0; i < ctx->thread_num; i++) {
- sum.curr_session_num += stat_data[i].curr_session_num;
- sum.queueing_pkts += stat_data[i].queueing_pkts;
- sum.queueing_bytes += stat_data[i].queueing_bytes;
-
- sum.ctrl_error += stat_data[i].ctrl_error;
- sum.ctrl_opening += stat_data[i].ctrl_opening;
- sum.ctrl_active += stat_data[i].ctrl_active;
- sum.ctrl_close += stat_data[i].ctrl_close;
- sum.ctrl_active_close += stat_data[i].ctrl_active_close;
- sum.ctrl_resetall += stat_data[i].ctrl_resetall;
- sum.session_log_send += stat_data[i].session_log_send;
-
- sum.async_invoke += stat_data[i].async_invoke;
- sum.async_callback += stat_data[i].async_callback;
-
- sum.async_tconsume_invoke += stat_data[i].async_tconsume_invoke;
- sum.async_tconsume_callback += stat_data[i].async_tconsume_callback;
- sum.async_hincrby_invoke += stat_data[i].async_hincrby_invoke;
- sum.async_hincrby_callback += stat_data[i].async_hincrby_callback;
- sum.async_hmget_invoke += stat_data[i].async_hmget_invoke;
- sum.async_hmget_callback += stat_data[i].async_hmget_callback;
-
- sum.async_tconsume_failed += stat_data[i].async_tconsume_failed;
- sum.async_hincrby_failed += stat_data[i].async_hincrby_failed;
- sum.async_hmget_failed += stat_data[i].async_hmget_failed;
-
- sum.all_traffic.rx_pkts += stat_data[i].all_traffic.rx_pkts;
- sum.all_traffic.rx_bytes += stat_data[i].all_traffic.rx_bytes;
- sum.all_traffic.tx_pkts += stat_data[i].all_traffic.tx_pkts;
- sum.all_traffic.tx_bytes += stat_data[i].all_traffic.tx_bytes;
- sum.all_traffic.drop_pkts += stat_data[i].all_traffic.drop_pkts;
- sum.all_traffic.drop_bytes += stat_data[i].all_traffic.drop_bytes;
-
- sum.hit_policy_traffic.rx_pkts += stat_data[i].hit_policy_traffic.rx_pkts;
- sum.hit_policy_traffic.rx_bytes += stat_data[i].hit_policy_traffic.rx_bytes;
- sum.hit_policy_traffic.tx_pkts += stat_data[i].hit_policy_traffic.tx_pkts;
- sum.hit_policy_traffic.tx_bytes += stat_data[i].hit_policy_traffic.tx_bytes;
- sum.hit_policy_traffic.tx_syn_ack_pkts += stat_data[i].hit_policy_traffic.tx_syn_ack_pkts;
- sum.hit_policy_traffic.drop_pkts += stat_data[i].hit_policy_traffic.drop_pkts;
- sum.hit_policy_traffic.drop_bytes += stat_data[i].hit_policy_traffic.drop_bytes;
- }
-
- struct shaping_global_stat_traffic_data *all_traffic_data = &sum.all_traffic;
- struct shaping_global_stat_traffic_data *hit_policy_traffic_data = &sum.hit_policy_traffic;
-
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[CURR_SESSION_NUM_IDX], sum.curr_session_num);
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[QUEUEING_PKTS_IDX], sum.queueing_pkts);
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[QUEUEING_BYTES_IDX], sum.queueing_bytes);
-
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[CTRL_ERR_IDX], sum.ctrl_error);
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[CTRL_OPENING_IDX], sum.ctrl_opening);
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[CTRL_ACTIVE_IDX], sum.ctrl_active);
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[CTRL_CLOSE_IDX], sum.ctrl_close);
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[CTRL_ACTIVE_CLOSE_IDX], sum.ctrl_active_close);
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[CTRL_RESETALL_IDX], sum.ctrl_resetall);
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[SESSION_LOG_SEND_IDX], sum.session_log_send);
-
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_INVOKE_IDX], sum.async_invoke);
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_CALLBACK_IDX], sum.async_callback);
-
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_TCONSUME_INVOKE_IDX], sum.async_tconsume_invoke);
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_TCONSUME_CALLBACK_IDX], sum.async_tconsume_callback);
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_HINCRBY_INVOKE_IDX], sum.async_hincrby_invoke);
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_HINCRBY_CALLBACK_IDX], sum.async_hincrby_callback);
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_HMGET_INVOKE_IDX], sum.async_hmget_invoke);
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_HMGET_CALLBACK_IDX], sum.async_hmget_callback);
-
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_TCONSUME_FAILED], sum.async_tconsume_failed);
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_HINCRBY_FAILED], sum.async_hincrby_failed);
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_HMGET_FAILED], sum.async_hmget_failed);
-
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[RX_PKTS_IDX], all_traffic_data->rx_pkts);
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[RX_BYTES_IDX], all_traffic_data->rx_bytes);
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[TX_PKTS_IDX], all_traffic_data->tx_pkts);
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[TX_BYTES_IDX], all_traffic_data->tx_bytes);
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[DROP_PKTS_IDX], all_traffic_data->drop_pkts);
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[DROP_BYTES_IDX], all_traffic_data->drop_bytes);
-
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_RX_PKTS_IDX], hit_policy_traffic_data->rx_pkts);
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_RX_BYTES_IDX], hit_policy_traffic_data->rx_bytes);
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_TX_PKTS_IDX], hit_policy_traffic_data->tx_pkts);
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_TX_BYTES_IDX], hit_policy_traffic_data->tx_bytes);
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_TX_SYN_ACK_PKTS_IDX], hit_policy_traffic_data->tx_syn_ack_pkts);
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_DROP_PKTS_IDX], hit_policy_traffic_data->drop_pkts);
- fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_DROP_BYTES_IDX], hit_policy_traffic_data->drop_bytes);
-
- fieldstat_passive_output(global_stat->instance);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[CURR_SESSION_NUM_IDX], NULL, 0, stat_data->curr_session_num);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[QUEUEING_PKTS_IDX], NULL, 0, stat_data->queueing_pkts);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[QUEUEING_BYTES_IDX], NULL, 0, stat_data->queueing_bytes);
+
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[CTRL_ERR_IDX], NULL, 0, stat_data->ctrl_error);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[CTRL_OPENING_IDX], NULL, 0, stat_data->ctrl_opening);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[CTRL_ACTIVE_IDX], NULL, 0, stat_data->ctrl_active);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[CTRL_CLOSE_IDX], NULL, 0, stat_data->ctrl_close);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[CTRL_ACTIVE_CLOSE_IDX], NULL, 0, stat_data->ctrl_active_close);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[CTRL_RESETALL_IDX], NULL, 0, stat_data->ctrl_resetall);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[SESSION_LOG_SEND_IDX], NULL, 0, stat_data->session_log_send);
+
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[ASYNC_INVOKE_IDX], NULL, 0, stat_data->async_invoke);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[ASYNC_CALLBACK_IDX], NULL, 0, stat_data->async_callback);
+
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[ASYNC_TCONSUME_INVOKE_IDX], NULL, 0, stat_data->async_tconsume_invoke);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[ASYNC_TCONSUME_CALLBACK_IDX], NULL, 0, stat_data->async_tconsume_callback);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[ASYNC_HINCRBY_INVOKE_IDX], NULL, 0, stat_data->async_hincrby_invoke);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[ASYNC_HINCRBY_CALLBACK_IDX], NULL, 0, stat_data->async_hincrby_callback);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[ASYNC_HMGET_INVOKE_IDX], NULL, 0, stat_data->async_hmget_invoke);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[ASYNC_HMGET_CALLBACK_IDX], NULL, 0, stat_data->async_hmget_callback);
+
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[ASYNC_TCONSUME_FAILED], NULL, 0, stat_data->async_tconsume_failed);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[ASYNC_HINCRBY_FAILED], NULL, 0, stat_data->async_hincrby_failed);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[ASYNC_HMGET_FAILED], NULL, 0, stat_data->async_hmget_failed);
+
+ struct shaping_global_stat_traffic_data *all_traffic_data = &stat_data->all_traffic;
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[RX_PKTS_IDX], NULL, 0, all_traffic_data->rx_pkts);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[RX_BYTES_IDX], NULL, 0, all_traffic_data->rx_bytes);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[TX_PKTS_IDX], NULL, 0, all_traffic_data->tx_pkts);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[TX_BYTES_IDX], NULL, 0, all_traffic_data->tx_bytes);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[DROP_PKTS_IDX], NULL, 0, all_traffic_data->drop_pkts);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[DROP_BYTES_IDX], NULL, 0, all_traffic_data->drop_bytes);
+
+ struct shaping_global_stat_traffic_data *hit_policy_traffic_data = &stat_data->hit_policy_traffic;
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[HIT_POLICY_RX_PKTS_IDX], NULL, 0, hit_policy_traffic_data->rx_pkts);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[HIT_POLICY_RX_BYTES_IDX], NULL, 0, hit_policy_traffic_data->rx_bytes);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[HIT_POLICY_TX_PKTS_IDX], NULL, 0, hit_policy_traffic_data->tx_pkts);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[HIT_POLICY_TX_BYTES_IDX], NULL, 0, hit_policy_traffic_data->tx_bytes);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[HIT_POLICY_TX_SYN_ACK_PKTS_IDX], NULL, 0, hit_policy_traffic_data->tx_syn_ack_pkts);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[HIT_POLICY_DROP_PKTS_IDX], NULL, 0, hit_policy_traffic_data->drop_pkts);
+ fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[HIT_POLICY_DROP_BYTES_IDX], NULL, 0, hit_policy_traffic_data->drop_bytes);
+
+ memset(&ctx->thread_global_stat, 0, sizeof(struct shaping_global_stat_data));
} \ No newline at end of file
diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp
index e55463e..e37f5e0 100644
--- a/shaping/src/shaper_stat.cpp
+++ b/shaping/src/shaper_stat.cpp
@@ -4,7 +4,7 @@
#include <arpa/inet.h>
#include <MESA/MESA_prof_load.h>
#include <MESA/swarmkv.h>
-#include <fieldstat.h>
+#include <fieldstat/fieldstat_easy.h>
#include "log.h"
#include "utils.h"
@@ -12,117 +12,177 @@
#include "shaper_stat.h"
#include "shaper_global_stat.h"
-#define SHAPER_STAT_ROW_NAME "traffic_shaping_rule_hits"
-
#define SHAPER_STAT_REFRESH_TIME_US 10000 //10 ms
#define HINCRBY_RETRY_MAX 5
struct shaper_stat_conf {
- int enable_backgroud_thread;
- int output_interval_ms;
- char telegraf_ip[16];
- short telegraf_port;
+ char device_group[32];
+ char device_id[32];
+ char data_center[32];
+ char kafka_topic[64];
+ char kafka_username[64];
+ char kafka_password[64];
+ char kafka_brokers[256];
};
thread_local struct fieldstat_tag tags[TAG_IDX_MAX] =
{
- [TAG_VSYS_ID_IDX] = {.key = "vsys_id", .value_type = 0},
- [TAG_RULE_ID_IDX] = {.key = "rule_id", .value_type = 0},
- [TAG_PROFILE_ID_IDX] = {.key = "profile_id", .value_type = 0},
- [TAG_PRIORITY_IDX] = {.key = "priority", .value_type = 0},
- [TAG_PROFILE_TYPE_IDX] = {.key = "profile_type", .value_type = 2}
+ [TAG_VSYS_ID_IDX] = {.key = "vsys_id", .type = TAG_INTEGER},
+ [TAG_RULE_ID_IDX] = {.key = "rule_id", .type = TAG_INTEGER},
+ [TAG_PROFILE_ID_IDX] = {.key = "profile_id", .type = TAG_INTEGER},
+ [TAG_PRIORITY_IDX] = {.key = "priority", .type = TAG_INTEGER},
+ [TAG_PROFILE_TYPE_IDX] = {.key = "profile_type", .type = TAG_CSTRING}
};
+char *output_json_buf = NULL;
+
void shaper_stat_destroy(struct shaping_stat *stat)
{
- if (!stat) {
+ if (stat) {
+ if (stat->instance) {
+ fieldstat_easy_free(stat->instance);
+ }
+ free(stat);
+ }
+
+ return;
+}
+
+static void shaper_stat_kafka_init(struct shaping_stat *stat, struct shaper_stat_conf *conf)
+{
+ char kafka_errstr[1024]={0};
+
+ if (strlen(conf->kafka_topic) == 0 || strlen(conf->kafka_brokers) == 0) {
+ LOG_ERROR("%s: kafka topic or brokers is empty", LOG_TAG_STAT);
+ return;
+ }
+
+ if (strlen(conf->kafka_username) == 0 || strlen(conf->kafka_password) == 0) {
+ LOG_ERROR("%s: kafka username or password is empty", LOG_TAG_STAT);
return;
}
- if (stat->instance) {
- fieldstat_dynamic_instance_free(stat->instance);
+ rd_kafka_conf_t *rdkafka_conf = rd_kafka_conf_new();
+
+ rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000", kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "socket.keepalive.enable", "true", kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "bootstrap.servers", conf->kafka_brokers, kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "client.id", conf->kafka_topic, kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "sasl.username", conf->kafka_username, kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "sasl.password", conf->kafka_brokers, kafka_errstr, sizeof(kafka_errstr));
+
+ stat->kafka_handle = rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr));
+ if (stat->kafka_handle == NULL) {
+ LOG_ERROR("%s: kafka producer create failed, err %s", LOG_TAG_STAT, kafka_errstr);
+ return;
}
- free(stat);
+ stat->topic_rkt = rd_kafka_topic_new(stat->kafka_handle, conf->kafka_topic, NULL);
return;
}
-static int shaper_stat_conf_load(struct shaper_stat_conf *conf)
+static int shaper_stat_conf_load(struct shaping_stat *stat, struct shaper_stat_conf *conf)
{
- memset(conf, 0, sizeof(struct shaper_stat_conf));
+ MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "FIELDSTAT_OUTPUT_INTERVAL_S", &stat->output_interval_s, 1);
+ MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "DEVICE_GROUP", conf->device_group, sizeof(conf->device_group), "");
+ MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "DEVICE_ID", conf->device_id, sizeof(conf->device_id), "");
+ MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "DATA_CENTER", conf->data_center, sizeof(conf->data_center), "");
- MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "LINE_PROTOCOL_SERVER_IP", conf->telegraf_ip, sizeof(conf->telegraf_ip), "127.0.0.1");
- MESA_load_profile_short_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "LINE_PROTOCOL_SERVER_PORT", &conf->telegraf_port, 8200);
- MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "FIELDSTAT_OUTPUT_INTERVAL_MS", &conf->output_interval_ms, 500);
- MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "FIELDSTAT_ENABLE_BACKGRUND_THREAD", &conf->enable_backgroud_thread, 1);
+ MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "KAFKA_TOPIC", conf->kafka_topic, sizeof(conf->kafka_topic), "");
+ MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "KAFKA_USERNAME", conf->kafka_username, sizeof(conf->kafka_username), "");
+ MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "KAFKA_PASSWORD", conf->kafka_password, sizeof(conf->kafka_password), "");
+ MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "KAFKA_BROKERS", conf->kafka_brokers, sizeof(conf->kafka_brokers), "");
return 0;
}
struct shaping_stat* shaper_stat_init(int thread_num)
{
- struct shaping_stat *stat = NULL;
- int column_num;
+ struct fieldstat_tag global_tags[5];
struct shaper_stat_conf conf;
- const char *column_name[] = {"in_max_latency_us", "in_queue_len", "out_max_latency_us", "out_queue_len", //first line is gauge, second line is counter
- "in_pkts", "in_bytes", "in_drop_pkts", "out_pkts", "out_bytes", "out_drop_pkts"};
- enum field_type column_type[] = {FIELD_TYPE_COUNTER, FIELD_TYPE_GAUGE, FIELD_TYPE_COUNTER, FIELD_TYPE_GAUGE,
- FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER};
-
- column_num = sizeof(column_name)/sizeof(column_name[0]);
- if (column_num != STAT_COLUNM_IDX_MAX) {
- LOG_ERROR("%s: shaping init fieldstat failed, column_num %d != index num %d", LOG_TAG_STAT, column_num, STAT_COLUNM_IDX_MAX);
- goto ERROR;
- }
+ struct shaping_stat *stat = (struct shaping_stat *)calloc(1, sizeof(struct shaping_stat));
- if (shaper_stat_conf_load(&conf) != 0) {
+ if (shaper_stat_conf_load(stat, &conf) != 0) {
LOG_ERROR("%s: shaping init metric conf failed", LOG_TAG_STAT);
goto ERROR;
}
- stat = (struct shaping_stat *)calloc(1, sizeof(struct shaping_stat));
+ shaper_stat_kafka_init(stat, &conf);
- stat->instance = fieldstat_dynamic_instance_new("shaping_engine", thread_num);
+ global_tags[0].key = "app_name";
+ global_tags[0].type = TAG_CSTRING;
+ global_tags[0].value_str = "shaping_engine";
+
+ global_tags[1].key = "device_group";
+ global_tags[1].type = TAG_CSTRING;
+ global_tags[1].value_str = conf.device_group;
+
+ global_tags[2].key = "device_id";
+ global_tags[2].type = TAG_CSTRING;
+ global_tags[2].value_str = conf.device_id;
+
+ global_tags[3].key = "data_center";
+ global_tags[3].type = TAG_CSTRING;
+ global_tags[3].value_str = conf.data_center;
+
+ global_tags[4].key = "table_name";
+ global_tags[4].type = TAG_CSTRING;
+ global_tags[4].value_str = "shaping_metric";
+
+ stat->instance = fieldstat_easy_new(thread_num, "shaping_stat", global_tags, 5);
if (stat->instance == NULL) {
LOG_ERROR("%s: shaping init fieldstat instance failed", LOG_TAG_STAT);
goto ERROR;
}
- fieldstat_dynamic_set_output_interval(stat->instance, conf.output_interval_ms);
- fieldstat_dynamic_set_line_protocol_server(stat->instance, conf.telegraf_ip, conf.telegraf_port);
- if (conf.enable_backgroud_thread == 0) {
- fieldstat_dynamic_disable_background_thread(stat->instance);
+ stat->latency_histogram_id = fieldstat_easy_register_histogram(stat->instance, "latency_distribution_us", 1, 1000000, 5);
+ if (stat->latency_histogram_id < 0) {
+ LOG_ERROR("%s: shaping fieldstat register histogram failed", LOG_TAG_STAT);
+ goto ERROR;
}
- stat->table_id = fieldstat_register_dynamic_table(stat->instance, "shaping_metric", column_name, column_type, column_num, stat->column_ids);
- if (stat->table_id < 0) {
- LOG_ERROR("%s: shaping fieldstat register table failed", LOG_TAG_STAT);
- goto ERROR;
+ stat->column_ids[IN_QUEUE_LEN_IDX] = fieldstat_easy_register_counter(stat->instance, "in_queue_len");
+ stat->column_ids[OUT_QUEUE_LEN_IDX] = fieldstat_easy_register_counter(stat->instance, "out_queue_len");
+ stat->column_ids[IN_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "in_pkts");
+ stat->column_ids[IN_BYTES_IDX] = fieldstat_easy_register_counter(stat->instance, "in_bytes");
+ stat->column_ids[IN_DROP_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "in_drop_pkts");
+ stat->column_ids[OUT_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "out_pkts");
+ stat->column_ids[OUT_BYTES_IDX] = fieldstat_easy_register_counter(stat->instance, "out_bytes");
+ stat->column_ids[OUT_DROP_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "out_drop_pkts");
+
+ for (int i = IN_QUEUE_LEN_IDX; i < STAT_COLUNM_IDX_MAX; i++) {
+ if (stat->column_ids[i] < 0) {
+ LOG_ERROR("%s: shaping fieldstat register column %d failed", LOG_TAG_STAT, i);
+ goto ERROR;
+ }
}
- fieldstat_dynamic_instance_start(stat->instance);
+ fieldstat_easy_enable_auto_output(stat->instance, "./metric/shaping_stat.json", 1);//TODO: output interval
return stat;
-
ERROR:
if (stat) {
if (stat->instance) {
- fieldstat_dynamic_instance_free(stat->instance);
+ fieldstat_easy_free(stat->instance);
}
free(stat);
}
+
return NULL;
}
static void shaper_stat_tags_build(int vsys_id, int rule_id, int profile_id, int priority, int profile_type)
{
- tags[TAG_VSYS_ID_IDX].value_int = vsys_id;
+ tags[TAG_VSYS_ID_IDX].value_longlong = vsys_id;
- tags[TAG_RULE_ID_IDX].value_int = rule_id;
+ tags[TAG_RULE_ID_IDX].value_longlong = rule_id;
- tags[TAG_PROFILE_ID_IDX].value_int = profile_id;
+ tags[TAG_PROFILE_ID_IDX].value_longlong = profile_id;
- tags[TAG_PRIORITY_IDX].value_int = priority;
+ tags[TAG_PRIORITY_IDX].value_longlong = priority;
if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {
tags[TAG_PROFILE_TYPE_IDX].value_str = "primary";
@@ -143,7 +203,7 @@ static void shaper_stat_swarmkv_hincrby_cb(const struct swarmkv_reply *reply, vo
clock_gettime(CLOCK_MONOTONIC, &curr_time);
curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
- shaper_global_stat_swarmkv_latency_update(global_stat, curr_time_us - arg->start_time_us);
+ shaper_global_stat_swarmkv_latency_update(global_stat, curr_time_us - arg->start_time_us, ctx->thread_index);
shaper_global_stat_async_callback_inc(&ctx->thread_global_stat);
shaper_global_stat_hincrby_callback_inc(&ctx->thread_global_stat);
@@ -240,7 +300,6 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, s
struct shaping_stat *stat = ctx->stat;
int priority = profile->priority;
int thread_id = ctx->thread_index;
- unsigned long long old_latency;
if (need_update_guage) {
profile->hash_node->local_queue_len[priority][SHAPING_DIR_IN] += profile_stat->priority_queue_len[SHAPING_DIR_IN];
@@ -255,28 +314,21 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, s
}
shaper_stat_tags_build(rule->vsys_id, rule->id, profile->id, priority, profile_type);
- fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_DROP_PKTS_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.drop_pkts, tags, TAG_IDX_MAX, thread_id);
- fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_PKTS_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.pkts, tags, TAG_IDX_MAX, thread_id);
- fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_BYTES_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.bytes, tags, TAG_IDX_MAX, thread_id);
- fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_DROP_PKTS_IDX], SHAPER_STAT_ROW_NAME, profile_stat->out.drop_pkts, tags, TAG_IDX_MAX, thread_id);
- fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_PKTS_IDX], SHAPER_STAT_ROW_NAME, profile_stat->out.pkts, tags, TAG_IDX_MAX, thread_id);
- fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_BYTES_IDX], SHAPER_STAT_ROW_NAME, profile_stat->out.bytes, tags, TAG_IDX_MAX, thread_id);
+ fieldstat_easy_counter_incrby(stat->instance, thread_id, stat->column_ids[IN_DROP_PKTS_IDX], tags, TAG_IDX_MAX, profile_stat->in.drop_pkts);
+ fieldstat_easy_counter_incrby(stat->instance, thread_id, stat->column_ids[IN_PKTS_IDX], tags, TAG_IDX_MAX, profile_stat->in.pkts);
+ fieldstat_easy_counter_incrby(stat->instance, thread_id, stat->column_ids[IN_BYTES_IDX], tags, TAG_IDX_MAX, profile_stat->in.bytes);
- old_latency = fieldstat_dynamic_table_metric_value_get(stat->instance, stat->table_id, stat->column_ids[IN_MAX_LATENCY_IDX], SHAPER_STAT_ROW_NAME, tags, TAG_IDX_MAX, thread_id);
- if (profile_stat->in.max_latency > old_latency) {
- fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_MAX_LATENCY_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.max_latency - old_latency, tags, TAG_IDX_MAX, thread_id);
- }
+ fieldstat_easy_counter_incrby(stat->instance, thread_id, stat->column_ids[OUT_DROP_PKTS_IDX], tags, TAG_IDX_MAX, profile_stat->out.drop_pkts);
+ fieldstat_easy_counter_incrby(stat->instance, thread_id, stat->column_ids[OUT_PKTS_IDX], tags, TAG_IDX_MAX, profile_stat->out.pkts);
+ fieldstat_easy_counter_incrby(stat->instance, thread_id, stat->column_ids[OUT_BYTES_IDX], tags, TAG_IDX_MAX, profile_stat->out.bytes);
- old_latency = fieldstat_dynamic_table_metric_value_get(stat->instance, stat->table_id, stat->column_ids[OUT_MAX_LATENCY_IDX], SHAPER_STAT_ROW_NAME, tags, TAG_IDX_MAX, thread_id);
- if (profile_stat->out.max_latency > old_latency) {
- fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_MAX_LATENCY_IDX], SHAPER_STAT_ROW_NAME, profile_stat->out.max_latency - old_latency, tags, TAG_IDX_MAX, thread_id);
- }
+ fieldstat_easy_histogram_record(stat->instance, thread_id, stat->latency_histogram_id, tags, TAG_IDX_MAX, profile_stat->out.max_latency);
if (need_update_guage) {
if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {
- fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_QUEUE_LEN_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.queue_len, tags, TAG_IDX_MAX, thread_id);
- fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_QUEUE_LEN_IDX], SHAPER_STAT_ROW_NAME, profile_stat->out.queue_len, tags, TAG_IDX_MAX, thread_id);
+ fieldstat_easy_counter_incrby(stat->instance, thread_id, stat->column_ids[IN_QUEUE_LEN_IDX], tags, TAG_IDX_MAX, profile_stat->in.queue_len);
+ fieldstat_easy_counter_incrby(stat->instance, thread_id, stat->column_ids[OUT_QUEUE_LEN_IDX], tags, TAG_IDX_MAX, profile_stat->out.queue_len);
}
memset(profile_stat, 0, sizeof(struct shaping_stat_for_profile));
@@ -431,4 +483,27 @@ void shaper_stat_max_latency_update(struct shaping_stat_for_profile *profile_sta
}
return;
+}
+
+void shaper_stat_output(struct shaping_stat *stat)
+{
+ size_t len = 0;
+ fieldstat_easy_output(stat->instance, &output_json_buf, &len);
+
+ int status=rd_kafka_produce(stat->topic_rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, output_json_buf, len, NULL, 0, NULL);
+ if (status < 0) {
+ LOG_ERROR("%s:shaper_stat_output, rd_kafka_produce is error of code: %d %s(%s), status: %d",
+ LOG_TAG_STAT,
+ rd_kafka_last_error(),
+ rd_kafka_err2name(rd_kafka_last_error()),
+ rd_kafka_err2str(rd_kafka_last_error()),
+ status);
+ }
+
+ if (output_json_buf) {
+ free(output_json_buf);
+ output_json_buf = NULL;
+ }
+
+ return;
} \ No newline at end of file
diff --git a/shaping/test/CMakeLists.txt b/shaping/test/CMakeLists.txt
index 643c222..1c2e046 100644
--- a/shaping/test/CMakeLists.txt
+++ b/shaping/test/CMakeLists.txt
@@ -3,7 +3,7 @@
# gtest_shaper_maat
###############################################################################
-add_executable(gtest_shaper_maat gtest_shaper_maat.cpp stub.cpp dummy_swarmkv.cpp)
+add_executable(gtest_shaper_maat gtest_shaper_maat.cpp stub.cpp dummy_swarmkv.cpp dummy_rdkafka.cpp)
target_include_directories(gtest_shaper_maat PUBLIC ${CMAKE_SOURCE_DIR}/common/include)
target_include_directories(gtest_shaper_maat PUBLIC ${CMAKE_SOURCE_DIR}/shaping/include)
target_link_libraries(gtest_shaper_maat common shaper pthread gtest)
@@ -12,7 +12,7 @@ target_link_libraries(gtest_shaper_maat common shaper pthread gtest)
# gtest_shaper_maat
###############################################################################
-add_executable(gtest_shaper_send_log gtest_shaper_send_log.cpp stub.cpp dummy_swarmkv.cpp)
+add_executable(gtest_shaper_send_log gtest_shaper_send_log.cpp stub.cpp dummy_swarmkv.cpp dummy_rdkafka.cpp)
target_include_directories(gtest_shaper_send_log PUBLIC ${CMAKE_SOURCE_DIR}/common/include)
target_include_directories(gtest_shaper_send_log PUBLIC ${CMAKE_SOURCE_DIR}/shaping/include)
target_link_libraries(gtest_shaper_send_log common shaper pthread gtest)
@@ -21,7 +21,7 @@ target_link_libraries(gtest_shaper_send_log common shaper pthread gtest)
# gtest_shaper
###############################################################################
-add_executable(gtest_shaper gtest_shaper.cpp stub.cpp dummy_swarmkv.cpp dummy_time.cpp)
+add_executable(gtest_shaper gtest_shaper.cpp stub.cpp dummy_swarmkv.cpp dummy_time.cpp dummy_rdkafka.cpp)
target_include_directories(gtest_shaper PUBLIC ${CMAKE_SOURCE_DIR}/common/include)
target_include_directories(gtest_shaper PUBLIC ${CMAKE_SOURCE_DIR}/shaping/include)
target_link_libraries(gtest_shaper common shaper pthread gtest)
diff --git a/shaping/test/dummy_rdkafka.cpp b/shaping/test/dummy_rdkafka.cpp
new file mode 100644
index 0000000..5d255d8
--- /dev/null
+++ b/shaping/test/dummy_rdkafka.cpp
@@ -0,0 +1,41 @@
+#include <librdkafka/rdkafka.h>
+
+rd_kafka_conf_t *rd_kafka_conf_new(void)
+{
+ return NULL;
+}
+
+rd_kafka_conf_res_t rd_kafka_conf_set(rd_kafka_conf_t *conf, const char *name, const char *value, char *errstr, size_t errstr_size)
+{
+ return RD_KAFKA_CONF_OK;
+}
+
+rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, rd_kafka_conf_t *conf, char *errstr, size_t errstr_size)
+{
+ return NULL;
+}
+
+rd_kafka_topic_t *rd_kafka_topic_new(rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf)
+{
+ return NULL;
+}
+
+int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t keylen, void *msg_opaque)
+{
+ return 0;
+}
+
+rd_kafka_resp_err_t rd_kafka_last_error (void)
+{
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+const char *rd_kafka_err2name (rd_kafka_resp_err_t err)
+{
+ return NULL;
+}
+
+const char *rd_kafka_err2str (rd_kafka_resp_err_t err)
+{
+ return NULL;
+} \ No newline at end of file
diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp
index 49b9a06..cde8aef 100644
--- a/shaping/test/gtest_shaper.cpp
+++ b/shaping/test/gtest_shaper.cpp
@@ -1,7 +1,7 @@
-#include <fieldstat.h>
#include <gtest/gtest.h>
#include <cjson/cJSON.h>
#include <sys/queue.h>
+#include <fieldstat/fieldstat_easy.h>
#include "log.h"
#include "shaper.h"
@@ -12,15 +12,11 @@
#include "stub.h"
#define SHAPING_SESSION_QUEUE_LEN 128
-#define SHAPING_STAT_FILE_NAME "/tmp/shaping_metrics.json"
-#define SHAPING_GLOBAL_STAT_FILE_NAME "shaping_global_metric"
#define FIELDSTAT_AUTO_TIME_MAX 999999000
char profile_type_primary[] = "primary";
char profile_type_borrow[] = "borrow";
-char line[4096];
-
static struct stub_packet* packet_new(unsigned long long income_time, unsigned int length, unsigned char dir)
{
struct stub_packet *packet;
@@ -104,12 +100,13 @@ static int judge_packet_eq(struct stub_pkt_queue *expec_queue, struct stub_pkt_q
return 0;
}
-static void shaping_stat_judge(char *file_line, int rule_id, int profile_id, int priority,
+static void shaping_stat_judge(char *file_line, int json_array_idx, int rule_id, int profile_id, int priority,
unsigned long long tx_pkts, unsigned long long tx_bytes,
unsigned long long drop_pkts, long long queue_len, long long max_latency,
unsigned char direction, char profile_type[])
{
cJSON *json = NULL;
+ cJSON *json_array_element = NULL;
cJSON *fields_json = NULL;
cJSON *tags_json = NULL;
cJSON *tmp_obj = NULL;
@@ -118,36 +115,41 @@ static void shaping_stat_judge(char *file_line, int rule_id, int profile_id, int
json = cJSON_Parse(file_line);
ASSERT_TRUE(json != NULL);
- tmp_obj = cJSON_GetObjectItem(json, "name");
+ ASSERT_EQ(json->type, cJSON_Array);
+ ASSERT_GT(cJSON_GetArraySize(json), json_array_idx);
+
+ json_array_element = cJSON_GetArrayItem(json, json_array_idx);
+
+ tmp_obj = cJSON_GetObjectItem(json_array_element, "name");
ASSERT_TRUE(tmp_obj != NULL);
- EXPECT_STREQ("traffic_shaping_rule_hits", tmp_obj->valuestring);
+ EXPECT_STREQ("shaping_stat", tmp_obj->valuestring);
/******************parse tags***********************************/
- tags_json = cJSON_GetObjectItem(json, "tags");
+ tags_json = cJSON_GetObjectItem(json_array_element, "tags");
ASSERT_TRUE(tags_json != NULL);
tmp_obj = cJSON_GetObjectItem(tags_json, "vsys_id");
ASSERT_TRUE(tmp_obj != NULL);
- EXPECT_EQ(atoi(tmp_obj->valuestring), STUB_TEST_VSYS_ID);
+ EXPECT_EQ(tmp_obj->valueint, STUB_TEST_VSYS_ID);
tmp_obj = cJSON_GetObjectItem(tags_json, "rule_id");
ASSERT_TRUE(tmp_obj != NULL);
- EXPECT_EQ(rule_id, atoi(tmp_obj->valuestring));
+ EXPECT_EQ(rule_id, tmp_obj->valueint);
tmp_obj = cJSON_GetObjectItem(tags_json, "profile_id");
ASSERT_TRUE(tmp_obj != NULL);
- EXPECT_EQ(profile_id, atoi(tmp_obj->valuestring));
+ EXPECT_EQ(profile_id, tmp_obj->valueint);
tmp_obj = cJSON_GetObjectItem(tags_json, "priority");
ASSERT_TRUE(tmp_obj != NULL);
- EXPECT_EQ(priority, atoi(tmp_obj->valuestring));
+ EXPECT_EQ(priority, tmp_obj->valueint);
tmp_obj = cJSON_GetObjectItem(tags_json, "profile_type");
ASSERT_TRUE(tmp_obj != NULL);
EXPECT_STREQ(tmp_obj->valuestring, profile_type);
/******************parse fields**********************************/
- fields_json = cJSON_GetObjectItem(json, "fields");
+ fields_json = cJSON_GetObjectItem(json_array_element, "fields");
ASSERT_TRUE(fields_json != NULL);
snprintf(attr_name, sizeof(attr_name), "%s_pkts", direction == SHAPING_DIR_OUT ? "out" : "in");
@@ -165,17 +167,19 @@ static void shaping_stat_judge(char *file_line, int rule_id, int profile_id, int
ASSERT_TRUE(tmp_obj != NULL);
EXPECT_EQ(drop_pkts, tmp_obj->valueint);
- if (max_latency != -1) {
+ //TODO: api to parse histogram
+ /*if (max_latency != -1) {
snprintf(attr_name, sizeof(attr_name), "%s_max_latency_us", direction == SHAPING_DIR_OUT ? "out" : "in");
tmp_obj = cJSON_GetObjectItem(fields_json, attr_name);
ASSERT_TRUE(tmp_obj != NULL);
EXPECT_EQ(max_latency, tmp_obj->valueint);
- }
+ }*/
snprintf(attr_name, sizeof(attr_name), "%s_queue_len", direction == SHAPING_DIR_OUT ? "out" : "in");
tmp_obj = cJSON_GetObjectItem(fields_json, attr_name);
- ASSERT_TRUE(tmp_obj != NULL);
- EXPECT_EQ(queue_len, tmp_obj->valueint);
+ if (tmp_obj != NULL) {
+ EXPECT_EQ(queue_len, tmp_obj->valueint);
+ }
cJSON_Delete(json);
@@ -184,14 +188,11 @@ static void shaping_stat_judge(char *file_line, int rule_id, int profile_id, int
static int shaping_global_stat_field_get(cJSON *metrics, const char *field_name)
{
- int metrics_size = cJSON_GetArraySize(metrics);
+ cJSON *tmp_obj = NULL;
- for (int i = 0; i < metrics_size; i++) {
- cJSON *tmp = cJSON_GetArrayItem(metrics, i);
- char *column_name = cJSON_GetObjectItem(tmp, "name")->valuestring;
- if (strcmp(column_name, field_name) == 0) {
- return cJSON_GetObjectItem(tmp, "diff")->valueint;
- }
+ tmp_obj = cJSON_GetObjectItem(metrics, field_name);
+ if (tmp_obj != NULL) {
+ return tmp_obj->valueint;
}
return -1;
@@ -200,9 +201,12 @@ static int shaping_global_stat_field_get(cJSON *metrics, const char *field_name)
static void shaping_global_stat_judge(char *file_line, int tx_pkts, int tx_bytes, int drop_pkts, int drop_bytes, int queueing_pkts, int queueing_bytes)
{
cJSON *metrics = NULL;
+ cJSON *json_array_element = NULL;
cJSON *json = cJSON_Parse(file_line);
- metrics = cJSON_GetObjectItem(json, "metrics");
+ json_array_element = cJSON_GetArrayItem(json, 0);
+
+ metrics = cJSON_GetObjectItem(json_array_element, "fields");
EXPECT_EQ(tx_pkts, shaping_global_stat_field_get(metrics, "all_tx_pkts"));
EXPECT_EQ(tx_bytes, shaping_global_stat_field_get(metrics, "all_tx_bytes"));
@@ -266,11 +270,17 @@ TEST(single_session, udp_tx_in_order)
}
shaping_flow_free(&ctx->thread_ctx[0], sf);
- fieldstat_global_disable_prometheus_endpoint();
/***********send stat data here********************/
- fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
- shaper_global_stat_refresh(ctx);
+ char *global_stat_str = NULL;
+ size_t global_stat_str_len = 0;
+ char *stat_str = NULL;
+ size_t stat_str_len = 0;
+
+ shaper_thread_global_stat_refresh(&ctx->thread_ctx[0]);
+
+ fieldstat_easy_output(ctx->global_stat->instance, &global_stat_str, &global_stat_str_len);
+ fieldstat_easy_output(ctx->thread_ctx[0].stat->instance, &stat_str, &stat_str_len);
shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
@@ -278,24 +288,14 @@ TEST(single_session, udp_tx_in_order)
stub_clear_matched_shaping_rules();
/*******test statistics***********/
- sleep(2);//wait telegraf to output
- FILE *stat_file;
-
//judge shaping metric
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
- memset(line, 0, sizeof(line));
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts
- fclose(stat_file);
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
- fclose(stat_file);
+ shaping_stat_judge(stat_str, 0, 0, 0, 1, 100, 10000, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts
//judge shaping global metric
- stat_file = fopen(SHAPING_GLOBAL_STAT_FILE_NAME, "r");
- memset(line, 0, sizeof(line));
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_global_stat_judge(line, 100, 10000, 0, 0, 0, 0);
- fclose(stat_file);
+ shaping_global_stat_judge(global_stat_str, 100, 10000, 0, 0, 0, 0);
+
+ free(global_stat_str);
+ free(stat_str);
}
/*session1 match rule1
@@ -355,7 +355,6 @@ TEST(bidirectional, udp_tx_in_order)
ASSERT_TRUE(TAILQ_EMPTY(&expec_tx_queue_in));
shaping_flow_free(&ctx->thread_ctx[0], sf);
- fieldstat_global_disable_prometheus_endpoint();
shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
@@ -410,11 +409,17 @@ TEST(max_min_host_fairness_profile, udp_tx_in_order)
}
shaping_flow_free(&ctx->thread_ctx[0], sf);
- fieldstat_global_disable_prometheus_endpoint();
/***********send stat data here********************/
- fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
- shaper_global_stat_refresh(ctx);
+ char *global_stat_str = NULL;
+ size_t global_stat_str_len = 0;
+ char *stat_str = NULL;
+ size_t stat_str_len = 0;
+
+ shaper_thread_global_stat_refresh(&ctx->thread_ctx[0]);
+
+ fieldstat_easy_output(ctx->global_stat->instance, &global_stat_str, &global_stat_str_len);
+ fieldstat_easy_output(ctx->thread_ctx[0].stat->instance, &stat_str, &stat_str_len);
shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
@@ -422,24 +427,14 @@ TEST(max_min_host_fairness_profile, udp_tx_in_order)
stub_clear_matched_shaping_rules();
/*******test statistics***********/
- sleep(2);//wait telegraf to output
- FILE *stat_file;
-
//judge shaping metric
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
- memset(line, 0, sizeof(line));
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts
- fclose(stat_file);
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
- fclose(stat_file);
+ shaping_stat_judge(stat_str, 0, 0, 0, 1, 100, 10000, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts
//judge shaping global metric
- stat_file = fopen(SHAPING_GLOBAL_STAT_FILE_NAME, "r");
- memset(line, 0, sizeof(line));
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_global_stat_judge(line, 100, 10000, 0, 0, 0, 0);
- fclose(stat_file);
+ shaping_global_stat_judge(global_stat_str, 100, 10000, 0, 0, 0, 0);
+
+ free(global_stat_str);
+ free(stat_str);
}
/*session1 match rule1
@@ -482,9 +477,13 @@ TEST(single_session, tcp_tx_in_order)
/***********send stat data here********************/
+ char *stat_str = NULL;
+ size_t stat_str_len = 0;
+
shaper_stat_refresh(&ctx->thread_ctx[0], sf, 1);
- fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
- sleep(2);//wait telegraf generate metric
+ fieldstat_easy_output(ctx->thread_ctx[0].stat->instance, &stat_str, &stat_str_len);
+ shaping_stat_judge(stat_str, 0, 0, 0, 1, 20, 2000, 0, 10, 0, SHAPING_DIR_OUT, profile_type_primary);//*test statistics
+ free(stat_str);
stub_refresh_token_bucket(0);
@@ -502,10 +501,9 @@ TEST(single_session, tcp_tx_in_order)
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
shaping_flow_free(&ctx->thread_ctx[0], sf);
- fieldstat_global_disable_prometheus_endpoint();
/***********send stat data here********************/
- fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+ fieldstat_easy_output(ctx->thread_ctx[0].stat->instance, &stat_str, &stat_str_len);
shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
@@ -513,20 +511,8 @@ TEST(single_session, tcp_tx_in_order)
/*******test statistics***********/
- sleep(2);//wait telegraf to output
- FILE *stat_file;
-
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
- memset(line, 0, sizeof(line));
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 10, 0, SHAPING_DIR_OUT, profile_type_primary);
-
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 0, 0, 1, 10, 1000, 0, 0, 31000, SHAPING_DIR_OUT, profile_type_primary);
-
- fclose(stat_file);
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
- fclose(stat_file);
+ shaping_stat_judge(stat_str, 0, 0, 0, 1, 30, 3000, 0, 0, 31000, SHAPING_DIR_OUT, profile_type_primary);
+ free(stat_str);
}
/*session1 match rule1
@@ -581,10 +567,12 @@ TEST(single_session, udp_diff_direction)
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
shaping_flow_free(&ctx->thread_ctx[0], sf);
- fieldstat_global_disable_prometheus_endpoint();
/***********send stat data here********************/
- fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+ char *stat_str = NULL;
+ size_t stat_str_len = 0;
+
+ fieldstat_easy_output(ctx->thread_ctx[0].stat->instance, &stat_str, &stat_str_len);
shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
@@ -592,18 +580,9 @@ TEST(single_session, udp_diff_direction)
/*******test statistics***********/
- sleep(2);//wait telegraf to output
- FILE *stat_file;
-
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
- memset(line, 0, sizeof(line));
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 21000, SHAPING_DIR_OUT, profile_type_primary);
-
- shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 0, SHAPING_DIR_IN, profile_type_primary);
- fclose(stat_file);
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
- fclose(stat_file);
+ shaping_stat_judge(stat_str, 0, 0, 0, 1, 20, 2000, 0, 0, 21000, SHAPING_DIR_OUT, profile_type_primary);
+ shaping_stat_judge(stat_str, 0, 0, 0, 1, 20, 2000, 0, 0, 0, SHAPING_DIR_IN, profile_type_primary);
+ free(stat_str);
}
/*session1 match rule1, rule2, rule3
@@ -660,33 +639,28 @@ TEST(single_session, udp_multi_rules)
}
shaping_flow_free(&ctx->thread_ctx[0], sf);
- fieldstat_global_disable_prometheus_endpoint();
/***********send stat data here********************/
- fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+ char *stat_str = NULL;
+ size_t stat_str_len = 0;
+
+ fieldstat_easy_output(ctx->thread_ctx[0].stat->instance, &stat_str, &stat_str_len);
shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
/*******test statistics***********/
- sleep(2);//wait telegraf to output
- FILE *stat_file;
-
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
- memset(line, 0, sizeof(line));
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 0
- shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 507000, SHAPING_DIR_OUT, profile_type_primary);
+ //profile_id 0
+ shaping_stat_judge(stat_str, 0, 0, 0, 1, 100, 10000, 0, 0, 507000, SHAPING_DIR_OUT, profile_type_primary);
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1
- shaping_stat_judge(line, 1, 1, 1, 100, 10000, 0, 0, 1000, SHAPING_DIR_OUT, profile_type_primary);
+ //profile_id 1
+ shaping_stat_judge(stat_str, 1, 1, 1, 1, 100, 10000, 0, 0, 1000, SHAPING_DIR_OUT, profile_type_primary);
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2
- shaping_stat_judge(line, 2, 2, 1, 100, 10000, 0, 0, 91000, SHAPING_DIR_OUT, profile_type_primary);//max latency is first queued pkt
+ //profile_id 2
+ shaping_stat_judge(stat_str, 2, 2, 2, 1, 100, 10000, 0, 0, 91000, SHAPING_DIR_OUT, profile_type_primary);//max latency is first queued pkt
- fclose(stat_file);
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
- fclose(stat_file);
+ free(stat_str);
}
/*session1 match rule1
@@ -735,30 +709,25 @@ TEST(single_session, udp_borrow)
}
shaping_flow_free(&ctx->thread_ctx[0], sf);
- fieldstat_global_disable_prometheus_endpoint();
/***********send stat data here********************/
- fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+ char *stat_str = NULL;
+ size_t stat_str_len = 0;
+
+ fieldstat_easy_output(ctx->thread_ctx[0].stat->instance, &stat_str, &stat_str_len);
shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
/*******test statistics***********/
- sleep(2);//wait telegraf to output
- FILE *stat_file;
-
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
- memset(line, 0, sizeof(line));
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1, primary
- shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary);
+ //profile_id 1, primary
+ shaping_stat_judge(stat_str, 0, 1, 1, 1, 0, 0, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary);
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, borrow
- shaping_stat_judge(line, 1, 2, 2, 100, 10000, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow);
+ //profile_id 2, borrow
+ shaping_stat_judge(stat_str, 1, 1, 2, 2, 100, 10000, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow);
- fclose(stat_file);
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
- fclose(stat_file);
+ free(stat_str);
}
/*session1 match rule1
@@ -810,35 +779,28 @@ TEST(single_session, udp_borrow_same_priority_9)
}
shaping_flow_free(&ctx->thread_ctx[0], sf);
- fieldstat_global_disable_prometheus_endpoint();
/***********send stat data here********************/
- fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+ char *stat_str = NULL;
+ size_t stat_str_len = 0;
+
+ fieldstat_easy_output(ctx->thread_ctx[0].stat->instance, &stat_str, &stat_str_len);
shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
/*******test statistics***********/
- sleep(2);//wait telegraf to output
- FILE *stat_file;
+ //profile_id 1, primary
+ shaping_stat_judge(stat_str, 0, 1, 1, 9, 0, 0, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary);
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
- memset(line, 0, sizeof(line));
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1, primary
- shaping_stat_judge(line, 1, 1, 9, 0, 0, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary);
+ //profile_id 2, borrow
+ shaping_stat_judge(stat_str, 1, 1, 2, 9, 0, 0, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow);
-#if 0 //fieldstat don't output a row when all values is zero
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, borrow
- shaping_stat_judge(line, 1, 2, 9, 0, 0, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow);
-#endif
+ //profile_id 3, borrow
+ shaping_stat_judge(stat_str, 2, 1, 3, 9, 100, 10000, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow);
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 3, borrow
- shaping_stat_judge(line, 1, 3, 9, 100, 10000, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow);
-
- fclose(stat_file);
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
- fclose(stat_file);
+ free(stat_str);
}
/*session1 match rule1
@@ -885,7 +847,6 @@ TEST(single_session_async, udp_close_before_async_exec)
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
- fieldstat_global_disable_prometheus_endpoint();
shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
@@ -980,33 +941,28 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order)
shaping_flow_free(&ctx->thread_ctx[0], sf1);
shaping_flow_free(&ctx->thread_ctx[0], sf2);
- fieldstat_global_disable_prometheus_endpoint();
/***********send stat data here********************/
- fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+ char *stat_str = NULL;
+ size_t stat_str_len = 0;
+
+ fieldstat_easy_output(ctx->thread_ctx[0].stat->instance, &stat_str, &stat_str_len);
shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
/*******test statistics***********/
- sleep(2);//wait telegraf to output
- FILE *stat_file;
-
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
- memset(line, 0, sizeof(line));
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1, primary
- shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 1471000, SHAPING_DIR_OUT, profile_type_primary);
+ //profile_id 1, primary
+ shaping_stat_judge(stat_str, 0, 1, 1, 1, 0, 0, 0, 0, 1471000, SHAPING_DIR_OUT, profile_type_primary);
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, borrow
- shaping_stat_judge(line, 1, 2, 2, 100, 10000, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow);
+ //profile_id 2, borrow
+ shaping_stat_judge(stat_str, 1, 1, 2, 2, 100, 10000, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow);
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, primary
- shaping_stat_judge(line, 2, 2, 1, 100, 10000, 0, 0, 191000, SHAPING_DIR_OUT, profile_type_primary);
+ //profile_id 2, primary
+ shaping_stat_judge(stat_str, 2, 2, 2, 1, 100, 10000, 0, 0, 191000, SHAPING_DIR_OUT, profile_type_primary);
- fclose(stat_file);
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
- fclose(stat_file);
+ free(stat_str);
}
/*session1 match rule1; session2 match rule2
@@ -1089,7 +1045,6 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order)
shaping_flow_free(&ctx->thread_ctx[0], sf1);
shaping_flow_free(&ctx->thread_ctx[1], sf2);
- fieldstat_global_disable_prometheus_endpoint();
shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
@@ -1191,7 +1146,6 @@ TEST(two_session_diff_priority_same_profile, profile_timer_test)
shaping_flow_free(&ctx->thread_ctx[0], sf1);
shaping_flow_free(&ctx->thread_ctx[1], sf2);
- fieldstat_global_disable_prometheus_endpoint();
shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
@@ -1279,7 +1233,6 @@ TEST(two_session_diff_priority_same_profile, one_direction_dont_block_another)
shaping_flow_free(&ctx->thread_ctx[0], sf1);
shaping_flow_free(&ctx->thread_ctx[1], sf2);
- fieldstat_global_disable_prometheus_endpoint();
shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
@@ -1355,7 +1308,6 @@ TEST(two_sessions, priority_non_block)
shaping_flow_free(&ctx->thread_ctx[0], sf1);
shaping_flow_free(&ctx->thread_ctx[1], sf2);
- fieldstat_global_disable_prometheus_endpoint();
shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
@@ -1430,7 +1382,6 @@ TEST(two_sessions, borrow_when_primary_profile_priority_blocked)
shaping_flow_free(&ctx->thread_ctx[0], sf1);
shaping_flow_free(&ctx->thread_ctx[1], sf2);
- fieldstat_global_disable_prometheus_endpoint();
shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
@@ -1511,7 +1462,6 @@ TEST(two_sessions, primary_profile_priority_blocked_by_borrow_profile)
shaping_flow_free(&ctx->thread_ctx[0], sf1);
shaping_flow_free(&ctx->thread_ctx[1], sf2);
- fieldstat_global_disable_prometheus_endpoint();
shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
@@ -1560,35 +1510,31 @@ TEST(statistics, udp_drop_pkt)
}
shaping_flow_free(&ctx->thread_ctx[0], sf);
- fieldstat_global_disable_prometheus_endpoint();
/***********send stat data here********************/
- fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
- shaper_global_stat_refresh(ctx);
+ char *global_stat_str = NULL;
+ size_t global_stat_str_len = 0;
+ char *stat_str = NULL;
+ size_t stat_str_len = 0;
+
+ shaper_thread_global_stat_refresh(&ctx->thread_ctx[0]);
+
+ fieldstat_easy_output(ctx->global_stat->instance, &global_stat_str, &global_stat_str_len);
+ fieldstat_easy_output(ctx->thread_ctx[0].stat->instance, &stat_str, &stat_str_len);
shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
/*******test statistics***********/
- sleep(2);//wait telegraf to output
- FILE *stat_file;
-
//judge shaping metric
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
- memset(line, 0, sizeof(line));
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 0, 0, 1, SHAPING_SESSION_QUEUE_LEN+10, (SHAPING_SESSION_QUEUE_LEN+10)*100, 100, 0, 228000, SHAPING_DIR_OUT, profile_type_primary);//every queued pkt's latency is max
- fclose(stat_file);
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
- fclose(stat_file);
+ shaping_stat_judge(stat_str, 0, 0, 0, 1, SHAPING_SESSION_QUEUE_LEN+10, (SHAPING_SESSION_QUEUE_LEN+10)*100, 100, 0, 228000, SHAPING_DIR_OUT, profile_type_primary);//every queued pkt's latency is max
//judge shaping global metric
- stat_file = fopen(SHAPING_GLOBAL_STAT_FILE_NAME, "r");
- memset(line, 0, sizeof(line));
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_global_stat_judge(line, SHAPING_SESSION_QUEUE_LEN+10, (SHAPING_SESSION_QUEUE_LEN+10)*100, 100, 10000, 0, 0);
- fclose(stat_file);
+ shaping_global_stat_judge(global_stat_str, SHAPING_SESSION_QUEUE_LEN+10, (SHAPING_SESSION_QUEUE_LEN+10)*100, 100, 10000, 0, 0);
+
+ free(global_stat_str);
+ free(stat_str);
}
/*session1 match rule1
@@ -1604,7 +1550,6 @@ TEST(statistics, udp_queueing_pkt)
int priority[] = {1};
int profile_num[] = {1};
int profile_id[][MAX_REF_PROFILE] = {{0}};
- FILE *stat_file;
TAILQ_INIT(&expec_tx_queue);
stub_init();
@@ -1624,17 +1569,23 @@ TEST(statistics, udp_queueing_pkt)
/***********send stat data here********************/
+ char *global_stat_str = NULL;
+ size_t global_stat_str_len = 0;
+ char *stat_str = NULL;
+ size_t stat_str_len = 0;
+
+ shaper_thread_global_stat_refresh(&ctx->thread_ctx[0]);
shaper_stat_refresh(&ctx->thread_ctx[0], sf, 1);
- fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
- shaper_global_stat_refresh(ctx);
- sleep(2);//wait telegraf generate metric
- /*******judge global metric********/
- stat_file = fopen(SHAPING_GLOBAL_STAT_FILE_NAME, "r");
- memset(line, 0, sizeof(line));
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_global_stat_judge(line, 10, 1000, 0, 0, 90, 9000);
- fclose(stat_file);
+ fieldstat_easy_output(ctx->global_stat->instance, &global_stat_str, &global_stat_str_len);
+ fieldstat_easy_output(ctx->thread_ctx[0].stat->instance, &stat_str, &stat_str_len);
+
+ /*******judge metric********/
+ shaping_stat_judge(stat_str, 0, 0, 0, 1, 10, 1000, 0, 90, 0, SHAPING_DIR_OUT, profile_type_primary);
+ shaping_global_stat_judge(global_stat_str, 10, 1000, 0, 0, 90, 9000);
+
+ free(global_stat_str);
+ free(stat_str);
//first 10 packets
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
@@ -1649,39 +1600,25 @@ TEST(statistics, udp_queueing_pkt)
}
shaping_flow_free(&ctx->thread_ctx[0], sf);
- fieldstat_global_disable_prometheus_endpoint();
/***********send stat data here********************/
- fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
- shaper_global_stat_refresh(ctx);
+ shaper_thread_global_stat_refresh(&ctx->thread_ctx[0]);
+ fieldstat_easy_output(ctx->global_stat->instance, &global_stat_str, &global_stat_str_len);
+ fieldstat_easy_output(ctx->thread_ctx[0].stat->instance, &stat_str, &stat_str_len);
shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
/*******test statistics***********/
- sleep(2);//wait telegraf to output
-
//judge shaping metric
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
- memset(line, 0, sizeof(line));
-
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//stat data first sent
- shaping_stat_judge(line, 0, 0, 1, 10, 1000, 0, 90, 0, SHAPING_DIR_OUT, profile_type_primary);
-
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//stat data last sent
- shaping_stat_judge(line, 0, 0, 1, 90, 9000, 0, 0, 90000, SHAPING_DIR_OUT, profile_type_primary);
-
- fclose(stat_file);
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
- fclose(stat_file);
+ shaping_stat_judge(stat_str, 0, 0, 0, 1, 100, 10000, 0, 0, 90000, SHAPING_DIR_OUT, profile_type_primary);
//judge global metric
- stat_file = fopen(SHAPING_GLOBAL_STAT_FILE_NAME, "r");
- memset(line, 0, sizeof(line));
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_global_stat_judge(line, 90, 9000, 0, 0, -90, -9000);
- fclose(stat_file);
+ shaping_global_stat_judge(global_stat_str, 100, 10000, 0, 0, 0, 0);
+
+ free(global_stat_str);
+ free(stat_str);
}
int main(int argc, char **argv)
diff --git a/shaping/test/stub.h b/shaping/test/stub.h
index d791295..4c86f09 100644
--- a/shaping/test/stub.h
+++ b/shaping/test/stub.h
@@ -56,8 +56,3 @@ void stub_shaper_stat_send(int thread_seq);
void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_thread_ctx *ctx);
/*******************temporary for test******************************/
-
-
-/************************temp invoke fieldstat api, to avoid memory leak for self test*********************************/
-void fieldstat_global_disable_prometheus_endpoint();
-/**********************************************************************************************************************/ \ No newline at end of file
diff --git a/shaping/test/telegraf/self_test_shaping.conf b/shaping/test/telegraf/self_test_shaping.conf
deleted file mode 100644
index 6059161..0000000
--- a/shaping/test/telegraf/self_test_shaping.conf
+++ /dev/null
@@ -1,102 +0,0 @@
-# Telegraf Configuration
-[global_tags]
- device_id = "88888888"
- #vsys_id = "1"
-[agent]
- interval = "1s"
- round_interval = true
- metric_batch_size = 1000
- metric_buffer_limit = 10000
- collection_jitter = "0s"
- flush_interval = "1s"
- flush_jitter = "0s"
- precision = ""
- debug = false
- quiet = false
- logfile = ""
- hostname = ""
- omit_hostname = true
-
-
-[[inputs.socket_listener]]
- service_address = "udp4://:6667"
- data_format = "influx"
-
-#[[processors.converter]]
-# [processors.converter.tags]
-# measurement = ["topic"]
-
-[[processors.rename]]
- [[processors.rename.replace]]
- field = "active_sessions_sum"
- dest = "active_sessions"
-
- [[processors.rename.replace]]
- field = "in_drop_pkts_sum"
- dest = "in_drop_pkts"
-
- [[processors.rename.replace]]
- field = "in_max_latency_us_max"
- dest = "in_max_latency_us"
-
- [[processors.rename.replace]]
- field = "in_queue_len_sum"
- dest = "in_queue_len"
-
- [[processors.rename.replace]]
- field = "in_bytes_sum"
- dest = "in_bytes"
-
- [[processors.rename.replace]]
- field = "in_pkts_sum"
- dest = "in_pkts"
-
- [[processors.rename.replace]]
- field = "out_drop_pkts_sum"
- dest = "out_drop_pkts"
-
- [[processors.rename.replace]]
- field = "out_max_latency_us_max"
- dest = "out_max_latency_us"
-
- [[processors.rename.replace]]
- field = "out_queue_len_sum"
- dest = "out_queue_len"
-
- [[processors.rename.replace]]
- field = "out_bytes_sum"
- dest = "out_bytes"
-
- [[processors.rename.replace]]
- field = "out_pkts_sum"
- dest = "out_pkts"
-
- [[processors.rename.replace]]
- field = "queueing_sessions_sum"
- dest = "queueing_sessions"
-
-#[[aggregators.basicstats]]
-# period = "1s"
-# drop_original = true
-# stats = ["sum", "max"]
-
-[[outputs.file]]
-files = ["/tmp/shaping_metrics.json", "stdout"]
-data_format = "json"
-fielddrop = ["*pkts*max", "*bytes*max", "*session*max", "*queue*max", "*latency*sum"]
-json_timestamp_units = "1ms"
-#json_transformation = '''
-# $merge([{"timestamp": timestamp}, tags, fields])
-# '''
-
-#[[outputs.kafka]]
-# sasl_username = "admin"
-# sasl_password = "galaxy2019"
-# brokers = [ "192.168.44.12:9094" ]
-# topic = "POLICY-RULE-METRICS"
-# fielddrop = ["*pkts*max", "*bytes*max", "*session*max", "*queue*max", "*latency*sum"]
-# data_format = "json"
-# json_timestamp_units = "1ms"
-# json_transformation = '''
-# $merge([{"timestamp": timestamp}, tags, fields])
-# '''
diff --git a/shaping/test/test_conf/shaping.conf b/shaping/test/test_conf/shaping.conf
index 015ab3c..3e71225 100644
--- a/shaping/test/test_conf/shaping.conf
+++ b/shaping/test/test_conf/shaping.conf
@@ -30,11 +30,11 @@ SWARMKV_HEALTH_CHECK_PORT=0
SWARMKV_HEALTH_CHECK_ANNOUNCE_PORT=1111
[METRIC]
-FIELDSTAT_OUTPUT_INTERVAL_MS=999999000
-FIELDSTAT_ENABLE_BACKGRUND_THREAD=0
-SELF_TEST=1
-LINE_PROTOCOL_SERVER_IP="127.0.0.1"
-LINE_PROTOCOL_SERVER_PORT=6667
+FIELDSTAT_OUTPUT_INTERVAL_S=999999000
+DEVICE_GROUP="test_device_group"
+DEVICE_ID="2333333333333333"
+DATA_CENTER="center-xxg-tsgx"
+GLOBAL_STAT_SELF_TEST=1
[CONFIG]
#PROFILE_QUEUE_LEN_PER_PRIORITY_MAX=128