From 55b82e9f049c9ec9bce3e8f8d309a01b3619ecc2 Mon Sep 17 00:00:00 2001 From: liuchang Date: Thu, 23 Mar 2023 11:51:15 +0000 Subject: suit filestat3 --- shaping/src/shaper.cpp | 121 ++++++++++++++++++++++++------------------------- 1 file changed, 59 insertions(+), 62 deletions(-) (limited to 'shaping/src/shaper.cpp') diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index cdc3657..5cd64c6 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -1,7 +1,6 @@ #include "log.h" #include "session_table.h" #include -#include #include #include #include @@ -186,18 +185,19 @@ void shaper_packet_dequeue(struct shaping_flow *sf) return; } -void shaper_queue_clear(struct shaping_flow *sf, struct shaping_stat_data **stat_hashtbl, struct shaping_thread_ctx *ctx) +void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx) { struct shaping_packet_wrapper *pkt_wrapper; + struct shaping_stat *stat = ctx->stat; struct shaping_rule_info *rule = &sf->matched_rule_infos[0]; while (!shaper_queue_empty(sf)) { pkt_wrapper = shaper_first_pkt_get(sf); - shaper_stat_queueing_pkt_dec(stat_hashtbl, rule->id, rule->primary.id, rule->primary.priority, - pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY); - shaper_stat_drop_inc(stat_hashtbl, rule->id, rule->primary.id, rule->primary.priority, - pkt_wrapper->direction, pkt_wrapper->length); + shaper_stat_queueing_pkt_dec(stat, rule->id, rule->primary.id, rule->primary.priority, + pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index); + shaper_stat_drop_inc(stat, rule->id, rule->primary.id, rule->primary.priority, + pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index); marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index); shaper_packet_dequeue(sf); @@ -207,11 +207,11 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_stat_data **stat } //return success(0) while any avl tree insert success -int shaper_flow_push(struct shaping_flow *sf, struct shaper *sp, - struct shaping_stat_data **stat_hashtbl, unsigned long long enqueue_time) +int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time) { struct shaping_node *s_node = (struct shaping_node*)sf; struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor]; + struct shaper *sp = ctx->sp; struct shaping_packet_wrapper *pkt_wrapper = NULL; int priority; int ret = -1; @@ -224,9 +224,9 @@ int shaper_flow_push(struct shaping_flow *sf, struct shaper *sp, avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns); if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) { ret = 0; - shaper_stat_queueing_pkt_inc(stat_hashtbl, s_rule_info->id, s_rule_info->primary.id, - priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY); - shaper_stat_queueing_session_inc(stat_hashtbl, s_rule_info->id, s_rule_info->primary.id, priority, SHAPING_PROFILE_TYPE_PRIMARY); + shaper_stat_queueing_pkt_inc(ctx->stat, s_rule_info->id, s_rule_info->primary.id, + priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index); + shaper_stat_queueing_session_inc(ctx->stat, s_rule_info->id, s_rule_info->primary.id, priority, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index); s_rule_info->primary.enqueue_time_us = enqueue_time; } @@ -238,9 +238,9 @@ int shaper_flow_push(struct shaping_flow *sf, struct shaper *sp, avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns); if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) { ret = 0; - shaper_stat_queueing_pkt_inc(stat_hashtbl, s_rule_info->id, s_rule_info->borrowing[i].id, - priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_BORROW); - shaper_stat_queueing_session_inc(stat_hashtbl, s_rule_info->id, s_rule_info->borrowing[i].id, priority, SHAPING_PROFILE_TYPE_BORROW); + shaper_stat_queueing_pkt_inc(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id, + priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index); + shaper_stat_queueing_session_inc(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id, priority, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index); s_rule_info->borrowing[i].enqueue_time_us = enqueue_time; } } @@ -258,10 +258,11 @@ static unsigned long long shaper_pkt_latency_calculate(struct shaping_profile_in return (curr_time - enqueue_time); } -static void shaping_flow_remove_from_pool(struct shaping_flow *sf, struct shaper *sp, struct shaping_stat_data **stat_hashtbl) +static void shaping_flow_remove_from_pool(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) { struct shaping_node *s_node = (struct shaping_node*)sf; struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor]; + struct shaper *sp = ctx->sp; struct shaping_packet_wrapper *pkt_wrapper = NULL; struct timespec curr_time; unsigned long long latency; @@ -276,13 +277,13 @@ static void shaping_flow_remove_from_pool(struct shaping_flow *sf, struct shaper priority = s_rule_info->primary.priority; if (avl_node_in_tree(s_node->avl_node[priority])) { avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); - shaper_stat_queueing_pkt_dec(stat_hashtbl, s_rule_info->id, s_rule_info->primary.id, - priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY); - shaper_stat_queueing_session_dec(stat_hashtbl, s_rule_info->id, s_rule_info->primary.id, priority, SHAPING_PROFILE_TYPE_PRIMARY); + shaper_stat_queueing_pkt_dec(ctx->stat, s_rule_info->id, s_rule_info->primary.id, + priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index); + shaper_stat_queueing_session_dec(ctx->stat, s_rule_info->id, s_rule_info->primary.id, priority, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index); latency = shaper_pkt_latency_calculate(&s_rule_info->primary, &curr_time); - shaper_stat_max_latency_update(stat_hashtbl, s_rule_info->id, s_rule_info->primary.id, - priority, pkt_wrapper->direction, latency, SHAPING_PROFILE_TYPE_PRIMARY); + shaper_stat_max_latency_update(ctx->stat, s_rule_info->id, s_rule_info->primary.id, + priority, pkt_wrapper->direction, latency, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index); } if (s_rule_info->borrowing_num == 0) { @@ -293,13 +294,13 @@ static void shaping_flow_remove_from_pool(struct shaping_flow *sf, struct shaper priority = s_rule_info->borrowing[i].priority; if (avl_node_in_tree(s_node->avl_node[priority])) { avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); - shaper_stat_queueing_pkt_dec(stat_hashtbl, s_rule_info->id, s_rule_info->borrowing[i].id, - priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_BORROW); - shaper_stat_queueing_session_dec(stat_hashtbl, s_rule_info->id, s_rule_info->borrowing[i].id, priority, SHAPING_PROFILE_TYPE_BORROW); + shaper_stat_queueing_pkt_dec(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id, + priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index); + shaper_stat_queueing_session_dec(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id, priority, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index); - latency = shaper_pkt_latency_calculate(&s_rule_info->primary, &curr_time); - shaper_stat_max_latency_update(stat_hashtbl, s_rule_info->id, s_rule_info->borrowing[i].id, - priority, pkt_wrapper->direction, latency, SHAPING_PROFILE_TYPE_BORROW); + latency = shaper_pkt_latency_calculate(&s_rule_info->borrowing[i], &curr_time); + shaper_stat_max_latency_update(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id, + priority, pkt_wrapper->direction, latency, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index); } } @@ -331,9 +332,9 @@ int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_i return count; } -void shaper_flow_pop(struct shaper *sp, struct shaping_flow *sf, struct shaping_stat_data **stat_hashtbl) +void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) { - shaping_flow_remove_from_pool(sf, sp, stat_hashtbl); + shaping_flow_remove_from_pool(ctx, sf); return; } @@ -516,8 +517,7 @@ static int shaper_next_anchor_get(struct shaping_flow *sf, unsigned char directi return anchor; } -enum shaping_packet_action shaper_pkt_action_decide(struct swarmkv *db, struct shaping_flow *sf, struct shaper *sp, - int priority, struct shaping_stat_data **stat_hashtbl, int sf_in_queue) +enum shaping_packet_action shaper_pkt_action_decide(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int priority, int sf_in_queue) { int profile_type = 0; struct shaping_profile_info *profile = NULL; @@ -534,18 +534,18 @@ enum shaping_packet_action shaper_pkt_action_decide(struct swarmkv *db, struct s if (pkt_wrapper->tcp_pure_contorl) { if (sf_in_queue) { - shaper_flow_pop(sp, sf, stat_hashtbl); + shaper_flow_pop(ctx, sf); } - shaper_stat_forward_all_rule_inc(stat_hashtbl, sf, pkt_wrapper->direction, pkt_wrapper->length); + shaper_stat_forward_all_rule_inc(ctx->stat, sf, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index); return SHAPING_FORWARD; } - if (0 == shaper_token_consume(db, sf, pkt_wrapper->length, profile, profile_type, pkt_wrapper->direction)) { - shaper_stat_forward_inc(stat_hashtbl, rule->id, profile->id, profile->priority, - pkt_wrapper->direction, pkt_wrapper->length, profile_type); + if (0 == shaper_token_consume(ctx->swarmkv_db, sf, pkt_wrapper->length, profile, profile_type, pkt_wrapper->direction)) { + shaper_stat_forward_inc(ctx->stat, rule->id, profile->id, profile->priority, + pkt_wrapper->direction, pkt_wrapper->length, profile_type, ctx->thread_index); if (sf_in_queue) { - shaper_flow_pop(sp, sf, stat_hashtbl); + shaper_flow_pop(ctx, sf); } sf->anchor = shaper_next_anchor_get(sf, pkt_wrapper->direction); @@ -566,12 +566,12 @@ enum shaping_packet_action shaper_pkt_action_decide(struct swarmkv *db, struct s } FLOW_PUSH: - if (0 == shaper_flow_push(sf, sp, stat_hashtbl, enqueue_time)) { + if (0 == shaper_flow_push(ctx, sf, enqueue_time)) { return SHAPING_QUEUED; } else { rule = &sf->matched_rule_infos[sf->anchor]; - shaper_stat_drop_inc(stat_hashtbl, rule->id, rule->primary.id, - rule->primary.priority, pkt_wrapper->direction, pkt_wrapper->length); + shaper_stat_drop_inc(ctx->stat, rule->id, rule->primary.id, + rule->primary.priority, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index); sf->anchor = 0; return SHAPING_DROP; @@ -579,7 +579,7 @@ FLOW_PUSH: } static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_flow *sf, int priority, - struct shaping_stat_data **stat_hashtbl, struct shaping_thread_ctx *ctx) + struct shaping_stat *stat, struct shaping_thread_ctx *ctx) { struct shaping_packet_wrapper *pkt_wrapper; struct shaping_rule_info *rule = NULL; @@ -600,7 +600,7 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ shaping_ret = shaper_pkt_action_decide(g_swarmkv_db, sf, sp, priority, stat_hashtbl, 1); } #endif - shaping_ret = shaper_pkt_action_decide(ctx->swarmkv_db, sf, sp, priority, stat_hashtbl, 1); + shaping_ret = shaper_pkt_action_decide(ctx, sf, priority, 1); switch (shaping_ret) { case SHAPING_QUEUED: @@ -632,16 +632,16 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ pkt_wrapper = shaper_first_pkt_get(sf); sf->anchor = 0; - if (0 == shaper_flow_push(sf, sp, stat_hashtbl, pkt_wrapper->enqueue_time_us)) { + if (0 == shaper_flow_push(ctx, sf, pkt_wrapper->enqueue_time_us)) { /*in pkt process, when queue not empty, new pkt's queueing stat was added to primary profile of first rule. while shaper_flow_push() here will add queueing stat to every profile of first rule, so need adjust queueing stat here*/ rule = &sf->matched_rule_infos[sf->anchor]; - shaper_stat_queueing_pkt_dec(stat_hashtbl, rule->id, rule->primary.id, rule->primary.priority, - pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY); + shaper_stat_queueing_pkt_dec(stat, rule->id, rule->primary.id, rule->primary.priority, + pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index); } else { - shaper_queue_clear(sf, stat_hashtbl, ctx);//first packet fail, then every packet will fail + shaper_queue_clear(sf, ctx);//first packet fail, then every packet will fail if (sf->flag & STREAM_CLOSE) { shaping_flow_free(sf); } @@ -655,7 +655,6 @@ void shaping_stream_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu int priority; int shaping_ret; struct shaping_rule_info *s_rule; - struct shaper *sp = ctx->sp; struct shaping_stat *stat = ctx->stat; struct shaping_marsio_info *marsio_info = ctx->marsio_info; struct timespec curr_time; @@ -664,19 +663,18 @@ void shaping_stream_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu if (!shaper_queue_empty(sf)) {//already have queueing pkt, enqueue directly s_rule = &sf->matched_rule_infos[0]; if (0 == shaper_packet_enqueue(ctx, sf, rx_buff, &curr_time, meta)) { - shaper_stat_queueing_pkt_inc(&stat->stat_hashtbl, s_rule->id, + shaper_stat_queueing_pkt_inc(stat, s_rule->id, s_rule->primary.id, s_rule->primary.priority, meta->dir, meta->raw_len, - SHAPING_PROFILE_TYPE_PRIMARY); + SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index); } else { - shaper_stat_drop_inc(&stat->stat_hashtbl, s_rule->id, - s_rule->primary.id, s_rule->primary.priority, meta->dir, meta->raw_len); + shaper_stat_drop_inc(stat, s_rule->id, s_rule->primary.id, s_rule->primary.priority, meta->dir, meta->raw_len, ctx->thread_index); marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index); } } else { if (meta->is_tcp_pure_ctrl) { marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1); - shaper_stat_forward_all_rule_inc(&stat->stat_hashtbl, sf, meta->dir, meta->raw_len); + shaper_stat_forward_all_rule_inc(stat, sf, meta->dir, meta->raw_len, ctx->thread_index); goto JUDGE_CLOSE;//for tcp pure control pkt, transmit it directly } @@ -689,8 +687,7 @@ void shaping_stream_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu sf->anchor = 0; priority = sf->matched_rule_infos[sf->anchor].primary.priority; - shaping_ret = shaper_pkt_action_decide(ctx->swarmkv_db, sf, sp, priority, - &stat->stat_hashtbl, 0); + shaping_ret = shaper_pkt_action_decide(ctx, sf, priority, 0); switch (shaping_ret) { case SHAPING_QUEUED: break; @@ -716,8 +713,6 @@ JUDGE_CLOSE: } } - shaper_stat_send(stat, &stat->stat_hashtbl); - return; } @@ -734,16 +729,13 @@ void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_ } for (int j = 0; j < sf_num; j++) { - ret = shaper_polling_first_pkt_token_get(sp, sf_ins[j].sf, sf_ins[j].priority, &stat->stat_hashtbl, ctx); + ret = shaper_polling_first_pkt_token_get(sp, sf_ins[j].sf, sf_ins[j].priority, stat, ctx); if (ret == 0) { - goto STAT_DATA_SEND; + return; } } } -STAT_DATA_SEND: - shaper_stat_send(stat, &stat->stat_hashtbl); - return; } @@ -945,11 +937,11 @@ void shaping_engine_destroy(struct shaping_ctx *ctx) shaper_swarmkv_destroy(ctx->swarmkv_db); shaper_maat_destroy(ctx->maat_info); shaper_marsio_destroy(ctx->marsio_info); + shaper_stat_destroy(ctx->stat); if (ctx->thread_ctx) { for (int i = 0; i < ctx->thread_num; i++) { shaper_free(ctx->thread_ctx[i].sp); - shaper_stat_send_free(ctx->thread_ctx[i].stat); session_table_destory(ctx->thread_ctx[i].session_table); } free(ctx->thread_ctx); @@ -998,13 +990,18 @@ struct shaping_ctx *shaping_engine_init() if (ctx->marsio_info == NULL) { goto ERROR; } + + ctx->stat = shaper_stat_new(conf.work_thread_num); + if (ctx->stat == NULL) { + goto ERROR; + } ctx->thread_ctx = (struct shaping_thread_ctx *)calloc(conf.work_thread_num, sizeof(struct shaping_thread_ctx)); ctx->thread_num = conf.work_thread_num; for (int i = 0; i < conf.work_thread_num; i++) { ctx->thread_ctx[i].thread_index = i; ctx->thread_ctx[i].sp = shaper_new(conf.priority_queue_len_max); - ctx->thread_ctx[i].stat = shaper_stat_new(conf.telegraf_ip, conf.telegraf_port); + ctx->thread_ctx[i].stat = ctx->stat; ctx->thread_ctx[i].session_table = session_table_create(); ctx->thread_ctx[i].maat_info = ctx->maat_info; ctx->thread_ctx[i].marsio_info = ctx->marsio_info; -- cgit v1.2.3 From c25b65bc60a5fb82c576458deb7ef12f1a6885b3 Mon Sep 17 00:00:00 2001 From: liuchang Date: Fri, 24 Mar 2023 09:53:38 +0000 Subject: fix test case for stat after using fieldstat3 --- conf/main.conf | 7 +- shaping/include/shaper.h | 2 - shaping/include/shaper_stat.h | 2 +- shaping/src/shaper.cpp | 10 +- shaping/src/shaper_stat.cpp | 43 +++++- shaping/test/gtest_shaper.cpp | 302 +++++++++++++++++++++++++++----------- shaping/test/gtest_shaper_bak.cpp | 234 ----------------------------- shaping/test/stub.cpp | 4 +- shaping/test/stub.h | 5 +- shaping/test/test_conf/main.conf | 8 +- 10 files changed, 274 insertions(+), 343 deletions(-) delete mode 100644 shaping/test/gtest_shaper_bak.cpp (limited to 'shaping/src/shaper.cpp') diff --git a/conf/main.conf b/conf/main.conf index e134a70..5a8adec 100644 --- a/conf/main.conf +++ b/conf/main.conf @@ -28,9 +28,10 @@ SWARMKV_CLUSTER_ANNOUNCE_PORT=8501 SWARMKV_HEALTH_CHECK_PORT=0 SWARMKV_HEALTH_CHECK_ANNOUNCE_PORT=1111 -#[METRIC] -#TELEGRAF_IP="127.0.0.1" -#TELEGRAF_PORT=6667 +[METRIC] +FIELDSTAT_OUTPUT_INTERVAL_MS=500 +LINE_PROTOCOL_SERVER_IP="127.0.0.1" +LINE_PROTOCOL_SERVER_PORT=6667 [CONFIG] #PROFILE_QUEUE_LEN_PER_PRIORITY_MAX=128 diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h index 0de6f62..0959ebe 100644 --- a/shaping/include/shaper.h +++ b/shaping/include/shaper.h @@ -20,8 +20,6 @@ #define SHAPING_GLOBAL_CONF_FILE "./conf/main.conf" struct shaping_global_conf { - char telegraf_ip[16]; - short telegraf_port; unsigned int session_queue_len_max; unsigned int priority_queue_len_max; int polling_node_num_max[SHAPING_PRIORITY_NUM_MAX]; diff --git a/shaping/include/shaper_stat.h b/shaping/include/shaper_stat.h index 02f9bef..30d869e 100644 --- a/shaping/include/shaper_stat.h +++ b/shaping/include/shaper_stat.h @@ -66,7 +66,7 @@ struct shaping_stat { }; void shaper_stat_destroy(struct shaping_stat *stat); -struct shaping_stat* shaper_stat_new(int thread_num); +struct shaping_stat* shaper_stat_init(int thread_num); void shaper_stat_queueing_pkt_inc(struct shaping_stat *stat, int rule_id, int profile_id, int priority, unsigned char direction, int pkt_len, int profile_type, int thread_id); void shaper_stat_queueing_pkt_dec(struct shaping_stat *stat, int rule_id, int profile_id, int priority, unsigned char direction, int pkt_len, int profile_type, int thread_id); diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 5cd64c6..541e6e5 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -1,5 +1,3 @@ -#include "log.h" -#include "session_table.h" #include #include #include @@ -12,6 +10,8 @@ extern "C" { #include "libavl.h" } +#include "log.h" +#include "session_table.h" #include "addr_tuple4.h" #include "raw_packet.h" #include "shaper.h" @@ -913,10 +913,6 @@ int shaper_global_conf_init(struct shaping_global_conf *conf) } /*************************************************************************/ - - MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "TELEGRAF_IP", conf->telegraf_ip, sizeof(conf->telegraf_ip), "127.0.0.1"); - MESA_load_profile_short_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "TELEGRAF_PORT", &conf->telegraf_port, 6379); - MESA_load_profile_uint_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "SESSION_QUEUE_LEN_MAX", &conf->session_queue_len_max, 128); MESA_load_profile_uint_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "PRIORITY_QUEUE_LEN_MAX", &conf->priority_queue_len_max, 1024); @@ -991,7 +987,7 @@ struct shaping_ctx *shaping_engine_init() goto ERROR; } - ctx->stat = shaper_stat_new(conf.work_thread_num); + ctx->stat = shaper_stat_init(conf.work_thread_num); if (ctx->stat == NULL) { goto ERROR; } diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp index 6498be9..878daa0 100644 --- a/shaping/src/shaper_stat.cpp +++ b/shaping/src/shaper_stat.cpp @@ -1,14 +1,22 @@ -#include #include #include #include #include +#include +#include #include "log.h" #include "utils.h" #include "shaper.h" #include "shaper_stat.h" +struct shaper_stat_conf { + int enable_backgroud_thread; + int output_interval_ms; + char telegraf_ip[16]; + short telegraf_port; +}; + thread_local struct fieldstat_tag tags[TAG_IDX_MAX]; void shaper_stat_destroy(struct shaping_stat *stat) @@ -25,10 +33,23 @@ void shaper_stat_destroy(struct shaping_stat *stat) return; } -struct shaping_stat* shaper_stat_new(int thread_num) +static int shaper_stat_conf_load(struct shaper_stat_conf *conf) +{ + memset(conf, 0, sizeof(struct shaper_stat_conf)); + + MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "TELEGRAF_IP", conf->telegraf_ip, sizeof(conf->telegraf_ip), "127.0.0.1"); + MESA_load_profile_short_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "TELEGRAF_PORT", &conf->telegraf_port, 6379); + MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "FIELDSTAT_OUTPUT_INTERVAL_MS", &conf->output_interval_ms, 500); + MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "FIELDSTAT_ENABLE_BACKGRUND_THREAD", &conf->enable_backgroud_thread, 1); + + return 0; +} + +struct shaping_stat* shaper_stat_init(int thread_num) { struct shaping_stat *stat = NULL; int column_num; + struct shaper_stat_conf conf; const char *column_name[] = {"queueing_sessions", "in_max_latency_us", "in_queue_len", "out_max_latency_us", "out_queue_len", //first line is gauge, second line is counter "in_pkts", "in_bytes", "in_drop_pkts", "out_pkts", "out_bytes", "out_drop_pkts"}; enum field_type column_type[] = {FIELD_TYPE_GAUGE, FIELD_TYPE_GAUGE, FIELD_TYPE_GAUGE, FIELD_TYPE_GAUGE, FIELD_TYPE_GAUGE, @@ -40,6 +61,11 @@ struct shaping_stat* shaper_stat_new(int thread_num) goto ERROR; } + if (shaper_stat_conf_load(&conf) != 0) { + LOG_ERROR("%s: shaping init metric conf failed", LOG_TAG_STAT); + goto ERROR; + } + stat = (struct shaping_stat *)calloc(1, sizeof(struct shaping_stat)); stat->instance = fieldstat_dynamic_instance_new("shaping_engine", thread_num); @@ -48,6 +74,12 @@ struct shaping_stat* shaper_stat_new(int thread_num) goto ERROR; } + fieldstat_dynamic_set_output_interval(stat->instance, conf.output_interval_ms); + fieldstat_dynamic_set_line_protocol_server(stat->instance, conf.telegraf_ip, conf.telegraf_port); + if (conf.enable_backgroud_thread == 0) { + fieldstat_dynamic_disable_background_thread(stat->instance); + } + stat->table_id = fieldstat_register_dynamic_table(stat->instance, "shaping_metric", column_name, column_type, column_num, stat->column_ids); if (stat->table_id < 0) { LOG_ERROR("%s: shaping fieldstat register table failed", LOG_TAG_STAT); @@ -199,6 +231,13 @@ void shaper_stat_max_latency_update(struct shaping_stat *stat, int rule_id, int } } #endif + + shaper_stat_tags_build(rule_id, profile_id, priority, profile_type); + if (direction == SHAPING_DIR_IN) { + fieldstat_dynamic_table_metric_value_set(stat->instance, stat->table_id, stat->column_ids[IN_MAX_LATENCY_IDX], "shaping_metric_row", -1, tags, TAG_IDX_MAX, thread_id); + } else { + fieldstat_dynamic_table_metric_value_set(stat->instance, stat->table_id, stat->column_ids[OUT_MAX_LATENCY_IDX], "shaping_metric_row", -1, tags, TAG_IDX_MAX, thread_id); + } return; } diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp index 50aef6f..51b1a06 100644 --- a/shaping/test/gtest_shaper.cpp +++ b/shaping/test/gtest_shaper.cpp @@ -1,12 +1,21 @@ #include +#include +#include #include +#include #include "shaper.h" #include "shaper_maat.h" +#include "shaper_stat.h" #include "shaper_marsio.h" #include "stub.h" #define SHAPING_SESSION_QUEUE_LEN 128 +#define SHAPING_STAT_FILE_NAME "/tmp/shaping_metrics.json" +#define FIELDSTAT_AUTO_TIME_MAX 999999000 + +char profile_type_primary[] = "primary"; +char profile_type_borrow[] = "borrow"; static struct stub_packet* packet_new(unsigned long long income_time, unsigned int length, unsigned char dir) { @@ -60,7 +69,7 @@ static void send_packets(struct shaping_thread_ctx *ctx, struct shaping_flow *sf polling_entry(ctx->sp, ctx->stat, ctx); } - stub_curr_time_inc(); + stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); } return; @@ -91,6 +100,70 @@ static int judge_packet_eq(struct stub_pkt_queue *expec_queue, struct stub_pkt_q return 0; } +static void shaping_stat_judge(char *file_line, int rule_id, int profile_id, int priority, + unsigned long long tx_pkts, unsigned long long tx_bytes, + unsigned long long drop_pkts, long long queue_len, long long max_latency, + int queueing_sessions, unsigned char direction, char profile_type[]) +{ + cJSON *json = NULL; + cJSON *tmp_obj = NULL; + char attr_name[32] = {0}; + + json = cJSON_Parse(file_line); + ASSERT_TRUE(json != NULL); + + tmp_obj = cJSON_GetObjectItem(json, "rule_id"); + ASSERT_TRUE(tmp_obj != NULL); + EXPECT_EQ(rule_id, atoi(tmp_obj->valuestring)); + + tmp_obj = cJSON_GetObjectItem(json, "profile_id"); + ASSERT_TRUE(tmp_obj != NULL); + EXPECT_EQ(profile_id, atoi(tmp_obj->valuestring)); + + tmp_obj = cJSON_GetObjectItem(json, "priority"); + ASSERT_TRUE(tmp_obj != NULL); + EXPECT_EQ(priority, atoi(tmp_obj->valuestring)); + + tmp_obj = cJSON_GetObjectItem(json, "profile_type"); + ASSERT_TRUE(tmp_obj != NULL); + EXPECT_STREQ(tmp_obj->valuestring, profile_type); + + tmp_obj = cJSON_GetObjectItem(json, "queueing_sessions"); + ASSERT_TRUE(tmp_obj != NULL); + EXPECT_EQ(queueing_sessions, tmp_obj->valueint); + + snprintf(attr_name, sizeof(attr_name), "%s_pkts", direction == SHAPING_DIR_OUT ? "out" : "in"); + tmp_obj = cJSON_GetObjectItem(json, attr_name); + ASSERT_TRUE(tmp_obj != NULL); + EXPECT_EQ(tx_pkts, tmp_obj->valueint); + + snprintf(attr_name, sizeof(attr_name), "%s_bytes", direction == SHAPING_DIR_OUT ? "out" : "in"); + tmp_obj = cJSON_GetObjectItem(json, attr_name); + ASSERT_TRUE(tmp_obj != NULL); + EXPECT_EQ(tx_bytes, tmp_obj->valueint); + + snprintf(attr_name, sizeof(attr_name), "%s_drop_pkts", direction == SHAPING_DIR_OUT ? "out" : "in"); + tmp_obj = cJSON_GetObjectItem(json, attr_name); + ASSERT_TRUE(tmp_obj != NULL); + EXPECT_EQ(drop_pkts, tmp_obj->valueint); + + if (max_latency != -1) { + snprintf(attr_name, sizeof(attr_name), "%s_max_latency_us", direction == SHAPING_DIR_OUT ? "out" : "in"); + tmp_obj = cJSON_GetObjectItem(json, attr_name); + ASSERT_TRUE(tmp_obj != NULL); + EXPECT_EQ(max_latency, tmp_obj->valueint); + } + + snprintf(attr_name, sizeof(attr_name), "%s_queue_len", direction == SHAPING_DIR_OUT ? "out" : "in"); + tmp_obj = cJSON_GetObjectItem(json, attr_name); + ASSERT_TRUE(tmp_obj != NULL); + EXPECT_EQ(queue_len, tmp_obj->valueint); + + cJSON_Delete(json); + + return; +} + /*session1 match rule1 rule1: profile: limit 1000*/ @@ -117,6 +190,7 @@ TEST(single_session, udp_tx_in_order) actual_tx_queue = stub_get_tx_queue(); shaper_rules_update(&ctx->thread_ctx[0], sf, rule_id, 1); + /**********send packets*********************/ send_packets(&ctx->thread_ctx[0], sf, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); /*******************************************/ @@ -129,18 +203,21 @@ TEST(single_session, udp_tx_in_order) stub_refresh_token_bucket(0); for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(); + stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); } + /***********send stat data here********************/ + stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric + fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + shaping_flow_free(sf); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); -#if 0 /*******test statistics***********/ sleep(2);//wait telegraf to output char line[1024]; @@ -149,11 +226,11 @@ TEST(single_session, udp_tx_in_order) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 100, 10000, 100, 10000, 0, 0, 170, 0, DIR_ROUTE_UP, profile_type_primary);//max latency is last 10 pkts + //shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 170, 0, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts + shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, -1, 0, SHAPING_DIR_OUT, profile_type_primary);//TODO: latency fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); -#endif } /*session1 match rule1 @@ -190,30 +267,35 @@ TEST(single_session, tcp_tx_in_order) ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); -#if 0 + /***********send stat data here********************/ - stub_shaper_stat_send(0); + stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric + fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy sleep(2);//wait telegraf generate metric -#endif + stub_refresh_token_bucket(0); for (int i = 0; i < 10; i++) {//10 pkts which is not pure control polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(); + stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); while (!TAILQ_EMPTY(&expec_tx_queue)) {//20 pure contorl pkts polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 1)); - stub_curr_time_inc(); + stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); } + /***********send stat data here********************/ + stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric + fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + shaping_flow_free(sf); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); -#if 0 + /*******test statistics***********/ sleep(2);//wait telegraf to output char line[1024]; @@ -222,15 +304,15 @@ TEST(single_session, tcp_tx_in_order) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 40, 4000, 10, 1000, 0, 30, 0, 1, DIR_ROUTE_UP, profile_type_primary);//max latency is first queueing pkts + shaping_stat_judge(line, 0, 0, 1, 10, 1000, 0, 30, 0, 1, SHAPING_DIR_OUT, profile_type_primary); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 0, 0, 30, 3000, 0, 0, 30, 0, DIR_ROUTE_UP, profile_type_primary); + //shaping_stat_judge(line, 0, 0, 1, 30, 3000, 0, 0, 30 + (STUB_TIME_INC_FOR_METRIC_SEND / 1000), 0, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 0, 0, 1, 30, 3000, 0, 0, -1, 0, SHAPING_DIR_OUT, profile_type_primary);//TODO: latency fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); -#endif } /*session1 match rule1 @@ -275,7 +357,7 @@ TEST(single_session, udp_diff_direction) stub_refresh_token_bucket(0); for (int i = 0; i < 20; i++) { polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(); + stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); } //10 out packets ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); @@ -283,11 +365,15 @@ TEST(single_session, udp_diff_direction) ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); + /***********send stat data here********************/ + stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric + fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + shaping_flow_free(sf); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); -#if 0 + /*******test statistics***********/ sleep(2);//wait telegraf to output char line[1024]; @@ -296,12 +382,14 @@ TEST(single_session, udp_diff_direction) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 20, 2000, 20, 2000, 0, 0, 20, 0, DIR_ROUTE_UP, profile_type_primary);//max latency is last 10 pkts - shaping_stat_judge(line, 0, 0, 1, 20, 2000, 20, 2000, 0, 0, 20, 0, DIR_ROUTE_DOWN, profile_type_primary); + //shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 20, 0, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, -1, 0, SHAPING_DIR_OUT, profile_type_primary);//TODO: latency + + //shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 20, 0, SHAPING_DIR_IN, profile_type_primary); + shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, -1, 0, SHAPING_DIR_IN, profile_type_primary);//TODO: latency fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); -#endif } /*session1 match rule1, rule2, rule3 @@ -351,17 +439,20 @@ TEST(single_session, udp_multi_rules) //there are 3 rules, send one packet need 3 polling process, so 10 packets need 30 polling //even though invoke polling more than 30 times, there should be only 10 pkts be sent polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(); + stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); } + /***********send stat data here********************/ + stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric + fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + shaping_flow_free(sf); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); -#if 0 /*******test statistics***********/ sleep(2);//wait telegraf to output char line[1024]; @@ -370,18 +461,20 @@ TEST(single_session, udp_multi_rules) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 0 - shaping_stat_judge(line, 0, 0, 1, 100, 10000, 100, 10000, 0, 0, 506, 0, DIR_ROUTE_UP, profile_type_primary);//max latency is last pkt + //shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 506, 0, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, -1, 0, SHAPING_DIR_OUT, profile_type_primary);//TODO: latency ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1 - shaping_stat_judge(line, 1, 1, 2, 100, 10000, 100, 10000, 0, 0, 1, 0, DIR_ROUTE_UP, profile_type_primary);//latency of every queued pkt is 1 + //shaping_stat_judge(line, 1, 1, 2, 100, 10000, 0, 0, 1, 0, SHAPING_DIR_OUT, profile_type_primary);//latency of every queued pkt is 1 + shaping_stat_judge(line, 1, 1, 2, 100, 10000, 0, 0, -1, 0, SHAPING_DIR_OUT, profile_type_primary);//TODO: latency ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2 - shaping_stat_judge(line, 2, 2, 3, 100, 10000, 100, 10000, 0, 0, 90, 0, DIR_ROUTE_UP, profile_type_primary);//max latency is first queued pkt + //shaping_stat_judge(line, 2, 2, 3, 100, 10000, 0, 0, 90, 0, SHAPING_DIR_OUT, profile_type_primary);//max latency is first queued pkt + shaping_stat_judge(line, 2, 2, 3, 100, 10000, 0, 0, -1, 0, SHAPING_DIR_OUT, profile_type_primary);//TODO: latency fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); -#endif } /*session1 match rule1 @@ -423,17 +516,20 @@ TEST(single_session, udp_borrow) stub_refresh_token_bucket(2); for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(); + stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); } + /***********send stat data here********************/ + stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric + fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + shaping_flow_free(sf); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); -#if 0 /*******test statistics***********/ sleep(2);//wait telegraf to output char line[1024]; @@ -442,15 +538,16 @@ TEST(single_session, udp_borrow) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1, primary - shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 0, 0, 170, 0, DIR_ROUTE_UP, profile_type_primary); + //shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 170, 0, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, -1, 0, SHAPING_DIR_OUT, profile_type_primary);//TODO: latency ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, borrow - shaping_stat_judge(line, 1, 2, 2, 100, 10000, 100, 10000, 0, 0, 170, 0, DIR_ROUTE_UP, profile_type_borrow); + //shaping_stat_judge(line, 1, 2, 2, 100, 10000, 0, 0, 170, 0, SHAPING_DIR_OUT, profile_type_borrow); + shaping_stat_judge(line, 1, 2, 2, 100, 10000, 0, 0, -1, 0, SHAPING_DIR_OUT, profile_type_borrow);//TODO: latency fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); -#endif } /*session1 match rule1, session2 match rule2 @@ -512,7 +609,7 @@ TEST(two_session_diff_priority, udp_in_order) stub_refresh_token_bucket(1); for (int i = 0; i < 10; i++) { polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(); + stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10));//stream2 priority 1, first ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); @@ -523,18 +620,21 @@ TEST(two_session_diff_priority, udp_in_order) stub_refresh_token_bucket(1); for (int i = 0; i < 10; i++) { polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(); + stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10));//stream1 priority 2 ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); } + /***********send stat data here********************/ + stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric + fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + shaping_flow_free(sf1); shaping_flow_free(sf2); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); -#if 0 /*******test statistics***********/ sleep(2);//wait telegraf to output char line[1024]; @@ -543,15 +643,16 @@ TEST(two_session_diff_priority, udp_in_order) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 0 - shaping_stat_judge(line, 0, 0, 2, 100, 10000, 100, 10000, 0, 0, 280, 0, DIR_ROUTE_UP, profile_type_primary);//max latency is every queued pkts + //shaping_stat_judge(line, 0, 0, 2, 100, 10000, 0, 0, 280, 0, SHAPING_DIR_OUT, profile_type_primary);//max latency is every queued pkts + shaping_stat_judge(line, 0, 0, 2, 100, 10000, 0, 0, -1, 0, SHAPING_DIR_OUT, profile_type_primary);//TODO: latency ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1 - shaping_stat_judge(line, 1, 1, 1, 100, 10000, 100, 10000, 0, 0, 90, 0, DIR_ROUTE_UP, profile_type_primary);//max latency is every queued pkts + //shaping_stat_judge(line, 1, 1, 1, 100, 10000, 0, 0, 90, 0, SHAPING_DIR_OUT, profile_type_primary);//max latency is every queued pkts + shaping_stat_judge(line, 1, 1, 1, 100, 10000, 0, 0, -1, 0, SHAPING_DIR_OUT, profile_type_primary);//TODO: latency fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); -#endif } /*session1 match rule1,rule2,rule4; session2 match rule3 @@ -626,7 +727,7 @@ TEST(two_session_diff_priority, udp_in_order_multi_rule) stub_refresh_token_bucket(4); for (int j = 0; j < 2; j++) { polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(); + stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); @@ -639,7 +740,7 @@ TEST(two_session_diff_priority, udp_in_order_multi_rule) stub_refresh_token_bucket(4); for (int i = 0; i < 10; i++) { polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(); + stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10));//stream2 priority 3, first ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); @@ -652,18 +753,21 @@ TEST(two_session_diff_priority, udp_in_order_multi_rule) stub_refresh_token_bucket(4); for (int i = 0; i < 30; i++) { polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(); + stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); } + /***********send stat data here********************/ + stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric + fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + shaping_flow_free(sf1); shaping_flow_free(sf2); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); -#if 0 /*******test statistics***********/ sleep(2);//wait telegraf to output char line[1024]; @@ -673,21 +777,24 @@ TEST(two_session_diff_priority, udp_in_order_multi_rule) memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 1, 1, 1, 20, 2000, 20, 2000, 0, 0, 58, 0, DIR_ROUTE_UP, profile_type_primary);//profile_id 1, max latency is last pkt + //shaping_stat_judge(line, 1, 1, 1, 20, 2000, 0, 0, 58, 0, SHAPING_DIR_OUT, profile_type_primary);//profile_id 1, max latency is last pkt + shaping_stat_judge(line, 1, 1, 1, 20, 2000, 0, 0, -1, 0, SHAPING_DIR_OUT, profile_type_primary);//TODO: latency ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 2, 2, 2, 20, 2000, 20, 2000, 0, 0, 1, 0, DIR_ROUTE_UP, profile_type_primary);//profile_id 2, evevy queued pkt's latency is 1 + //shaping_stat_judge(line, 2, 2, 2, 20, 2000, 0, 0, 1, 0, SHAPING_DIR_OUT, profile_type_primary);//profile_id 2, evevy queued pkt's latency is 1 + shaping_stat_judge(line, 2, 2, 2, 20, 2000, 0, 0, -1, 0, SHAPING_DIR_OUT, profile_type_primary);//TODO: latency ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 4, 4, 4, 20, 2000, 20, 2000, 0, 0, 11, 0, DIR_ROUTE_UP, profile_type_primary);//profile_id 4, max latency is first queued pkt + //shaping_stat_judge(line, 4, 4, 4, 20, 2000, 0, 0, 11, 0, SHAPING_DIR_OUT, profile_type_primary);//profile_id 4, max latency is first queued pkt + shaping_stat_judge(line, 4, 4, 4, 20, 2000, 0, 0, -1, 0, SHAPING_DIR_OUT, profile_type_primary);//TODO: latency ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 3, 3, 3, 20, 2000, 20, 2000, 0, 0, 12, 0, DIR_ROUTE_UP, profile_type_primary);//profile_id 3, every queued pkt's latency is 12 + //shaping_stat_judge(line, 3, 3, 3, 20, 2000, 0, 0, 12, 0, SHAPING_DIR_OUT, profile_type_primary);//profile_id 3, every queued pkt's latency is 12 + shaping_stat_judge(line, 3, 3, 3, 20, 2000, 0, 0, -1, 0, SHAPING_DIR_OUT, profile_type_primary);//TODO: latency fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); -#endif } /*session1 match rule1 @@ -736,7 +843,7 @@ TEST(single_session_async, udp_tx_in_order) stub_refresh_token_bucket(0); for (int i = 0; i < 10; i++) {//异步获取token多发送了10个报文,补回token,不应发送报文 polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(); + stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); @@ -744,17 +851,20 @@ TEST(single_session_async, udp_tx_in_order) stub_refresh_token_bucket(0); for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(); + stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); } + /***********send stat data here********************/ + stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric + fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + shaping_flow_free(sf); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); -#if 0 /*******test statistics***********/ sleep(2);//wait telegraf to output char line[1024]; @@ -763,11 +873,11 @@ TEST(single_session_async, udp_tx_in_order) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 100, 10000, 100, 10000, 0, 0, 160, 0, DIR_ROUTE_UP, profile_type_primary);//max latency is last 10 pkts + //shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 160, 0, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts + shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, -1, 0, SHAPING_DIR_OUT, profile_type_primary);//TODO: latency fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); -#endif } /*session1 match rule1 @@ -809,11 +919,14 @@ TEST(single_session_async, udp_close_before_async_exec) ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); + /***********send stat data here********************/ + stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric + fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + shaping_flow_free(sf); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); -#if 0 /*******test statistics***********/ sleep(2);//wait telegraf to output char line[1024]; @@ -822,11 +935,10 @@ TEST(single_session_async, udp_close_before_async_exec) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 10, 1000, 10, 1000, 0, 0, 0, 0, DIR_ROUTE_UP, profile_type_primary); + shaping_stat_judge(line, 0, 0, 1, 10, 1000, 0, 0, 0, 0, SHAPING_DIR_OUT, profile_type_primary); fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); -#endif } /*session1 match rule1; session2 match rule2 @@ -890,7 +1002,7 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order) stub_refresh_token_bucket(2); for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(); + stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); @@ -900,18 +1012,21 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order) stub_refresh_token_bucket(2); for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(); + stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); } + /***********send stat data here********************/ + stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric + fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + shaping_flow_free(sf1); shaping_flow_free(sf2); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); -#if 0 /*******test statistics***********/ sleep(2);//wait telegraf to output char line[1024]; @@ -920,18 +1035,20 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1, primary - shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 0, 0, 470, 0, DIR_ROUTE_UP, profile_type_primary); + //shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 470, 0, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, -1, 0, SHAPING_DIR_OUT, profile_type_primary);//TODO: latency ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, borrow - shaping_stat_judge(line, 1, 2, 2, 100, 10000, 100, 10000, 0, 0, 470, 0, DIR_ROUTE_UP, profile_type_borrow); + //shaping_stat_judge(line, 1, 2, 2, 100, 10000, 0, 0, 470, 0, SHAPING_DIR_OUT, profile_type_borrow); + shaping_stat_judge(line, 1, 2, 2, 100, 10000, 0, 0, -1, 0, SHAPING_DIR_OUT, profile_type_borrow);//TODO: latency ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, primary - shaping_stat_judge(line, 2, 2, 1, 100, 10000, 100, 10000, 0, 0, 190, 0, DIR_ROUTE_UP, profile_type_primary); + //shaping_stat_judge(line, 2, 2, 1, 100, 10000, 0, 0, 190, 0, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 2, 2, 1, 100, 10000, 0, 0, -1, 0, SHAPING_DIR_OUT, profile_type_primary);//TODO: latency fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); -#endif } /*session1 match rule1; session2 match rule1 @@ -984,18 +1101,21 @@ TEST(two_session_same_rule, udp_tx_in_order) stub_refresh_token_bucket(1); for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(); + stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); } + /***********send stat data here********************/ + stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric + fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + shaping_flow_free(sf1); shaping_flow_free(sf2); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); -#if 0 /*******test statistics***********/ sleep(2);//wait telegraf to output char line[1024]; @@ -1004,11 +1124,11 @@ TEST(two_session_same_rule, udp_tx_in_order) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 1, 1, 1, 200, 20000, 200, 20000, 0, 0, 370, 0, DIR_ROUTE_UP, profile_type_primary); + //shaping_stat_judge(line, 1, 1, 1, 200, 20000, 0, 0, 370, 0, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 1, 1, 1, 200, 20000, 0, 0, -1, 0, SHAPING_DIR_OUT, profile_type_primary);//TODO: latency fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); -#endif } /*session1 match rule1; session2 match rule2 @@ -1037,7 +1157,7 @@ TEST(two_session_diff_priority_same_profile, udp_random_tx_in_order) int profile_id[][MAX_REF_PROFILE] = {{0}, {0}}; int stream1_pkt_num = 0; int stream2_pkt_num = 0; - struct timespec curr_time; + time_t curr_time; TAILQ_INIT(&expec_tx_queue1); @@ -1065,8 +1185,8 @@ TEST(two_session_diff_priority_same_profile, udp_random_tx_in_order) ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1)); } - clock_gettime(CLOCK_MONOTONIC, &curr_time); - srand(curr_time.tv_sec); + time(&curr_time); + srand(curr_time); for (int i = 0; i < 99; i++) { if (rand() % 2 == 0) { send_packets(&ctx->thread_ctx[0], sf1, 1, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 1, 0); @@ -1101,12 +1221,15 @@ TEST(two_session_diff_priority_same_profile, udp_random_tx_in_order) ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); + /***********send stat data here********************/ + stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric + fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + shaping_flow_free(sf1); shaping_flow_free(sf2); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); -#if 0 /*******test statistics***********/ sleep(2);//wait telegraf to output char line[1024]; @@ -1115,17 +1238,14 @@ TEST(two_session_diff_priority_same_profile, udp_random_tx_in_order) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 1, 0, 1, stream1_pkt_num, stream1_pkt_num*100, - stream1_pkt_num, stream1_pkt_num*100, 0, 0, -1, 0, DIR_ROUTE_UP, profile_type_primary);//can't predict a certain latency cause of random + shaping_stat_judge(line, 1, 0, 1, stream1_pkt_num, stream1_pkt_num*100, 0, 0, -1, 0, SHAPING_DIR_OUT, profile_type_primary);//can't predict a certain latency cause of random ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 2, 0, 2, stream2_pkt_num, stream2_pkt_num*100, - stream2_pkt_num, stream2_pkt_num*100, 0, 0, -1, 0, DIR_ROUTE_UP, profile_type_primary);//can't predict a certain latency cause of random + shaping_stat_judge(line, 2, 0, 2, stream2_pkt_num, stream2_pkt_num*100, 0, 0, -1, 0, SHAPING_DIR_OUT, profile_type_primary);//can't predict a certain latency cause of random fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); -#endif } /*session1 match rule1 @@ -1165,15 +1285,18 @@ TEST(statistics, udp_drop_pkt) while (!TAILQ_EMPTY(&expec_tx_queue)) { stub_refresh_token_bucket(0); polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(); + stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 1)); } + /***********send stat data here********************/ + stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric + fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + shaping_flow_free(sf); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); -#if 0 /*******test statistics***********/ sleep(2);//wait telegraf to output char line[1024]; @@ -1182,11 +1305,11 @@ TEST(statistics, udp_drop_pkt) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 210, 21000, 110, 11000, 100, 0, 200, 0, DIR_ROUTE_UP, profile_type_primary);//every queued pkt's latency is 200 + //shaping_stat_judge(line, 0, 0, 1, SHAPING_SESSION_QUEUE_LEN+10, (SHAPING_SESSION_QUEUE_LEN+10)*100, 100, 0, 228, 0, SHAPING_DIR_OUT, profile_type_primary);//every queued pkt's latency is max + shaping_stat_judge(line, 0, 0, 1, SHAPING_SESSION_QUEUE_LEN+10, (SHAPING_SESSION_QUEUE_LEN+10)*100, 100, 0, -1, 0, SHAPING_DIR_OUT, profile_type_primary);//TODO: latency fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); -#endif } /*session1 match rule1 @@ -1215,16 +1338,15 @@ TEST(statistics, udp_queueing_pkt) stub_set_token_bucket_avl_per_sec(0, 1000, SHAPING_DIR_OUT); actual_tx_queue = stub_get_tx_queue(); shaper_rules_update(&ctx->thread_ctx[0], sf, rule_id, 1); - - /*******packets, OP_STATE_DATA***********/ + /*******send packets***********/ send_packets(&ctx->thread_ctx[0], sf, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); -#if 0 + /***********send stat data here********************/ - stub_shaper_stat_send(0); + stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric + fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy sleep(2);//wait telegraf generate metric -#endif //first 10 packets ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); @@ -1233,15 +1355,18 @@ TEST(statistics, udp_queueing_pkt) while (!TAILQ_EMPTY(&expec_tx_queue)) {//last 90 delay packets stub_refresh_token_bucket(0); polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(); + stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 1)); } + /***********send stat data here********************/ + stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric + fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + shaping_flow_free(sf); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); -#if 0 /*******test statistics***********/ sleep(2);//wait telegraf to output char line[1024]; @@ -1251,20 +1376,21 @@ TEST(statistics, udp_queueing_pkt) memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//stat data first sent - shaping_stat_judge(line, 0, 0, 1, 100, 10000, 10, 1000, 0, 90, 0, 1, DIR_ROUTE_UP, profile_type_primary); + //shaping_stat_judge(line, 0, 0, 1, 10, 1000, 0, 90, 0, 1, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 0, 0, 1, 10, 1000, 0, 90, -1, 1, SHAPING_DIR_OUT, profile_type_primary);//TODO: latency ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//stat data last sent - shaping_stat_judge(line, 0, 0, 1, 0, 0, 90, 9000, 0, 0, 90, 0, DIR_ROUTE_UP, profile_type_primary); + //shaping_stat_judge(line, 0, 0, 1, 90, 9000, 0, 0, 90, 0, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 0, 0, 1, 90, 9000, 0, 0, -1, 0, SHAPING_DIR_OUT, profile_type_primary);//TODO: latency fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); -#endif } int main(int argc, char **argv) { testing::InitGoogleTest(&argc, argv); - //testing::GTEST_FLAG(filter) = "two_session_diff_priority.udp_in_order"; + //testing::GTEST_FLAG(filter) = "statistics.udp_queueing_pkt"; return RUN_ALL_TESTS(); } \ No newline at end of file diff --git a/shaping/test/gtest_shaper_bak.cpp b/shaping/test/gtest_shaper_bak.cpp deleted file mode 100644 index a947a8b..0000000 --- a/shaping/test/gtest_shaper_bak.cpp +++ /dev/null @@ -1,234 +0,0 @@ -#include -#include -#include -#include -#include -#include - -#include "stub.h" - -#define SHAPING_STAT_FILE_NAME "/tmp/shaping_metrics.json" -#define SHAPING_SESSION_QUEUE_LEN 100 -#define SHAPING_SESSIONS_LIMIT_PER_AVL 1000 - -char profile_type_primary[] = "primary"; -char profile_type_borrow[] = "borrow"; - - -using namespace std; - -static int judge_packet_eq(struct stub_pkt_queue *expec_queue, struct stub_pkt_queue *actual_queue, int num) -{ - struct stub_packet_node *expec_pkt_node; - struct stub_packet_node *actual_pkt_node; - - for (int i = 0; i < num; i++) { - if(TAILQ_EMPTY(actual_queue)) { - return -1; - } - expec_pkt_node = TAILQ_FIRST(expec_queue); - actual_pkt_node = TAILQ_FIRST(actual_queue); - if (expec_pkt_node->raw_packet != actual_pkt_node->raw_packet) { - return -2; - } - - TAILQ_REMOVE(expec_queue, expec_pkt_node, node); - TAILQ_REMOVE(actual_queue, actual_pkt_node, node); - free(expec_pkt_node->raw_packet); - free(expec_pkt_node); - free(actual_pkt_node); - } - - return 0; -} - -static struct stub_packet* packet_new(unsigned long long income_time, unsigned int length, unsigned char dir) -{ - struct stub_packet *packet; - - packet = (struct stub_packet*)calloc(1, sizeof(struct stub_packet)); - packet->income_time = income_time; - packet->length = length; - packet->direction = dir; - - return packet; -} - -static struct stub_packet_node* packet_node_new(stub_packet *packet) -{ - struct stub_packet_node *pkt_node; - - pkt_node = (struct stub_packet_node*)calloc(1, sizeof(struct stub_packet_node)); - pkt_node->raw_packet = packet; - - return pkt_node; -} - -static void send_packets(struct streaminfo *stream, void **pme, int pkt_num, int pkt_len, - unsigned char dir, struct stub_pkt_queue *expec_tx_queue, int polling_times, - int is_tcp, int is_pure_control) -{ - struct stub_packet_node *pkt_node; - struct stub_packet *packet; - unsigned long long time; - char ret; - - stream->threadnum = 0;//just 1 thread for test!!! - - for (int i = 0; i < pkt_num; i++) { - time = stub_curr_time_get(); - packet = packet_new(time, pkt_len, dir); - if (expec_tx_queue) { - pkt_node = packet_node_new(packet); - TAILQ_INSERT_TAIL(expec_tx_queue, pkt_node, node); - } - - time++; - - stream->routedir = dir; - stream->hash_index = pkt_len;//just for stub test!!!!! use hash_index to store pkt_len - if (is_tcp) { - stream->ptcpdetail->clientpktnum++; - if (is_pure_control) { - stream->ptcpdetail->pdata = NULL; - } else { - stream->ptcpdetail->pdata = (void*)0x1234;//just for stub test, will not access this pointer - } - } else { - stream->pudpdetail->clientpktnum++; - } - - if (is_tcp) { - ret = tcp_allpkt_raw_process(stream, NULL, packet, pme); - } else { - ret = udp_raw_process(stream, NULL, packet, pme); - } - - if (ret == APP_STATE_GIVEME) { - if (!packet->detained_flag) { - stub_send_packet(packet); - } - } - - for (int j = 0; j < polling_times; j++) { - polling_entry(NULL, NULL, 0, NULL); - } - - stub_curr_time_inc(); - } -} - -static void stream_state_change_udp(struct streaminfo *stream, void **pme, unsigned char state) -{ - stream->opstate = state; - //udp_process(stream, pme, 0, NULL);//just change state, no packet - if (state == OP_STATE_PENDING) { - stub_stream_bridge_sync_cb_invoke(stream); - } else if(state == OP_STATE_CLOSE) { - stub_stream_bridge_free_cb_invoke(stream); - } - - udp_raw_process(stream, NULL, NULL, pme);//just change state, no packet - - return; -} - -static void stream_state_change_tcp(struct streaminfo *stream, void **pme, unsigned char state) -{ - stream->pktstate = state; - //tcp_allpkt_process(stream, pme, 0, NULL);//just change state, no packet - if (state == OP_STATE_PENDING) { - stub_stream_bridge_sync_cb_invoke(stream); - } else if(state == OP_STATE_CLOSE) { - stub_stream_bridge_free_cb_invoke(stream); - } - - tcp_allpkt_raw_process(stream, NULL, NULL, pme);//just change state, no packet - - return; -} - -static void shaping_stat_judge(char *file_line, int rule_id, int profile_id, int priority, - unsigned long long rx_pkts, unsigned long long rx_bytes, - unsigned long long tx_pkts, unsigned long long tx_bytes, - unsigned long long drop_pkts, long long queue_len, long long max_latency, - int queueing_sessions, unsigned char direction, char profile_type[]) -{ - cJSON *json = NULL; - cJSON *tmp_obj = NULL; - char attr_name[32] = {0}; - - json = cJSON_Parse(file_line); - ASSERT_TRUE(json != NULL); - - tmp_obj = cJSON_GetObjectItem(json, "rule_id"); - ASSERT_TRUE(tmp_obj != NULL); - EXPECT_EQ(rule_id, atoi(tmp_obj->valuestring)); - - tmp_obj = cJSON_GetObjectItem(json, "profile_id"); - ASSERT_TRUE(tmp_obj != NULL); - EXPECT_EQ(profile_id, atoi(tmp_obj->valuestring)); - - tmp_obj = cJSON_GetObjectItem(json, "priority"); - ASSERT_TRUE(tmp_obj != NULL); - EXPECT_EQ(priority, atoi(tmp_obj->valuestring)); - - tmp_obj = cJSON_GetObjectItem(json, "profile_type"); - ASSERT_TRUE(tmp_obj != NULL); - EXPECT_STREQ(tmp_obj->valuestring, profile_type); - - tmp_obj = cJSON_GetObjectItem(json, "queueing_sessions"); - ASSERT_TRUE(tmp_obj != NULL); - EXPECT_EQ(queueing_sessions, tmp_obj->valueint); - - snprintf(attr_name, sizeof(attr_name), "%s_rx_pkts", direction == DIR_ROUTE_UP ? "out" : "in"); - tmp_obj = cJSON_GetObjectItem(json, attr_name); - ASSERT_TRUE(tmp_obj != NULL); - EXPECT_EQ(rx_pkts, tmp_obj->valueint); - - snprintf(attr_name, sizeof(attr_name), "%s_rx_bytes", direction == DIR_ROUTE_UP ? "out" : "in"); - tmp_obj = cJSON_GetObjectItem(json, attr_name); - ASSERT_TRUE(tmp_obj != NULL); - EXPECT_EQ(rx_bytes, tmp_obj->valueint); - - snprintf(attr_name, sizeof(attr_name), "%s_tx_pkts", direction == DIR_ROUTE_UP ? "out" : "in"); - tmp_obj = cJSON_GetObjectItem(json, attr_name); - ASSERT_TRUE(tmp_obj != NULL); - EXPECT_EQ(tx_pkts, tmp_obj->valueint); - - snprintf(attr_name, sizeof(attr_name), "%s_tx_bytes", direction == DIR_ROUTE_UP ? "out" : "in"); - tmp_obj = cJSON_GetObjectItem(json, attr_name); - ASSERT_TRUE(tmp_obj != NULL); - EXPECT_EQ(tx_bytes, tmp_obj->valueint); - - snprintf(attr_name, sizeof(attr_name), "%s_drop_pkts", direction == DIR_ROUTE_UP ? "out" : "in"); - tmp_obj = cJSON_GetObjectItem(json, attr_name); - ASSERT_TRUE(tmp_obj != NULL); - EXPECT_EQ(drop_pkts, tmp_obj->valueint); - - if (max_latency != -1) { - snprintf(attr_name, sizeof(attr_name), "%s_max_latency_us", direction == DIR_ROUTE_UP ? "out" : "in"); - tmp_obj = cJSON_GetObjectItem(json, attr_name); - ASSERT_TRUE(tmp_obj != NULL); - EXPECT_EQ(max_latency, tmp_obj->valueint); - } - - snprintf(attr_name, sizeof(attr_name), "%s_queue_len", direction == DIR_ROUTE_UP ? "out" : "in"); - tmp_obj = cJSON_GetObjectItem(json, attr_name); - ASSERT_TRUE(tmp_obj != NULL); - EXPECT_EQ(queue_len, tmp_obj->valueint); - - cJSON_Delete(json); - - return; -} - - - - -int main(int argc, char **argv) -{ - testing::InitGoogleTest(&argc, argv); - //testing::GTEST_FLAG(filter) = "single_session.udp_diff_direction"; - return RUN_ALL_TESTS(); -} \ No newline at end of file diff --git a/shaping/test/stub.cpp b/shaping/test/stub.cpp index 2bf4a8f..9d91b7e 100644 --- a/shaping/test/stub.cpp +++ b/shaping/test/stub.cpp @@ -158,9 +158,9 @@ struct stub_pkt_queue* stub_get_tx_queue() return &tx_queue; } -void stub_curr_time_inc() +void stub_curr_time_inc(unsigned long long time_ns) { - curr_time += 1000; + curr_time += time_ns; return; } diff --git a/shaping/test/stub.h b/shaping/test/stub.h index e394da3..e0702a2 100644 --- a/shaping/test/stub.h +++ b/shaping/test/stub.h @@ -9,6 +9,9 @@ #define STUB_MAAT_SHAPING_RULE_TABLE_ID 0 #define STUB_MAAT_SHAPING_PROFILE_TABLE_ID 1 +#define STUB_TIME_INC_FOR_PACKET 1000 +#define STUB_TIME_INC_FOR_METRIC_SEND 1000000 + struct stub_packet { unsigned char direction; unsigned char pure_control; @@ -37,7 +40,7 @@ struct stub_pkt_queue* stub_get_tx_queue(); int stub_AQM_drop_packet(int queue_len, unsigned long long income_time); -void stub_curr_time_inc(); +void stub_curr_time_inc(unsigned long long time_ns); unsigned long long stub_curr_time_get(); void stub_init(); diff --git a/shaping/test/test_conf/main.conf b/shaping/test/test_conf/main.conf index e134a70..78d458f 100644 --- a/shaping/test/test_conf/main.conf +++ b/shaping/test/test_conf/main.conf @@ -28,9 +28,11 @@ SWARMKV_CLUSTER_ANNOUNCE_PORT=8501 SWARMKV_HEALTH_CHECK_PORT=0 SWARMKV_HEALTH_CHECK_ANNOUNCE_PORT=1111 -#[METRIC] -#TELEGRAF_IP="127.0.0.1" -#TELEGRAF_PORT=6667 +[METRIC] +FIELDSTAT_OUTPUT_INTERVAL_MS=999999000 +FIELDSTAT_ENABLE_BACKGRUND_THREAD=0 +TELEGRAF_IP="127.0.0.1" +TELEGRAF_PORT=6667 [CONFIG] #PROFILE_QUEUE_LEN_PER_PRIORITY_MAX=128 -- cgit v1.2.3 From a85a124bf7b13fe369f26c730b53da8847535370 Mon Sep 17 00:00:00 2001 From: liuchang Date: Mon, 27 Mar 2023 02:57:36 +0000 Subject: use marsio api to set cpu_mask --- conf/main.conf | 41 ------------------------------------ conf/shaping.conf | 41 ++++++++++++++++++++++++++++++++++++ shaping/include/shaper.h | 10 +++------ shaping/include/shaper_marsio.h | 2 +- shaping/src/main.cpp | 19 ----------------- shaping/src/shaper.cpp | 13 +++++++----- shaping/src/shaper_marsio.cpp | 15 +++++++++---- shaping/test/test_conf/main.conf | 42 ------------------------------------- shaping/test/test_conf/shaping.conf | 42 +++++++++++++++++++++++++++++++++++++ 9 files changed, 106 insertions(+), 119 deletions(-) delete mode 100644 conf/main.conf create mode 100644 conf/shaping.conf delete mode 100644 shaping/test/test_conf/main.conf create mode 100644 shaping/test/test_conf/shaping.conf (limited to 'shaping/src/shaper.cpp') diff --git a/conf/main.conf b/conf/main.conf deleted file mode 100644 index 5a8adec..0000000 --- a/conf/main.conf +++ /dev/null @@ -1,41 +0,0 @@ -[SYSTEM] -WORK_THREAD_NUM=2 -ENABLE_CPU_AFFINITY=1 -CPU_AFFINITY_MASK=1-2 - -[MARSIO] -DEV_INTERFACE="eth_interface" -RX_BRUST_MAX=1 -APP_SYMBOL="shaping" - -[MAAT] -INPUT_MODE=1 -TABLE_INFO="conf/table_info.conf" -JSON_FILE="conf/shaping_maat.json" -REDIS_DB_IDX=0 -REDIS_IP="127.0.0.1" -REDIS_PORT="6379" - - -[SWARMKV] -SWARMKV_CLUSTER_NAME="shaping" -SWARMKV_NODE_IP="127.0.0.1" -SWARMKV_NODE_PORT=5210 -SWARMKV_CONSUL_IP="127.0.0.1" -SWARMKV_CONSUL_PORT=8500 -SWARMKV_CLUSTER_ANNOUNCE_IP="127.0.0.1" -SWARMKV_CLUSTER_ANNOUNCE_PORT=8501 -SWARMKV_HEALTH_CHECK_PORT=0 -SWARMKV_HEALTH_CHECK_ANNOUNCE_PORT=1111 - -[METRIC] -FIELDSTAT_OUTPUT_INTERVAL_MS=500 -LINE_PROTOCOL_SERVER_IP="127.0.0.1" -LINE_PROTOCOL_SERVER_PORT=6667 - -[CONFIG] -#PROFILE_QUEUE_LEN_PER_PRIORITY_MAX=128 -SESSION_QUEUE_LEN_MAX=128 -QUEUEING_SESSIONS_PER_PRIORITY_PER_THREAD_MAX=1024 -POLLING_NODE_NUM_MAX={"polling_node_num_max":[ 3, 2, 2, 1, 1, 1, 1, 1, 1, 1 ]} - diff --git a/conf/shaping.conf b/conf/shaping.conf new file mode 100644 index 0000000..5a8adec --- /dev/null +++ b/conf/shaping.conf @@ -0,0 +1,41 @@ +[SYSTEM] +WORK_THREAD_NUM=2 +ENABLE_CPU_AFFINITY=1 +CPU_AFFINITY_MASK=1-2 + +[MARSIO] +DEV_INTERFACE="eth_interface" +RX_BRUST_MAX=1 +APP_SYMBOL="shaping" + +[MAAT] +INPUT_MODE=1 +TABLE_INFO="conf/table_info.conf" +JSON_FILE="conf/shaping_maat.json" +REDIS_DB_IDX=0 +REDIS_IP="127.0.0.1" +REDIS_PORT="6379" + + +[SWARMKV] +SWARMKV_CLUSTER_NAME="shaping" +SWARMKV_NODE_IP="127.0.0.1" +SWARMKV_NODE_PORT=5210 +SWARMKV_CONSUL_IP="127.0.0.1" +SWARMKV_CONSUL_PORT=8500 +SWARMKV_CLUSTER_ANNOUNCE_IP="127.0.0.1" +SWARMKV_CLUSTER_ANNOUNCE_PORT=8501 +SWARMKV_HEALTH_CHECK_PORT=0 +SWARMKV_HEALTH_CHECK_ANNOUNCE_PORT=1111 + +[METRIC] +FIELDSTAT_OUTPUT_INTERVAL_MS=500 +LINE_PROTOCOL_SERVER_IP="127.0.0.1" +LINE_PROTOCOL_SERVER_PORT=6667 + +[CONFIG] +#PROFILE_QUEUE_LEN_PER_PRIORITY_MAX=128 +SESSION_QUEUE_LEN_MAX=128 +QUEUEING_SESSIONS_PER_PRIORITY_PER_THREAD_MAX=1024 +POLLING_NODE_NUM_MAX={"polling_node_num_max":[ 3, 2, 2, 1, 1, 1, 1, 1, 1, 1 ]} + diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h index 0959ebe..04ff81d 100644 --- a/shaping/include/shaper.h +++ b/shaping/include/shaper.h @@ -19,13 +19,13 @@ #define SHAPING_GLOBAL_CONF_FILE "./conf/main.conf" -struct shaping_global_conf { +struct shaping_system_conf { unsigned int session_queue_len_max; unsigned int priority_queue_len_max; int polling_node_num_max[SHAPING_PRIORITY_NUM_MAX]; int work_thread_num; int cpu_affinity_enable; - int cpu_affinity_mask[SHAPING_WROK_THREAD_NUM_MAX]; + unsigned long long cpu_affinity_mask; }; struct shaping_thread_ctx { @@ -39,7 +39,6 @@ struct shaping_thread_ctx { struct shaping_maat_info *maat_info; struct session_table *session_table; int session_need_reset; - int cpu_mask; unsigned int session_queue_len_max; int polling_node_num_max[SHAPING_PRIORITY_NUM_MAX]; }; @@ -110,9 +109,6 @@ struct shaper_flow_instance { struct shaper;//instance of shaping, thread unsafe -//extern struct shaping_global_runtime_para g_rt_para; -//extern struct shaping_global_conf g_sp_conf; - struct shaping_flow* shaping_flow_new(); void shaping_flow_free(struct shaping_flow *sf); struct shaper* shaper_new(unsigned int priority_queue_len_max); @@ -130,7 +126,7 @@ int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_i enum shaping_packet_action shaper_pkt_action_decide(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int priority, int sf_in_queue); -int shaper_global_conf_init(struct shaping_global_conf *conf); +int shaper_global_conf_init(struct shaping_system_conf *conf); void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx); void shaping_stream_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_buff, struct metadata *meta, struct shaping_flow *sf); diff --git a/shaping/include/shaper_marsio.h b/shaping/include/shaper_marsio.h index f06f306..7ab8ccc 100644 --- a/shaping/include/shaper_marsio.h +++ b/shaping/include/shaper_marsio.h @@ -35,7 +35,7 @@ struct ctrl_pkt_data int shaping_rule_num; }; -struct shaping_marsio_info* shaper_marsio_init(int thread_num); +struct shaping_marsio_info* shaper_marsio_init(struct shaping_system_conf *sys_conf); void shaper_marsio_destroy(struct shaping_marsio_info *marsio_info); int shaper_marsio_pkt_metadata_get(marsio_buff_t *rx_buff, struct metadata *meta, int is_ctrl_buff, struct raw_pkt_parser *raw_parser); int shaper_marsio_ctrl_pkt_data_parse(struct ctrl_pkt_data *ctrl_data, const char *data, size_t length); \ No newline at end of file diff --git a/shaping/src/main.cpp b/shaping/src/main.cpp index 16f0aa8..476816f 100644 --- a/shaping/src/main.cpp +++ b/shaping/src/main.cpp @@ -8,29 +8,10 @@ #include "shaper_marsio.h" #include "shaper_session.h" -static int thread_set_affinity(int core_id) -{ - int num_cores = sysconf(_SC_NPROCESSORS_ONLN); - if (core_id < 0 || core_id >= num_cores) - { - return EINVAL; - } - - cpu_set_t cpuset; - CPU_ZERO(&cpuset); - CPU_SET(core_id, &cpuset); - - return pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); -} - static void *shaper_thread_loop(void *data) { struct shaping_thread_ctx *ctx = (struct shaping_thread_ctx *)data; - if (ctx->cpu_mask >= 0) - { - thread_set_affinity(ctx->cpu_mask); - } marsio_thread_init(ctx->marsio_info->instance); //loop to process pkts diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 541e6e5..5ce644d 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -830,13 +830,14 @@ void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx) return; } -int shaper_global_conf_init(struct shaping_global_conf *conf) +int shaper_global_conf_init(struct shaping_system_conf *conf) { int ret; int array_num; cJSON *json = NULL; cJSON *tmp_obj = NULL, *tmp_array_obj = NULL; char polling_node_num_max[128] = {0}; + unsigned int cpu_mask[SHAPING_WROK_THREAD_NUM_MAX] = {0}; ret = MESA_load_profile_int_nodef(SHAPING_GLOBAL_CONF_FILE, "SYSTEM", "WORK_THREAD_NUM", &conf->work_thread_num); if (ret < 0) { @@ -851,11 +852,14 @@ int shaper_global_conf_init(struct shaping_global_conf *conf) return ret; } - ret = MESA_load_profile_uint_range(SHAPING_GLOBAL_CONF_FILE, "SYSTEM", "CPU_AFFINITY_MASK", SHAPING_WROK_THREAD_NUM_MAX, (unsigned int *)conf->cpu_affinity_mask); + ret = MESA_load_profile_uint_range(SHAPING_GLOBAL_CONF_FILE, "SYSTEM", "CPU_AFFINITY_MASK", SHAPING_WROK_THREAD_NUM_MAX, cpu_mask); if (ret < 0 || ret != conf->work_thread_num) { LOG_ERROR("%s: shaping init global conf get CPU_AFFINITY_MASK failed or incomplete config", LOG_TAG_SHAPING); return -1; } + for (int i = 0; i < conf->work_thread_num; i++) { + conf->cpu_affinity_mask |= 1 << cpu_mask[i]; + } #if 0 //temporarily not support array config array_num = SHAPING_PRIORITY_NUM_MAX; @@ -951,7 +955,7 @@ void shaping_engine_destroy(struct shaping_ctx *ctx) struct shaping_ctx *shaping_engine_init() { - struct shaping_global_conf conf; + struct shaping_system_conf conf; struct shaping_ctx *ctx = NULL; int ret; @@ -982,7 +986,7 @@ struct shaping_ctx *shaping_engine_init() } /*init marsio*/ - ctx->marsio_info = shaper_marsio_init(conf.work_thread_num); + ctx->marsio_info = shaper_marsio_init(&conf); if (ctx->marsio_info == NULL) { goto ERROR; } @@ -1002,7 +1006,6 @@ struct shaping_ctx *shaping_engine_init() ctx->thread_ctx[i].maat_info = ctx->maat_info; ctx->thread_ctx[i].marsio_info = ctx->marsio_info; ctx->thread_ctx[i].swarmkv_db = ctx->swarmkv_db; - ctx->thread_ctx[i].cpu_mask = conf.cpu_affinity_enable ? conf.cpu_affinity_mask[i] : -1; ctx->thread_ctx[i].ref_ctx = ctx; ctx->thread_ctx[i].session_queue_len_max = conf.session_queue_len_max; memcpy(ctx->thread_ctx[i].polling_node_num_max, conf.polling_node_num_max, sizeof(conf.polling_node_num_max)); diff --git a/shaping/src/shaper_marsio.cpp b/shaping/src/shaper_marsio.cpp index 3ea774d..fa3ebac 100644 --- a/shaping/src/shaper_marsio.cpp +++ b/shaping/src/shaper_marsio.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include "log.h" #include "raw_packet.h" @@ -56,7 +57,7 @@ void shaper_marsio_destroy(struct shaping_marsio_info *marsio_info) return; } -struct shaping_marsio_info* shaper_marsio_init(int thread_num) +struct shaping_marsio_info* shaper_marsio_init(struct shaping_system_conf *sys_conf) { struct shaper_marsio_config conf; struct shaping_marsio_info *marsio_info; @@ -75,18 +76,24 @@ struct shaping_marsio_info* shaper_marsio_init(int thread_num) goto ERROR; } - if (marsio_option_set(marsio_info->instance, MARSIO_OPT_EXIT_WHEN_ERR, &opt, sizeof(opt)) != 0) - { + if (marsio_option_set(marsio_info->instance, MARSIO_OPT_EXIT_WHEN_ERR, &opt, sizeof(opt)) != 0) { LOG_ERROR("%s: shaping marsio set MARSIO_OPT_EXIT_WHEN_ERR failed", LOG_TAG_MARSIO); goto ERROR; } + if (sys_conf->cpu_affinity_enable) { + if (marsio_option_set(marsio_info->instance, MARSIO_OPT_THREAD_MASK, &sys_conf->cpu_affinity_mask, sizeof(sys_conf->cpu_affinity_mask)) != 0) { + LOG_ERROR("%s: shaping marsio set MARSIO_OPT_THREAD_MASK failed", LOG_TAG_MARSIO); + goto ERROR; + } + } + if (marsio_init(marsio_info->instance, conf.app_symbol) != 0) { LOG_ERROR("%s: shaping marsio init failed", LOG_TAG_MARSIO); goto ERROR; } - marsio_info->mr_dev = marsio_open_device(marsio_info->instance, conf.dev_interface, thread_num, thread_num); + marsio_info->mr_dev = marsio_open_device(marsio_info->instance, conf.dev_interface, sys_conf->work_thread_num, sys_conf->work_thread_num); if (!marsio_info->mr_dev) { LOG_ERROR("%s: shaping marsio open device %s failed", LOG_TAG_MARSIO, conf.dev_interface); goto ERROR; diff --git a/shaping/test/test_conf/main.conf b/shaping/test/test_conf/main.conf deleted file mode 100644 index 78d458f..0000000 --- a/shaping/test/test_conf/main.conf +++ /dev/null @@ -1,42 +0,0 @@ -[SYSTEM] -WORK_THREAD_NUM=2 -ENABLE_CPU_AFFINITY=1 -CPU_AFFINITY_MASK=1-2 - -[MARSIO] -DEV_INTERFACE="eth_interface" -RX_BRUST_MAX=1 -APP_SYMBOL="shaping" - -[MAAT] -INPUT_MODE=1 -TABLE_INFO="conf/table_info.conf" -JSON_FILE="conf/shaping_maat.json" -REDIS_DB_IDX=0 -REDIS_IP="127.0.0.1" -REDIS_PORT="6379" - - -[SWARMKV] -SWARMKV_CLUSTER_NAME="shaping" -SWARMKV_NODE_IP="127.0.0.1" -SWARMKV_NODE_PORT=5210 -SWARMKV_CONSUL_IP="127.0.0.1" -SWARMKV_CONSUL_PORT=8500 -SWARMKV_CLUSTER_ANNOUNCE_IP="127.0.0.1" -SWARMKV_CLUSTER_ANNOUNCE_PORT=8501 -SWARMKV_HEALTH_CHECK_PORT=0 -SWARMKV_HEALTH_CHECK_ANNOUNCE_PORT=1111 - -[METRIC] -FIELDSTAT_OUTPUT_INTERVAL_MS=999999000 -FIELDSTAT_ENABLE_BACKGRUND_THREAD=0 -TELEGRAF_IP="127.0.0.1" -TELEGRAF_PORT=6667 - -[CONFIG] -#PROFILE_QUEUE_LEN_PER_PRIORITY_MAX=128 -SESSION_QUEUE_LEN_MAX=128 -QUEUEING_SESSIONS_PER_PRIORITY_PER_THREAD_MAX=1024 -POLLING_NODE_NUM_MAX={"polling_node_num_max":[ 3, 2, 2, 1, 1, 1, 1, 1, 1, 1 ]} - diff --git a/shaping/test/test_conf/shaping.conf b/shaping/test/test_conf/shaping.conf new file mode 100644 index 0000000..78d458f --- /dev/null +++ b/shaping/test/test_conf/shaping.conf @@ -0,0 +1,42 @@ +[SYSTEM] +WORK_THREAD_NUM=2 +ENABLE_CPU_AFFINITY=1 +CPU_AFFINITY_MASK=1-2 + +[MARSIO] +DEV_INTERFACE="eth_interface" +RX_BRUST_MAX=1 +APP_SYMBOL="shaping" + +[MAAT] +INPUT_MODE=1 +TABLE_INFO="conf/table_info.conf" +JSON_FILE="conf/shaping_maat.json" +REDIS_DB_IDX=0 +REDIS_IP="127.0.0.1" +REDIS_PORT="6379" + + +[SWARMKV] +SWARMKV_CLUSTER_NAME="shaping" +SWARMKV_NODE_IP="127.0.0.1" +SWARMKV_NODE_PORT=5210 +SWARMKV_CONSUL_IP="127.0.0.1" +SWARMKV_CONSUL_PORT=8500 +SWARMKV_CLUSTER_ANNOUNCE_IP="127.0.0.1" +SWARMKV_CLUSTER_ANNOUNCE_PORT=8501 +SWARMKV_HEALTH_CHECK_PORT=0 +SWARMKV_HEALTH_CHECK_ANNOUNCE_PORT=1111 + +[METRIC] +FIELDSTAT_OUTPUT_INTERVAL_MS=999999000 +FIELDSTAT_ENABLE_BACKGRUND_THREAD=0 +TELEGRAF_IP="127.0.0.1" +TELEGRAF_PORT=6667 + +[CONFIG] +#PROFILE_QUEUE_LEN_PER_PRIORITY_MAX=128 +SESSION_QUEUE_LEN_MAX=128 +QUEUEING_SESSIONS_PER_PRIORITY_PER_THREAD_MAX=1024 +POLLING_NODE_NUM_MAX={"polling_node_num_max":[ 3, 2, 2, 1, 1, 1, 1, 1, 1, 1 ]} + -- cgit v1.2.3 From 9d8e78a8462d74deabedb8167ba129bbf124ed81 Mon Sep 17 00:00:00 2001 From: liuchang Date: Mon, 27 Mar 2023 03:39:37 +0000 Subject: add process for marsio rx_brust_max --- shaping/include/shaper.h | 2 +- shaping/include/shaper_marsio.h | 3 +++ shaping/src/shaper.cpp | 27 +++++++++++++++------------ shaping/src/shaper_marsio.cpp | 6 ++++++ 4 files changed, 25 insertions(+), 13 deletions(-) (limited to 'shaping/src/shaper.cpp') diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h index 04ff81d..b1ae622 100644 --- a/shaping/include/shaper.h +++ b/shaping/include/shaper.h @@ -17,7 +17,7 @@ #define SHAPING_WROK_THREAD_NUM_MAX 128 -#define SHAPING_GLOBAL_CONF_FILE "./conf/main.conf" +#define SHAPING_GLOBAL_CONF_FILE "./conf/shaping.conf" struct shaping_system_conf { unsigned int session_queue_len_max; diff --git a/shaping/include/shaper_marsio.h b/shaping/include/shaper_marsio.h index 7ab8ccc..d0de401 100644 --- a/shaping/include/shaper_marsio.h +++ b/shaping/include/shaper_marsio.h @@ -1,11 +1,14 @@ #include #include "shaper.h" +#define SHAPER_MARSIO_RX_BRUST_MAX 128 + struct shaping_marsio_info { struct mr_instance *instance; struct mr_vdev *mr_dev; struct mr_sendpath *mr_path; + int rx_brust_max; }; struct metadata diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 5ce644d..184afd1 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -803,29 +803,32 @@ static struct shaping_flow *shaper_raw_pkt_session_handle(struct shaping_thread_ void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx) { - marsio_buff_t *rx_buff; + marsio_buff_t *rx_buff[SHAPER_MARSIO_RX_BRUST_MAX]; struct shaping_flow *sf = NULL; struct metadata meta; int rx_num; + int i; - rx_num = marsio_recv_burst(ctx->marsio_info->mr_dev, ctx->thread_index, &rx_buff, 1); + rx_num = marsio_recv_burst(ctx->marsio_info->mr_dev, ctx->thread_index, rx_buff, ctx->marsio_info->rx_brust_max); if (rx_num <= 0) { polling_entry(ctx->sp, ctx->stat, ctx); return; } - if (marsio_buff_is_ctrlbuf(rx_buff)) { - sf = shaper_ctrl_pkt_session_handle(ctx, rx_buff, &meta); - } else { - sf = shaper_raw_pkt_session_handle(ctx, rx_buff, &meta); - } + for (i = 0; i < rx_num; i++) { + if (marsio_buff_is_ctrlbuf(rx_buff[i])) { + sf = shaper_ctrl_pkt_session_handle(ctx, rx_buff[i], &meta); + } else { + sf = shaper_raw_pkt_session_handle(ctx, rx_buff[i], &meta); + } - if (meta.is_ctrl_pkt || !sf) {//ctrl pkt need send directly - marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &rx_buff, 1); - } else { - shaping_stream_process(ctx, rx_buff, &meta, sf); + if (meta.is_ctrl_pkt || !sf) {//ctrl pkt need send directly + marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &rx_buff[i], 1); + } else { + shaping_stream_process(ctx, rx_buff[i], &meta, sf); + } + polling_entry(ctx->sp, ctx->stat, ctx); } - polling_entry(ctx->sp, ctx->stat, ctx); return; } diff --git a/shaping/src/shaper_marsio.cpp b/shaping/src/shaper_marsio.cpp index fa3ebac..cbc5d09 100644 --- a/shaping/src/shaper_marsio.cpp +++ b/shaping/src/shaper_marsio.cpp @@ -11,6 +11,7 @@ struct shaper_marsio_config { + int rx_brust_max; char app_symbol[256]; char dev_interface[256]; }; @@ -30,6 +31,9 @@ static int shaper_marsio_config_load(struct shaper_marsio_config *conf) LOG_ERROR("%s: shaping load MARSIO conf APP_SYMBOL failed", LOG_TAG_MARSIO); return ret; } + + ret = MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "SYSTEM", "RX_BRUST_MAX", &conf->rx_brust_max, 1); + conf->rx_brust_max = conf->rx_brust_max <= SHAPER_MARSIO_RX_BRUST_MAX ? conf->rx_brust_max : SHAPER_MARSIO_RX_BRUST_MAX; return 0; } @@ -105,6 +109,8 @@ struct shaping_marsio_info* shaper_marsio_init(struct shaping_system_conf *sys_c goto ERROR; } + marsio_info->rx_brust_max = conf.rx_brust_max; + return marsio_info; ERROR: -- cgit v1.2.3 From b320a1112f34714530cfbbe67d8012aaaf0acabe Mon Sep 17 00:00:00 2001 From: liuchang Date: Mon, 27 Mar 2023 10:49:18 +0000 Subject: fix bug TSG-14068 --- common/libavl/libavl.c | 23 +++++-- shaping/include/shaper.h | 4 +- shaping/src/shaper.cpp | 146 +++++++++++++++++++++++++++++------------- shaping/src/shaper_maat.cpp | 6 +- shaping/test/gtest_shaper.cpp | 84 +++++++++++++++++++++++- 5 files changed, 207 insertions(+), 56 deletions(-) (limited to 'shaping/src/shaper.cpp') diff --git a/common/libavl/libavl.c b/common/libavl/libavl.c index 8d439a1..7bfeee2 100644 --- a/common/libavl/libavl.c +++ b/common/libavl/libavl.c @@ -1,14 +1,14 @@ +#include #include #include "libavl.h" #include "avl_tree.h" -#define AVL_NODE_IN_TREE 0x01 struct avl_node { unsigned long long unique_key; void *data; void(*free_cb) (void *); - unsigned int flag; + unsigned int ref_count; struct avl_tree_node node; }; @@ -131,7 +131,7 @@ int avl_node_in_tree(struct avl_node* pnode) return 0; } - if (pnode->flag & AVL_NODE_IN_TREE) { + if (pnode->ref_count > 0) { return 1; } @@ -162,6 +162,11 @@ int avl_tree_node_insert(struct avl_tree *tree, struct avl_node* pnode) return -1; } + if (avl_node_in_tree(pnode)) { + pnode->ref_count++; + return 0; + } + if (tree->curr_node_num == tree->max_node_num) { return -1; } @@ -171,7 +176,7 @@ int avl_tree_node_insert(struct avl_tree *tree, struct avl_node* pnode) return -1; } - pnode->flag |= AVL_NODE_IN_TREE; + pnode->ref_count = 1; tree->curr_node_num++; return 0; @@ -179,12 +184,18 @@ int avl_tree_node_insert(struct avl_tree *tree, struct avl_node* pnode) void avl_tree_node_remove(struct avl_tree *tree, struct avl_node* pnode) { - if (!pnode || !(pnode->flag & AVL_NODE_IN_TREE)) { + if (!pnode || pnode->ref_count == 0) { + return; + } + + pnode->ref_count--; + assert(pnode->ref_count >= 0); + + if (pnode->ref_count > 0) { return; } avl_tree_remove(&tree->root, &pnode->node); - pnode->flag &= ~AVL_NODE_IN_TREE; tree->curr_node_num--; return; } diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h index b1ae622..83c1343 100644 --- a/shaping/include/shaper.h +++ b/shaping/include/shaper.h @@ -124,12 +124,12 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf); int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num); -enum shaping_packet_action shaper_pkt_action_decide(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int priority, int sf_in_queue); +//enum shaping_packet_action shaper_pkt_action_decide(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int priority, int sf_in_queue); int shaper_global_conf_init(struct shaping_system_conf *conf); void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx); -void shaping_stream_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_buff, struct metadata *meta, struct shaping_flow *sf); +void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_buff, struct metadata *meta, struct shaping_flow *sf); struct shaping_ctx *shaping_engine_init(); void shaping_engine_destroy(struct shaping_ctx *ctx); diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 184afd1..6090a15 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -41,6 +41,11 @@ struct shaping_async_cb_arg { unsigned char direction; }; +struct shaping_profile_container { + struct shaping_profile_info *pf_info; + int pf_type; +}; + struct shaper* shaper_new(unsigned int priority_queue_len_max) { struct shaper *sp = NULL; @@ -258,7 +263,7 @@ static unsigned long long shaper_pkt_latency_calculate(struct shaping_profile_in return (curr_time - enqueue_time); } -static void shaping_flow_remove_from_pool(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) +void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) { struct shaping_node *s_node = (struct shaping_node*)sf; struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor]; @@ -332,13 +337,6 @@ int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_i return count; } -void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) -{ - shaping_flow_remove_from_pool(ctx, sf); - - return; -} - static void shaper_deposit_token_add(struct shaping_profile_info *pf_info, int req_token, unsigned char direction) { if (direction == SHAPING_DIR_IN) { @@ -487,23 +485,43 @@ enum shaping_packet_action shaper_pkt_action_decide(struct shaping_flow *sf, str } #endif -static struct shaping_profile_info * shaper_profile_get(struct shaping_rule_info *s_rule_info, int priority, int *profile_type) +int shaper_profile_get(struct shaping_rule_info *s_rule_info, int priority, struct shaping_profile_container pf_container[]) { - int i; + int num = 0; - if (s_rule_info->primary.priority == priority) { - *profile_type = SHAPING_PROFILE_TYPE_PRIMARY; - return &s_rule_info->primary; - } + if (priority == SHAPING_PRIORITY_NUM_MAX - 1) {//priority 9 allow multi profiles for one priority + if (s_rule_info->primary.priority == priority) { + pf_container[num].pf_type = SHAPING_PROFILE_TYPE_PRIMARY; + pf_container[num].pf_info = &s_rule_info->primary; + num++; + } - for (i = 0; i < s_rule_info->borrowing_num; i++) { - if (s_rule_info->borrowing[i].priority == priority) { - *profile_type = SHAPING_PROFILE_TYPE_BORROW; - return &s_rule_info->borrowing[i]; + for (int i = 0; i < s_rule_info->borrowing_num; i++) { + if (s_rule_info->borrowing[i].priority == priority) { + pf_container[num].pf_type = SHAPING_PROFILE_TYPE_BORROW; + pf_container[num].pf_info = &s_rule_info->borrowing[i]; + num++; + } + } + + return num; + } else { + if (s_rule_info->primary.priority == priority) { + pf_container[0].pf_type = SHAPING_PROFILE_TYPE_PRIMARY; + pf_container[0].pf_info = &s_rule_info->primary; + return 1; + } + + for (int i = 0; i < s_rule_info->borrowing_num; i++) { + if (s_rule_info->borrowing[i].priority == priority) { + pf_container[0].pf_type = SHAPING_PROFILE_TYPE_BORROW; + pf_container[0].pf_info = &s_rule_info->borrowing[i]; + return 1; + } } } - return NULL; + return num; } static int shaper_next_anchor_get(struct shaping_flow *sf, unsigned char direction) @@ -517,25 +535,78 @@ static int shaper_next_anchor_get(struct shaping_flow *sf, unsigned char directi return anchor; } -enum shaping_packet_action shaper_pkt_action_decide(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int priority, int sf_in_queue) +static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int priority) { - int profile_type = 0; - struct shaping_profile_info *profile = NULL; struct shaping_rule_info *rule = NULL; + struct shaping_profile_info *profile = NULL; + int profile_type; struct shaping_packet_wrapper *pkt_wrapper = NULL; + struct shaping_profile_container pf_container[SHAPING_PRIORITY_NUM_MAX]; struct timespec curr_time; unsigned long long enqueue_time; + int get_token_success = 0; + int profile_num; rule = &sf->matched_rule_infos[sf->anchor]; - profile = shaper_profile_get(rule, priority, &profile_type); - assert(profile != NULL); + profile_num = shaper_profile_get(rule, priority, pf_container); + assert(profile_num > 0); pkt_wrapper = shaper_first_pkt_get(sf); assert(pkt_wrapper != NULL); if (pkt_wrapper->tcp_pure_contorl) { - if (sf_in_queue) { - shaper_flow_pop(ctx, sf); + shaper_flow_pop(ctx, sf); + shaper_stat_forward_all_rule_inc(ctx->stat, sf, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index); + return SHAPING_FORWARD; + } + + for (int i = 0; i < profile_num; i++) { + profile = pf_container[i].pf_info; + profile_type = pf_container[i].pf_type; + if (0 == shaper_token_consume(ctx->swarmkv_db, sf, pkt_wrapper->length, profile, profile_type, pkt_wrapper->direction)) { + shaper_stat_forward_inc(ctx->stat, rule->id, profile->id, profile->priority, + pkt_wrapper->direction, pkt_wrapper->length, profile_type, ctx->thread_index); + get_token_success = 1; + break; } + } + + if (!get_token_success) { + return SHAPING_QUEUED; + } + + shaper_flow_pop(ctx, sf); + sf->anchor = shaper_next_anchor_get(sf, pkt_wrapper->direction); + if (sf->anchor == 0) {//no next rule + return SHAPING_FORWARD; + } + + //push sf for next rule + clock_gettime(CLOCK_MONOTONIC, &curr_time); + enqueue_time = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; + if (0 == shaper_flow_push(ctx, sf, enqueue_time)) { + return SHAPING_QUEUED; + } else { + rule = &sf->matched_rule_infos[sf->anchor]; + shaper_stat_drop_inc(ctx->stat, rule->id, rule->primary.id, + rule->primary.priority, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index); + sf->anchor = 0; + return SHAPING_DROP; + } +} + +static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile) +{ + int profile_type = SHAPING_PROFILE_TYPE_PRIMARY; + struct shaping_rule_info *rule = NULL; + struct shaping_packet_wrapper *pkt_wrapper = NULL; + struct timespec curr_time; + unsigned long long enqueue_time; + + rule = &sf->matched_rule_infos[sf->anchor]; + pkt_wrapper = shaper_first_pkt_get(sf); + assert(pkt_wrapper != NULL); + + if (pkt_wrapper->tcp_pure_contorl) { shaper_stat_forward_all_rule_inc(ctx->stat, sf, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index); return SHAPING_FORWARD; } @@ -543,10 +614,6 @@ enum shaping_packet_action shaper_pkt_action_decide(struct shaping_thread_ctx *c if (0 == shaper_token_consume(ctx->swarmkv_db, sf, pkt_wrapper->length, profile, profile_type, pkt_wrapper->direction)) { shaper_stat_forward_inc(ctx->stat, rule->id, profile->id, profile->priority, pkt_wrapper->direction, pkt_wrapper->length, profile_type, ctx->thread_index); - - if (sf_in_queue) { - shaper_flow_pop(ctx, sf); - } sf->anchor = shaper_next_anchor_get(sf, pkt_wrapper->direction); if (sf->anchor == 0) {//no next rule @@ -557,12 +624,8 @@ enum shaping_packet_action shaper_pkt_action_decide(struct shaping_thread_ctx *c goto FLOW_PUSH; } } else { - if (sf_in_queue) { - return SHAPING_QUEUED; - } else { - enqueue_time = pkt_wrapper->enqueue_time_us; - goto FLOW_PUSH; - } + enqueue_time = pkt_wrapper->enqueue_time_us; + goto FLOW_PUSH; } FLOW_PUSH: @@ -572,7 +635,6 @@ FLOW_PUSH: rule = &sf->matched_rule_infos[sf->anchor]; shaper_stat_drop_inc(ctx->stat, rule->id, rule->primary.id, rule->primary.priority, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index); - sf->anchor = 0; return SHAPING_DROP; } @@ -600,7 +662,7 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ shaping_ret = shaper_pkt_action_decide(g_swarmkv_db, sf, sp, priority, stat_hashtbl, 1); } #endif - shaping_ret = shaper_pkt_action_decide(ctx, sf, priority, 1); + shaping_ret = shaper_pkt_action_decide_queueing(ctx, sf, priority); switch (shaping_ret) { case SHAPING_QUEUED: @@ -650,9 +712,8 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ } } -void shaping_stream_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_buff, struct metadata *meta, struct shaping_flow *sf) +void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_buff, struct metadata *meta, struct shaping_flow *sf) { - int priority; int shaping_ret; struct shaping_rule_info *s_rule; struct shaping_stat *stat = ctx->stat; @@ -686,8 +747,7 @@ void shaping_stream_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu sf->anchor = 0; - priority = sf->matched_rule_infos[sf->anchor].primary.priority; - shaping_ret = shaper_pkt_action_decide(ctx, sf, priority, 0); + shaping_ret = shaper_pkt_action_decide_no_queue(ctx, sf, &sf->matched_rule_infos[sf->anchor].primary); switch (shaping_ret) { case SHAPING_QUEUED: break; @@ -825,7 +885,7 @@ void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx) if (meta.is_ctrl_pkt || !sf) {//ctrl pkt need send directly marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &rx_buff[i], 1); } else { - shaping_stream_process(ctx, rx_buff[i], &meta, sf); + shaping_packet_process(ctx, rx_buff[i], &meta, sf); } polling_entry(ctx->sp, ctx->stat, ctx); } diff --git a/shaping/src/shaper_maat.cpp b/shaping/src/shaper_maat.cpp index 7d62314..1849987 100644 --- a/shaping/src/shaper_maat.cpp +++ b/shaping/src/shaper_maat.cpp @@ -283,12 +283,12 @@ static void shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_ru goto END; } - if (s_rule->priority + i + 1 < SHAPING_PRIORITY_NUM_MAX) {//TODO: 优先级大于9的都按9处理 + if (s_rule->priority + i + 1 < SHAPING_PRIORITY_NUM_MAX) { shaper_profile_update(&s_rule_info->borrowing[i], s_pf, s_rule->priority + i + 1); - s_rule_info->borrowing_num++; } else { - goto END; + shaper_profile_update(&s_rule_info->borrowing[i], s_pf, SHAPING_PRIORITY_NUM_MAX - 1); } + s_rule_info->borrowing_num++; } END: diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp index 51b1a06..ada50cb 100644 --- a/shaping/test/gtest_shaper.cpp +++ b/shaping/test/gtest_shaper.cpp @@ -63,7 +63,7 @@ static void send_packets(struct shaping_thread_ctx *ctx, struct shaping_flow *sf meta.is_tcp_pure_ctrl = 1; } - shaping_stream_process(ctx, packet, &meta, sf); + shaping_packet_process(ctx, packet, &meta, sf); for (int j = 0; j < polling_times; j++) { polling_entry(ctx->sp, ctx->stat, ctx); @@ -550,6 +550,86 @@ TEST(single_session, udp_borrow) fclose(stat_file); } +/*session1 match rule1 + rule1: + priority: 9 + profile1: limit 0 + profile2: limit 0 + profile3: limit 1000*/ +TEST(single_session, udp_borrow_same_priority_9) +{ + struct stub_pkt_queue expec_tx_queue; + struct stub_pkt_queue *actual_tx_queue; + struct shaping_ctx *ctx = NULL; + struct shaping_flow *sf = NULL; + long long rule_id[] = {1}; + int priority[] = {9}; + int profile_num[] = {3}; + int profile_id[][MAX_REF_PROFILE] = {{1, 2, 3}}; + + TAILQ_INIT(&expec_tx_queue); + stub_init(); + ctx = shaping_engine_init(); + ASSERT_TRUE(ctx != NULL); + sf = shaping_flow_new(); + ASSERT_TRUE(sf != NULL); + + stub_set_matched_shaping_rules(1, rule_id, priority, profile_num, profile_id); + stub_set_token_bucket_avl_per_sec(1, 0, SHAPING_DIR_OUT); + stub_set_token_bucket_avl_per_sec(2, 0, SHAPING_DIR_OUT); + stub_set_token_bucket_avl_per_sec(3, 1000, SHAPING_DIR_OUT); + actual_tx_queue = stub_get_tx_queue(); + shaper_rules_update(&ctx->thread_ctx[0], sf, rule_id, 1); + + /*******send packets***********/ + send_packets(&ctx->thread_ctx[0], sf, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); + + + //first 10 packets + ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); + + while (!TAILQ_EMPTY(&expec_tx_queue)) {//last 90 delay packets + stub_refresh_token_bucket(3); + for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent + polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); + stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); + } + ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); + ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); + } + + /***********send stat data here********************/ + stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric + fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + + shaping_flow_free(sf); + shaping_engine_destroy(ctx); + stub_clear_matched_shaping_rules(); + + /*******test statistics***********/ + sleep(2);//wait telegraf to output + char line[1024]; + FILE *stat_file; + + stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); + memset(line, 0, sizeof(line)); + ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1, primary + //shaping_stat_judge(line, 1, 1, 9, 0, 0, 0, 0, 170, 0, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 1, 1, 9, 0, 0, 0, 0, -1, 0, SHAPING_DIR_OUT, profile_type_primary);//TODO: latency + + ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1, primary + //shaping_stat_judge(line, 1, 1, 9, 0, 0, 0, 0, 170, 0, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 1, 2, 9, 0, 0, 0, 0, -1, 0, SHAPING_DIR_OUT, profile_type_borrow);//TODO: latency + + ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, borrow + //shaping_stat_judge(line, 1, 2, 9, 100, 10000, 0, 0, 170, 0, SHAPING_DIR_OUT, profile_type_borrow); + shaping_stat_judge(line, 1, 3, 9, 100, 10000, 0, 0, -1, 0, SHAPING_DIR_OUT, profile_type_borrow);//TODO: latency + + fclose(stat_file); + stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file + fclose(stat_file); +} + /*session1 match rule1, session2 match rule2 rule1: priority:2 @@ -1391,6 +1471,6 @@ TEST(statistics, udp_queueing_pkt) int main(int argc, char **argv) { testing::InitGoogleTest(&argc, argv); - //testing::GTEST_FLAG(filter) = "statistics.udp_queueing_pkt"; + //testing::GTEST_FLAG(filter) = "single_session.udp_borrow_same_priority_9"; return RUN_ALL_TESTS(); } \ No newline at end of file -- cgit v1.2.3