diff options
Diffstat (limited to 'shaping/src/shaper.cpp')
| -rw-r--r-- | shaping/src/shaper.cpp | 112 |
1 files changed, 50 insertions, 62 deletions
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index f8dbe6e..ca2893b 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -27,6 +27,8 @@ extern "C" { #define MICRO_SECONDS_PER_SEC 1000000 #define NANO_SECONDS_PER_SEC 1000000000 +#define SHAPING_LATENCY_THRESHOLD 2000000 //2s + #define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 "HMGET tsg-shaping-%d priority-0" #define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 " priority-1" #define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 " priority-2" @@ -169,9 +171,10 @@ void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) return; } -static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, void *pkt_buff, struct timespec *income_time, struct metadata *meta) +static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, void *pkt_buff, struct metadata *meta) { struct shaping_packet_wrapper *s_pkt = NULL; + struct timespec curr_time; if (sf->queue_len == ctx->session_queue_len_max) { return -1; @@ -182,12 +185,14 @@ static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_ return -1; } + clock_gettime(CLOCK_MONOTONIC, &curr_time); + s_pkt->pkt_buff = pkt_buff; s_pkt->direction = meta->dir; s_pkt->length = meta->raw_len; s_pkt->tcp_pure_contorl = meta->is_tcp_pure_ctrl; - s_pkt->income_time_ns = income_time->tv_sec * NANO_SECONDS_PER_SEC + income_time->tv_nsec; - s_pkt->enqueue_time_us = income_time->tv_sec * MICRO_SECONDS_PER_SEC + income_time->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; + s_pkt->income_time_ns = curr_time.tv_sec * NANO_SECONDS_PER_SEC + curr_time.tv_nsec; + s_pkt->enqueue_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; TAILQ_INSERT_TAIL(&sf->packet_queue, s_pkt, node); sf->queue_len++; @@ -307,7 +312,7 @@ END: return ret; } -static unsigned long long shaper_pkt_latency_calculate(struct shaping_profile_info *profile, struct timespec *time) +static unsigned long long shaper_pkt_latency_us_calculate(struct shaping_profile_info *profile, struct timespec *time) { unsigned long long enqueue_time = profile->enqueue_time_us; unsigned long long curr_time = time->tv_sec * MICRO_SECONDS_PER_SEC + time->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; @@ -359,7 +364,7 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) } END: - latency = shaper_pkt_latency_calculate(&s_rule_info->primary, &curr_time); + latency = shaper_pkt_latency_us_calculate(&s_rule_info->primary, &curr_time); shaper_stat_max_latency_update(&s_rule_info->primary.stat, pkt_wrapper->direction, latency, ctx->thread_index); return; @@ -665,6 +670,13 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi return SHAPING_FORWARD; } + clock_gettime(CLOCK_MONOTONIC, &curr_time); + if (shaper_pkt_latency_us_calculate(pf_container[0].pf_info, &curr_time) > SHAPING_LATENCY_THRESHOLD) { + shaper_flow_pop(ctx, sf); + goto DROP; + } + /*todo: AQM*/ + for (int i = 0; i < profile_num; i++) { profile = pf_container[i].pf_info; profile_type = pf_container[i].pf_type; @@ -691,55 +703,60 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi if (0 == shaper_flow_push(ctx, sf, enqueue_time)) { return SHAPING_QUEUED; } else { - rule = &sf->matched_rule_infos[sf->anchor]; - shaper_stat_drop_inc(&rule->primary.stat, pkt_wrapper->direction, ctx->thread_index); - sf->anchor = 0; - return SHAPING_DROP; + goto DROP; } + +DROP: + rule = &sf->matched_rule_infos[sf->anchor]; + shaper_stat_drop_inc(&rule->primary.stat, pkt_wrapper->direction, ctx->thread_index); + sf->anchor = 0; + return SHAPING_DROP; } -static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile) +static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct metadata *meta, struct shaping_profile_info *profile, marsio_buff_t *rx_buff) { int profile_type = PROFILE_IN_RULE_TYPE_PRIMARY; - struct shaping_rule_info *rule = NULL; - struct shaping_packet_wrapper *pkt_wrapper = NULL; struct timespec curr_time; unsigned long long enqueue_time; - rule = &sf->matched_rule_infos[sf->anchor]; - pkt_wrapper = shaper_first_pkt_get(sf); - assert(pkt_wrapper != NULL); - - if (pkt_wrapper->tcp_pure_contorl) { - shaper_stat_forward_all_rule_inc(ctx->stat, sf, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index); + if (meta->is_tcp_pure_ctrl) { + shaper_stat_forward_all_rule_inc(ctx->stat, sf, meta->dir, meta->raw_len, ctx->thread_index); return SHAPING_FORWARD; } - if (0 == shaper_token_consume(ctx, sf, pkt_wrapper->length, profile, profile_type, pkt_wrapper->direction)) { - shaper_stat_forward_inc(&profile->stat, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index); + if (0 == shaper_token_consume(ctx, sf, meta->raw_len, profile, profile_type, meta->dir)) { + shaper_stat_forward_inc(&profile->stat, meta->dir, meta->raw_len, ctx->thread_index); - sf->anchor = shaper_next_anchor_get(sf, pkt_wrapper->direction); + sf->anchor = shaper_next_anchor_get(sf, meta->dir); if (sf->anchor == 0) {//no next rule return SHAPING_FORWARD; } else { - clock_gettime(CLOCK_MONOTONIC, &curr_time); - enqueue_time = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; goto FLOW_PUSH; } - } else { - enqueue_time = pkt_wrapper->enqueue_time_us; - goto FLOW_PUSH; } FLOW_PUSH: + if (shaper_packet_enqueue(ctx, sf, rx_buff, meta) != 0) { + char *addr_str = addr_tuple4_to_str(&sf->tuple4); + LOG_ERROR("%s: shaping enqueue packet failed while queue empty for session: %s", LOG_TAG_SHAPING, addr_str); + if (addr_str) { + free(addr_str); + } + goto DROP; + } + + clock_gettime(CLOCK_MONOTONIC, &curr_time); + enqueue_time = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; if (0 == shaper_flow_push(ctx, sf, enqueue_time)) { return SHAPING_QUEUED; } else { - rule = &sf->matched_rule_infos[sf->anchor]; - shaper_stat_drop_inc(&rule->primary.stat, pkt_wrapper->direction, ctx->thread_index); - sf->anchor = 0; - return SHAPING_DROP; + goto DROP; } + +DROP: + shaper_stat_drop_inc(&sf->matched_rule_infos[sf->anchor].primary.stat, meta->dir, ctx->thread_index); + sf->anchor = 0; + return SHAPING_DROP; } static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_flow *sf, int priority, @@ -753,17 +770,6 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ pkt_wrapper = shaper_first_pkt_get(sf); assert(pkt_wrapper != NULL); -#if 0 - //AQM not implement yet - if (stub_AQM_drop_packet(sf->queue_len, pkt_wrapper->enqueue_time_us)) { - rule = &sf->matched_rule_infos[sf->anchor]; - shaper_stat_drop_inc(stat_hashtbl, rule->id, rule->primary.id, rule->primary.priority, pkt_wrapper->direction, pkt_wrapper->length); - - shaping_ret = SHAPING_DROP; - } else { - shaping_ret = shaper_pkt_action_decide(g_swarmkv_db, sf, sp, priority, stat_hashtbl, 1); - } -#endif shaping_ret = shaper_pkt_action_decide_queueing(ctx, sf, priority); switch (shaping_ret) { @@ -831,7 +837,6 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu struct shaping_rule_info *s_rule; struct shaping_stat *stat = ctx->stat; struct shaping_marsio_info *marsio_info = ctx->marsio_info; - struct timespec curr_time; sf->processed_pkts++; @@ -843,10 +848,9 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu goto END;//for tcp pure control pkt, transmit it directly } - clock_gettime(CLOCK_MONOTONIC, &curr_time); if (!shaper_queue_empty(sf)) {//already have queueing pkt, enqueue directly s_rule = &sf->matched_rule_infos[0]; - if (0 == shaper_packet_enqueue(ctx, sf, rx_buff, &curr_time, meta)) { + if (0 == shaper_packet_enqueue(ctx, sf, rx_buff, meta)) { shaper_stat_queueing_pkt_inc(&s_rule->primary.stat, meta->dir, ctx->thread_index); shaper_global_stat_queueing_inc(ctx->global_stat, meta->raw_len); } else { @@ -856,37 +860,21 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index); } - } else { - if (0 != shaper_packet_enqueue(ctx, sf, rx_buff, &curr_time, meta)) { - marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index); - shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len); - shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, meta->raw_len); - - char *addr_str = addr_tuple4_to_str(&sf->tuple4); - LOG_ERROR("%s: shaping enqueue packet failed while queue empty for session: %s", LOG_TAG_SHAPING, addr_str); - if (addr_str) { - free(addr_str); - } - - goto END; - } - + } else {//no queueing pkt, decide action sf->anchor = 0; - shaping_ret = shaper_pkt_action_decide_no_queue(ctx, sf, &sf->matched_rule_infos[sf->anchor].primary); + shaping_ret = shaper_pkt_action_decide_no_queue(ctx, sf, meta, &sf->matched_rule_infos[sf->anchor].primary, rx_buff); switch (shaping_ret) { case SHAPING_QUEUED: shaper_global_stat_queueing_inc(ctx->global_stat, meta->raw_len); break; case SHAPING_DROP: marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index); - shaper_packet_dequeue(sf); shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len); shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, meta->raw_len); break; case SHAPING_FORWARD: marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1); - shaper_packet_dequeue(sf); shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len); shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len); break; |
