diff options
Diffstat (limited to 'shaping/src')
| -rw-r--r-- | shaping/src/shaper.cpp | 77 | ||||
| -rw-r--r-- | shaping/src/shaper_aqm.cpp | 4 | ||||
| -rw-r--r-- | shaping/src/shaper_stat.cpp | 54 |
3 files changed, 81 insertions, 54 deletions
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index b55807e..89e5d1b 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -35,15 +35,16 @@ extern "C" { #define SWARMKV_CALLER_LOOP_DIVISOR_MIN 1 #define SWARMKV_CALLER_LOOP_DIVISOR_MAX 10 -#define SWARMKV_QUEUE_LEN_GET_CMD "HMGET tsg-shaping-%d priority-0 priority-1 priority-2 priority-3 priority-4 priority-5 priority-6 priority-7 priority-8 priority-9" +#define SWARMKV_IN_QUEUE_LEN_GET_CMD "HMGET tsg-shaping-%d priority-0-in priority-1-in priority-2-in priority-3-in priority-4-in priority-5-in priority-6-in priority-7-in priority-8-in priority-9-in" +#define SWARMKV_OUT_QUEUE_LEN_GET_CMD "HMGET tsg-shaping-%d priority-0-out priority-1-out priority-2-out priority-3-out priority-4-out priority-5-out priority-6-out priority-7-out priority-8-out priority-9-out" struct shaper {//trees in one thread - struct avl_tree *priority_trees[SHAPING_DIR_MAX][SHAPING_PRIORITY_NUM_MAX];//represent 10 avl tree corresponding to 10 priority + struct avl_tree *priority_trees[SHAPING_PRIORITY_NUM_MAX][SHAPING_DIR_MAX];//represent 10 avl tree corresponding to 10 priority }; struct shaping_node {//a session will have 10 nodes, corresponding 10 avl tree struct shaping_flow shaping_flow; - struct avl_node *avl_node[SHAPING_DIR_MAX][SHAPING_PRIORITY_NUM_MAX]; + struct avl_node *avl_node[SHAPING_PRIORITY_NUM_MAX][SHAPING_DIR_MAX]; }; struct shaping_profile_container { @@ -69,8 +70,8 @@ struct shaper* shaper_new(unsigned int priority_queue_len_max) goto ERROR; } - for (int i = 0; i < SHAPING_DIR_MAX; i++) { - for (int j = 0; j < SHAPING_PRIORITY_NUM_MAX; j++) { + for (int i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) { + for (int j = 0; j < SHAPING_DIR_MAX; j++) { sp->priority_trees[i][j] = avl_tree_init(priority_queue_len_max); if (!sp->priority_trees[i][j]) { goto ERROR; @@ -88,8 +89,8 @@ ERROR: void shaper_free(struct shaper *sp) { if (sp) { - for (int i = 0; i < SHAPING_DIR_MAX; i++) { - for (int j = 0; j < SHAPING_PRIORITY_NUM_MAX; j++) { + for (int i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) { + for (int j = 0; j < SHAPING_DIR_MAX; j++) { if (sp->priority_trees[i][j]) { avl_tree_destroy(sp->priority_trees[i][j]); } @@ -104,8 +105,8 @@ void shaper_free(struct shaper *sp) static void shaping_node_free(struct shaping_node *s_node) { if (s_node) { - for (int i = 0; i < SHAPING_DIR_MAX; i++) { - for (int j = 0; j < SHAPING_PRIORITY_NUM_MAX; j++) { + for (int i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) { + for (int j = 0; j < SHAPING_DIR_MAX; j++) { if (s_node->avl_node[i][j]) { avl_tree_node_free(s_node->avl_node[i][j]); } @@ -135,8 +136,8 @@ struct shaping_flow* shaping_flow_new(struct shaping_thread_ctx *ctx) goto ERROR; } - for (int i = 0; i < SHAPING_DIR_MAX; i++) { - for (int j = 0; j < SHAPING_PRIORITY_NUM_MAX; j++) { + for (int i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) { + for (int j = 0; j < SHAPING_DIR_MAX; j++) { s_node->avl_node[i][j] = avl_tree_node_new(0, &s_node->shaping_flow, NULL); if (!s_node->avl_node[i][j]) { goto ERROR; @@ -266,8 +267,8 @@ static int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow assert(pkt_wrapper != NULL); priority = s_rule_info->primary.priority; - avl_tree_node_key_set(s_node->avl_node[dir][priority], pkt_wrapper->income_time_ns); - if (0 != avl_tree_node_insert(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority])) {//primary profile failed means flow push failed, ignore borrow profile + avl_tree_node_key_set(s_node->avl_node[priority][dir], pkt_wrapper->income_time_ns); + if (0 != avl_tree_node_insert(sp->priority_trees[priority][dir], s_node->avl_node[priority][dir])) {//primary profile failed means flow push failed, ignore borrow profile return -1; } @@ -277,8 +278,8 @@ static int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow for (i = 0; i < s_rule_info->borrowing_num; i++) { priority = s_rule_info->borrowing[i].priority; - avl_tree_node_key_set(s_node->avl_node[dir][priority], pkt_wrapper->income_time_ns); - if (0 == avl_tree_node_insert(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority])) { + avl_tree_node_key_set(s_node->avl_node[priority][dir], pkt_wrapper->income_time_ns); + if (0 == avl_tree_node_insert(sp->priority_trees[priority][dir], s_node->avl_node[priority][dir])) { shaper_stat_queueing_pkt_inc(&s_rule_info->borrowing[i].stat, dir, ctx->thread_index); } } @@ -313,8 +314,8 @@ static void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow assert(pkt_wrapper != NULL); priority = s_rule_info->primary.priority; - if (avl_node_in_tree(s_node->avl_node[dir][priority])) { - avl_tree_node_remove(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority]); + if (avl_node_in_tree(s_node->avl_node[priority][dir])) { + avl_tree_node_remove(sp->priority_trees[priority][dir], s_node->avl_node[priority][dir]); } if (s_rule_info->borrowing_num == 0) { @@ -323,8 +324,8 @@ static void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow for (i = 0; i < s_rule_info->borrowing_num; i++) { priority = s_rule_info->borrowing[i].priority; - if (avl_node_in_tree(s_node->avl_node[dir][priority])) { - avl_tree_node_remove(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority]); + if (avl_node_in_tree(s_node->avl_node[priority][dir])) { + avl_tree_node_remove(sp->priority_trees[priority][dir], s_node->avl_node[priority][dir]); shaper_stat_queueing_pkt_dec(&s_rule_info->borrowing[i].stat, dir, ctx->thread_index); } } @@ -349,8 +350,8 @@ static void shaper_flow_specific_borrow_priority_pop(struct shaping_thread_ctx * for (int i = 0; i < s_rule_info->borrowing_num; i++) { if (priority == s_rule_info->borrowing[i].priority) { - if (avl_node_in_tree(s_node->avl_node[dir][priority])) { - avl_tree_node_remove(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority]); + if (avl_node_in_tree(s_node->avl_node[priority][dir])) { + avl_tree_node_remove(sp->priority_trees[priority][dir], s_node->avl_node[priority][dir]); shaper_stat_queueing_pkt_dec(&s_rule_info->borrowing[i].stat, dir, ctx->thread_index); } } @@ -368,7 +369,7 @@ static int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instan return 0; } - avl_node = avl_tree_minimum_node_get(sp->priority_trees[dir][priority]); + avl_node = avl_tree_minimum_node_get(sp->priority_trees[priority][dir]); while(avl_node) { sf_ins[count].sf = (struct shaping_flow*)avl_tree_node_data_get(avl_node); sf_ins[count].priority = priority; @@ -693,7 +694,7 @@ static void shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb_arg) { struct shaping_hmget_cb_arg *arg = (struct shaping_hmget_cb_arg *)cb_arg; - struct shaping_thread_ctx *ctx = arg->ctx; + struct shaping_thread_ctx *ctx = arg->ctx; struct shaping_profile_hash_node *pf_hash_node = arg->pf_hash_node; struct timespec curr_time; long long curr_time_us; @@ -722,21 +723,21 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb if (reply->elements[i]->type == SWARMKV_REPLY_STRING) { char tmp_str[32] = {0}; memcpy(tmp_str, reply->elements[i]->str, reply->elements[i]->len); - pf_hash_node->queue_len[i] = strtoll(tmp_str, NULL, 10); + pf_hash_node->queue_len[i][arg->dir] = strtoll(tmp_str, NULL, 10); } else { - pf_hash_node->queue_len[i] = 0; + pf_hash_node->queue_len[i][arg->dir] = 0; } } END: - pf_hash_node->last_hmget_ms = curr_time_ms; + pf_hash_node->last_hmget_ms[arg->dir] = curr_time_ms; pf_hash_node->hmget_ref_cnt--; free(cb_arg); cb_arg = NULL; } -static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, struct timespec *curr_timespec, long long curr_time_ms) +static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, enum shaping_packet_dir direction, struct timespec *curr_timespec, long long curr_time_ms) { struct shaping_hmget_cb_arg *arg; int priority = profile->priority; @@ -753,32 +754,36 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st goto END; } - if (curr_time_ms - profile->hash_node->last_hmget_ms < HMGET_REQUEST_INTERVAL_MS) {//don't send hmget command in 10 ms + if (curr_time_ms - profile->hash_node->last_hmget_ms[direction] < HMGET_REQUEST_INTERVAL_MS) {//don't send hmget command in 10 ms goto END; } arg = (struct shaping_hmget_cb_arg *)calloc(1, sizeof(struct shaping_hmget_cb_arg)); arg->ctx = ctx; arg->pf_hash_node = profile->hash_node; + arg->dir = direction; arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; profile->hash_node->hmget_ref_cnt++; shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat); shaper_global_stat_hmget_invoke_inc(&ctx->thread_global_stat); - swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, SWARMKV_QUEUE_LEN_GET_CMD, profile->id); + if (direction == SHAPING_DIR_IN) { + swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, SWARMKV_IN_QUEUE_LEN_GET_CMD, profile->id); + } else { + swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, SWARMKV_OUT_QUEUE_LEN_GET_CMD, profile->id); + } for (int i = 0; i < priority; i++) { - if (profile->hash_node->queue_len[i] > 0) { - profile->hash_node->priority_blocked_time_ms[priority] = curr_time_ms; + if (profile->hash_node->queue_len[i][direction] > 0) { + profile->hash_node->priority_blocked_time_ms[priority][direction] = curr_time_ms; goto END; } } END: - if (curr_time_ms - profile->hash_node->priority_blocked_time_ms[priority] < PRIORITY_BLOCK_MIN_TIME_MS) { - shaper_profile_async_pass_set(profile, SHAPING_DIR_OUT, priority, 0); - shaper_profile_async_pass_set(profile, SHAPING_DIR_IN, priority, 0); + if (curr_time_ms - profile->hash_node->priority_blocked_time_ms[priority][direction] < PRIORITY_BLOCK_MIN_TIME_MS) { + shaper_profile_async_pass_set(profile, direction, priority, 0); return 1; } else { return 0; @@ -865,7 +870,7 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f return ret; } - if (shaper_profile_is_priority_blocked(ctx, sf, profile, curr_timespec, curr_time_ms)) { + if (shaper_profile_is_priority_blocked(ctx, sf, profile, direction, curr_timespec, curr_time_ms)) { return ret; } @@ -963,7 +968,7 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi profile_type = pf_container[i].pf_type; /*AQM process, if aqm not pass, for primary profile drop packet, for borrow profile just don't give token to this packet*/ - if (shaper_aqm_need_drop(profile, pkt_wrapper, &curr_time, latency_us)) { + if (shaper_aqm_need_drop(profile, pkt_wrapper, dir, &curr_time, latency_us)) { if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) { shaper_flow_pop(ctx, sf, dir, &curr_time); goto DROP; diff --git a/shaping/src/shaper_aqm.cpp b/shaping/src/shaper_aqm.cpp index 5f2a860..1de33e9 100644 --- a/shaping/src/shaper_aqm.cpp +++ b/shaping/src/shaper_aqm.cpp @@ -96,7 +96,7 @@ static void shaper_aqm_mark_processed(struct shaping_packet_wrapper *pkt_wrapper } } -int shaper_aqm_need_drop(struct shaping_profile_info *profile, struct shaping_packet_wrapper *pkt_wrapper, struct timespec *curr_time, unsigned long long latency_us) +int shaper_aqm_need_drop(struct shaping_profile_info *profile, struct shaping_packet_wrapper *pkt_wrapper, enum shaping_packet_dir dir, struct timespec *curr_time, unsigned long long latency_us) { int ret = 0; unsigned long long curr_time_ms; @@ -111,7 +111,7 @@ int shaper_aqm_need_drop(struct shaping_profile_info *profile, struct shaping_pa switch (profile->hash_node->aqm_type) { case AQM_TYPE_BLUE: - ret = shaper_aqm_blue_need_drop(profile->id, &profile->hash_node->aqm_blue_para, profile->hash_node->queue_len[profile->priority]); + ret = shaper_aqm_blue_need_drop(profile->id, &profile->hash_node->aqm_blue_para, profile->hash_node->queue_len[profile->priority][dir]); break; case AQM_TYPE_CODEL: curr_time_ms = curr_time->tv_sec * MILLI_SECONDS_PER_SEC + curr_time->tv_nsec / NANO_SECONDS_PER_MILLI_SEC; diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp index ea385f2..e55463e 100644 --- a/shaping/src/shaper_stat.cpp +++ b/shaping/src/shaper_stat.cpp @@ -152,7 +152,8 @@ static void shaper_stat_swarmkv_hincrby_cb(const struct swarmkv_reply *reply, vo shaper_global_stat_async_hincrby_failed_inc(&ctx->thread_global_stat); if (arg->retry_cnt >= HINCRBY_RETRY_MAX) { - LOG_ERROR("%s: shaping stat hincrby failed after retry %d times for profile id %d priority %d, operate queue_len %lld", LOG_TAG_STAT, arg->retry_cnt, arg->profile_id, arg->priority, arg->queue_len); + LOG_ERROR("%s: shaping stat hincrby failed after retry %d times for profile id %d priority %d, operate %s queue_len %lld", + LOG_TAG_STAT, arg->retry_cnt, arg->profile_id, arg->priority, arg->dir == SHAPING_DIR_IN ? "in" : "out", arg->queue_len); goto END; } @@ -161,8 +162,13 @@ static void shaper_stat_swarmkv_hincrby_cb(const struct swarmkv_reply *reply, vo shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat);//hincrby failed, retry shaper_global_stat_hincrby_invoke_inc(&ctx->thread_global_stat); - LOG_DEBUG("%s: shaping stat hincrby failed, retry for profile id %d priority %d, operate queue_len %lld", LOG_TAG_STAT, arg->profile_id, arg->priority, arg->queue_len); - swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, arg, "HINCRBY tsg-shaping-%d priority-%d %lld", arg->profile_id, arg->priority, arg->queue_len); + LOG_DEBUG("%s: shaping stat hincrby failed, retry for profile id %d priority %d, operate %s queue_len %lld", LOG_TAG_STAT, arg->profile_id, arg->priority, arg->dir == SHAPING_DIR_IN ? "in" : "out", arg->queue_len); + + if (arg->dir == SHAPING_DIR_IN) { + swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, arg, "HINCRBY tsg-shaping-%d priority-%d-in %lld", arg->profile_id, arg->priority, arg->queue_len); + } else { + swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, arg, "HINCRBY tsg-shaping-%d priority-%d-out %lld", arg->profile_id, arg->priority, arg->queue_len); + } return; } @@ -172,13 +178,9 @@ END: return; } -static void shaper_stat_priority_queue_len_refresh(struct shaping_thread_ctx *ctx, struct shaping_profile_hash_node *profile_hash_node, int priority, long long curr_time_us) +static void shaper_stat_priority_queue_len_refresh_dir(struct shaping_thread_ctx *ctx, struct shaping_profile_hash_node *profile_hash_node, int priority, enum shaping_packet_dir direction, long long curr_time_us) { - if (profile_hash_node->local_queue_len[priority] == 0) { - return; - } - - if (curr_time_us - profile_hash_node->local_queue_len_update_time_us[priority] < SHAPER_STAT_REFRESH_TIME_US) { + if (profile_hash_node->local_queue_len[priority][direction] == 0) { return; } @@ -188,13 +190,31 @@ static void shaper_stat_priority_queue_len_refresh(struct shaping_thread_ctx *ct arg->start_time_us = curr_time_us; arg->profile_id = profile_hash_node->id; arg->priority = priority; - arg->queue_len = profile_hash_node->local_queue_len[priority]; + arg->dir = direction; + arg->queue_len = profile_hash_node->local_queue_len[priority][direction]; shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat); shaper_global_stat_hincrby_invoke_inc(&ctx->thread_global_stat); - swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, arg, "HINCRBY tsg-shaping-%d priority-%d %lld", arg->profile_id, arg->priority, arg->queue_len); + if (direction == SHAPING_DIR_IN) { + swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, arg, "HINCRBY tsg-shaping-%d priority-%d-in %lld", arg->profile_id, arg->priority, arg->queue_len); + } else { + swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, arg, "HINCRBY tsg-shaping-%d priority-%d-out %lld", arg->profile_id, arg->priority, arg->queue_len); + } + + profile_hash_node->local_queue_len[priority][direction] = 0; + + return; +} + +static void shaper_stat_priority_queue_len_refresh(struct shaping_thread_ctx *ctx, struct shaping_profile_hash_node *profile_hash_node, int priority, long long curr_time_us) +{ + if (curr_time_us - profile_hash_node->local_queue_len_update_time_us[priority] < SHAPER_STAT_REFRESH_TIME_US) { + return; + } + + shaper_stat_priority_queue_len_refresh_dir(ctx, profile_hash_node, priority, SHAPING_DIR_IN, curr_time_us); + shaper_stat_priority_queue_len_refresh_dir(ctx, profile_hash_node, priority, SHAPING_DIR_OUT, curr_time_us); profile_hash_node->local_queue_len_update_time_us[priority] = curr_time_us; - profile_hash_node->local_queue_len[priority] = 0; return; } @@ -223,8 +243,10 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, s unsigned long long old_latency; if (need_update_guage) { - profile->hash_node->local_queue_len[priority] += profile_stat->priority_queue_len; - profile_stat->priority_queue_len = 0; + profile->hash_node->local_queue_len[priority][SHAPING_DIR_IN] += profile_stat->priority_queue_len[SHAPING_DIR_IN]; + profile->hash_node->local_queue_len[priority][SHAPING_DIR_OUT] += profile_stat->priority_queue_len[SHAPING_DIR_OUT]; + profile_stat->priority_queue_len[SHAPING_DIR_IN] = 0; + profile_stat->priority_queue_len[SHAPING_DIR_OUT] = 0; shaper_stat_priority_queue_len_refresh(ctx, profile->hash_node, priority, curr_time_us); } @@ -358,7 +380,7 @@ void shaper_stat_queueing_pkt_inc(struct shaping_stat_for_profile *profile_stat, profile_stat->out.queue_len++; } - profile_stat->priority_queue_len++; + profile_stat->priority_queue_len[direction]++; return; } @@ -371,7 +393,7 @@ void shaper_stat_queueing_pkt_dec(struct shaping_stat_for_profile *profile_stat, profile_stat->out.queue_len--; } - profile_stat->priority_queue_len--; + profile_stat->priority_queue_len[direction]--; return; } |
