diff options
Diffstat (limited to 'shaping/src/shaper.cpp')
| -rw-r--r-- | shaping/src/shaper.cpp | 108 |
1 files changed, 41 insertions, 67 deletions
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 2d88b01..aef5a65 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -22,10 +22,7 @@ extern "C" { #include "shaper_swarmkv.h" #include "shaper_maat.h" #include "shaper_global_stat.h" - -#define NANO_SECONDS_PER_MICRO_SEC 1000 -#define MICRO_SECONDS_PER_SEC 1000000 -#define NANO_SECONDS_PER_SEC 1000000000 +#include "shaper_aqm.h" #define SHAPING_LATENCY_THRESHOLD 2000000 //2s @@ -65,7 +62,23 @@ struct shaping_profile_container { int pf_type; }; -struct shaper* shaper_new(unsigned int priority_queue_len_max) +static void shaper_free(struct shaper *sp) +{ + int i; + + if (sp) { + for (i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) { + if (sp->priority_trees[i]) { + avl_tree_destroy(sp->priority_trees[i]); + } + } + free(sp); + } + + return; +} + +static struct shaper* shaper_new(unsigned int priority_queue_len_max) { struct shaper *sp = NULL; int i; @@ -90,22 +103,6 @@ ERROR: return NULL; } -void shaper_free(struct shaper *sp) -{ - int i; - - if (sp) { - for (i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) { - if (sp->priority_trees[i]) { - avl_tree_destroy(sp->priority_trees[i]); - } - } - free(sp); - } - - return; -} - static void shaping_node_free(struct shaping_node *s_node) { int i; @@ -164,7 +161,7 @@ void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) struct shaping_node *s_node = (struct shaping_node*)sf; if (__atomic_sub_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST) == 0) { - shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 1); + shaper_stat_refresh(ctx, sf, ctx->thread_index, 1); shaping_node_free(s_node); } @@ -180,6 +177,10 @@ static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_ return -1; } + if (shaper_aqm_enqueue(&sf->matched_rule_infos[sf->anchor].primary) == AQM_ACTION_DROP) { + return -1; + } + s_pkt = (struct shaping_packet_wrapper*)calloc(1, sizeof(struct shaping_packet_wrapper)); if (!s_pkt) { return -1; @@ -204,12 +205,12 @@ bool shaper_queue_empty(struct shaping_flow *sf) return TAILQ_EMPTY(&sf->packet_queue); } -struct shaping_packet_wrapper* shaper_first_pkt_get(struct shaping_flow *sf) +static struct shaping_packet_wrapper* shaper_first_pkt_get(struct shaping_flow *sf) { return TAILQ_FIRST(&sf->packet_queue); } -void shaper_packet_dequeue(struct shaping_flow *sf) +static void shaper_packet_dequeue(struct shaping_flow *sf) { struct shaping_packet_wrapper *s_pkt; @@ -244,21 +245,8 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx) return; } -static void swarmkv_reply_cb_do_nothing(const struct swarmkv_reply *reply, void * cb_arg) -{ - struct shaping_global_stat *global_stat = (struct shaping_global_stat *)cb_arg; - - shaper_global_stat_async_callback_inc(global_stat); - - if (reply->type != SWARMKV_REPLY_INTEGER) { - shaper_global_stat_async_hincrby_failed_inc(global_stat); - } - - return; -} - //return success(0) while any avl tree insert success -int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time) +static int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time) { struct shaping_node *s_node = (struct shaping_node*)sf; struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor]; @@ -271,20 +259,10 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un pkt_wrapper = shaper_first_pkt_get(sf); assert(pkt_wrapper != NULL); - if ((sf->flag & SESSION_UPDATE_PF_PRIO_LEN) == 0) { - if (sf->processed_pkts > CONFIRM_PRIORITY_PKTS) { - sf->flag |= SESSION_UPDATE_PF_PRIO_LEN; - } - } - priority = s_rule_info->primary.priority; avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns); if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) { ret = 0; - if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) { - shaper_global_stat_async_invoke_inc(ctx->global_stat); - swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->primary.id, priority); - } } if (s_rule_info->borrowing_num == 0) {// no borrow profile @@ -296,10 +274,8 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns); if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) { ret = 0; - if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) { - shaper_global_stat_async_invoke_inc(ctx->global_stat); - swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->borrowing[i].id, priority); - } + //TODO: calculate queue_len for borrow profile and add judge when refresh stat???? + //shaper_stat_queueing_pkt_inc(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index); } } @@ -320,7 +296,7 @@ static unsigned long long shaper_pkt_latency_us_calculate(struct shaping_profile return (curr_time - enqueue_time); } -void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) +static void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) { struct shaping_node *s_node = (struct shaping_node*)sf; struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor]; @@ -339,10 +315,6 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) priority = s_rule_info->primary.priority; if (avl_node_in_tree(s_node->avl_node[priority])) { avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); - if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) { - shaper_global_stat_async_invoke_inc(ctx->global_stat); - swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->primary.id, priority); - } } if (s_rule_info->borrowing_num == 0) { @@ -353,10 +325,7 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) priority = s_rule_info->borrowing[i].priority; if (avl_node_in_tree(s_node->avl_node[priority])) { avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); - if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) { - shaper_global_stat_async_invoke_inc(ctx->global_stat); - swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->borrowing[i].id, priority); - } + //TODO: calculate queue_len for borrow profile and add judge when refresh stat } } @@ -368,7 +337,7 @@ END: return; } -int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num) +static int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num) { struct avl_node *avl_node = NULL; int count = 0; @@ -691,8 +660,13 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi shaper_flow_pop(ctx, sf); goto DROP; } - } - /*todo: AQM, just for primary profile*/ + + if (shaper_aqm_need_drop(pf_container[0].pf_info, pkt_wrapper)) { + shaper_flow_pop(ctx, sf); + goto DROP; + } + } + /*todo: AQM*/ for (int i = 0; i < profile_num; i++) { profile = pf_container[i].pf_info; @@ -819,7 +793,7 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ break; } - shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 0); + shaper_stat_refresh(ctx, sf, ctx->thread_index, 0); if (shaper_queue_empty(sf)) { if (sf->flag & SESSION_CLOSE) { @@ -899,7 +873,7 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu } END: - shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 0); + shaper_stat_refresh(ctx, sf, ctx->thread_index, 0); if(sf->flag & SESSION_CLOSE) { if (shaper_queue_empty(sf)) { char *addr_str = addr_tuple4_to_str(&sf->tuple4); @@ -1046,7 +1020,7 @@ void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx) return; } -int shaper_global_conf_init(struct shaping_system_conf *conf) +static int shaper_global_conf_init(struct shaping_system_conf *conf) { int ret; int array_num; |
