From 782ee7558dc53f308edd9c0ad1813d43bb22b33c Mon Sep 17 00:00:00 2001 From: root Date: Fri, 19 Jul 2024 10:02:16 +0000 Subject: update to fieldstat4, not complete --- conf/shaping.conf | 2 +- shaping/CMakeLists.txt | 3 +- shaping/include/shaper.h | 1 + shaping/include/shaper_global_stat.h | 9 +- shaping/include/shaper_stat.h | 18 +- shaping/src/main.cpp | 22 ++- shaping/src/shaper.cpp | 5 +- shaping/src/shaper_global_stat.cpp | 279 ++++++++++++------------------- shaping/src/shaper_stat.cpp | 219 ++++++++++++++++-------- shaping/test/CMakeLists.txt | 6 +- shaping/test/dummy_rdkafka.cpp | 41 +++++ shaping/test/gtest_shaper.cpp | 311 ++++++++++++++--------------------- shaping/test/stub.h | 5 - shaping/test/test_conf/shaping.conf | 9 +- 14 files changed, 457 insertions(+), 473 deletions(-) create mode 100644 shaping/test/dummy_rdkafka.cpp diff --git a/conf/shaping.conf b/conf/shaping.conf index 70a1811..6c67f8e 100644 --- a/conf/shaping.conf +++ b/conf/shaping.conf @@ -30,7 +30,7 @@ SWARMKV_HEALTH_CHECK_PORT=0 SWARMKV_HEALTH_CHECK_ANNOUNCE_PORT=1111 [METRIC] -FIELDSTAT_OUTPUT_INTERVAL_MS=1000 +FIELDSTAT_OUTPUT_INTERVAL_S=1 GLOBAL_STAT_OUTPUT_INTERVAL_S=1 LINE_PROTOCOL_SERVER_IP="127.0.0.1" LINE_PROTOCOL_SERVER_PORT=6667 diff --git a/shaping/CMakeLists.txt b/shaping/CMakeLists.txt index 19862eb..30ddf4c 100644 --- a/shaping/CMakeLists.txt +++ b/shaping/CMakeLists.txt @@ -4,7 +4,7 @@ target_link_libraries(shaper PUBLIC avl_tree) target_link_libraries(shaper PUBLIC cjson) target_link_libraries(shaper PUBLIC MESA_handle_logger) target_link_libraries(shaper PUBLIC MESA_prof_load) -target_link_libraries(shaper PUBLIC fieldstat3) +target_link_libraries(shaper PUBLIC fieldstat4) target_link_libraries(shaper PUBLIC pthread) target_include_directories(shaper PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include) target_include_directories(shaper PUBLIC ${PROJECT_SOURCE_DIR}/deps/timeout) @@ -14,6 +14,7 @@ target_link_libraries(shaping_engine PUBLIC maatframe) target_link_libraries(shaping_engine PUBLIC mrzcpd) target_link_libraries(shaping_engine PUBLIC libjemalloc-static dl) target_link_libraries(shaping_engine PUBLIC swarmkv) +target_link_libraries(shaping_engine PUBLIC rdkafka) install(TARGETS shaping_engine RUNTIME DESTINATION bin COMPONENT Program) diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h index 5e2f7c0..a438242 100644 --- a/shaping/include/shaper.h +++ b/shaping/include/shaper.h @@ -56,6 +56,7 @@ struct shaping_thread_ctx { struct shaping_ctx *ref_ctx; struct shaper *sp; struct shaping_stat *stat; + struct shaping_global_stat *global_stat; struct shaping_marsio_info *marsio_info; struct swarmkv *swarmkv_db;//handle of swarmkv struct shaping_maat_info *maat_info; diff --git a/shaping/include/shaper_global_stat.h b/shaping/include/shaper_global_stat.h index 4c100a7..cd40320 100644 --- a/shaping/include/shaper_global_stat.h +++ b/shaping/include/shaper_global_stat.h @@ -1,7 +1,5 @@ #pragma once -#include - enum shaping_global_stat_column_index { CURR_SESSION_NUM_IDX = 0, QUEUEING_PKTS_IDX, @@ -82,17 +80,16 @@ struct shaping_global_stat_data { }; struct shaping_global_stat { - struct fieldstat_instance *instance; + struct fieldstat_easy *instance; int column_ids[GLOBAL_STAT_COLUNM_IDX_MAX]; int swarmkv_latency_summary_id; - struct shaping_global_stat_data *stat_data; int output_interval_s; }; struct shaping_global_stat* shaper_global_stat_init(int work_thread_num); void shaper_global_stat_destroy(struct shaping_global_stat *stat); -void shaper_global_stat_swarmkv_latency_update(struct shaping_global_stat *stat, long long latency_us); +void shaper_global_stat_swarmkv_latency_update(struct shaping_global_stat *stat, long long latency_us, int thread_idx); void shaper_global_stat_curr_session_inc(struct shaping_global_stat_data *thread_global_stat); void shaper_global_stat_curr_session_dec(struct shaping_global_stat_data *thread_global_stat); @@ -132,4 +129,4 @@ void shaper_global_stat_hit_policy_throughput_tx_inc(struct shaping_global_stat_ void shaper_global_stat_hit_policy_throughput_tx_syn_ack_inc(struct shaping_global_stat_data *thread_global_stat); void shaper_global_stat_hit_policy_drop_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len); -void shaper_global_stat_refresh(struct shaping_ctx *ctx); \ No newline at end of file +void shaper_thread_global_stat_refresh(struct shaping_thread_ctx *ctx); \ No newline at end of file diff --git a/shaping/include/shaper_stat.h b/shaping/include/shaper_stat.h index e6feeb3..124757c 100644 --- a/shaping/include/shaper_stat.h +++ b/shaping/include/shaper_stat.h @@ -1,8 +1,8 @@ #pragma once #include +#include #include "uthash.h" -#include enum shaping_packet_dir { SHAPING_DIR_IN = 0, @@ -20,9 +20,7 @@ enum shaping_stat_tags_index { }; enum shaping_stat_column_index { - IN_MAX_LATENCY_IDX = 0, - IN_QUEUE_LEN_IDX, - OUT_MAX_LATENCY_IDX, + IN_QUEUE_LEN_IDX = 0, OUT_QUEUE_LEN_IDX, IN_PKTS_IDX, IN_BYTES_IDX, @@ -47,8 +45,12 @@ struct shaping_stat_for_profile { }; struct shaping_stat { - struct fieldstat_dynamic_instance *instance; - int table_id; + struct fieldstat_easy *instance; + rd_kafka_t *kafka_handle; + rd_kafka_topic_t *topic_rkt; + int output_interval_s; + int in_latency_histogram_id; + int out_latency_histogram_id; unsigned int column_ids[STAT_COLUNM_IDX_MAX]; }; @@ -65,4 +67,6 @@ void shaper_stat_drop_inc(struct shaping_stat_for_profile *profile_stat, unsigne void shaper_stat_max_latency_update(struct shaping_stat_for_profile *profile_stat, unsigned char direction, unsigned long long latency, int thread_id); void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int force); -void shaper_stat_priority_queue_len_refresh_all(struct shaping_thread_ctx *ctx, struct shaping_profile_hash_node *profile_hash_node); \ No newline at end of file +void shaper_stat_priority_queue_len_refresh_all(struct shaping_thread_ctx *ctx, struct shaping_profile_hash_node *profile_hash_node); + +void shaper_stat_output(struct shaping_stat *stat); \ No newline at end of file diff --git a/shaping/src/main.cpp b/shaping/src/main.cpp index 597230b..6d210ea 100644 --- a/shaping/src/main.cpp +++ b/shaping/src/main.cpp @@ -23,6 +23,7 @@ static void *shaper_thread_loop(void *data) char thread_name[16] = {0}; struct shaping_thread_ctx *ctx = (struct shaping_thread_ctx *)data; int output_interval_s = ctx->ref_ctx->global_stat->output_interval_s; + time_t last_refresh_stat_time = time(NULL); snprintf(thread_name, sizeof(thread_name), "shape-work-%d", ctx->thread_index); prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL); @@ -41,7 +42,14 @@ static void *shaper_thread_loop(void *data) session_table_reset_with_callback(ctx->session_table, shaper_session_data_free_cb, ctx); __atomic_fetch_and(&ctx->session_need_reset, 0, __ATOMIC_SEQ_CST); } - marsio_poll_wait(ctx->marsio_info->instance, &ctx->marsio_info->mr_dev, 1, ctx->thread_index, output_interval_s); + + time_t curr_time = time(NULL); + if (curr_time - last_refresh_stat_time >= output_interval_s) { + shaper_thread_global_stat_refresh(ctx); + last_refresh_stat_time = curr_time; + } + + marsio_poll_wait(ctx->marsio_info->instance, &ctx->marsio_info->mr_dev, 1, ctx->thread_index, 10); } shaper_thread_resource_clear(); @@ -68,7 +76,7 @@ static void sig_handler(int signo) int main(int argc, char **argv) { struct shaping_ctx *ctx = NULL; - time_t last_update_time = time(NULL); + time_t last_stat_update_time = time(NULL); if (LOG_INIT("./conf/zlog.conf") == -1) { @@ -111,11 +119,13 @@ int main(int argc, char **argv) while(!quit) { time_t curr_time = time(NULL); - if (curr_time - last_update_time >= ctx->global_stat->output_interval_s) { - shaper_global_stat_refresh(ctx); - last_update_time = curr_time; + + if (curr_time - last_stat_update_time >= ctx->stat->output_interval_s) { + shaper_stat_output(ctx->stat); + last_stat_update_time = curr_time; } - sleep(ctx->global_stat->output_interval_s); + + sleep(ctx->stat->output_interval_s); } for (int i = 0; i < ctx->thread_num; i++) { diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index ba8e1f0..a662df4 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -503,7 +503,7 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg clock_gettime(CLOCK_MONOTONIC, &curr_time); curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; - shaper_global_stat_swarmkv_latency_update(ctx->ref_ctx->global_stat, curr_time_us - arg->start_time_us); + shaper_global_stat_swarmkv_latency_update(ctx->ref_ctx->global_stat, curr_time_us - arg->start_time_us, ctx->thread_index); shaper_global_stat_async_callback_inc(&ctx->thread_global_stat); shaper_global_stat_tconsume_callback_inc(&ctx->thread_global_stat); @@ -705,7 +705,7 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb clock_gettime(CLOCK_MONOTONIC, &curr_time); curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; curr_time_ms = curr_time_us / 1000; - shaper_global_stat_swarmkv_latency_update(ctx->ref_ctx->global_stat, curr_time_us - arg->start_time_us); + shaper_global_stat_swarmkv_latency_update(ctx->ref_ctx->global_stat, curr_time_us - arg->start_time_us, ctx->thread_index); shaper_global_stat_async_callback_inc(&ctx->thread_global_stat); shaper_global_stat_hmget_callback_inc(&ctx->thread_global_stat); @@ -1625,6 +1625,7 @@ struct shaping_ctx *shaping_engine_init() ctx->thread_ctx[i].thread_index = i; ctx->thread_ctx[i].sp = shaper_new(conf.priority_queue_len_max); ctx->thread_ctx[i].stat = ctx->stat; + ctx->thread_ctx[i].global_stat = ctx->global_stat; ctx->thread_ctx[i].session_table = session_table_create(); ctx->thread_ctx[i].maat_info = ctx->maat_info; ctx->thread_ctx[i].marsio_info = ctx->marsio_info; diff --git a/shaping/src/shaper_global_stat.cpp b/shaping/src/shaper_global_stat.cpp index d59b290..74ae4fa 100644 --- a/shaping/src/shaper_global_stat.cpp +++ b/shaping/src/shaper_global_stat.cpp @@ -2,70 +2,64 @@ #include #include -#include +#include #include "log.h" #include "utils.h" #include "shaper.h" #include "shaper_global_stat.h" -struct shaping_global_stat_conf { - int is_self_test; -}; - -static int shaper_global_stat_conf_load(struct shaping_global_stat *stat, struct shaping_global_stat_conf *conf) +static int shaper_global_stat_conf_load(struct shaping_global_stat *stat) { MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "GLOBAL_STAT_OUTPUT_INTERVAL_S", &stat->output_interval_s, 1); - MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "SELF_TEST", &conf->is_self_test, 0); return 0; } static void shaper_global_stat_fieldstat_reg(struct shaping_global_stat *stat) { - const char * quantiles = "0.1,0.5,0.8,0.9,0.95,0.99"; - stat->swarmkv_latency_summary_id = fieldstat_register_summary(stat->instance, "async_delay(us)", NULL, 0, quantiles, 1, 500000, 3, 1); - - stat->column_ids[CURR_SESSION_NUM_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_GAUGE, "curr_session_num", NULL, 0); - stat->column_ids[QUEUEING_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_GAUGE, "curr_queueing_pkts", NULL, 0); - stat->column_ids[QUEUEING_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_GAUGE, "curr_queueing_bytes", NULL, 0); - - stat->column_ids[CTRL_ERR_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_error", NULL, 0); - stat->column_ids[CTRL_OPENING_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_open", NULL, 0); - stat->column_ids[CTRL_ACTIVE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_active", NULL, 0); - stat->column_ids[CTRL_CLOSE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_close", NULL, 0); - stat->column_ids[CTRL_ACTIVE_CLOSE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_sf_close", NULL, 0); - stat->column_ids[CTRL_RESETALL_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_reset", NULL, 0); - stat->column_ids[SESSION_LOG_SEND_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "sess_log_send", NULL, 0); - - stat->column_ids[ASYNC_INVOKE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "async", NULL, 0); - stat->column_ids[ASYNC_CALLBACK_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "async_cb", NULL, 0); - - stat->column_ids[ASYNC_TCONSUME_INVOKE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "tconsume", NULL, 0); - stat->column_ids[ASYNC_TCONSUME_CALLBACK_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "tconsume_cb", NULL, 0); - stat->column_ids[ASYNC_HINCRBY_INVOKE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hincrby", NULL, 0); - stat->column_ids[ASYNC_HINCRBY_CALLBACK_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hincrby_cb", NULL, 0); - stat->column_ids[ASYNC_HMGET_INVOKE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hmget", NULL, 0); - stat->column_ids[ASYNC_HMGET_CALLBACK_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hmget_cb", NULL, 0); - - stat->column_ids[ASYNC_TCONSUME_FAILED] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "tconsume_failed", NULL, 0); - stat->column_ids[ASYNC_HINCRBY_FAILED] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hincrby_failed", NULL, 0); - stat->column_ids[ASYNC_HMGET_FAILED] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hmget_failed", NULL, 0); - - stat->column_ids[RX_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "all_rx_pkts", NULL, 0); - stat->column_ids[RX_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "all_rx_bytes", NULL, 0); - stat->column_ids[TX_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "all_tx_pkts", NULL, 0); - stat->column_ids[TX_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "all_tx_bytes", NULL, 0); - stat->column_ids[DROP_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "all_drop_pkts", NULL, 0); - stat->column_ids[DROP_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "all_drop_bytes", NULL, 0); - - stat->column_ids[HIT_POLICY_RX_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_rx_pkts", NULL, 0); - stat->column_ids[HIT_POLICY_RX_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_rx_bytes", NULL, 0); - stat->column_ids[HIT_POLICY_TX_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_tx_pkts", NULL, 0); - stat->column_ids[HIT_POLICY_TX_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_tx_bytes", NULL, 0); - stat->column_ids[HIT_POLICY_TX_SYN_ACK_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_tx_syn_ack_pkts", NULL, 0); - stat->column_ids[HIT_POLICY_DROP_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_drop_pkts", NULL, 0); - stat->column_ids[HIT_POLICY_DROP_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_drop_bytes", NULL, 0); + stat->swarmkv_latency_summary_id = fieldstat_easy_register_histogram(stat->instance, "async_delay(us)", 1, 500000, 1); + + stat->column_ids[CURR_SESSION_NUM_IDX] = fieldstat_easy_register_counter(stat->instance, "curr_session_num"); + stat->column_ids[QUEUEING_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "curr_queueing_pkts"); + stat->column_ids[QUEUEING_BYTES_IDX] = fieldstat_easy_register_counter(stat->instance, "curr_queueing_bytes"); + + stat->column_ids[CTRL_ERR_IDX] = fieldstat_easy_register_counter(stat->instance, "ctrl_error"); + stat->column_ids[CTRL_OPENING_IDX] = fieldstat_easy_register_counter(stat->instance, "ctrl_open"); + stat->column_ids[CTRL_ACTIVE_IDX] = fieldstat_easy_register_counter(stat->instance, "ctrl_active"); + stat->column_ids[CTRL_CLOSE_IDX] = fieldstat_easy_register_counter(stat->instance, "ctrl_close"); + stat->column_ids[CTRL_ACTIVE_CLOSE_IDX] = fieldstat_easy_register_counter(stat->instance, "ctrl_sf_close"); + stat->column_ids[CTRL_RESETALL_IDX] = fieldstat_easy_register_counter(stat->instance, "ctrl_reset"); + stat->column_ids[SESSION_LOG_SEND_IDX] = fieldstat_easy_register_counter(stat->instance, "sess_log_send"); + + stat->column_ids[ASYNC_INVOKE_IDX] = fieldstat_easy_register_counter(stat->instance, "async"); + stat->column_ids[ASYNC_CALLBACK_IDX] = fieldstat_easy_register_counter(stat->instance, "async_cb"); + + stat->column_ids[ASYNC_TCONSUME_INVOKE_IDX] = fieldstat_easy_register_counter(stat->instance, "tconsume"); + stat->column_ids[ASYNC_TCONSUME_CALLBACK_IDX] = fieldstat_easy_register_counter(stat->instance, "tconsume_cb"); + stat->column_ids[ASYNC_HINCRBY_INVOKE_IDX] = fieldstat_easy_register_counter(stat->instance, "hincrby"); + stat->column_ids[ASYNC_HINCRBY_CALLBACK_IDX] = fieldstat_easy_register_counter(stat->instance, "hincrby_cb"); + stat->column_ids[ASYNC_HMGET_INVOKE_IDX] = fieldstat_easy_register_counter(stat->instance, "hmget"); + stat->column_ids[ASYNC_HMGET_CALLBACK_IDX] = fieldstat_easy_register_counter(stat->instance, "hmget_cb"); + + stat->column_ids[ASYNC_TCONSUME_FAILED] = fieldstat_easy_register_counter(stat->instance, "tconsume_failed"); + stat->column_ids[ASYNC_HINCRBY_FAILED] = fieldstat_easy_register_counter(stat->instance, "hincrby_failed"); + stat->column_ids[ASYNC_HMGET_FAILED] = fieldstat_easy_register_counter(stat->instance, "hmget_failed"); + + stat->column_ids[RX_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "all_rx_pkts"); + stat->column_ids[RX_BYTES_IDX] = fieldstat_easy_register_counter(stat->instance, "all_rx_bytes"); + stat->column_ids[TX_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "all_tx_pkts"); + stat->column_ids[TX_BYTES_IDX] = fieldstat_easy_register_counter(stat->instance, "all_tx_bytes"); + stat->column_ids[DROP_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "all_drop_pkts"); + stat->column_ids[DROP_BYTES_IDX] = fieldstat_easy_register_counter(stat->instance, "all_drop_bytes"); + + stat->column_ids[HIT_POLICY_RX_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "shape_rx_pkts"); + stat->column_ids[HIT_POLICY_RX_BYTES_IDX] = fieldstat_easy_register_counter(stat->instance, "shape_rx_bytes"); + stat->column_ids[HIT_POLICY_TX_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "shape_tx_pkts"); + stat->column_ids[HIT_POLICY_TX_BYTES_IDX] = fieldstat_easy_register_counter(stat->instance, "shape_tx_bytes"); + stat->column_ids[HIT_POLICY_TX_SYN_ACK_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "shape_tx_syn_ack_pkts"); + stat->column_ids[HIT_POLICY_DROP_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "shape_drop_pkts"); + stat->column_ids[HIT_POLICY_DROP_BYTES_IDX] = fieldstat_easy_register_counter(stat->instance, "shape_drop_bytes"); return; } @@ -73,49 +67,38 @@ static void shaper_global_stat_fieldstat_reg(struct shaping_global_stat *stat) struct shaping_global_stat* shaper_global_stat_init(int work_thread_num) { struct shaping_global_stat *stat = NULL; - struct shaping_global_stat_conf conf; + struct fieldstat_tag tag; + int ret = 0; stat = (struct shaping_global_stat*)calloc(1, sizeof(struct shaping_global_stat)); - stat->stat_data = (struct shaping_global_stat_data*)calloc(work_thread_num, sizeof(struct shaping_global_stat_data)); - if (shaper_global_stat_conf_load(stat, &conf) != 0) { + if (shaper_global_stat_conf_load(stat) != 0) { LOG_ERROR("%s: shaping init metric conf failed", LOG_TAG_STAT); goto ERROR; } - stat->instance = fieldstat_instance_new("shaping_global"); + tag.key = "shaping_global"; + tag.type = TAG_CSTRING; + tag.value_str = "shaping_global"; + stat->instance = fieldstat_easy_new(work_thread_num, "shaping_global", &tag, 1); if (stat->instance == NULL) { LOG_ERROR("%s: shaping global init fieldstat instance failed", LOG_TAG_STAT); goto ERROR; } shaper_global_stat_fieldstat_reg(stat); - - fieldstat_disable_background_thread(stat->instance); - if (conf.is_self_test) { - fieldstat_set_local_output(stat->instance, "shaping_global_metric", "json"); - } else { - fieldstat_set_local_output(stat->instance, "shaping_global_metric", "default"); - } - - if (fieldstat_global_enable_prometheus_endpoint(9007, NULL) != 0) { - LOG_ERROR("%s: shaping global fieldstat enable prometheus endpoint failed", LOG_TAG_STAT); + ret = fieldstat_easy_enable_auto_output(stat->instance, "./metric/shaping_global_stat.json", stat->output_interval_s); + if (ret < 0) { + LOG_ERROR("%s: shaping global enable auto output failed, ret %d", LOG_TAG_STAT, ret); goto ERROR; } - if (fieldstat_enable_prometheus_output(stat->instance) != 0) { - LOG_ERROR("%s: shaping global fieldstat enable prometheus output failed", LOG_TAG_STAT); - goto ERROR; - } - - fieldstat_instance_start(stat->instance); - return stat; ERROR: if (stat) { if (stat->instance) { - fieldstat_instance_free(stat->instance); + fieldstat_easy_free(stat->instance); } free(stat); } @@ -128,21 +111,17 @@ void shaper_global_stat_destroy(struct shaping_global_stat *stat) return; } - if (stat->stat_data) { - free(stat->stat_data); - } - if (stat->instance) { - fieldstat_instance_free(stat->instance); + fieldstat_easy_free(stat->instance); } free(stat); return; } -void shaper_global_stat_swarmkv_latency_update(struct shaping_global_stat *stat, long long latency_us) +void shaper_global_stat_swarmkv_latency_update(struct shaping_global_stat *stat, long long latency_us, int thread_idx) { - fieldstat_value_set(stat->instance, stat->swarmkv_latency_summary_id, latency_us); + fieldstat_easy_histogram_record(stat->instance, thread_idx, stat->swarmkv_latency_summary_id, NULL, 0, latency_us); return; } @@ -360,103 +339,53 @@ void shaper_global_stat_hit_policy_drop_inc(struct shaping_global_stat_data *thr return; } -void shaper_global_stat_refresh(struct shaping_ctx *ctx) +void shaper_thread_global_stat_refresh(struct shaping_thread_ctx *ctx) { - static struct shaping_global_stat_data sum; + struct shaping_global_stat_data *stat_data = &ctx->thread_global_stat; struct shaping_global_stat *global_stat = ctx->global_stat; - struct shaping_global_stat_data *stat_data = global_stat->stat_data; - - for (int i = 0; i < ctx->thread_num; i++) { - memcpy(&stat_data[i], &ctx->thread_ctx[i].thread_global_stat, sizeof(struct shaping_global_stat_data)); - } - memset(&sum, 0, sizeof(struct shaping_global_stat_data)); - for (int i = 0; i < ctx->thread_num; i++) { - sum.curr_session_num += stat_data[i].curr_session_num; - sum.queueing_pkts += stat_data[i].queueing_pkts; - sum.queueing_bytes += stat_data[i].queueing_bytes; - - sum.ctrl_error += stat_data[i].ctrl_error; - sum.ctrl_opening += stat_data[i].ctrl_opening; - sum.ctrl_active += stat_data[i].ctrl_active; - sum.ctrl_close += stat_data[i].ctrl_close; - sum.ctrl_active_close += stat_data[i].ctrl_active_close; - sum.ctrl_resetall += stat_data[i].ctrl_resetall; - sum.session_log_send += stat_data[i].session_log_send; - - sum.async_invoke += stat_data[i].async_invoke; - sum.async_callback += stat_data[i].async_callback; - - sum.async_tconsume_invoke += stat_data[i].async_tconsume_invoke; - sum.async_tconsume_callback += stat_data[i].async_tconsume_callback; - sum.async_hincrby_invoke += stat_data[i].async_hincrby_invoke; - sum.async_hincrby_callback += stat_data[i].async_hincrby_callback; - sum.async_hmget_invoke += stat_data[i].async_hmget_invoke; - sum.async_hmget_callback += stat_data[i].async_hmget_callback; - - sum.async_tconsume_failed += stat_data[i].async_tconsume_failed; - sum.async_hincrby_failed += stat_data[i].async_hincrby_failed; - sum.async_hmget_failed += stat_data[i].async_hmget_failed; - - sum.all_traffic.rx_pkts += stat_data[i].all_traffic.rx_pkts; - sum.all_traffic.rx_bytes += stat_data[i].all_traffic.rx_bytes; - sum.all_traffic.tx_pkts += stat_data[i].all_traffic.tx_pkts; - sum.all_traffic.tx_bytes += stat_data[i].all_traffic.tx_bytes; - sum.all_traffic.drop_pkts += stat_data[i].all_traffic.drop_pkts; - sum.all_traffic.drop_bytes += stat_data[i].all_traffic.drop_bytes; - - sum.hit_policy_traffic.rx_pkts += stat_data[i].hit_policy_traffic.rx_pkts; - sum.hit_policy_traffic.rx_bytes += stat_data[i].hit_policy_traffic.rx_bytes; - sum.hit_policy_traffic.tx_pkts += stat_data[i].hit_policy_traffic.tx_pkts; - sum.hit_policy_traffic.tx_bytes += stat_data[i].hit_policy_traffic.tx_bytes; - sum.hit_policy_traffic.tx_syn_ack_pkts += stat_data[i].hit_policy_traffic.tx_syn_ack_pkts; - sum.hit_policy_traffic.drop_pkts += stat_data[i].hit_policy_traffic.drop_pkts; - sum.hit_policy_traffic.drop_bytes += stat_data[i].hit_policy_traffic.drop_bytes; - } - - struct shaping_global_stat_traffic_data *all_traffic_data = &sum.all_traffic; - struct shaping_global_stat_traffic_data *hit_policy_traffic_data = &sum.hit_policy_traffic; - - fieldstat_value_set(global_stat->instance, global_stat->column_ids[CURR_SESSION_NUM_IDX], sum.curr_session_num); - fieldstat_value_set(global_stat->instance, global_stat->column_ids[QUEUEING_PKTS_IDX], sum.queueing_pkts); - fieldstat_value_set(global_stat->instance, global_stat->column_ids[QUEUEING_BYTES_IDX], sum.queueing_bytes); - - fieldstat_value_set(global_stat->instance, global_stat->column_ids[CTRL_ERR_IDX], sum.ctrl_error); - fieldstat_value_set(global_stat->instance, global_stat->column_ids[CTRL_OPENING_IDX], sum.ctrl_opening); - fieldstat_value_set(global_stat->instance, global_stat->column_ids[CTRL_ACTIVE_IDX], sum.ctrl_active); - fieldstat_value_set(global_stat->instance, global_stat->column_ids[CTRL_CLOSE_IDX], sum.ctrl_close); - fieldstat_value_set(global_stat->instance, global_stat->column_ids[CTRL_ACTIVE_CLOSE_IDX], sum.ctrl_active_close); - fieldstat_value_set(global_stat->instance, global_stat->column_ids[CTRL_RESETALL_IDX], sum.ctrl_resetall); - fieldstat_value_set(global_stat->instance, global_stat->column_ids[SESSION_LOG_SEND_IDX], sum.session_log_send); - - fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_INVOKE_IDX], sum.async_invoke); - fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_CALLBACK_IDX], sum.async_callback); - - fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_TCONSUME_INVOKE_IDX], sum.async_tconsume_invoke); - fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_TCONSUME_CALLBACK_IDX], sum.async_tconsume_callback); - fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_HINCRBY_INVOKE_IDX], sum.async_hincrby_invoke); - fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_HINCRBY_CALLBACK_IDX], sum.async_hincrby_callback); - fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_HMGET_INVOKE_IDX], sum.async_hmget_invoke); - fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_HMGET_CALLBACK_IDX], sum.async_hmget_callback); - - fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_TCONSUME_FAILED], sum.async_tconsume_failed); - fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_HINCRBY_FAILED], sum.async_hincrby_failed); - fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_HMGET_FAILED], sum.async_hmget_failed); - - fieldstat_value_set(global_stat->instance, global_stat->column_ids[RX_PKTS_IDX], all_traffic_data->rx_pkts); - fieldstat_value_set(global_stat->instance, global_stat->column_ids[RX_BYTES_IDX], all_traffic_data->rx_bytes); - fieldstat_value_set(global_stat->instance, global_stat->column_ids[TX_PKTS_IDX], all_traffic_data->tx_pkts); - fieldstat_value_set(global_stat->instance, global_stat->column_ids[TX_BYTES_IDX], all_traffic_data->tx_bytes); - fieldstat_value_set(global_stat->instance, global_stat->column_ids[DROP_PKTS_IDX], all_traffic_data->drop_pkts); - fieldstat_value_set(global_stat->instance, global_stat->column_ids[DROP_BYTES_IDX], all_traffic_data->drop_bytes); - - fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_RX_PKTS_IDX], hit_policy_traffic_data->rx_pkts); - fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_RX_BYTES_IDX], hit_policy_traffic_data->rx_bytes); - fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_TX_PKTS_IDX], hit_policy_traffic_data->tx_pkts); - fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_TX_BYTES_IDX], hit_policy_traffic_data->tx_bytes); - fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_TX_SYN_ACK_PKTS_IDX], hit_policy_traffic_data->tx_syn_ack_pkts); - fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_DROP_PKTS_IDX], hit_policy_traffic_data->drop_pkts); - fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_DROP_BYTES_IDX], hit_policy_traffic_data->drop_bytes); - - fieldstat_passive_output(global_stat->instance); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[CURR_SESSION_NUM_IDX], NULL, 0, stat_data->curr_session_num); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[QUEUEING_PKTS_IDX], NULL, 0, stat_data->queueing_pkts); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[QUEUEING_BYTES_IDX], NULL, 0, stat_data->queueing_bytes); + + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[CTRL_ERR_IDX], NULL, 0, stat_data->ctrl_error); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[CTRL_OPENING_IDX], NULL, 0, stat_data->ctrl_opening); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[CTRL_ACTIVE_IDX], NULL, 0, stat_data->ctrl_active); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[CTRL_CLOSE_IDX], NULL, 0, stat_data->ctrl_close); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[CTRL_ACTIVE_CLOSE_IDX], NULL, 0, stat_data->ctrl_active_close); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[CTRL_RESETALL_IDX], NULL, 0, stat_data->ctrl_resetall); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[SESSION_LOG_SEND_IDX], NULL, 0, stat_data->session_log_send); + + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[ASYNC_INVOKE_IDX], NULL, 0, stat_data->async_invoke); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[ASYNC_CALLBACK_IDX], NULL, 0, stat_data->async_callback); + + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[ASYNC_TCONSUME_INVOKE_IDX], NULL, 0, stat_data->async_tconsume_invoke); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[ASYNC_TCONSUME_CALLBACK_IDX], NULL, 0, stat_data->async_tconsume_callback); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[ASYNC_HINCRBY_INVOKE_IDX], NULL, 0, stat_data->async_hincrby_invoke); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[ASYNC_HINCRBY_CALLBACK_IDX], NULL, 0, stat_data->async_hincrby_callback); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[ASYNC_HMGET_INVOKE_IDX], NULL, 0, stat_data->async_hmget_invoke); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[ASYNC_HMGET_CALLBACK_IDX], NULL, 0, stat_data->async_hmget_callback); + + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[ASYNC_TCONSUME_FAILED], NULL, 0, stat_data->async_tconsume_failed); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[ASYNC_HINCRBY_FAILED], NULL, 0, stat_data->async_hincrby_failed); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[ASYNC_HMGET_FAILED], NULL, 0, stat_data->async_hmget_failed); + + struct shaping_global_stat_traffic_data *all_traffic_data = &stat_data->all_traffic; + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[RX_PKTS_IDX], NULL, 0, all_traffic_data->rx_pkts); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[RX_BYTES_IDX], NULL, 0, all_traffic_data->rx_bytes); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[TX_PKTS_IDX], NULL, 0, all_traffic_data->tx_pkts); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[TX_BYTES_IDX], NULL, 0, all_traffic_data->tx_bytes); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[DROP_PKTS_IDX], NULL, 0, all_traffic_data->drop_pkts); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[DROP_BYTES_IDX], NULL, 0, all_traffic_data->drop_bytes); + + struct shaping_global_stat_traffic_data *hit_policy_traffic_data = &stat_data->hit_policy_traffic; + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[HIT_POLICY_RX_PKTS_IDX], NULL, 0, hit_policy_traffic_data->rx_pkts); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[HIT_POLICY_RX_BYTES_IDX], NULL, 0, hit_policy_traffic_data->rx_bytes); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[HIT_POLICY_TX_PKTS_IDX], NULL, 0, hit_policy_traffic_data->tx_pkts); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[HIT_POLICY_TX_BYTES_IDX], NULL, 0, hit_policy_traffic_data->tx_bytes); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[HIT_POLICY_TX_SYN_ACK_PKTS_IDX], NULL, 0, hit_policy_traffic_data->tx_syn_ack_pkts); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[HIT_POLICY_DROP_PKTS_IDX], NULL, 0, hit_policy_traffic_data->drop_pkts); + fieldstat_easy_counter_incrby(global_stat->instance, 0, global_stat->column_ids[HIT_POLICY_DROP_BYTES_IDX], NULL, 0, hit_policy_traffic_data->drop_bytes); + + memset(&ctx->thread_global_stat, 0, sizeof(struct shaping_global_stat_data)); } \ No newline at end of file diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp index e55463e..a68a85e 100644 --- a/shaping/src/shaper_stat.cpp +++ b/shaping/src/shaper_stat.cpp @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include "log.h" #include "utils.h" @@ -12,117 +12,185 @@ #include "shaper_stat.h" #include "shaper_global_stat.h" -#define SHAPER_STAT_ROW_NAME "traffic_shaping_rule_hits" - #define SHAPER_STAT_REFRESH_TIME_US 10000 //10 ms #define HINCRBY_RETRY_MAX 5 +#define OUTPUT_JSON_BUF_LEN 2048 + struct shaper_stat_conf { - int enable_backgroud_thread; - int output_interval_ms; - char telegraf_ip[16]; - short telegraf_port; + char device_group[32]; + char device_id[32]; + char data_center[32]; + char kafka_topic[64]; + char kafka_username[64]; + char kafka_password[64]; + char kafka_brokers[256]; }; thread_local struct fieldstat_tag tags[TAG_IDX_MAX] = { - [TAG_VSYS_ID_IDX] = {.key = "vsys_id", .value_type = 0}, - [TAG_RULE_ID_IDX] = {.key = "rule_id", .value_type = 0}, - [TAG_PROFILE_ID_IDX] = {.key = "profile_id", .value_type = 0}, - [TAG_PRIORITY_IDX] = {.key = "priority", .value_type = 0}, - [TAG_PROFILE_TYPE_IDX] = {.key = "profile_type", .value_type = 2} + [TAG_VSYS_ID_IDX] = {.key = "vsys_id", .type = TAG_INTEGER}, + [TAG_RULE_ID_IDX] = {.key = "rule_id", .type = TAG_INTEGER}, + [TAG_PROFILE_ID_IDX] = {.key = "profile_id", .type = TAG_INTEGER}, + [TAG_PRIORITY_IDX] = {.key = "priority", .type = TAG_INTEGER}, + [TAG_PROFILE_TYPE_IDX] = {.key = "profile_type", .type = TAG_CSTRING} }; +char *output_json_buf = NULL; + void shaper_stat_destroy(struct shaping_stat *stat) { - if (!stat) { + if (stat) { + if (stat->instance) { + fieldstat_easy_free(stat->instance); + } + free(stat); + } + + if (output_json_buf) { + free(output_json_buf); + } + + return; +} + +static void shaper_stat_kafka_init(struct shaping_stat *stat, struct shaper_stat_conf *conf) +{ + char kafka_errstr[1024]={0}; + + if (strlen(conf->kafka_topic) == 0 || strlen(conf->kafka_brokers) == 0) { + LOG_ERROR("%s: kafka topic or brokers is empty", LOG_TAG_STAT); return; } - if (stat->instance) { - fieldstat_dynamic_instance_free(stat->instance); + if (strlen(conf->kafka_username) == 0 || strlen(conf->kafka_password) == 0) { + LOG_ERROR("%s: kafka username or password is empty", LOG_TAG_STAT); + return; + } + + rd_kafka_conf_t *rdkafka_conf = rd_kafka_conf_new(); + + rd_kafka_conf_set(rdkafka_conf, "socket.keepalive.enable", "true", kafka_errstr, sizeof(kafka_errstr)); + rd_kafka_conf_set(rdkafka_conf, "bootstrap.servers", conf->kafka_brokers, kafka_errstr, sizeof(kafka_errstr)); + rd_kafka_conf_set(rdkafka_conf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr)); + rd_kafka_conf_set(rdkafka_conf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr)); + rd_kafka_conf_set(rdkafka_conf, "sasl.username", conf->kafka_username, kafka_errstr, sizeof(kafka_errstr)); + rd_kafka_conf_set(rdkafka_conf, "sasl.password", conf->kafka_brokers, kafka_errstr, sizeof(kafka_errstr)); + + stat->kafka_handle = rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr)); + if (stat->kafka_handle == NULL) { + LOG_ERROR("%s: kafka producer create failed, err %s", LOG_TAG_STAT, kafka_errstr); + return; } - free(stat); + stat->topic_rkt = rd_kafka_topic_new(stat->kafka_handle, conf->kafka_topic, NULL); return; } -static int shaper_stat_conf_load(struct shaper_stat_conf *conf) +static int shaper_stat_conf_load(struct shaping_stat *stat, struct shaper_stat_conf *conf) { - memset(conf, 0, sizeof(struct shaper_stat_conf)); + MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "FIELDSTAT_OUTPUT_INTERVAL_S", &stat->output_interval_s, 1); + MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "DEVICE_GROUP", conf->device_group, sizeof(conf->device_group), ""); + MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "DEVICE_ID", conf->device_id, sizeof(conf->device_id), ""); + MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "DATA_CENTER", conf->data_center, sizeof(conf->data_center), ""); - MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "LINE_PROTOCOL_SERVER_IP", conf->telegraf_ip, sizeof(conf->telegraf_ip), "127.0.0.1"); - MESA_load_profile_short_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "LINE_PROTOCOL_SERVER_PORT", &conf->telegraf_port, 8200); - MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "FIELDSTAT_OUTPUT_INTERVAL_MS", &conf->output_interval_ms, 500); - MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "FIELDSTAT_ENABLE_BACKGRUND_THREAD", &conf->enable_backgroud_thread, 1); + MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "KAFKA_TOPIC", conf->kafka_topic, sizeof(conf->kafka_topic), ""); + MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "KAFKA_USERNAME", conf->kafka_username, sizeof(conf->kafka_username), ""); + MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "KAFKA_PASSWORD", conf->kafka_password, sizeof(conf->kafka_password), ""); + MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "KAFKA_BROKERS", conf->kafka_brokers, sizeof(conf->kafka_brokers), ""); return 0; } struct shaping_stat* shaper_stat_init(int thread_num) { - struct shaping_stat *stat = NULL; - int column_num; + struct fieldstat_tag global_tags[5]; struct shaper_stat_conf conf; - const char *column_name[] = {"in_max_latency_us", "in_queue_len", "out_max_latency_us", "out_queue_len", //first line is gauge, second line is counter - "in_pkts", "in_bytes", "in_drop_pkts", "out_pkts", "out_bytes", "out_drop_pkts"}; - enum field_type column_type[] = {FIELD_TYPE_COUNTER, FIELD_TYPE_GAUGE, FIELD_TYPE_COUNTER, FIELD_TYPE_GAUGE, - FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER}; - - column_num = sizeof(column_name)/sizeof(column_name[0]); - if (column_num != STAT_COLUNM_IDX_MAX) { - LOG_ERROR("%s: shaping init fieldstat failed, column_num %d != index num %d", LOG_TAG_STAT, column_num, STAT_COLUNM_IDX_MAX); - goto ERROR; - } + struct shaping_stat *stat = (struct shaping_stat *)calloc(1, sizeof(struct shaping_stat)); + + output_json_buf = (char *)calloc(1, OUTPUT_JSON_BUF_LEN); - if (shaper_stat_conf_load(&conf) != 0) { + if (shaper_stat_conf_load(stat, &conf) != 0) { LOG_ERROR("%s: shaping init metric conf failed", LOG_TAG_STAT); goto ERROR; } - stat = (struct shaping_stat *)calloc(1, sizeof(struct shaping_stat)); + shaper_stat_kafka_init(stat, &conf); + + global_tags[0].key = "app_name"; + global_tags[0].type = TAG_CSTRING; + global_tags[0].value_str = "shaping_engine"; + + global_tags[1].key = "device_group"; + global_tags[1].type = TAG_CSTRING; + global_tags[1].value_str = conf.device_group; + + global_tags[2].key = "device_id"; + global_tags[2].type = TAG_CSTRING; + global_tags[2].value_str = conf.device_id; - stat->instance = fieldstat_dynamic_instance_new("shaping_engine", thread_num); + global_tags[3].key = "data_center"; + global_tags[3].type = TAG_CSTRING; + global_tags[3].value_str = conf.data_center; + + global_tags[4].key = "table_name"; + global_tags[4].type = TAG_CSTRING; + global_tags[4].value_str = "shaping_metric"; + + stat->instance = fieldstat_easy_new(thread_num, "shaping_stat", global_tags, 5); if (stat->instance == NULL) { LOG_ERROR("%s: shaping init fieldstat instance failed", LOG_TAG_STAT); goto ERROR; } - fieldstat_dynamic_set_output_interval(stat->instance, conf.output_interval_ms); - fieldstat_dynamic_set_line_protocol_server(stat->instance, conf.telegraf_ip, conf.telegraf_port); - if (conf.enable_backgroud_thread == 0) { - fieldstat_dynamic_disable_background_thread(stat->instance); + stat->in_latency_histogram_id = fieldstat_easy_register_histogram(stat->instance, "in_latency_us", 1, 1000000, 5); + stat->out_latency_histogram_id = fieldstat_easy_register_histogram(stat->instance, "out_latency_us", 1, 1000000, 5); + if (stat->in_latency_histogram_id < 0 || stat->out_latency_histogram_id < 0) { + LOG_ERROR("%s: shaping fieldstat register histogram failed", LOG_TAG_STAT); + goto ERROR; } - stat->table_id = fieldstat_register_dynamic_table(stat->instance, "shaping_metric", column_name, column_type, column_num, stat->column_ids); - if (stat->table_id < 0) { - LOG_ERROR("%s: shaping fieldstat register table failed", LOG_TAG_STAT); - goto ERROR; + stat->column_ids[IN_QUEUE_LEN_IDX] = fieldstat_easy_register_counter(stat->instance, "in_queue_len"); + stat->column_ids[OUT_QUEUE_LEN_IDX] = fieldstat_easy_register_counter(stat->instance, "out_queue_len"); + stat->column_ids[IN_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "in_pkts"); + stat->column_ids[IN_BYTES_IDX] = fieldstat_easy_register_counter(stat->instance, "in_bytes"); + stat->column_ids[IN_DROP_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "in_drop_pkts"); + stat->column_ids[OUT_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "out_pkts"); + stat->column_ids[OUT_BYTES_IDX] = fieldstat_easy_register_counter(stat->instance, "out_bytes"); + stat->column_ids[OUT_DROP_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "out_drop_pkts"); + + for (int i = IN_QUEUE_LEN_IDX; i < STAT_COLUNM_IDX_MAX; i++) { + if (stat->column_ids[i] < 0) { + LOG_ERROR("%s: shaping fieldstat register column %d failed", LOG_TAG_STAT, i); + goto ERROR; + } } - fieldstat_dynamic_instance_start(stat->instance); + fieldstat_easy_enable_auto_output(stat->instance, "./metric/shaping_stat.json", 1);//TODO: output interval return stat; - ERROR: if (stat) { if (stat->instance) { - fieldstat_dynamic_instance_free(stat->instance); + fieldstat_easy_free(stat->instance); } free(stat); } + if (output_json_buf) { + free(output_json_buf); + } return NULL; } static void shaper_stat_tags_build(int vsys_id, int rule_id, int profile_id, int priority, int profile_type) { - tags[TAG_VSYS_ID_IDX].value_int = vsys_id; + tags[TAG_VSYS_ID_IDX].value_longlong = vsys_id; - tags[TAG_RULE_ID_IDX].value_int = rule_id; + tags[TAG_RULE_ID_IDX].value_longlong = rule_id; - tags[TAG_PROFILE_ID_IDX].value_int = profile_id; + tags[TAG_PROFILE_ID_IDX].value_longlong = profile_id; - tags[TAG_PRIORITY_IDX].value_int = priority; + tags[TAG_PRIORITY_IDX].value_longlong = priority; if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) { tags[TAG_PROFILE_TYPE_IDX].value_str = "primary"; @@ -143,7 +211,7 @@ static void shaper_stat_swarmkv_hincrby_cb(const struct swarmkv_reply *reply, vo clock_gettime(CLOCK_MONOTONIC, &curr_time); curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; - shaper_global_stat_swarmkv_latency_update(global_stat, curr_time_us - arg->start_time_us); + shaper_global_stat_swarmkv_latency_update(global_stat, curr_time_us - arg->start_time_us, ctx->thread_index); shaper_global_stat_async_callback_inc(&ctx->thread_global_stat); shaper_global_stat_hincrby_callback_inc(&ctx->thread_global_stat); @@ -240,7 +308,6 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, s struct shaping_stat *stat = ctx->stat; int priority = profile->priority; int thread_id = ctx->thread_index; - unsigned long long old_latency; if (need_update_guage) { profile->hash_node->local_queue_len[priority][SHAPING_DIR_IN] += profile_stat->priority_queue_len[SHAPING_DIR_IN]; @@ -255,28 +322,22 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, s } shaper_stat_tags_build(rule->vsys_id, rule->id, profile->id, priority, profile_type); - fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_DROP_PKTS_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.drop_pkts, tags, TAG_IDX_MAX, thread_id); - fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_PKTS_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.pkts, tags, TAG_IDX_MAX, thread_id); - fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_BYTES_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.bytes, tags, TAG_IDX_MAX, thread_id); - fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_DROP_PKTS_IDX], SHAPER_STAT_ROW_NAME, profile_stat->out.drop_pkts, tags, TAG_IDX_MAX, thread_id); - fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_PKTS_IDX], SHAPER_STAT_ROW_NAME, profile_stat->out.pkts, tags, TAG_IDX_MAX, thread_id); - fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_BYTES_IDX], SHAPER_STAT_ROW_NAME, profile_stat->out.bytes, tags, TAG_IDX_MAX, thread_id); + fieldstat_easy_counter_incrby(stat->instance, thread_id, stat->column_ids[IN_DROP_PKTS_IDX], tags, TAG_IDX_MAX, profile_stat->in.drop_pkts); + fieldstat_easy_counter_incrby(stat->instance, thread_id, stat->column_ids[IN_PKTS_IDX], tags, TAG_IDX_MAX, profile_stat->in.pkts); + fieldstat_easy_counter_incrby(stat->instance, thread_id, stat->column_ids[IN_BYTES_IDX], tags, TAG_IDX_MAX, profile_stat->in.bytes); - old_latency = fieldstat_dynamic_table_metric_value_get(stat->instance, stat->table_id, stat->column_ids[IN_MAX_LATENCY_IDX], SHAPER_STAT_ROW_NAME, tags, TAG_IDX_MAX, thread_id); - if (profile_stat->in.max_latency > old_latency) { - fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_MAX_LATENCY_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.max_latency - old_latency, tags, TAG_IDX_MAX, thread_id); - } + fieldstat_easy_counter_incrby(stat->instance, thread_id, stat->column_ids[OUT_DROP_PKTS_IDX], tags, TAG_IDX_MAX, profile_stat->out.drop_pkts); + fieldstat_easy_counter_incrby(stat->instance, thread_id, stat->column_ids[OUT_PKTS_IDX], tags, TAG_IDX_MAX, profile_stat->out.pkts); + fieldstat_easy_counter_incrby(stat->instance, thread_id, stat->column_ids[OUT_BYTES_IDX], tags, TAG_IDX_MAX, profile_stat->out.bytes); - old_latency = fieldstat_dynamic_table_metric_value_get(stat->instance, stat->table_id, stat->column_ids[OUT_MAX_LATENCY_IDX], SHAPER_STAT_ROW_NAME, tags, TAG_IDX_MAX, thread_id); - if (profile_stat->out.max_latency > old_latency) { - fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_MAX_LATENCY_IDX], SHAPER_STAT_ROW_NAME, profile_stat->out.max_latency - old_latency, tags, TAG_IDX_MAX, thread_id); - } + fieldstat_easy_histogram_record(stat->instance, thread_id, stat->in_latency_histogram_id, tags, TAG_PROFILE_TYPE_IDX, profile_stat->in.max_latency); + fieldstat_easy_histogram_record(stat->instance, thread_id, stat->out_latency_histogram_id, tags, TAG_PROFILE_TYPE_IDX, profile_stat->out.max_latency); if (need_update_guage) { if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) { - fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_QUEUE_LEN_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.queue_len, tags, TAG_IDX_MAX, thread_id); - fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_QUEUE_LEN_IDX], SHAPER_STAT_ROW_NAME, profile_stat->out.queue_len, tags, TAG_IDX_MAX, thread_id); + fieldstat_easy_counter_incrby(stat->instance, thread_id, stat->column_ids[IN_QUEUE_LEN_IDX], tags, TAG_IDX_MAX, profile_stat->in.queue_len); + fieldstat_easy_counter_incrby(stat->instance, thread_id, stat->column_ids[OUT_QUEUE_LEN_IDX], tags, TAG_IDX_MAX, profile_stat->out.queue_len); } memset(profile_stat, 0, sizeof(struct shaping_stat_for_profile)); @@ -430,5 +491,23 @@ void shaper_stat_max_latency_update(struct shaping_stat_for_profile *profile_sta } } + return; +} + +void shaper_stat_output(struct shaping_stat *stat) +{ + size_t len = OUTPUT_JSON_BUF_LEN; + fieldstat_easy_output(stat->instance, &output_json_buf, &len); + + int status=rd_kafka_produce(stat->topic_rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, output_json_buf, len, NULL, 0, NULL); + if (status < 0) { + LOG_ERROR("%s:shaper_stat_output, rd_kafka_produce is error of code: %d %s(%s), status: %d", + LOG_TAG_STAT, + rd_kafka_last_error(), + rd_kafka_err2name(rd_kafka_last_error()), + rd_kafka_err2str(rd_kafka_last_error()), + status); + } + return; } \ No newline at end of file diff --git a/shaping/test/CMakeLists.txt b/shaping/test/CMakeLists.txt index 643c222..1c2e046 100644 --- a/shaping/test/CMakeLists.txt +++ b/shaping/test/CMakeLists.txt @@ -3,7 +3,7 @@ # gtest_shaper_maat ############################################################################### -add_executable(gtest_shaper_maat gtest_shaper_maat.cpp stub.cpp dummy_swarmkv.cpp) +add_executable(gtest_shaper_maat gtest_shaper_maat.cpp stub.cpp dummy_swarmkv.cpp dummy_rdkafka.cpp) target_include_directories(gtest_shaper_maat PUBLIC ${CMAKE_SOURCE_DIR}/common/include) target_include_directories(gtest_shaper_maat PUBLIC ${CMAKE_SOURCE_DIR}/shaping/include) target_link_libraries(gtest_shaper_maat common shaper pthread gtest) @@ -12,7 +12,7 @@ target_link_libraries(gtest_shaper_maat common shaper pthread gtest) # gtest_shaper_maat ############################################################################### -add_executable(gtest_shaper_send_log gtest_shaper_send_log.cpp stub.cpp dummy_swarmkv.cpp) +add_executable(gtest_shaper_send_log gtest_shaper_send_log.cpp stub.cpp dummy_swarmkv.cpp dummy_rdkafka.cpp) target_include_directories(gtest_shaper_send_log PUBLIC ${CMAKE_SOURCE_DIR}/common/include) target_include_directories(gtest_shaper_send_log PUBLIC ${CMAKE_SOURCE_DIR}/shaping/include) target_link_libraries(gtest_shaper_send_log common shaper pthread gtest) @@ -21,7 +21,7 @@ target_link_libraries(gtest_shaper_send_log common shaper pthread gtest) # gtest_shaper ############################################################################### -add_executable(gtest_shaper gtest_shaper.cpp stub.cpp dummy_swarmkv.cpp dummy_time.cpp) +add_executable(gtest_shaper gtest_shaper.cpp stub.cpp dummy_swarmkv.cpp dummy_time.cpp dummy_rdkafka.cpp) target_include_directories(gtest_shaper PUBLIC ${CMAKE_SOURCE_DIR}/common/include) target_include_directories(gtest_shaper PUBLIC ${CMAKE_SOURCE_DIR}/shaping/include) target_link_libraries(gtest_shaper common shaper pthread gtest) diff --git a/shaping/test/dummy_rdkafka.cpp b/shaping/test/dummy_rdkafka.cpp new file mode 100644 index 0000000..5d255d8 --- /dev/null +++ b/shaping/test/dummy_rdkafka.cpp @@ -0,0 +1,41 @@ +#include + +rd_kafka_conf_t *rd_kafka_conf_new(void) +{ + return NULL; +} + +rd_kafka_conf_res_t rd_kafka_conf_set(rd_kafka_conf_t *conf, const char *name, const char *value, char *errstr, size_t errstr_size) +{ + return RD_KAFKA_CONF_OK; +} + +rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, rd_kafka_conf_t *conf, char *errstr, size_t errstr_size) +{ + return NULL; +} + +rd_kafka_topic_t *rd_kafka_topic_new(rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf) +{ + return NULL; +} + +int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t keylen, void *msg_opaque) +{ + return 0; +} + +rd_kafka_resp_err_t rd_kafka_last_error (void) +{ + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + +const char *rd_kafka_err2name (rd_kafka_resp_err_t err) +{ + return NULL; +} + +const char *rd_kafka_err2str (rd_kafka_resp_err_t err) +{ + return NULL; +} \ No newline at end of file diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp index 49b9a06..47130c1 100644 --- a/shaping/test/gtest_shaper.cpp +++ b/shaping/test/gtest_shaper.cpp @@ -1,7 +1,7 @@ -#include #include #include #include +#include #include "log.h" #include "shaper.h" @@ -12,15 +12,13 @@ #include "stub.h" #define SHAPING_SESSION_QUEUE_LEN 128 -#define SHAPING_STAT_FILE_NAME "/tmp/shaping_metrics.json" -#define SHAPING_GLOBAL_STAT_FILE_NAME "shaping_global_metric" #define FIELDSTAT_AUTO_TIME_MAX 999999000 +#define FIELDSTAT_OUTPUT_BUF_LEN 4096 + char profile_type_primary[] = "primary"; char profile_type_borrow[] = "borrow"; -char line[4096]; - static struct stub_packet* packet_new(unsigned long long income_time, unsigned int length, unsigned char dir) { struct stub_packet *packet; @@ -266,11 +264,17 @@ TEST(single_session, udp_tx_in_order) } shaping_flow_free(&ctx->thread_ctx[0], sf); - fieldstat_global_disable_prometheus_endpoint(); /***********send stat data here********************/ - fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy - shaper_global_stat_refresh(ctx); + char *global_stat_str = (char*)malloc(FIELDSTAT_OUTPUT_BUF_LEN); + size_t global_stat_str_len = sizeof(global_stat_str); + char *stat_str = (char*)malloc(FIELDSTAT_OUTPUT_BUF_LEN); + size_t stat_str_len = sizeof(stat_str); + + shaper_thread_global_stat_refresh(&ctx->thread_ctx[0]); + + fieldstat_easy_output(ctx->global_stat->instance, &global_stat_str, &global_stat_str_len); + fieldstat_easy_output(ctx->thread_ctx[0].stat->instance, &stat_str, &stat_str_len); shaper_thread_resource_clear(); shaping_engine_destroy(ctx); @@ -278,24 +282,14 @@ TEST(single_session, udp_tx_in_order) stub_clear_matched_shaping_rules(); /*******test statistics***********/ - sleep(2);//wait telegraf to output - FILE *stat_file; - //judge shaping metric - stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); - memset(line, 0, sizeof(line)); - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts - fclose(stat_file); - stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file - fclose(stat_file); + shaping_stat_judge(stat_str, 0, 0, 1, 100, 10000, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts //judge shaping global metric - stat_file = fopen(SHAPING_GLOBAL_STAT_FILE_NAME, "r"); - memset(line, 0, sizeof(line)); - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_global_stat_judge(line, 100, 10000, 0, 0, 0, 0); - fclose(stat_file); + shaping_global_stat_judge(global_stat_str, 100, 10000, 0, 0, 0, 0); + + free(global_stat_str); + free(stat_str); } /*session1 match rule1 @@ -355,7 +349,6 @@ TEST(bidirectional, udp_tx_in_order) ASSERT_TRUE(TAILQ_EMPTY(&expec_tx_queue_in)); shaping_flow_free(&ctx->thread_ctx[0], sf); - fieldstat_global_disable_prometheus_endpoint(); shaper_thread_resource_clear(); shaping_engine_destroy(ctx); @@ -410,11 +403,17 @@ TEST(max_min_host_fairness_profile, udp_tx_in_order) } shaping_flow_free(&ctx->thread_ctx[0], sf); - fieldstat_global_disable_prometheus_endpoint(); /***********send stat data here********************/ - fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy - shaper_global_stat_refresh(ctx); + char *global_stat_str = (char*)malloc(FIELDSTAT_OUTPUT_BUF_LEN); + size_t global_stat_str_len = sizeof(global_stat_str); + char *stat_str = (char*)malloc(FIELDSTAT_OUTPUT_BUF_LEN); + size_t stat_str_len = sizeof(stat_str); + + shaper_thread_global_stat_refresh(&ctx->thread_ctx[0]); + + fieldstat_easy_output(ctx->global_stat->instance, &global_stat_str, &global_stat_str_len); + fieldstat_easy_output(ctx->thread_ctx[0].stat->instance, &stat_str, &stat_str_len); shaper_thread_resource_clear(); shaping_engine_destroy(ctx); @@ -422,24 +421,14 @@ TEST(max_min_host_fairness_profile, udp_tx_in_order) stub_clear_matched_shaping_rules(); /*******test statistics***********/ - sleep(2);//wait telegraf to output - FILE *stat_file; - //judge shaping metric - stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); - memset(line, 0, sizeof(line)); - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts - fclose(stat_file); - stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file - fclose(stat_file); + shaping_stat_judge(stat_str, 0, 0, 1, 100, 10000, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts //judge shaping global metric - stat_file = fopen(SHAPING_GLOBAL_STAT_FILE_NAME, "r"); - memset(line, 0, sizeof(line)); - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_global_stat_judge(line, 100, 10000, 0, 0, 0, 0); - fclose(stat_file); + shaping_global_stat_judge(global_stat_str, 100, 10000, 0, 0, 0, 0); + + free(global_stat_str); + free(stat_str); } /*session1 match rule1 @@ -482,9 +471,11 @@ TEST(single_session, tcp_tx_in_order) /***********send stat data here********************/ - shaper_stat_refresh(&ctx->thread_ctx[0], sf, 1); - fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy - sleep(2);//wait telegraf generate metric + char *stat_str = (char*)malloc(FIELDSTAT_OUTPUT_BUF_LEN); + size_t stat_str_len = sizeof(stat_str); + + fieldstat_easy_output(ctx->thread_ctx[0].stat->instance, &stat_str, &stat_str_len); + shaping_stat_judge(stat_str, 0, 0, 1, 20, 2000, 0, 10, 0, SHAPING_DIR_OUT, profile_type_primary);//*test statistics stub_refresh_token_bucket(0); @@ -502,10 +493,9 @@ TEST(single_session, tcp_tx_in_order) ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); shaping_flow_free(&ctx->thread_ctx[0], sf); - fieldstat_global_disable_prometheus_endpoint(); /***********send stat data here********************/ - fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + fieldstat_easy_output(ctx->thread_ctx[0].stat->instance, &stat_str, &stat_str_len); shaper_thread_resource_clear(); shaping_engine_destroy(ctx); @@ -513,20 +503,8 @@ TEST(single_session, tcp_tx_in_order) /*******test statistics***********/ - sleep(2);//wait telegraf to output - FILE *stat_file; - - stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); - memset(line, 0, sizeof(line)); - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 10, 0, SHAPING_DIR_OUT, profile_type_primary); - - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 10, 1000, 0, 0, 31000, SHAPING_DIR_OUT, profile_type_primary); - - fclose(stat_file); - stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file - fclose(stat_file); + shaping_stat_judge(stat_str, 0, 0, 1, 10, 1000, 0, 0, 31000, SHAPING_DIR_OUT, profile_type_primary); + free(stat_str); } /*session1 match rule1 @@ -581,10 +559,12 @@ TEST(single_session, udp_diff_direction) ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); shaping_flow_free(&ctx->thread_ctx[0], sf); - fieldstat_global_disable_prometheus_endpoint(); /***********send stat data here********************/ - fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + char *stat_str = (char*)malloc(FIELDSTAT_OUTPUT_BUF_LEN); + size_t stat_str_len = sizeof(stat_str); + + fieldstat_easy_output(ctx->thread_ctx[0].stat->instance, &stat_str, &stat_str_len); shaper_thread_resource_clear(); shaping_engine_destroy(ctx); @@ -592,18 +572,9 @@ TEST(single_session, udp_diff_direction) /*******test statistics***********/ - sleep(2);//wait telegraf to output - FILE *stat_file; - - stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); - memset(line, 0, sizeof(line)); - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 21000, SHAPING_DIR_OUT, profile_type_primary); - - shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 0, SHAPING_DIR_IN, profile_type_primary); - fclose(stat_file); - stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file - fclose(stat_file); + shaping_stat_judge(stat_str, 0, 0, 1, 20, 2000, 0, 0, 21000, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(stat_str, 0, 0, 1, 20, 2000, 0, 0, 0, SHAPING_DIR_IN, profile_type_primary); + free(stat_str); } /*session1 match rule1, rule2, rule3 @@ -660,33 +631,28 @@ TEST(single_session, udp_multi_rules) } shaping_flow_free(&ctx->thread_ctx[0], sf); - fieldstat_global_disable_prometheus_endpoint(); /***********send stat data here********************/ - fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + char *stat_str = (char*)malloc(FIELDSTAT_OUTPUT_BUF_LEN); + size_t stat_str_len = sizeof(stat_str); + + fieldstat_easy_output(ctx->thread_ctx[0].stat->instance, &stat_str, &stat_str_len); shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); /*******test statistics***********/ - sleep(2);//wait telegraf to output - FILE *stat_file; + //profile_id 0 + shaping_stat_judge(stat_str, 0, 0, 1, 100, 10000, 0, 0, 507000, SHAPING_DIR_OUT, profile_type_primary); - stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); - memset(line, 0, sizeof(line)); - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 0 - shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 507000, SHAPING_DIR_OUT, profile_type_primary); + //profile_id 1 + shaping_stat_judge(stat_str, 1, 1, 1, 100, 10000, 0, 0, 1000, SHAPING_DIR_OUT, profile_type_primary); - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1 - shaping_stat_judge(line, 1, 1, 1, 100, 10000, 0, 0, 1000, SHAPING_DIR_OUT, profile_type_primary); + //profile_id 2 + shaping_stat_judge(stat_str, 2, 2, 1, 100, 10000, 0, 0, 91000, SHAPING_DIR_OUT, profile_type_primary);//max latency is first queued pkt - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2 - shaping_stat_judge(line, 2, 2, 1, 100, 10000, 0, 0, 91000, SHAPING_DIR_OUT, profile_type_primary);//max latency is first queued pkt - - fclose(stat_file); - stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file - fclose(stat_file); + free(stat_str); } /*session1 match rule1 @@ -735,30 +701,25 @@ TEST(single_session, udp_borrow) } shaping_flow_free(&ctx->thread_ctx[0], sf); - fieldstat_global_disable_prometheus_endpoint(); /***********send stat data here********************/ - fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + char *stat_str = (char*)malloc(FIELDSTAT_OUTPUT_BUF_LEN); + size_t stat_str_len = sizeof(stat_str); + + fieldstat_easy_output(ctx->thread_ctx[0].stat->instance, &stat_str, &stat_str_len); shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); /*******test statistics***********/ - sleep(2);//wait telegraf to output - FILE *stat_file; + //profile_id 1, primary + shaping_stat_judge(stat_str, 1, 1, 1, 0, 0, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary); - stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); - memset(line, 0, sizeof(line)); - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1, primary - shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary); + //profile_id 2, borrow + shaping_stat_judge(stat_str, 1, 2, 2, 100, 10000, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow); - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, borrow - shaping_stat_judge(line, 1, 2, 2, 100, 10000, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow); - - fclose(stat_file); - stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file - fclose(stat_file); + free(stat_str); } /*session1 match rule1 @@ -810,35 +771,30 @@ TEST(single_session, udp_borrow_same_priority_9) } shaping_flow_free(&ctx->thread_ctx[0], sf); - fieldstat_global_disable_prometheus_endpoint(); /***********send stat data here********************/ - fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + char *stat_str = (char*)malloc(FIELDSTAT_OUTPUT_BUF_LEN); + size_t stat_str_len = sizeof(stat_str); + + fieldstat_easy_output(ctx->thread_ctx[0].stat->instance, &stat_str, &stat_str_len); shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); /*******test statistics***********/ - sleep(2);//wait telegraf to output - FILE *stat_file; - - stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); - memset(line, 0, sizeof(line)); - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1, primary - shaping_stat_judge(line, 1, 1, 9, 0, 0, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary); + //profile_id 1, primary + shaping_stat_judge(stat_str, 1, 1, 9, 0, 0, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary); #if 0 //fieldstat don't output a row when all values is zero ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, borrow shaping_stat_judge(line, 1, 2, 9, 0, 0, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow); #endif - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 3, borrow - shaping_stat_judge(line, 1, 3, 9, 100, 10000, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow); + //profile_id 3, borrow + shaping_stat_judge(stat_str, 1, 3, 9, 100, 10000, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow); - fclose(stat_file); - stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file - fclose(stat_file); + free(stat_str); } /*session1 match rule1 @@ -885,7 +841,6 @@ TEST(single_session_async, udp_close_before_async_exec) ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); - fieldstat_global_disable_prometheus_endpoint(); shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); @@ -980,33 +935,28 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order) shaping_flow_free(&ctx->thread_ctx[0], sf1); shaping_flow_free(&ctx->thread_ctx[0], sf2); - fieldstat_global_disable_prometheus_endpoint(); /***********send stat data here********************/ - fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + char *stat_str = (char*)malloc(FIELDSTAT_OUTPUT_BUF_LEN); + size_t stat_str_len = sizeof(stat_str); + + fieldstat_easy_output(ctx->thread_ctx[0].stat->instance, &stat_str, &stat_str_len); shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); /*******test statistics***********/ - sleep(2);//wait telegraf to output - FILE *stat_file; + //profile_id 1, primary + shaping_stat_judge(stat_str, 1, 1, 1, 0, 0, 0, 0, 1471000, SHAPING_DIR_OUT, profile_type_primary); - stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); - memset(line, 0, sizeof(line)); - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1, primary - shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 1471000, SHAPING_DIR_OUT, profile_type_primary); + //profile_id 2, borrow + shaping_stat_judge(stat_str, 1, 2, 2, 100, 10000, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow); - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, borrow - shaping_stat_judge(line, 1, 2, 2, 100, 10000, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow); - - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, primary - shaping_stat_judge(line, 2, 2, 1, 100, 10000, 0, 0, 191000, SHAPING_DIR_OUT, profile_type_primary); + //profile_id 2, primary + shaping_stat_judge(stat_str, 2, 2, 1, 100, 10000, 0, 0, 191000, SHAPING_DIR_OUT, profile_type_primary); - fclose(stat_file); - stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file - fclose(stat_file); + free(stat_str); } /*session1 match rule1; session2 match rule2 @@ -1089,7 +1039,6 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order) shaping_flow_free(&ctx->thread_ctx[0], sf1); shaping_flow_free(&ctx->thread_ctx[1], sf2); - fieldstat_global_disable_prometheus_endpoint(); shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); @@ -1191,7 +1140,6 @@ TEST(two_session_diff_priority_same_profile, profile_timer_test) shaping_flow_free(&ctx->thread_ctx[0], sf1); shaping_flow_free(&ctx->thread_ctx[1], sf2); - fieldstat_global_disable_prometheus_endpoint(); shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); @@ -1279,7 +1227,6 @@ TEST(two_session_diff_priority_same_profile, one_direction_dont_block_another) shaping_flow_free(&ctx->thread_ctx[0], sf1); shaping_flow_free(&ctx->thread_ctx[1], sf2); - fieldstat_global_disable_prometheus_endpoint(); shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); @@ -1355,7 +1302,6 @@ TEST(two_sessions, priority_non_block) shaping_flow_free(&ctx->thread_ctx[0], sf1); shaping_flow_free(&ctx->thread_ctx[1], sf2); - fieldstat_global_disable_prometheus_endpoint(); shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); @@ -1430,7 +1376,6 @@ TEST(two_sessions, borrow_when_primary_profile_priority_blocked) shaping_flow_free(&ctx->thread_ctx[0], sf1); shaping_flow_free(&ctx->thread_ctx[1], sf2); - fieldstat_global_disable_prometheus_endpoint(); shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); @@ -1511,7 +1456,6 @@ TEST(two_sessions, primary_profile_priority_blocked_by_borrow_profile) shaping_flow_free(&ctx->thread_ctx[0], sf1); shaping_flow_free(&ctx->thread_ctx[1], sf2); - fieldstat_global_disable_prometheus_endpoint(); shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); @@ -1560,35 +1504,31 @@ TEST(statistics, udp_drop_pkt) } shaping_flow_free(&ctx->thread_ctx[0], sf); - fieldstat_global_disable_prometheus_endpoint(); /***********send stat data here********************/ - fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy - shaper_global_stat_refresh(ctx); + char *global_stat_str = (char*)malloc(FIELDSTAT_OUTPUT_BUF_LEN); + size_t global_stat_str_len = sizeof(global_stat_str); + char *stat_str = (char*)malloc(FIELDSTAT_OUTPUT_BUF_LEN); + size_t stat_str_len = sizeof(stat_str); + + shaper_thread_global_stat_refresh(&ctx->thread_ctx[0]); + + fieldstat_easy_output(ctx->global_stat->instance, &global_stat_str, &global_stat_str_len); + fieldstat_easy_output(ctx->thread_ctx[0].stat->instance, &stat_str, &stat_str_len); shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); /*******test statistics***********/ - sleep(2);//wait telegraf to output - FILE *stat_file; - //judge shaping metric - stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); - memset(line, 0, sizeof(line)); - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, SHAPING_SESSION_QUEUE_LEN+10, (SHAPING_SESSION_QUEUE_LEN+10)*100, 100, 0, 228000, SHAPING_DIR_OUT, profile_type_primary);//every queued pkt's latency is max - fclose(stat_file); - stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file - fclose(stat_file); + shaping_stat_judge(stat_str, 0, 0, 1, SHAPING_SESSION_QUEUE_LEN+10, (SHAPING_SESSION_QUEUE_LEN+10)*100, 100, 0, 228000, SHAPING_DIR_OUT, profile_type_primary);//every queued pkt's latency is max //judge shaping global metric - stat_file = fopen(SHAPING_GLOBAL_STAT_FILE_NAME, "r"); - memset(line, 0, sizeof(line)); - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_global_stat_judge(line, SHAPING_SESSION_QUEUE_LEN+10, (SHAPING_SESSION_QUEUE_LEN+10)*100, 100, 10000, 0, 0); - fclose(stat_file); + shaping_global_stat_judge(global_stat_str, SHAPING_SESSION_QUEUE_LEN+10, (SHAPING_SESSION_QUEUE_LEN+10)*100, 100, 10000, 0, 0); + + free(global_stat_str); + free(stat_str); } /*session1 match rule1 @@ -1604,7 +1544,6 @@ TEST(statistics, udp_queueing_pkt) int priority[] = {1}; int profile_num[] = {1}; int profile_id[][MAX_REF_PROFILE] = {{0}}; - FILE *stat_file; TAILQ_INIT(&expec_tx_queue); stub_init(); @@ -1624,17 +1563,20 @@ TEST(statistics, udp_queueing_pkt) /***********send stat data here********************/ + char *global_stat_str = (char*)malloc(FIELDSTAT_OUTPUT_BUF_LEN); + size_t global_stat_str_len = sizeof(global_stat_str); + char *stat_str = (char*)malloc(FIELDSTAT_OUTPUT_BUF_LEN); + size_t stat_str_len = sizeof(stat_str); + + shaper_thread_global_stat_refresh(&ctx->thread_ctx[0]); shaper_stat_refresh(&ctx->thread_ctx[0], sf, 1); - fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy - shaper_global_stat_refresh(ctx); - sleep(2);//wait telegraf generate metric - /*******judge global metric********/ - stat_file = fopen(SHAPING_GLOBAL_STAT_FILE_NAME, "r"); - memset(line, 0, sizeof(line)); - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_global_stat_judge(line, 10, 1000, 0, 0, 90, 9000); - fclose(stat_file); + fieldstat_easy_output(ctx->global_stat->instance, &global_stat_str, &global_stat_str_len); + fieldstat_easy_output(ctx->thread_ctx[0].stat->instance, &stat_str, &stat_str_len); + + /*******judge metric********/ + shaping_stat_judge(stat_str, 0, 0, 1, 10, 1000, 0, 90, 0, SHAPING_DIR_OUT, profile_type_primary); + shaping_global_stat_judge(global_stat_str, 10, 1000, 0, 0, 90, 9000); //first 10 packets ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); @@ -1649,39 +1591,24 @@ TEST(statistics, udp_queueing_pkt) } shaping_flow_free(&ctx->thread_ctx[0], sf); - fieldstat_global_disable_prometheus_endpoint(); /***********send stat data here********************/ - fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy - shaper_global_stat_refresh(ctx); + fieldstat_easy_output(ctx->global_stat->instance, &global_stat_str, &global_stat_str_len); + fieldstat_easy_output(ctx->thread_ctx[0].stat->instance, &stat_str, &stat_str_len); shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); /*******test statistics***********/ - sleep(2);//wait telegraf to output - //judge shaping metric - stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); - memset(line, 0, sizeof(line)); - - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//stat data first sent - shaping_stat_judge(line, 0, 0, 1, 10, 1000, 0, 90, 0, SHAPING_DIR_OUT, profile_type_primary); - - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//stat data last sent - shaping_stat_judge(line, 0, 0, 1, 90, 9000, 0, 0, 90000, SHAPING_DIR_OUT, profile_type_primary); - - fclose(stat_file); - stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file - fclose(stat_file); + shaping_stat_judge(stat_str, 0, 0, 1, 90, 9000, 0, 0, 90000, SHAPING_DIR_OUT, profile_type_primary); //judge global metric - stat_file = fopen(SHAPING_GLOBAL_STAT_FILE_NAME, "r"); - memset(line, 0, sizeof(line)); - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_global_stat_judge(line, 90, 9000, 0, 0, -90, -9000); - fclose(stat_file); + shaping_global_stat_judge(global_stat_str, 90, 9000, 0, 0, -90, -9000); + + free(global_stat_str); + free(stat_str); } int main(int argc, char **argv) diff --git a/shaping/test/stub.h b/shaping/test/stub.h index d791295..4c86f09 100644 --- a/shaping/test/stub.h +++ b/shaping/test/stub.h @@ -56,8 +56,3 @@ void stub_shaper_stat_send(int thread_seq); void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_thread_ctx *ctx); /*******************temporary for test******************************/ - - -/************************temp invoke fieldstat api, to avoid memory leak for self test*********************************/ -void fieldstat_global_disable_prometheus_endpoint(); -/**********************************************************************************************************************/ \ No newline at end of file diff --git a/shaping/test/test_conf/shaping.conf b/shaping/test/test_conf/shaping.conf index 015ab3c..325803f 100644 --- a/shaping/test/test_conf/shaping.conf +++ b/shaping/test/test_conf/shaping.conf @@ -30,11 +30,10 @@ SWARMKV_HEALTH_CHECK_PORT=0 SWARMKV_HEALTH_CHECK_ANNOUNCE_PORT=1111 [METRIC] -FIELDSTAT_OUTPUT_INTERVAL_MS=999999000 -FIELDSTAT_ENABLE_BACKGRUND_THREAD=0 -SELF_TEST=1 -LINE_PROTOCOL_SERVER_IP="127.0.0.1" -LINE_PROTOCOL_SERVER_PORT=6667 +FIELDSTAT_OUTPUT_INTERVAL_S=999999000 +DEVICE_GROUP="test_device_group" +DEVICE_ID="2333333333333333" +DATA_CENTER="center-xxg-tsgx" [CONFIG] #PROFILE_QUEUE_LEN_PER_PRIORITY_MAX=128 -- cgit v1.2.3