From ddd6873dab598216449de3aec2bf55c40cb6b619 Mon Sep 17 00:00:00 2001 From: root Date: Tue, 4 Jun 2024 07:29:04 +0000 Subject: 每个session的缓存队列分为in和out两个方向队列 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- shaping/src/shaper.cpp | 290 +++++++++++++++++++++++++------------------------ 1 file changed, 150 insertions(+), 140 deletions(-) (limited to 'shaping/src/shaper.cpp') diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 4accf9c..b55807e 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -38,12 +38,12 @@ extern "C" { #define SWARMKV_QUEUE_LEN_GET_CMD "HMGET tsg-shaping-%d priority-0 priority-1 priority-2 priority-3 priority-4 priority-5 priority-6 priority-7 priority-8 priority-9" struct shaper {//trees in one thread - struct avl_tree *priority_trees[SHAPING_PRIORITY_NUM_MAX];//represent 10 avl tree corresponding to 10 priority + struct avl_tree *priority_trees[SHAPING_DIR_MAX][SHAPING_PRIORITY_NUM_MAX];//represent 10 avl tree corresponding to 10 priority }; struct shaping_node {//a session will have 10 nodes, corresponding 10 avl tree struct shaping_flow shaping_flow; - struct avl_node *avl_node[SHAPING_PRIORITY_NUM_MAX]; + struct avl_node *avl_node[SHAPING_DIR_MAX][SHAPING_PRIORITY_NUM_MAX]; }; struct shaping_profile_container { @@ -63,18 +63,18 @@ thread_local static int thread_swarmkv_cb_cnt = 0; struct shaper* shaper_new(unsigned int priority_queue_len_max) { struct shaper *sp = NULL; - int i; sp = (struct shaper*)calloc(1, sizeof(struct shaper)); if (!sp) { goto ERROR; } - - for (i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) { - sp->priority_trees[i] = avl_tree_init(priority_queue_len_max); - if (!sp->priority_trees[i]) { - goto ERROR; + for (int i = 0; i < SHAPING_DIR_MAX; i++) { + for (int j = 0; j < SHAPING_PRIORITY_NUM_MAX; j++) { + sp->priority_trees[i][j] = avl_tree_init(priority_queue_len_max); + if (!sp->priority_trees[i][j]) { + goto ERROR; + } } } @@ -87,12 +87,12 @@ ERROR: void shaper_free(struct shaper *sp) { - int i; - if (sp) { - for (i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) { - if (sp->priority_trees[i]) { - avl_tree_destroy(sp->priority_trees[i]); + for (int i = 0; i < SHAPING_DIR_MAX; i++) { + for (int j = 0; j < SHAPING_PRIORITY_NUM_MAX; j++) { + if (sp->priority_trees[i][j]) { + avl_tree_destroy(sp->priority_trees[i][j]); + } } } free(sp); @@ -103,12 +103,12 @@ void shaper_free(struct shaper *sp) static void shaping_node_free(struct shaping_node *s_node) { - int i; - if (s_node) { - for (i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) { - if (s_node->avl_node[i]) { - avl_tree_node_free(s_node->avl_node[i]); + for (int i = 0; i < SHAPING_DIR_MAX; i++) { + for (int j = 0; j < SHAPING_PRIORITY_NUM_MAX; j++) { + if (s_node->avl_node[i][j]) { + avl_tree_node_free(s_node->avl_node[i][j]); + } } } @@ -129,23 +129,25 @@ static void shaping_node_free(struct shaping_node *s_node) struct shaping_flow* shaping_flow_new(struct shaping_thread_ctx *ctx) { struct shaping_node *s_node = NULL; - int i; s_node = (struct shaping_node*)calloc(1, sizeof(struct shaping_node)); if (!s_node) { goto ERROR; } - for (i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) { - s_node->avl_node[i] = avl_tree_node_new(0, &s_node->shaping_flow, NULL); - if (!s_node->avl_node[i]) { - goto ERROR; + for (int i = 0; i < SHAPING_DIR_MAX; i++) { + for (int j = 0; j < SHAPING_PRIORITY_NUM_MAX; j++) { + s_node->avl_node[i][j] = avl_tree_node_new(0, &s_node->shaping_flow, NULL); + if (!s_node->avl_node[i][j]) { + goto ERROR; + } } } - TAILQ_INIT(&s_node->shaping_flow.packet_queue); - s_node->shaping_flow.priority = SHAPING_PRIORITY_NUM_MAX - 1; + TAILQ_INIT(&s_node->shaping_flow.packet_queue[SHAPING_DIR_IN]); + TAILQ_INIT(&s_node->shaping_flow.packet_queue[SHAPING_DIR_OUT]); + s_node->shaping_flow.priority = SHAPING_PRIORITY_NUM_MAX - 1; s_node->shaping_flow.ref_cnt = 1; return &s_node->shaping_flow; @@ -195,34 +197,33 @@ static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_ } s_pkt->pkt_buff = pkt_buff; - s_pkt->direction = meta->dir; s_pkt->length = meta->raw_len; - s_pkt->rule_anchor = sf->anchor; + s_pkt->rule_anchor = sf->anchor[meta->dir]; s_pkt->income_time_ns = curr_time->tv_sec * NANO_SECONDS_PER_SEC + curr_time->tv_nsec; s_pkt->enqueue_time_us = curr_time->tv_sec * MICRO_SECONDS_PER_SEC + curr_time->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; - TAILQ_INSERT_TAIL(&sf->packet_queue, s_pkt, node); + TAILQ_INSERT_TAIL(&sf->packet_queue[meta->dir], s_pkt, node); sf->queue_len++; return 0; } -bool shaper_queue_empty(struct shaping_flow *sf) +bool shaper_queue_empty(struct shaping_flow *sf, enum shaping_packet_dir dir) { - return TAILQ_EMPTY(&sf->packet_queue); + return TAILQ_EMPTY(&sf->packet_queue[dir]); } -struct shaping_packet_wrapper* shaper_first_pkt_get(struct shaping_flow *sf) +static struct shaping_packet_wrapper* shaper_first_pkt_get(struct shaping_flow *sf, enum shaping_packet_dir dir) { - return TAILQ_FIRST(&sf->packet_queue); + return TAILQ_FIRST(&sf->packet_queue[dir]); } -void shaper_packet_dequeue(struct shaping_flow *sf) +static void shaper_packet_dequeue(struct shaping_flow *sf, enum shaping_packet_dir dir) { struct shaping_packet_wrapper *s_pkt; - s_pkt = TAILQ_FIRST(&sf->packet_queue); + s_pkt = TAILQ_FIRST(&sf->packet_queue[dir]); if (s_pkt) { - TAILQ_REMOVE(&sf->packet_queue, s_pkt, node); + TAILQ_REMOVE(&sf->packet_queue[dir], s_pkt, node); sf->queue_len--; free(s_pkt); } @@ -230,43 +231,43 @@ void shaper_packet_dequeue(struct shaping_flow *sf) return; } -void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx) +void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx, enum shaping_packet_dir dir) { struct shaping_packet_wrapper *pkt_wrapper; struct shaping_rule_info *rule = &sf->matched_rule_infos[0]; - while (!shaper_queue_empty(sf)) { - pkt_wrapper = shaper_first_pkt_get(sf); + while (!shaper_queue_empty(sf, dir)) { + pkt_wrapper = shaper_first_pkt_get(sf, dir); - shaper_stat_queueing_pkt_dec(&rule->primary.stat, pkt_wrapper->direction, ctx->thread_index); + shaper_stat_queueing_pkt_dec(&rule->primary.stat, dir, ctx->thread_index); shaper_stat_drop_inc(&rule->primary.stat, pkt_wrapper->length, ctx->thread_index); shaper_global_stat_queueing_dec(&ctx->thread_global_stat, pkt_wrapper->length); shaper_global_stat_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length); shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length); marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index); - shaper_packet_dequeue(sf); + shaper_packet_dequeue(sf, dir); } return; } //return success(0) while any avl tree insert success -static int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time_us) +static int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, enum shaping_packet_dir dir, unsigned long long enqueue_time_us) { struct shaping_node *s_node = (struct shaping_node*)sf; - struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor]; + struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor[dir]]; struct shaper *sp = ctx->sp; struct shaping_packet_wrapper *pkt_wrapper = NULL; int priority; int i; - pkt_wrapper = shaper_first_pkt_get(sf); + pkt_wrapper = shaper_first_pkt_get(sf, dir); assert(pkt_wrapper != NULL); priority = s_rule_info->primary.priority; - avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns); - if (0 != avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {//primary profile failed means flow push failed, ignore borrow profile + avl_tree_node_key_set(s_node->avl_node[dir][priority], pkt_wrapper->income_time_ns); + if (0 != avl_tree_node_insert(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority])) {//primary profile failed means flow push failed, ignore borrow profile return -1; } @@ -276,44 +277,44 @@ static int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow for (i = 0; i < s_rule_info->borrowing_num; i++) { priority = s_rule_info->borrowing[i].priority; - avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns); - if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) { - shaper_stat_queueing_pkt_inc(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index); + avl_tree_node_key_set(s_node->avl_node[dir][priority], pkt_wrapper->income_time_ns); + if (0 == avl_tree_node_insert(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority])) { + shaper_stat_queueing_pkt_inc(&s_rule_info->borrowing[i].stat, dir, ctx->thread_index); } } END: - s_rule_info->primary.enqueue_time_us = enqueue_time_us; - shaper_stat_queueing_pkt_inc(&s_rule_info->primary.stat, pkt_wrapper->direction, ctx->thread_index); + s_rule_info->primary.enqueue_time_us[dir] = enqueue_time_us; + shaper_stat_queueing_pkt_inc(&s_rule_info->primary.stat, dir, ctx->thread_index); return 0; } -static unsigned long long shaper_pkt_latency_us_calculate(struct shaping_profile_info *profile, struct timespec *time) +static unsigned long long shaper_pkt_latency_us_calculate(struct shaping_profile_info *profile, struct timespec *time, enum shaping_packet_dir dir) { - unsigned long long enqueue_time = profile->enqueue_time_us; + unsigned long long enqueue_time = profile->enqueue_time_us[dir]; unsigned long long curr_time = time->tv_sec * MICRO_SECONDS_PER_SEC + time->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; assert(curr_time >= enqueue_time); return (curr_time - enqueue_time); } -static void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct timespec *curr_time) +static void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, enum shaping_packet_dir dir, struct timespec *curr_time) { struct shaping_node *s_node = (struct shaping_node*)sf; - struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor]; + struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor[dir]]; struct shaper *sp = ctx->sp; struct shaping_packet_wrapper *pkt_wrapper = NULL; unsigned long long latency; int priority; int i; - pkt_wrapper = shaper_first_pkt_get(sf); + pkt_wrapper = shaper_first_pkt_get(sf, dir); assert(pkt_wrapper != NULL); priority = s_rule_info->primary.priority; - if (avl_node_in_tree(s_node->avl_node[priority])) { - avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); + if (avl_node_in_tree(s_node->avl_node[dir][priority])) { + avl_tree_node_remove(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority]); } if (s_rule_info->borrowing_num == 0) { @@ -322,35 +323,35 @@ static void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow for (i = 0; i < s_rule_info->borrowing_num; i++) { priority = s_rule_info->borrowing[i].priority; - if (avl_node_in_tree(s_node->avl_node[priority])) { - avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); - shaper_stat_queueing_pkt_dec(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index); + if (avl_node_in_tree(s_node->avl_node[dir][priority])) { + avl_tree_node_remove(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority]); + shaper_stat_queueing_pkt_dec(&s_rule_info->borrowing[i].stat, dir, ctx->thread_index); } } END: - latency = shaper_pkt_latency_us_calculate(&s_rule_info->primary, curr_time); - shaper_stat_max_latency_update(&s_rule_info->primary.stat, pkt_wrapper->direction, latency, ctx->thread_index); - shaper_stat_queueing_pkt_dec(&s_rule_info->primary.stat, pkt_wrapper->direction, ctx->thread_index); + latency = shaper_pkt_latency_us_calculate(&s_rule_info->primary, curr_time, dir); + shaper_stat_max_latency_update(&s_rule_info->primary.stat, dir, latency, ctx->thread_index); + shaper_stat_queueing_pkt_dec(&s_rule_info->primary.stat, dir, ctx->thread_index); return; } -static void shaper_flow_specific_borrow_priority_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int priority) +static void shaper_flow_specific_borrow_priority_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, enum shaping_packet_dir dir, int priority) { struct shaping_node *s_node = (struct shaping_node*)sf; - struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor]; + struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor[dir]]; struct shaper *sp = ctx->sp; struct shaping_packet_wrapper *pkt_wrapper = NULL; - pkt_wrapper = shaper_first_pkt_get(sf); + pkt_wrapper = shaper_first_pkt_get(sf, dir); assert(pkt_wrapper != NULL); for (int i = 0; i < s_rule_info->borrowing_num; i++) { if (priority == s_rule_info->borrowing[i].priority) { - if (avl_node_in_tree(s_node->avl_node[priority])) { - avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); - shaper_stat_queueing_pkt_dec(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index); + if (avl_node_in_tree(s_node->avl_node[dir][priority])) { + avl_tree_node_remove(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority]); + shaper_stat_queueing_pkt_dec(&s_rule_info->borrowing[i].stat, dir, ctx->thread_index); } } } @@ -358,7 +359,7 @@ static void shaper_flow_specific_borrow_priority_pop(struct shaping_thread_ctx * return; } -int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num) +static int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], enum shaping_packet_dir dir, int priority, int max_sf_num) { struct avl_node *avl_node = NULL; int count = 0; @@ -367,7 +368,7 @@ int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_i return 0; } - avl_node = avl_tree_minimum_node_get(sp->priority_trees[priority]); + avl_node = avl_tree_minimum_node_get(sp->priority_trees[dir][priority]); while(avl_node) { sf_ins[count].sf = (struct shaping_flow*)avl_tree_node_data_get(avl_node); sf_ins[count].priority = priority; @@ -601,7 +602,7 @@ static int shaper_deposit_token_get(struct shaping_profile_info *profile, int re return 0; } - if (*deposit_token < req_token_bits) { + if (*deposit_token <= req_token_bits) { *need_get_token = 1; } @@ -635,7 +636,7 @@ static void shaper_profile_hash_node_refresh(struct shaping_thread_ctx *ctx, str return; } -static void shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *pf_info, int req_token_bits, unsigned char direction, struct timespec *curr_timespec) +static void shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *pf_info, int req_token_bits, enum shaping_packet_dir dir, struct timespec *curr_timespec) { struct shaping_tconsume_cb_arg *arg = NULL; struct shaping_profile_hash_node *pf_hash_node = pf_info->hash_node; @@ -650,14 +651,14 @@ static void shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct if (pf_hash_node->limit_direction == PROFILE_LIMIT_DIRECTION_BIDIRECTION) { snprintf(key, sizeof(key), "tsg-shaping-%d-bidirectional", pf_info->id); } else { - snprintf(key, sizeof(key), "tsg-shaping-%d-%s", pf_info->id, direction == SHAPING_DIR_OUT ? "outgoing" : "incoming"); + snprintf(key, sizeof(key), "tsg-shaping-%d-%s", pf_info->id, dir == SHAPING_DIR_OUT ? "outgoing" : "incoming"); } arg = (struct shaping_tconsume_cb_arg *)calloc(1, sizeof(struct shaping_tconsume_cb_arg)); arg->ctx = ctx; arg->profile = pf_info; arg->sf = sf; - arg->direction = direction; + arg->direction = dir; arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat); @@ -672,7 +673,7 @@ static void shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct break; case PROFILE_TYPE_HOST_FARINESS: case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS: - swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits, shaper_token_get_cb, arg); + swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor[dir]].fair_factor, req_token_bits, shaper_token_get_cb, arg); //TODO: ftconsume with flexiable //swarmkv_async_command(ctx->swarmkv_db, shaper_token_get_cb, arg, "FTCONSUME %s %s %d %d %s", key, sf->src_ip_str, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits, "FLEXIBLE"); break; @@ -826,9 +827,9 @@ static int shaping_swarmkv_is_too_short_interval(long long curr_time_ms, struct } } -static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, int profile_type, int req_token_bytes, unsigned char direction, struct timespec *curr_timespec) +static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, int profile_type, int req_token_bytes, enum shaping_packet_dir direction, struct timespec *curr_timespec) { - struct shaping_rule_info *rule = &sf->matched_rule_infos[sf->anchor]; + struct shaping_rule_info *rule = &sf->matched_rule_infos[sf->anchor[direction]]; int need_get_token = 0; int ret = SHAPER_TOKEN_GET_FAILED; @@ -917,9 +918,9 @@ int shaper_profile_get(struct shaping_rule_info *s_rule_info, int priority, stru return num; } -static int shaper_next_anchor_get(struct shaping_flow *sf, unsigned char direction) +static int shaper_next_anchor_get(struct shaping_flow *sf, enum shaping_packet_dir dir) { - int anchor = sf->anchor + 1; + int anchor = sf->anchor[dir] + 1; if (anchor > sf->rule_num - 1) { return 0; @@ -928,7 +929,7 @@ static int shaper_next_anchor_get(struct shaping_flow *sf, unsigned char directi return anchor; } -static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int priority) +static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, enum shaping_packet_dir dir, int priority) { struct shaping_rule_info *rule = NULL; struct shaping_profile_info *profile = NULL; @@ -941,18 +942,18 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi int get_token_success = 0; int profile_num; - rule = &sf->matched_rule_infos[sf->anchor]; + rule = &sf->matched_rule_infos[sf->anchor[dir]]; profile_num = shaper_profile_get(rule, priority, pf_container); assert(profile_num > 0); - pkt_wrapper = shaper_first_pkt_get(sf); + pkt_wrapper = shaper_first_pkt_get(sf, dir); assert(pkt_wrapper != NULL); clock_gettime(CLOCK_MONOTONIC, &curr_time); - latency_us = shaper_pkt_latency_us_calculate(&rule->primary, &curr_time); + latency_us = shaper_pkt_latency_us_calculate(&rule->primary, &curr_time, dir); if (pf_container[0].pf_type == PROFILE_IN_RULE_TYPE_PRIMARY) { if (latency_us > ctx->conf.pkt_max_delay_time_us) { - shaper_flow_pop(ctx, sf, &curr_time); + shaper_flow_pop(ctx, sf, dir, &curr_time); goto DROP; } } @@ -964,18 +965,18 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi /*AQM process, if aqm not pass, for primary profile drop packet, for borrow profile just don't give token to this packet*/ if (shaper_aqm_need_drop(profile, pkt_wrapper, &curr_time, latency_us)) { if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) { - shaper_flow_pop(ctx, sf, &curr_time); + shaper_flow_pop(ctx, sf, dir, &curr_time); goto DROP; } else { - shaper_flow_specific_borrow_priority_pop(ctx, sf, priority); + shaper_flow_specific_borrow_priority_pop(ctx, sf, dir, priority); continue; } } - int ret = shaper_token_consume(ctx, sf, profile, profile_type, pkt_wrapper->length, pkt_wrapper->direction, &curr_time); + int ret = shaper_token_consume(ctx, sf, profile, profile_type, pkt_wrapper->length, dir, &curr_time); if (ret >= SHAPER_TOKEN_GET_SUCCESS) { if (ret == SHAPER_TOKEN_GET_SUCCESS) { - shaper_stat_forward_inc(&profile->stat, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index); + shaper_stat_forward_inc(&profile->stat, dir, pkt_wrapper->length, ctx->thread_index); } get_token_success = 1; break; @@ -986,24 +987,24 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi return SHAPING_QUEUED; } - shaper_flow_pop(ctx, sf, &curr_time); - sf->anchor = shaper_next_anchor_get(sf, pkt_wrapper->direction); - if (sf->anchor == 0) {//no next rule + shaper_flow_pop(ctx, sf, dir, &curr_time); + sf->anchor[dir] = shaper_next_anchor_get(sf, dir); + if (sf->anchor[dir] == 0) {//no next rule return SHAPING_FORWARD; } //push sf for next rule enqueue_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; - if (0 == shaper_flow_push(ctx, sf, enqueue_time_us)) { + if (0 == shaper_flow_push(ctx, sf, dir, enqueue_time_us)) { return SHAPING_QUEUED; } else { goto DROP; } DROP: - rule = &sf->matched_rule_infos[sf->anchor]; - shaper_stat_drop_inc(&rule->primary.stat, pkt_wrapper->direction, ctx->thread_index); - sf->anchor = 0; + rule = &sf->matched_rule_infos[sf->anchor[dir]]; + shaper_stat_drop_inc(&rule->primary.stat, dir, ctx->thread_index); + sf->anchor[dir] = 0; return SHAPING_DROP; } @@ -1021,8 +1022,8 @@ static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shapi shaper_stat_forward_inc(&profile->stat, meta->dir, meta->raw_len, ctx->thread_index); } - sf->anchor = shaper_next_anchor_get(sf, meta->dir); - if (sf->anchor == 0) {//no next rule + sf->anchor[meta->dir] = shaper_next_anchor_get(sf, meta->dir); + if (sf->anchor[meta->dir] == 0) {//no next rule return SHAPING_FORWARD; } } @@ -1040,7 +1041,7 @@ static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shapi } enqueue_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; - if (0 == shaper_flow_push(ctx, sf, enqueue_time_us)) { + if (0 == shaper_flow_push(ctx, sf, meta->dir, enqueue_time_us)) { return SHAPING_QUEUED; } else { goto DROP; @@ -1048,30 +1049,30 @@ static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shapi DROP: if (enqueue_success) { - shaper_packet_dequeue(sf); + shaper_packet_dequeue(sf, meta->dir); } - struct shaping_profile_info *pf_info = &sf->matched_rule_infos[sf->anchor].primary; + struct shaping_profile_info *pf_info = &sf->matched_rule_infos[sf->anchor[meta->dir]].primary; shaper_stat_drop_inc(&pf_info->stat, meta->dir, ctx->thread_index); - sf->anchor = 0; + sf->anchor[meta->dir] = 0; return SHAPING_DROP; } -static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_flow *sf, int priority, - struct shaping_stat *stat, struct shaping_thread_ctx *ctx) +static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_flow *sf, enum shaping_packet_dir dir, + int priority, struct shaping_stat *stat, struct shaping_thread_ctx *ctx) { struct shaping_packet_wrapper *pkt_wrapper; - int old_anchor = sf->anchor; + int old_anchor = sf->anchor[dir]; int shaping_ret; - pkt_wrapper = shaper_first_pkt_get(sf); + pkt_wrapper = shaper_first_pkt_get(sf, dir); assert(pkt_wrapper != NULL); - shaping_ret = shaper_pkt_action_decide_queueing(ctx, sf, priority); + shaping_ret = shaper_pkt_action_decide_queueing(ctx, sf, dir, priority); switch (shaping_ret) { case SHAPING_QUEUED: - if (old_anchor == sf->anchor) {//didn't get token + if (old_anchor == sf->anchor[dir]) {//didn't get token return -1; } else {//got token for one rule and waiting get token for next rule return 0; @@ -1083,7 +1084,7 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length); marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index); - shaper_packet_dequeue(sf); + shaper_packet_dequeue(sf, dir); break; case SHAPING_FORWARD: shaper_global_stat_queueing_dec(&ctx->thread_global_stat, pkt_wrapper->length); @@ -1091,7 +1092,7 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ shaper_global_stat_hit_policy_throughput_tx_inc(&ctx->thread_global_stat, pkt_wrapper->length); marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &pkt_wrapper->pkt_buff, 1); - shaper_packet_dequeue(sf); + shaper_packet_dequeue(sf, dir); break; default: assert(0);//impossible path @@ -1100,20 +1101,21 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ shaper_stat_refresh(ctx, sf, 0); - if (shaper_queue_empty(sf)) { - if (sf->flag & SESSION_CLOSE) { + enum shaping_packet_dir dir_opposite = (dir == SHAPING_DIR_IN) ? SHAPING_DIR_OUT : SHAPING_DIR_IN; + if (shaper_queue_empty(sf, dir)) { + if (shaper_queue_empty(sf, dir_opposite) && sf->flag & SESSION_CLOSE) { sf->flag &= (~SESSION_CLOSE); shaping_flow_free(ctx, sf); } return 0; } else { - pkt_wrapper = shaper_first_pkt_get(sf); - shaper_stat_queueing_pkt_dec_for_rule(&sf->matched_rule_infos[pkt_wrapper->rule_anchor], pkt_wrapper->direction, ctx->thread_index); + pkt_wrapper = shaper_first_pkt_get(sf, dir); + shaper_stat_queueing_pkt_dec_for_rule(&sf->matched_rule_infos[pkt_wrapper->rule_anchor], dir, ctx->thread_index); - sf->anchor = 0; - if (shaper_flow_push(ctx, sf, pkt_wrapper->enqueue_time_us) != 0) { - shaper_queue_clear(sf, ctx);//first packet fail, then every packet will fail - if (sf->flag & SESSION_CLOSE) { + sf->anchor[dir] = 0; + if (shaper_flow_push(ctx, sf, dir, pkt_wrapper->enqueue_time_us) != 0) { + shaper_queue_clear(sf, ctx, dir);//first packet fail, then every packet will fail + if (shaper_queue_empty(sf, dir_opposite) && sf->flag & SESSION_CLOSE) { sf->flag &= (~SESSION_CLOSE); shaping_flow_free(ctx, sf); } @@ -1162,11 +1164,11 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu goto END;//for tcp pure control pkt, transmit it directly } - if (!shaper_queue_empty(sf)) {//already have queueing pkt, enqueue directly + if (!shaper_queue_empty(sf, meta->dir)) {//already have queueing pkt, enqueue directly struct timespec curr_time; clock_gettime(CLOCK_MONOTONIC, &curr_time); - s_rule = &sf->matched_rule_infos[sf->anchor]; + s_rule = &sf->matched_rule_infos[sf->anchor[meta->dir]]; if (0 == shaper_packet_enqueue(ctx, sf, rx_buff, meta, &curr_time)) { shaper_stat_queueing_pkt_inc_for_rule(s_rule, meta->dir, ctx->thread_index); shaper_global_stat_queueing_inc(&ctx->thread_global_stat, meta->raw_len); @@ -1180,9 +1182,9 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu } } else {//no queueing pkt, decide action - sf->anchor = 0; + sf->anchor[meta->dir] = 0; - shaping_ret = shaper_pkt_action_decide_no_queue(ctx, sf, meta, &sf->matched_rule_infos[sf->anchor].primary, rx_buff); + shaping_ret = shaper_pkt_action_decide_no_queue(ctx, sf, meta, &sf->matched_rule_infos[sf->anchor[meta->dir]].primary, rx_buff); switch (shaping_ret) { case SHAPING_QUEUED: shaper_global_stat_queueing_inc(&ctx->thread_global_stat, meta->raw_len); @@ -1207,7 +1209,7 @@ END: shaper_stat_refresh(ctx, sf, 0); if(sf->flag & SESSION_CLOSE) { - if (shaper_queue_empty(sf)) { + if (shaper_queue_empty(sf, SHAPING_DIR_IN) && shaper_queue_empty(sf, SHAPING_DIR_OUT)) { char *addr_str = addr_tuple4_to_str(&sf->tuple4); LOG_DEBUG("%s: shaping free a shaping_flow for session: %s", LOG_TAG_SHAPING, addr_str); @@ -1223,6 +1225,29 @@ END: return; } +static void shaper_polling_token_get(struct shaper *sp, struct shaping_stat *stat, struct shaping_thread_ctx *ctx, enum shaping_packet_dir dir) +{ + struct shaper_flow_instance sf_ins[SHAPER_FLOW_POP_NUM_MAX]; + int sf_num; + int ret; + + for (int i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) { + sf_num = shaper_flow_in_order_get(sp, sf_ins, dir, i, ctx->conf.polling_node_num_max[i]); + if (sf_num == 0) { + continue; + } + + for (int j = 0; j < sf_num; j++) { + ret = shaper_polling_first_pkt_token_get(sp, sf_ins[j].sf, dir, sf_ins[j].priority, stat, ctx); + if (ret == 0) { + return; + } + } + } + + return; +} + void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_thread_ctx *ctx) { static thread_local int swarmkv_caller_loop_divisor = SWARMKV_CALLER_LOOP_DIVISOR_MIN; @@ -1265,23 +1290,8 @@ void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_ return; } - struct shaper_flow_instance sf_ins[SHAPER_FLOW_POP_NUM_MAX]; - int sf_num; - int ret; - - for (int i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) { - sf_num = shaper_flow_in_order_get(sp, sf_ins, i, ctx->conf.polling_node_num_max[i]); - if (sf_num == 0) { - continue; - } - - for (int j = 0; j < sf_num; j++) { - ret = shaper_polling_first_pkt_token_get(sp, sf_ins[j].sf, sf_ins[j].priority, stat, ctx); - if (ret == 0) { - return; - } - } - } + shaper_polling_token_get(sp, stat, ctx, SHAPING_DIR_IN); + shaper_polling_token_get(sp, stat, ctx, SHAPING_DIR_OUT); return; } -- cgit v1.2.3