summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorroot <[email protected]>2024-06-13 09:59:24 +0000
committerroot <[email protected]>2024-06-13 09:59:24 +0000
commitcfc13ad17d6dd65239b6acc85417fdd804d3d267 (patch)
treee3251f57fda271f7b1bfc1f4f36514591081999f
parentf91407a5524365bb93dc6e8f96ef2b08ef3fe8a0 (diff)
separate in out direction for queue_len stored in swarmkv
-rw-r--r--shaping/include/shaper.h16
-rw-r--r--shaping/include/shaper_aqm.h4
-rw-r--r--shaping/include/shaper_stat.h8
-rw-r--r--shaping/src/shaper.cpp77
-rw-r--r--shaping/src/shaper_aqm.cpp4
-rw-r--r--shaping/src/shaper_stat.cpp54
-rw-r--r--shaping/test/gtest_shaper.cpp88
-rw-r--r--shaping/test/stub.cpp30
8 files changed, 209 insertions, 72 deletions
diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h
index 9a1dca6..5e2f7c0 100644
--- a/shaping/include/shaper.h
+++ b/shaping/include/shaper.h
@@ -76,12 +76,6 @@ struct shaping_ctx {
struct shaping_thread_ctx *thread_ctx;
};
-enum shaping_packet_dir {
- SHAPING_DIR_IN = 0,
- SHAPING_DIR_OUT,
- SHAPING_DIR_MAX
-};
-
enum shaping_packet_action {
SHAPING_FORWARD = 0,
SHAPING_QUEUED,
@@ -120,11 +114,11 @@ struct shaping_profile_hash_node {
long long out_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX];
long long bidirection_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX];
long long last_failed_get_token_ms[SHAPING_DIR_MAX];
- long long last_hmget_ms;
- long long queue_len[SHAPING_PRIORITY_NUM_MAX];
- long long local_queue_len[SHAPING_PRIORITY_NUM_MAX];
+ long long last_hmget_ms[SHAPING_DIR_MAX];
+ long long queue_len[SHAPING_PRIORITY_NUM_MAX][SHAPING_DIR_MAX];
+ long long local_queue_len[SHAPING_PRIORITY_NUM_MAX][SHAPING_DIR_MAX];
long long local_queue_len_update_time_us[SHAPING_PRIORITY_NUM_MAX];
- long long priority_blocked_time_ms[SHAPING_PRIORITY_NUM_MAX];
+ long long priority_blocked_time_ms[SHAPING_PRIORITY_NUM_MAX][SHAPING_DIR_MAX];
int hmget_ref_cnt;
int tconsume_ref_cnt;
unsigned long long last_refresh_time_ms;
@@ -221,6 +215,7 @@ struct shaping_tconsume_cb_arg {
struct shaping_hmget_cb_arg {
struct shaping_thread_ctx *ctx;
struct shaping_profile_hash_node *pf_hash_node;
+ enum shaping_packet_dir dir;
long long start_time_us;
};
@@ -228,6 +223,7 @@ struct shaping_hincrby_cb_arg {
struct shaping_thread_ctx *ctx;
long long start_time_us;
long long queue_len;
+ enum shaping_packet_dir dir;
int profile_id;
int priority;
int retry_cnt;
diff --git a/shaping/include/shaper_aqm.h b/shaping/include/shaper_aqm.h
index 8575076..2367d92 100644
--- a/shaping/include/shaper_aqm.h
+++ b/shaping/include/shaper_aqm.h
@@ -1,6 +1,8 @@
#pragma once
#include <time.h>
+#include "shaper_stat.h"
+
#define BLUE_PROBABILITY_MAX 100
#define BLUE_INCREMENT 10
#define BLUE_DECREMENT 1
@@ -34,6 +36,6 @@ struct shaper_aqm_codel_para {
unsigned int drop_count;
};
-int shaper_aqm_need_drop(struct shaping_profile_info *profile, struct shaping_packet_wrapper *pkt_wrapper, struct timespec *curr_time, unsigned long long latency_us);
+int shaper_aqm_need_drop(struct shaping_profile_info *profile, struct shaping_packet_wrapper *pkt_wrapper, enum shaping_packet_dir dir, struct timespec *curr_time, unsigned long long latency_us);
int shaper_aqm_blue_need_drop(int profile_id, struct shaper_aqm_blue_para *para, int curr_queue_len);
int shaper_aqm_codel_need_drop(int profile_id, struct shaper_aqm_codel_para *para, unsigned long long curr_time_ms, unsigned long long latency_ms); \ No newline at end of file
diff --git a/shaping/include/shaper_stat.h b/shaping/include/shaper_stat.h
index 5c720a3..e6feeb3 100644
--- a/shaping/include/shaper_stat.h
+++ b/shaping/include/shaper_stat.h
@@ -4,6 +4,12 @@
#include "uthash.h"
#include <fieldstat.h>
+enum shaping_packet_dir {
+ SHAPING_DIR_IN = 0,
+ SHAPING_DIR_OUT,
+ SHAPING_DIR_MAX
+};
+
enum shaping_stat_tags_index {
TAG_VSYS_ID_IDX = 0,
TAG_RULE_ID_IDX,
@@ -37,7 +43,7 @@ struct shaping_stat_for_profile_dir {
struct shaping_stat_for_profile {
struct shaping_stat_for_profile_dir in;
struct shaping_stat_for_profile_dir out;
- long long priority_queue_len;
+ long long priority_queue_len[SHAPING_DIR_MAX];
};
struct shaping_stat {
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index b55807e..89e5d1b 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -35,15 +35,16 @@ extern "C" {
#define SWARMKV_CALLER_LOOP_DIVISOR_MIN 1
#define SWARMKV_CALLER_LOOP_DIVISOR_MAX 10
-#define SWARMKV_QUEUE_LEN_GET_CMD "HMGET tsg-shaping-%d priority-0 priority-1 priority-2 priority-3 priority-4 priority-5 priority-6 priority-7 priority-8 priority-9"
+#define SWARMKV_IN_QUEUE_LEN_GET_CMD "HMGET tsg-shaping-%d priority-0-in priority-1-in priority-2-in priority-3-in priority-4-in priority-5-in priority-6-in priority-7-in priority-8-in priority-9-in"
+#define SWARMKV_OUT_QUEUE_LEN_GET_CMD "HMGET tsg-shaping-%d priority-0-out priority-1-out priority-2-out priority-3-out priority-4-out priority-5-out priority-6-out priority-7-out priority-8-out priority-9-out"
struct shaper {//trees in one thread
- struct avl_tree *priority_trees[SHAPING_DIR_MAX][SHAPING_PRIORITY_NUM_MAX];//represent 10 avl tree corresponding to 10 priority
+ struct avl_tree *priority_trees[SHAPING_PRIORITY_NUM_MAX][SHAPING_DIR_MAX];//represent 10 avl tree corresponding to 10 priority
};
struct shaping_node {//a session will have 10 nodes, corresponding 10 avl tree
struct shaping_flow shaping_flow;
- struct avl_node *avl_node[SHAPING_DIR_MAX][SHAPING_PRIORITY_NUM_MAX];
+ struct avl_node *avl_node[SHAPING_PRIORITY_NUM_MAX][SHAPING_DIR_MAX];
};
struct shaping_profile_container {
@@ -69,8 +70,8 @@ struct shaper* shaper_new(unsigned int priority_queue_len_max)
goto ERROR;
}
- for (int i = 0; i < SHAPING_DIR_MAX; i++) {
- for (int j = 0; j < SHAPING_PRIORITY_NUM_MAX; j++) {
+ for (int i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) {
+ for (int j = 0; j < SHAPING_DIR_MAX; j++) {
sp->priority_trees[i][j] = avl_tree_init(priority_queue_len_max);
if (!sp->priority_trees[i][j]) {
goto ERROR;
@@ -88,8 +89,8 @@ ERROR:
void shaper_free(struct shaper *sp)
{
if (sp) {
- for (int i = 0; i < SHAPING_DIR_MAX; i++) {
- for (int j = 0; j < SHAPING_PRIORITY_NUM_MAX; j++) {
+ for (int i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) {
+ for (int j = 0; j < SHAPING_DIR_MAX; j++) {
if (sp->priority_trees[i][j]) {
avl_tree_destroy(sp->priority_trees[i][j]);
}
@@ -104,8 +105,8 @@ void shaper_free(struct shaper *sp)
static void shaping_node_free(struct shaping_node *s_node)
{
if (s_node) {
- for (int i = 0; i < SHAPING_DIR_MAX; i++) {
- for (int j = 0; j < SHAPING_PRIORITY_NUM_MAX; j++) {
+ for (int i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) {
+ for (int j = 0; j < SHAPING_DIR_MAX; j++) {
if (s_node->avl_node[i][j]) {
avl_tree_node_free(s_node->avl_node[i][j]);
}
@@ -135,8 +136,8 @@ struct shaping_flow* shaping_flow_new(struct shaping_thread_ctx *ctx)
goto ERROR;
}
- for (int i = 0; i < SHAPING_DIR_MAX; i++) {
- for (int j = 0; j < SHAPING_PRIORITY_NUM_MAX; j++) {
+ for (int i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) {
+ for (int j = 0; j < SHAPING_DIR_MAX; j++) {
s_node->avl_node[i][j] = avl_tree_node_new(0, &s_node->shaping_flow, NULL);
if (!s_node->avl_node[i][j]) {
goto ERROR;
@@ -266,8 +267,8 @@ static int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow
assert(pkt_wrapper != NULL);
priority = s_rule_info->primary.priority;
- avl_tree_node_key_set(s_node->avl_node[dir][priority], pkt_wrapper->income_time_ns);
- if (0 != avl_tree_node_insert(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority])) {//primary profile failed means flow push failed, ignore borrow profile
+ avl_tree_node_key_set(s_node->avl_node[priority][dir], pkt_wrapper->income_time_ns);
+ if (0 != avl_tree_node_insert(sp->priority_trees[priority][dir], s_node->avl_node[priority][dir])) {//primary profile failed means flow push failed, ignore borrow profile
return -1;
}
@@ -277,8 +278,8 @@ static int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow
for (i = 0; i < s_rule_info->borrowing_num; i++) {
priority = s_rule_info->borrowing[i].priority;
- avl_tree_node_key_set(s_node->avl_node[dir][priority], pkt_wrapper->income_time_ns);
- if (0 == avl_tree_node_insert(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority])) {
+ avl_tree_node_key_set(s_node->avl_node[priority][dir], pkt_wrapper->income_time_ns);
+ if (0 == avl_tree_node_insert(sp->priority_trees[priority][dir], s_node->avl_node[priority][dir])) {
shaper_stat_queueing_pkt_inc(&s_rule_info->borrowing[i].stat, dir, ctx->thread_index);
}
}
@@ -313,8 +314,8 @@ static void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow
assert(pkt_wrapper != NULL);
priority = s_rule_info->primary.priority;
- if (avl_node_in_tree(s_node->avl_node[dir][priority])) {
- avl_tree_node_remove(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority]);
+ if (avl_node_in_tree(s_node->avl_node[priority][dir])) {
+ avl_tree_node_remove(sp->priority_trees[priority][dir], s_node->avl_node[priority][dir]);
}
if (s_rule_info->borrowing_num == 0) {
@@ -323,8 +324,8 @@ static void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow
for (i = 0; i < s_rule_info->borrowing_num; i++) {
priority = s_rule_info->borrowing[i].priority;
- if (avl_node_in_tree(s_node->avl_node[dir][priority])) {
- avl_tree_node_remove(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority]);
+ if (avl_node_in_tree(s_node->avl_node[priority][dir])) {
+ avl_tree_node_remove(sp->priority_trees[priority][dir], s_node->avl_node[priority][dir]);
shaper_stat_queueing_pkt_dec(&s_rule_info->borrowing[i].stat, dir, ctx->thread_index);
}
}
@@ -349,8 +350,8 @@ static void shaper_flow_specific_borrow_priority_pop(struct shaping_thread_ctx *
for (int i = 0; i < s_rule_info->borrowing_num; i++) {
if (priority == s_rule_info->borrowing[i].priority) {
- if (avl_node_in_tree(s_node->avl_node[dir][priority])) {
- avl_tree_node_remove(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority]);
+ if (avl_node_in_tree(s_node->avl_node[priority][dir])) {
+ avl_tree_node_remove(sp->priority_trees[priority][dir], s_node->avl_node[priority][dir]);
shaper_stat_queueing_pkt_dec(&s_rule_info->borrowing[i].stat, dir, ctx->thread_index);
}
}
@@ -368,7 +369,7 @@ static int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instan
return 0;
}
- avl_node = avl_tree_minimum_node_get(sp->priority_trees[dir][priority]);
+ avl_node = avl_tree_minimum_node_get(sp->priority_trees[priority][dir]);
while(avl_node) {
sf_ins[count].sf = (struct shaping_flow*)avl_tree_node_data_get(avl_node);
sf_ins[count].priority = priority;
@@ -693,7 +694,7 @@ static void shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb_arg)
{
struct shaping_hmget_cb_arg *arg = (struct shaping_hmget_cb_arg *)cb_arg;
- struct shaping_thread_ctx *ctx = arg->ctx;
+ struct shaping_thread_ctx *ctx = arg->ctx;
struct shaping_profile_hash_node *pf_hash_node = arg->pf_hash_node;
struct timespec curr_time;
long long curr_time_us;
@@ -722,21 +723,21 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb
if (reply->elements[i]->type == SWARMKV_REPLY_STRING) {
char tmp_str[32] = {0};
memcpy(tmp_str, reply->elements[i]->str, reply->elements[i]->len);
- pf_hash_node->queue_len[i] = strtoll(tmp_str, NULL, 10);
+ pf_hash_node->queue_len[i][arg->dir] = strtoll(tmp_str, NULL, 10);
} else {
- pf_hash_node->queue_len[i] = 0;
+ pf_hash_node->queue_len[i][arg->dir] = 0;
}
}
END:
- pf_hash_node->last_hmget_ms = curr_time_ms;
+ pf_hash_node->last_hmget_ms[arg->dir] = curr_time_ms;
pf_hash_node->hmget_ref_cnt--;
free(cb_arg);
cb_arg = NULL;
}
-static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, struct timespec *curr_timespec, long long curr_time_ms)
+static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, enum shaping_packet_dir direction, struct timespec *curr_timespec, long long curr_time_ms)
{
struct shaping_hmget_cb_arg *arg;
int priority = profile->priority;
@@ -753,32 +754,36 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st
goto END;
}
- if (curr_time_ms - profile->hash_node->last_hmget_ms < HMGET_REQUEST_INTERVAL_MS) {//don't send hmget command in 10 ms
+ if (curr_time_ms - profile->hash_node->last_hmget_ms[direction] < HMGET_REQUEST_INTERVAL_MS) {//don't send hmget command in 10 ms
goto END;
}
arg = (struct shaping_hmget_cb_arg *)calloc(1, sizeof(struct shaping_hmget_cb_arg));
arg->ctx = ctx;
arg->pf_hash_node = profile->hash_node;
+ arg->dir = direction;
arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
profile->hash_node->hmget_ref_cnt++;
shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat);
shaper_global_stat_hmget_invoke_inc(&ctx->thread_global_stat);
- swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, SWARMKV_QUEUE_LEN_GET_CMD, profile->id);
+ if (direction == SHAPING_DIR_IN) {
+ swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, SWARMKV_IN_QUEUE_LEN_GET_CMD, profile->id);
+ } else {
+ swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, SWARMKV_OUT_QUEUE_LEN_GET_CMD, profile->id);
+ }
for (int i = 0; i < priority; i++) {
- if (profile->hash_node->queue_len[i] > 0) {
- profile->hash_node->priority_blocked_time_ms[priority] = curr_time_ms;
+ if (profile->hash_node->queue_len[i][direction] > 0) {
+ profile->hash_node->priority_blocked_time_ms[priority][direction] = curr_time_ms;
goto END;
}
}
END:
- if (curr_time_ms - profile->hash_node->priority_blocked_time_ms[priority] < PRIORITY_BLOCK_MIN_TIME_MS) {
- shaper_profile_async_pass_set(profile, SHAPING_DIR_OUT, priority, 0);
- shaper_profile_async_pass_set(profile, SHAPING_DIR_IN, priority, 0);
+ if (curr_time_ms - profile->hash_node->priority_blocked_time_ms[priority][direction] < PRIORITY_BLOCK_MIN_TIME_MS) {
+ shaper_profile_async_pass_set(profile, direction, priority, 0);
return 1;
} else {
return 0;
@@ -865,7 +870,7 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f
return ret;
}
- if (shaper_profile_is_priority_blocked(ctx, sf, profile, curr_timespec, curr_time_ms)) {
+ if (shaper_profile_is_priority_blocked(ctx, sf, profile, direction, curr_timespec, curr_time_ms)) {
return ret;
}
@@ -963,7 +968,7 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi
profile_type = pf_container[i].pf_type;
/*AQM process, if aqm not pass, for primary profile drop packet, for borrow profile just don't give token to this packet*/
- if (shaper_aqm_need_drop(profile, pkt_wrapper, &curr_time, latency_us)) {
+ if (shaper_aqm_need_drop(profile, pkt_wrapper, dir, &curr_time, latency_us)) {
if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {
shaper_flow_pop(ctx, sf, dir, &curr_time);
goto DROP;
diff --git a/shaping/src/shaper_aqm.cpp b/shaping/src/shaper_aqm.cpp
index 5f2a860..1de33e9 100644
--- a/shaping/src/shaper_aqm.cpp
+++ b/shaping/src/shaper_aqm.cpp
@@ -96,7 +96,7 @@ static void shaper_aqm_mark_processed(struct shaping_packet_wrapper *pkt_wrapper
}
}
-int shaper_aqm_need_drop(struct shaping_profile_info *profile, struct shaping_packet_wrapper *pkt_wrapper, struct timespec *curr_time, unsigned long long latency_us)
+int shaper_aqm_need_drop(struct shaping_profile_info *profile, struct shaping_packet_wrapper *pkt_wrapper, enum shaping_packet_dir dir, struct timespec *curr_time, unsigned long long latency_us)
{
int ret = 0;
unsigned long long curr_time_ms;
@@ -111,7 +111,7 @@ int shaper_aqm_need_drop(struct shaping_profile_info *profile, struct shaping_pa
switch (profile->hash_node->aqm_type) {
case AQM_TYPE_BLUE:
- ret = shaper_aqm_blue_need_drop(profile->id, &profile->hash_node->aqm_blue_para, profile->hash_node->queue_len[profile->priority]);
+ ret = shaper_aqm_blue_need_drop(profile->id, &profile->hash_node->aqm_blue_para, profile->hash_node->queue_len[profile->priority][dir]);
break;
case AQM_TYPE_CODEL:
curr_time_ms = curr_time->tv_sec * MILLI_SECONDS_PER_SEC + curr_time->tv_nsec / NANO_SECONDS_PER_MILLI_SEC;
diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp
index ea385f2..e55463e 100644
--- a/shaping/src/shaper_stat.cpp
+++ b/shaping/src/shaper_stat.cpp
@@ -152,7 +152,8 @@ static void shaper_stat_swarmkv_hincrby_cb(const struct swarmkv_reply *reply, vo
shaper_global_stat_async_hincrby_failed_inc(&ctx->thread_global_stat);
if (arg->retry_cnt >= HINCRBY_RETRY_MAX) {
- LOG_ERROR("%s: shaping stat hincrby failed after retry %d times for profile id %d priority %d, operate queue_len %lld", LOG_TAG_STAT, arg->retry_cnt, arg->profile_id, arg->priority, arg->queue_len);
+ LOG_ERROR("%s: shaping stat hincrby failed after retry %d times for profile id %d priority %d, operate %s queue_len %lld",
+ LOG_TAG_STAT, arg->retry_cnt, arg->profile_id, arg->priority, arg->dir == SHAPING_DIR_IN ? "in" : "out", arg->queue_len);
goto END;
}
@@ -161,8 +162,13 @@ static void shaper_stat_swarmkv_hincrby_cb(const struct swarmkv_reply *reply, vo
shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat);//hincrby failed, retry
shaper_global_stat_hincrby_invoke_inc(&ctx->thread_global_stat);
- LOG_DEBUG("%s: shaping stat hincrby failed, retry for profile id %d priority %d, operate queue_len %lld", LOG_TAG_STAT, arg->profile_id, arg->priority, arg->queue_len);
- swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, arg, "HINCRBY tsg-shaping-%d priority-%d %lld", arg->profile_id, arg->priority, arg->queue_len);
+ LOG_DEBUG("%s: shaping stat hincrby failed, retry for profile id %d priority %d, operate %s queue_len %lld", LOG_TAG_STAT, arg->profile_id, arg->priority, arg->dir == SHAPING_DIR_IN ? "in" : "out", arg->queue_len);
+
+ if (arg->dir == SHAPING_DIR_IN) {
+ swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, arg, "HINCRBY tsg-shaping-%d priority-%d-in %lld", arg->profile_id, arg->priority, arg->queue_len);
+ } else {
+ swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, arg, "HINCRBY tsg-shaping-%d priority-%d-out %lld", arg->profile_id, arg->priority, arg->queue_len);
+ }
return;
}
@@ -172,13 +178,9 @@ END:
return;
}
-static void shaper_stat_priority_queue_len_refresh(struct shaping_thread_ctx *ctx, struct shaping_profile_hash_node *profile_hash_node, int priority, long long curr_time_us)
+static void shaper_stat_priority_queue_len_refresh_dir(struct shaping_thread_ctx *ctx, struct shaping_profile_hash_node *profile_hash_node, int priority, enum shaping_packet_dir direction, long long curr_time_us)
{
- if (profile_hash_node->local_queue_len[priority] == 0) {
- return;
- }
-
- if (curr_time_us - profile_hash_node->local_queue_len_update_time_us[priority] < SHAPER_STAT_REFRESH_TIME_US) {
+ if (profile_hash_node->local_queue_len[priority][direction] == 0) {
return;
}
@@ -188,13 +190,31 @@ static void shaper_stat_priority_queue_len_refresh(struct shaping_thread_ctx *ct
arg->start_time_us = curr_time_us;
arg->profile_id = profile_hash_node->id;
arg->priority = priority;
- arg->queue_len = profile_hash_node->local_queue_len[priority];
+ arg->dir = direction;
+ arg->queue_len = profile_hash_node->local_queue_len[priority][direction];
shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat);
shaper_global_stat_hincrby_invoke_inc(&ctx->thread_global_stat);
- swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, arg, "HINCRBY tsg-shaping-%d priority-%d %lld", arg->profile_id, arg->priority, arg->queue_len);
+ if (direction == SHAPING_DIR_IN) {
+ swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, arg, "HINCRBY tsg-shaping-%d priority-%d-in %lld", arg->profile_id, arg->priority, arg->queue_len);
+ } else {
+ swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, arg, "HINCRBY tsg-shaping-%d priority-%d-out %lld", arg->profile_id, arg->priority, arg->queue_len);
+ }
+
+ profile_hash_node->local_queue_len[priority][direction] = 0;
+
+ return;
+}
+
+static void shaper_stat_priority_queue_len_refresh(struct shaping_thread_ctx *ctx, struct shaping_profile_hash_node *profile_hash_node, int priority, long long curr_time_us)
+{
+ if (curr_time_us - profile_hash_node->local_queue_len_update_time_us[priority] < SHAPER_STAT_REFRESH_TIME_US) {
+ return;
+ }
+
+ shaper_stat_priority_queue_len_refresh_dir(ctx, profile_hash_node, priority, SHAPING_DIR_IN, curr_time_us);
+ shaper_stat_priority_queue_len_refresh_dir(ctx, profile_hash_node, priority, SHAPING_DIR_OUT, curr_time_us);
profile_hash_node->local_queue_len_update_time_us[priority] = curr_time_us;
- profile_hash_node->local_queue_len[priority] = 0;
return;
}
@@ -223,8 +243,10 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, s
unsigned long long old_latency;
if (need_update_guage) {
- profile->hash_node->local_queue_len[priority] += profile_stat->priority_queue_len;
- profile_stat->priority_queue_len = 0;
+ profile->hash_node->local_queue_len[priority][SHAPING_DIR_IN] += profile_stat->priority_queue_len[SHAPING_DIR_IN];
+ profile->hash_node->local_queue_len[priority][SHAPING_DIR_OUT] += profile_stat->priority_queue_len[SHAPING_DIR_OUT];
+ profile_stat->priority_queue_len[SHAPING_DIR_IN] = 0;
+ profile_stat->priority_queue_len[SHAPING_DIR_OUT] = 0;
shaper_stat_priority_queue_len_refresh(ctx, profile->hash_node, priority, curr_time_us);
}
@@ -358,7 +380,7 @@ void shaper_stat_queueing_pkt_inc(struct shaping_stat_for_profile *profile_stat,
profile_stat->out.queue_len++;
}
- profile_stat->priority_queue_len++;
+ profile_stat->priority_queue_len[direction]++;
return;
}
@@ -371,7 +393,7 @@ void shaper_stat_queueing_pkt_dec(struct shaping_stat_for_profile *profile_stat,
profile_stat->out.queue_len--;
}
- profile_stat->priority_queue_len--;
+ profile_stat->priority_queue_len[direction]--;
return;
}
diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp
index 78b8247..b342eda 100644
--- a/shaping/test/gtest_shaper.cpp
+++ b/shaping/test/gtest_shaper.cpp
@@ -1199,6 +1199,94 @@ TEST(two_session_diff_priority_same_profile, profile_timer_test)
stub_clear_matched_shaping_rules();
}
+/*session1 match rule1; session2 match rule2
+ rule1:
+ priority:1
+ primary profile_a: (priority 1)
+ rule2:
+ priority:2
+ primary profile_a: (priority 2)
+
+profile_a: in limit 1000, out limit 1000
+*/
+TEST(two_session_diff_priority_same_profile, one_direction_dont_block_another)
+{
+ struct stub_pkt_queue expec_tx_queue1;
+ struct stub_pkt_queue expec_tx_queue2;
+ struct stub_pkt_queue *actual_tx_queue;
+ struct shaping_ctx *ctx = NULL;
+ struct shaping_flow *sf1 = NULL;
+ struct shaping_flow *sf2 = NULL;
+ long long rule_ids[] = {1, 2};
+ long long rule_id1[] = {1};
+ long long rule_id2[] = {2};
+ int profile_nums[] = {1, 1};
+ int prioritys[] = {1, 2};
+ int profile_id[][MAX_REF_PROFILE] = {{0}, {0}};
+
+
+ TAILQ_INIT(&expec_tx_queue1);
+ TAILQ_INIT(&expec_tx_queue2);
+ stub_init();
+
+ ctx = shaping_engine_init();
+ ASSERT_TRUE(ctx != NULL);
+ sf1 = shaping_flow_new(&ctx->thread_ctx[0]);
+ ASSERT_TRUE(sf1 != NULL);
+ sf2 = shaping_flow_new(&ctx->thread_ctx[1]);
+ ASSERT_TRUE(sf2 != NULL);
+
+ stub_set_matched_shaping_rules(2, rule_ids, prioritys, profile_nums, profile_id);
+
+ stub_set_token_bucket_avl_per_sec(0, 1000, SHAPING_DIR_OUT, PROFILE_LIMIT_DIRECTION_INCOMING_OUTGOING);
+ stub_set_token_bucket_avl_per_sec(0, 1000, SHAPING_DIR_IN, PROFILE_LIMIT_DIRECTION_INCOMING_OUTGOING);
+ actual_tx_queue = stub_get_tx_queue();
+ shaper_rules_update(&ctx->thread_ctx[0], sf1, rule_id1, 1);
+ shaper_rules_update(&ctx->thread_ctx[1], sf2, rule_id2, 1);
+
+ /*******send packets***********/
+ send_packets(&ctx->thread_ctx[0], sf1, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 1, 0);
+ send_packets(&ctx->thread_ctx[1], sf2, 100, 100, SHAPING_DIR_IN, &expec_tx_queue2, 1, 0);
+ ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10));
+ ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10));
+ ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
+
+ sleep(3);//wait profile timer to expire, to refresh priority queue_len to swarmkv
+ for (int i = 0; i < 500; i++) {
+ stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);//inc time to refresh stat in timer
+ }
+ polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);//timer triggered in polling
+ polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);
+
+ stub_curr_time_s_inc(1);//inc time to refresh hmget interval
+ while (!TAILQ_EMPTY(&expec_tx_queue2)) {//线程0中优先级为1的session阻断OUT方向,线程1中的session优先级为2,但是IN方向不受影响
+ stub_refresh_token_bucket(0);
+ polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);//first polling request token
+ polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);
+ stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
+
+ ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1));
+ }
+
+ while (!TAILQ_EMPTY(&expec_tx_queue1)) {
+ stub_refresh_token_bucket(0);
+ polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);//first polling request token
+ polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);//then send pkt
+ stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
+
+ ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//sf1 priority 1
+ }
+
+ ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
+
+ shaping_flow_free(&ctx->thread_ctx[0], sf1);
+ shaping_flow_free(&ctx->thread_ctx[1], sf2);
+ fieldstat_global_disable_prometheus_endpoint();
+ shaper_thread_resource_clear();
+ shaping_engine_destroy(ctx);
+ stub_clear_matched_shaping_rules();
+}
+
/*session1 match rule1 & rule2; session2 match rule3
rule1:
priority:1
diff --git a/shaping/test/stub.cpp b/shaping/test/stub.cpp
index 9285120..be3f29d 100644
--- a/shaping/test/stub.cpp
+++ b/shaping/test/stub.cpp
@@ -15,6 +15,7 @@
#include <assert.h>
#include <pthread.h>
#include "shaper.h"
+#include "shaper_stat.h"
#include "stub.h"
#include "shaper_maat.h"
#include "log.h"
@@ -51,7 +52,7 @@ static int pf_async_times[MAX_STUB_PROFILE_NUM];
vector<struct stub_token_thread_arg> pf_async_thread[MAX_STUB_PROFILE_NUM];
struct stub_matched_rules matched_rules;
struct shaping_profile pf_array[MAX_STUB_PROFILE_NUM];
-static int profile_priority_len[MAX_STUB_PROFILE_NUM][10] = {{0}};
+static int profile_priority_len[MAX_STUB_PROFILE_NUM][SHAPING_PRIORITY_NUM_MAX][SHAPING_DIR_MAX];
static unsigned long long curr_time_ns = 2000000000;//2s
static unsigned int curr_time_s = 0;
@@ -204,6 +205,7 @@ void stub_init()
TAILQ_INIT(&tx_queue);
memset(&matched_rules, 0, sizeof(struct stub_matched_rules));
memset(&pf_array, 0, MAX_STUB_PROFILE_NUM * sizeof(struct shaping_profile));
+ memset(&profile_priority_len, 0, MAX_STUB_PROFILE_NUM * SHAPING_PRIORITY_NUM_MAX * SHAPING_DIR_MAX * sizeof(int));
for (i = 0; i < MAX_STUB_PROFILE_NUM; i++) {
pf_curr_avl_token[i].in_limit_bandwidth = DEFAULT_AVALIABLE_TOKEN_PER_SEC;
@@ -316,10 +318,18 @@ static void swarmkv_hincrby_cmd_func(char *cmd_str, swarmkv_on_reply_callback_t
int profile_id;
int priority;
int value;
+ char direction[5] = {0};
+ enum shaping_packet_dir dir;
struct swarmkv_reply *reply = (struct swarmkv_reply*)calloc(1, sizeof(struct swarmkv_reply));
- sscanf(cmd_str, "HINCRBY tsg-shaping-%d priority-%d %d", &profile_id, &priority, &value);
- profile_priority_len[profile_id][priority] += value;
+ sscanf(cmd_str, "HINCRBY tsg-shaping-%d priority-%d-%s %d", &profile_id, &priority, direction, &value);
+ if (strncmp(direction, "in", 2) == 0) {
+ dir = SHAPING_DIR_IN;
+ } else {
+ dir = SHAPING_DIR_OUT;
+ }
+
+ profile_priority_len[profile_id][priority][dir] += value;
reply->type = SWARMKV_REPLY_INTEGER;
cb(reply, cb_arg);
@@ -335,12 +345,20 @@ static void swarmkv_hmget_cmd_func(char *cmd_str, swarmkv_on_reply_callback_t *
int priority[10];
int ret;
int priority_num;
+ char direction[5] = {0};
+ enum shaping_packet_dir dir;
struct swarmkv_reply *reply = (struct swarmkv_reply*)calloc(1, sizeof(struct swarmkv_reply));
- ret = sscanf(cmd_str, "HMGET tsg-shaping-%d priority-%d priority-%d priority-%d priority-%d priority-%d priority-%d priority-%d priority-%d priority-%d",
- &profile_id, &priority[0], &priority[1], &priority[2], &priority[3], &priority[4], &priority[5], &priority[6], &priority[7], &priority[8]);
+ ret = sscanf(cmd_str, "HMGET tsg-shaping-%d priority-%d-%s priority-%d-%*s priority-%d-%*s priority-%d-%*s priority-%d-%*s priority-%d-%*s priority-%d-%*s priority-%d-%*s priority-%d-%*s",
+ &profile_id, &priority[0], direction, &priority[1], &priority[2], &priority[3], &priority[4], &priority[5], &priority[6], &priority[7], &priority[8]);
priority_num = ret - 1;
+ if (strncmp(direction, "in", 2) == 0) {
+ dir = SHAPING_DIR_IN;
+ } else {
+ dir = SHAPING_DIR_OUT;
+ }
+
reply->type = SWARMKV_REPLY_ARRAY;
reply->n_element = priority_num;
reply->elements = (struct swarmkv_reply**)calloc(priority_num, sizeof(struct swarmkv_reply*));
@@ -349,7 +367,7 @@ static void swarmkv_hmget_cmd_func(char *cmd_str, swarmkv_on_reply_callback_t *
reply->elements[i]->type = SWARMKV_REPLY_STRING;
char tmp_str[128] = {0};
- sprintf(tmp_str, "%d", profile_priority_len[profile_id][priority[i]]);
+ sprintf(tmp_str, "%d", profile_priority_len[profile_id][priority[i]][dir]);
reply->elements[i]->str = (char *)calloc(1, strlen(tmp_str));
memcpy(reply->elements[i]->str, tmp_str, strlen(tmp_str));
reply->elements[i]->len = strlen(tmp_str);