summaryrefslogtreecommitdiff
path: root/shaping/src/shaper.cpp
diff options
context:
space:
mode:
author刘畅 <[email protected]>2024-06-14 01:38:18 +0000
committer刘畅 <[email protected]>2024-06-14 01:38:18 +0000
commit23ddf75eaad60fd42693dbf6b9558806247dc519 (patch)
treee3251f57fda271f7b1bfc1f4f36514591081999f /shaping/src/shaper.cpp
parentf91407a5524365bb93dc6e8f96ef2b08ef3fe8a0 (diff)
parentcfc13ad17d6dd65239b6acc85417fdd804d3d267 (diff)
Merge branch 'separate_swarmkv_priority_len_in_out' into 'rel'v3.1.38
separate in out direction for queue_len stored in swarmkv See merge request tango/shaping-engine!98
Diffstat (limited to 'shaping/src/shaper.cpp')
-rw-r--r--shaping/src/shaper.cpp77
1 files changed, 41 insertions, 36 deletions
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index b55807e..89e5d1b 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -35,15 +35,16 @@ extern "C" {
#define SWARMKV_CALLER_LOOP_DIVISOR_MIN 1
#define SWARMKV_CALLER_LOOP_DIVISOR_MAX 10
-#define SWARMKV_QUEUE_LEN_GET_CMD "HMGET tsg-shaping-%d priority-0 priority-1 priority-2 priority-3 priority-4 priority-5 priority-6 priority-7 priority-8 priority-9"
+#define SWARMKV_IN_QUEUE_LEN_GET_CMD "HMGET tsg-shaping-%d priority-0-in priority-1-in priority-2-in priority-3-in priority-4-in priority-5-in priority-6-in priority-7-in priority-8-in priority-9-in"
+#define SWARMKV_OUT_QUEUE_LEN_GET_CMD "HMGET tsg-shaping-%d priority-0-out priority-1-out priority-2-out priority-3-out priority-4-out priority-5-out priority-6-out priority-7-out priority-8-out priority-9-out"
struct shaper {//trees in one thread
- struct avl_tree *priority_trees[SHAPING_DIR_MAX][SHAPING_PRIORITY_NUM_MAX];//represent 10 avl tree corresponding to 10 priority
+ struct avl_tree *priority_trees[SHAPING_PRIORITY_NUM_MAX][SHAPING_DIR_MAX];//represent 10 avl tree corresponding to 10 priority
};
struct shaping_node {//a session will have 10 nodes, corresponding 10 avl tree
struct shaping_flow shaping_flow;
- struct avl_node *avl_node[SHAPING_DIR_MAX][SHAPING_PRIORITY_NUM_MAX];
+ struct avl_node *avl_node[SHAPING_PRIORITY_NUM_MAX][SHAPING_DIR_MAX];
};
struct shaping_profile_container {
@@ -69,8 +70,8 @@ struct shaper* shaper_new(unsigned int priority_queue_len_max)
goto ERROR;
}
- for (int i = 0; i < SHAPING_DIR_MAX; i++) {
- for (int j = 0; j < SHAPING_PRIORITY_NUM_MAX; j++) {
+ for (int i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) {
+ for (int j = 0; j < SHAPING_DIR_MAX; j++) {
sp->priority_trees[i][j] = avl_tree_init(priority_queue_len_max);
if (!sp->priority_trees[i][j]) {
goto ERROR;
@@ -88,8 +89,8 @@ ERROR:
void shaper_free(struct shaper *sp)
{
if (sp) {
- for (int i = 0; i < SHAPING_DIR_MAX; i++) {
- for (int j = 0; j < SHAPING_PRIORITY_NUM_MAX; j++) {
+ for (int i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) {
+ for (int j = 0; j < SHAPING_DIR_MAX; j++) {
if (sp->priority_trees[i][j]) {
avl_tree_destroy(sp->priority_trees[i][j]);
}
@@ -104,8 +105,8 @@ void shaper_free(struct shaper *sp)
static void shaping_node_free(struct shaping_node *s_node)
{
if (s_node) {
- for (int i = 0; i < SHAPING_DIR_MAX; i++) {
- for (int j = 0; j < SHAPING_PRIORITY_NUM_MAX; j++) {
+ for (int i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) {
+ for (int j = 0; j < SHAPING_DIR_MAX; j++) {
if (s_node->avl_node[i][j]) {
avl_tree_node_free(s_node->avl_node[i][j]);
}
@@ -135,8 +136,8 @@ struct shaping_flow* shaping_flow_new(struct shaping_thread_ctx *ctx)
goto ERROR;
}
- for (int i = 0; i < SHAPING_DIR_MAX; i++) {
- for (int j = 0; j < SHAPING_PRIORITY_NUM_MAX; j++) {
+ for (int i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) {
+ for (int j = 0; j < SHAPING_DIR_MAX; j++) {
s_node->avl_node[i][j] = avl_tree_node_new(0, &s_node->shaping_flow, NULL);
if (!s_node->avl_node[i][j]) {
goto ERROR;
@@ -266,8 +267,8 @@ static int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow
assert(pkt_wrapper != NULL);
priority = s_rule_info->primary.priority;
- avl_tree_node_key_set(s_node->avl_node[dir][priority], pkt_wrapper->income_time_ns);
- if (0 != avl_tree_node_insert(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority])) {//primary profile failed means flow push failed, ignore borrow profile
+ avl_tree_node_key_set(s_node->avl_node[priority][dir], pkt_wrapper->income_time_ns);
+ if (0 != avl_tree_node_insert(sp->priority_trees[priority][dir], s_node->avl_node[priority][dir])) {//primary profile failed means flow push failed, ignore borrow profile
return -1;
}
@@ -277,8 +278,8 @@ static int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow
for (i = 0; i < s_rule_info->borrowing_num; i++) {
priority = s_rule_info->borrowing[i].priority;
- avl_tree_node_key_set(s_node->avl_node[dir][priority], pkt_wrapper->income_time_ns);
- if (0 == avl_tree_node_insert(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority])) {
+ avl_tree_node_key_set(s_node->avl_node[priority][dir], pkt_wrapper->income_time_ns);
+ if (0 == avl_tree_node_insert(sp->priority_trees[priority][dir], s_node->avl_node[priority][dir])) {
shaper_stat_queueing_pkt_inc(&s_rule_info->borrowing[i].stat, dir, ctx->thread_index);
}
}
@@ -313,8 +314,8 @@ static void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow
assert(pkt_wrapper != NULL);
priority = s_rule_info->primary.priority;
- if (avl_node_in_tree(s_node->avl_node[dir][priority])) {
- avl_tree_node_remove(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority]);
+ if (avl_node_in_tree(s_node->avl_node[priority][dir])) {
+ avl_tree_node_remove(sp->priority_trees[priority][dir], s_node->avl_node[priority][dir]);
}
if (s_rule_info->borrowing_num == 0) {
@@ -323,8 +324,8 @@ static void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow
for (i = 0; i < s_rule_info->borrowing_num; i++) {
priority = s_rule_info->borrowing[i].priority;
- if (avl_node_in_tree(s_node->avl_node[dir][priority])) {
- avl_tree_node_remove(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority]);
+ if (avl_node_in_tree(s_node->avl_node[priority][dir])) {
+ avl_tree_node_remove(sp->priority_trees[priority][dir], s_node->avl_node[priority][dir]);
shaper_stat_queueing_pkt_dec(&s_rule_info->borrowing[i].stat, dir, ctx->thread_index);
}
}
@@ -349,8 +350,8 @@ static void shaper_flow_specific_borrow_priority_pop(struct shaping_thread_ctx *
for (int i = 0; i < s_rule_info->borrowing_num; i++) {
if (priority == s_rule_info->borrowing[i].priority) {
- if (avl_node_in_tree(s_node->avl_node[dir][priority])) {
- avl_tree_node_remove(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority]);
+ if (avl_node_in_tree(s_node->avl_node[priority][dir])) {
+ avl_tree_node_remove(sp->priority_trees[priority][dir], s_node->avl_node[priority][dir]);
shaper_stat_queueing_pkt_dec(&s_rule_info->borrowing[i].stat, dir, ctx->thread_index);
}
}
@@ -368,7 +369,7 @@ static int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instan
return 0;
}
- avl_node = avl_tree_minimum_node_get(sp->priority_trees[dir][priority]);
+ avl_node = avl_tree_minimum_node_get(sp->priority_trees[priority][dir]);
while(avl_node) {
sf_ins[count].sf = (struct shaping_flow*)avl_tree_node_data_get(avl_node);
sf_ins[count].priority = priority;
@@ -693,7 +694,7 @@ static void shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb_arg)
{
struct shaping_hmget_cb_arg *arg = (struct shaping_hmget_cb_arg *)cb_arg;
- struct shaping_thread_ctx *ctx = arg->ctx;
+ struct shaping_thread_ctx *ctx = arg->ctx;
struct shaping_profile_hash_node *pf_hash_node = arg->pf_hash_node;
struct timespec curr_time;
long long curr_time_us;
@@ -722,21 +723,21 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb
if (reply->elements[i]->type == SWARMKV_REPLY_STRING) {
char tmp_str[32] = {0};
memcpy(tmp_str, reply->elements[i]->str, reply->elements[i]->len);
- pf_hash_node->queue_len[i] = strtoll(tmp_str, NULL, 10);
+ pf_hash_node->queue_len[i][arg->dir] = strtoll(tmp_str, NULL, 10);
} else {
- pf_hash_node->queue_len[i] = 0;
+ pf_hash_node->queue_len[i][arg->dir] = 0;
}
}
END:
- pf_hash_node->last_hmget_ms = curr_time_ms;
+ pf_hash_node->last_hmget_ms[arg->dir] = curr_time_ms;
pf_hash_node->hmget_ref_cnt--;
free(cb_arg);
cb_arg = NULL;
}
-static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, struct timespec *curr_timespec, long long curr_time_ms)
+static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, enum shaping_packet_dir direction, struct timespec *curr_timespec, long long curr_time_ms)
{
struct shaping_hmget_cb_arg *arg;
int priority = profile->priority;
@@ -753,32 +754,36 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st
goto END;
}
- if (curr_time_ms - profile->hash_node->last_hmget_ms < HMGET_REQUEST_INTERVAL_MS) {//don't send hmget command in 10 ms
+ if (curr_time_ms - profile->hash_node->last_hmget_ms[direction] < HMGET_REQUEST_INTERVAL_MS) {//don't send hmget command in 10 ms
goto END;
}
arg = (struct shaping_hmget_cb_arg *)calloc(1, sizeof(struct shaping_hmget_cb_arg));
arg->ctx = ctx;
arg->pf_hash_node = profile->hash_node;
+ arg->dir = direction;
arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
profile->hash_node->hmget_ref_cnt++;
shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat);
shaper_global_stat_hmget_invoke_inc(&ctx->thread_global_stat);
- swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, SWARMKV_QUEUE_LEN_GET_CMD, profile->id);
+ if (direction == SHAPING_DIR_IN) {
+ swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, SWARMKV_IN_QUEUE_LEN_GET_CMD, profile->id);
+ } else {
+ swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, SWARMKV_OUT_QUEUE_LEN_GET_CMD, profile->id);
+ }
for (int i = 0; i < priority; i++) {
- if (profile->hash_node->queue_len[i] > 0) {
- profile->hash_node->priority_blocked_time_ms[priority] = curr_time_ms;
+ if (profile->hash_node->queue_len[i][direction] > 0) {
+ profile->hash_node->priority_blocked_time_ms[priority][direction] = curr_time_ms;
goto END;
}
}
END:
- if (curr_time_ms - profile->hash_node->priority_blocked_time_ms[priority] < PRIORITY_BLOCK_MIN_TIME_MS) {
- shaper_profile_async_pass_set(profile, SHAPING_DIR_OUT, priority, 0);
- shaper_profile_async_pass_set(profile, SHAPING_DIR_IN, priority, 0);
+ if (curr_time_ms - profile->hash_node->priority_blocked_time_ms[priority][direction] < PRIORITY_BLOCK_MIN_TIME_MS) {
+ shaper_profile_async_pass_set(profile, direction, priority, 0);
return 1;
} else {
return 0;
@@ -865,7 +870,7 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f
return ret;
}
- if (shaper_profile_is_priority_blocked(ctx, sf, profile, curr_timespec, curr_time_ms)) {
+ if (shaper_profile_is_priority_blocked(ctx, sf, profile, direction, curr_timespec, curr_time_ms)) {
return ret;
}
@@ -963,7 +968,7 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi
profile_type = pf_container[i].pf_type;
/*AQM process, if aqm not pass, for primary profile drop packet, for borrow profile just don't give token to this packet*/
- if (shaper_aqm_need_drop(profile, pkt_wrapper, &curr_time, latency_us)) {
+ if (shaper_aqm_need_drop(profile, pkt_wrapper, dir, &curr_time, latency_us)) {
if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {
shaper_flow_pop(ctx, sf, dir, &curr_time);
goto DROP;