diff options
| -rw-r--r-- | README.md | 2 | ||||
| -rw-r--r-- | shaping/src/shaper.cpp | 112 |
2 files changed, 51 insertions, 63 deletions
@@ -1,4 +1,4 @@ -# shaping_master +# shaping_engine ## introduction TSG provides Quality of Service (QoS) by applying traffic shaping and DSCP marking. You can define shaping rules to allocate resources for different traffic types, align network performance to business priorities, and ensure that your network serves the business first. diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index f8dbe6e..ca2893b 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -27,6 +27,8 @@ extern "C" { #define MICRO_SECONDS_PER_SEC 1000000 #define NANO_SECONDS_PER_SEC 1000000000 +#define SHAPING_LATENCY_THRESHOLD 2000000 //2s + #define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 "HMGET tsg-shaping-%d priority-0" #define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 " priority-1" #define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 " priority-2" @@ -169,9 +171,10 @@ void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) return; } -static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, void *pkt_buff, struct timespec *income_time, struct metadata *meta) +static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, void *pkt_buff, struct metadata *meta) { struct shaping_packet_wrapper *s_pkt = NULL; + struct timespec curr_time; if (sf->queue_len == ctx->session_queue_len_max) { return -1; @@ -182,12 +185,14 @@ static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_ return -1; } + clock_gettime(CLOCK_MONOTONIC, &curr_time); + s_pkt->pkt_buff = pkt_buff; s_pkt->direction = meta->dir; s_pkt->length = meta->raw_len; s_pkt->tcp_pure_contorl = meta->is_tcp_pure_ctrl; - s_pkt->income_time_ns = income_time->tv_sec * NANO_SECONDS_PER_SEC + income_time->tv_nsec; - s_pkt->enqueue_time_us = income_time->tv_sec * MICRO_SECONDS_PER_SEC + income_time->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; + s_pkt->income_time_ns = curr_time.tv_sec * NANO_SECONDS_PER_SEC + curr_time.tv_nsec; + s_pkt->enqueue_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; TAILQ_INSERT_TAIL(&sf->packet_queue, s_pkt, node); sf->queue_len++; @@ -307,7 +312,7 @@ END: return ret; } -static unsigned long long shaper_pkt_latency_calculate(struct shaping_profile_info *profile, struct timespec *time) +static unsigned long long shaper_pkt_latency_us_calculate(struct shaping_profile_info *profile, struct timespec *time) { unsigned long long enqueue_time = profile->enqueue_time_us; unsigned long long curr_time = time->tv_sec * MICRO_SECONDS_PER_SEC + time->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; @@ -359,7 +364,7 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) } END: - latency = shaper_pkt_latency_calculate(&s_rule_info->primary, &curr_time); + latency = shaper_pkt_latency_us_calculate(&s_rule_info->primary, &curr_time); shaper_stat_max_latency_update(&s_rule_info->primary.stat, pkt_wrapper->direction, latency, ctx->thread_index); return; @@ -665,6 +670,13 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi return SHAPING_FORWARD; } + clock_gettime(CLOCK_MONOTONIC, &curr_time); + if (shaper_pkt_latency_us_calculate(pf_container[0].pf_info, &curr_time) > SHAPING_LATENCY_THRESHOLD) { + shaper_flow_pop(ctx, sf); + goto DROP; + } + /*todo: AQM*/ + for (int i = 0; i < profile_num; i++) { profile = pf_container[i].pf_info; profile_type = pf_container[i].pf_type; @@ -691,55 +703,60 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi if (0 == shaper_flow_push(ctx, sf, enqueue_time)) { return SHAPING_QUEUED; } else { - rule = &sf->matched_rule_infos[sf->anchor]; - shaper_stat_drop_inc(&rule->primary.stat, pkt_wrapper->direction, ctx->thread_index); - sf->anchor = 0; - return SHAPING_DROP; + goto DROP; } + +DROP: + rule = &sf->matched_rule_infos[sf->anchor]; + shaper_stat_drop_inc(&rule->primary.stat, pkt_wrapper->direction, ctx->thread_index); + sf->anchor = 0; + return SHAPING_DROP; } -static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile) +static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct metadata *meta, struct shaping_profile_info *profile, marsio_buff_t *rx_buff) { int profile_type = PROFILE_IN_RULE_TYPE_PRIMARY; - struct shaping_rule_info *rule = NULL; - struct shaping_packet_wrapper *pkt_wrapper = NULL; struct timespec curr_time; unsigned long long enqueue_time; - rule = &sf->matched_rule_infos[sf->anchor]; - pkt_wrapper = shaper_first_pkt_get(sf); - assert(pkt_wrapper != NULL); - - if (pkt_wrapper->tcp_pure_contorl) { - shaper_stat_forward_all_rule_inc(ctx->stat, sf, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index); + if (meta->is_tcp_pure_ctrl) { + shaper_stat_forward_all_rule_inc(ctx->stat, sf, meta->dir, meta->raw_len, ctx->thread_index); return SHAPING_FORWARD; } - if (0 == shaper_token_consume(ctx, sf, pkt_wrapper->length, profile, profile_type, pkt_wrapper->direction)) { - shaper_stat_forward_inc(&profile->stat, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index); + if (0 == shaper_token_consume(ctx, sf, meta->raw_len, profile, profile_type, meta->dir)) { + shaper_stat_forward_inc(&profile->stat, meta->dir, meta->raw_len, ctx->thread_index); - sf->anchor = shaper_next_anchor_get(sf, pkt_wrapper->direction); + sf->anchor = shaper_next_anchor_get(sf, meta->dir); if (sf->anchor == 0) {//no next rule return SHAPING_FORWARD; } else { - clock_gettime(CLOCK_MONOTONIC, &curr_time); - enqueue_time = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; goto FLOW_PUSH; } - } else { - enqueue_time = pkt_wrapper->enqueue_time_us; - goto FLOW_PUSH; } FLOW_PUSH: + if (shaper_packet_enqueue(ctx, sf, rx_buff, meta) != 0) { + char *addr_str = addr_tuple4_to_str(&sf->tuple4); + LOG_ERROR("%s: shaping enqueue packet failed while queue empty for session: %s", LOG_TAG_SHAPING, addr_str); + if (addr_str) { + free(addr_str); + } + goto DROP; + } + + clock_gettime(CLOCK_MONOTONIC, &curr_time); + enqueue_time = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; if (0 == shaper_flow_push(ctx, sf, enqueue_time)) { return SHAPING_QUEUED; } else { - rule = &sf->matched_rule_infos[sf->anchor]; - shaper_stat_drop_inc(&rule->primary.stat, pkt_wrapper->direction, ctx->thread_index); - sf->anchor = 0; - return SHAPING_DROP; + goto DROP; } + +DROP: + shaper_stat_drop_inc(&sf->matched_rule_infos[sf->anchor].primary.stat, meta->dir, ctx->thread_index); + sf->anchor = 0; + return SHAPING_DROP; } static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_flow *sf, int priority, @@ -753,17 +770,6 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ pkt_wrapper = shaper_first_pkt_get(sf); assert(pkt_wrapper != NULL); -#if 0 - //AQM not implement yet - if (stub_AQM_drop_packet(sf->queue_len, pkt_wrapper->enqueue_time_us)) { - rule = &sf->matched_rule_infos[sf->anchor]; - shaper_stat_drop_inc(stat_hashtbl, rule->id, rule->primary.id, rule->primary.priority, pkt_wrapper->direction, pkt_wrapper->length); - - shaping_ret = SHAPING_DROP; - } else { - shaping_ret = shaper_pkt_action_decide(g_swarmkv_db, sf, sp, priority, stat_hashtbl, 1); - } -#endif shaping_ret = shaper_pkt_action_decide_queueing(ctx, sf, priority); switch (shaping_ret) { @@ -831,7 +837,6 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu struct shaping_rule_info *s_rule; struct shaping_stat *stat = ctx->stat; struct shaping_marsio_info *marsio_info = ctx->marsio_info; - struct timespec curr_time; sf->processed_pkts++; @@ -843,10 +848,9 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu goto END;//for tcp pure control pkt, transmit it directly } - clock_gettime(CLOCK_MONOTONIC, &curr_time); if (!shaper_queue_empty(sf)) {//already have queueing pkt, enqueue directly s_rule = &sf->matched_rule_infos[0]; - if (0 == shaper_packet_enqueue(ctx, sf, rx_buff, &curr_time, meta)) { + if (0 == shaper_packet_enqueue(ctx, sf, rx_buff, meta)) { shaper_stat_queueing_pkt_inc(&s_rule->primary.stat, meta->dir, ctx->thread_index); shaper_global_stat_queueing_inc(ctx->global_stat, meta->raw_len); } else { @@ -856,37 +860,21 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index); } - } else { - if (0 != shaper_packet_enqueue(ctx, sf, rx_buff, &curr_time, meta)) { - marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index); - shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len); - shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, meta->raw_len); - - char *addr_str = addr_tuple4_to_str(&sf->tuple4); - LOG_ERROR("%s: shaping enqueue packet failed while queue empty for session: %s", LOG_TAG_SHAPING, addr_str); - if (addr_str) { - free(addr_str); - } - - goto END; - } - + } else {//no queueing pkt, decide action sf->anchor = 0; - shaping_ret = shaper_pkt_action_decide_no_queue(ctx, sf, &sf->matched_rule_infos[sf->anchor].primary); + shaping_ret = shaper_pkt_action_decide_no_queue(ctx, sf, meta, &sf->matched_rule_infos[sf->anchor].primary, rx_buff); switch (shaping_ret) { case SHAPING_QUEUED: shaper_global_stat_queueing_inc(ctx->global_stat, meta->raw_len); break; case SHAPING_DROP: marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index); - shaper_packet_dequeue(sf); shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len); shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, meta->raw_len); break; case SHAPING_FORWARD: marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1); - shaper_packet_dequeue(sf); shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len); shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len); break; |
