summaryrefslogtreecommitdiff
path: root/shaping/src
diff options
context:
space:
mode:
author刘畅 <[email protected]>2024-06-05 09:36:46 +0000
committer刘畅 <[email protected]>2024-06-05 09:36:46 +0000
commitf91407a5524365bb93dc6e8f96ef2b08ef3fe8a0 (patch)
treeea321ee2ec356db1ae52cdb562b90a45dfbf17a5 /shaping/src
parentdbb0f42537aee97e8f393f9f8a84856fb8aec5af (diff)
parentda691ef02580893d6539f39e7543443f427ef2a6 (diff)
Merge branch 'session_in_out_pkts_independent_queue' into 'rel'v3.1.37
TSG-19453: Session in out pkts independent queue See merge request tango/shaping-engine!97
Diffstat (limited to 'shaping/src')
-rw-r--r--shaping/src/shaper.cpp290
-rw-r--r--shaping/src/shaper_session.cpp5
-rw-r--r--shaping/src/shaper_swarmkv.cpp4
3 files changed, 156 insertions, 143 deletions
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index 4accf9c..b55807e 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -38,12 +38,12 @@ extern "C" {
#define SWARMKV_QUEUE_LEN_GET_CMD "HMGET tsg-shaping-%d priority-0 priority-1 priority-2 priority-3 priority-4 priority-5 priority-6 priority-7 priority-8 priority-9"
struct shaper {//trees in one thread
- struct avl_tree *priority_trees[SHAPING_PRIORITY_NUM_MAX];//represent 10 avl tree corresponding to 10 priority
+ struct avl_tree *priority_trees[SHAPING_DIR_MAX][SHAPING_PRIORITY_NUM_MAX];//represent 10 avl tree corresponding to 10 priority
};
struct shaping_node {//a session will have 10 nodes, corresponding 10 avl tree
struct shaping_flow shaping_flow;
- struct avl_node *avl_node[SHAPING_PRIORITY_NUM_MAX];
+ struct avl_node *avl_node[SHAPING_DIR_MAX][SHAPING_PRIORITY_NUM_MAX];
};
struct shaping_profile_container {
@@ -63,18 +63,18 @@ thread_local static int thread_swarmkv_cb_cnt = 0;
struct shaper* shaper_new(unsigned int priority_queue_len_max)
{
struct shaper *sp = NULL;
- int i;
sp = (struct shaper*)calloc(1, sizeof(struct shaper));
if (!sp) {
goto ERROR;
}
-
- for (i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) {
- sp->priority_trees[i] = avl_tree_init(priority_queue_len_max);
- if (!sp->priority_trees[i]) {
- goto ERROR;
+ for (int i = 0; i < SHAPING_DIR_MAX; i++) {
+ for (int j = 0; j < SHAPING_PRIORITY_NUM_MAX; j++) {
+ sp->priority_trees[i][j] = avl_tree_init(priority_queue_len_max);
+ if (!sp->priority_trees[i][j]) {
+ goto ERROR;
+ }
}
}
@@ -87,12 +87,12 @@ ERROR:
void shaper_free(struct shaper *sp)
{
- int i;
-
if (sp) {
- for (i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) {
- if (sp->priority_trees[i]) {
- avl_tree_destroy(sp->priority_trees[i]);
+ for (int i = 0; i < SHAPING_DIR_MAX; i++) {
+ for (int j = 0; j < SHAPING_PRIORITY_NUM_MAX; j++) {
+ if (sp->priority_trees[i][j]) {
+ avl_tree_destroy(sp->priority_trees[i][j]);
+ }
}
}
free(sp);
@@ -103,12 +103,12 @@ void shaper_free(struct shaper *sp)
static void shaping_node_free(struct shaping_node *s_node)
{
- int i;
-
if (s_node) {
- for (i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) {
- if (s_node->avl_node[i]) {
- avl_tree_node_free(s_node->avl_node[i]);
+ for (int i = 0; i < SHAPING_DIR_MAX; i++) {
+ for (int j = 0; j < SHAPING_PRIORITY_NUM_MAX; j++) {
+ if (s_node->avl_node[i][j]) {
+ avl_tree_node_free(s_node->avl_node[i][j]);
+ }
}
}
@@ -129,23 +129,25 @@ static void shaping_node_free(struct shaping_node *s_node)
struct shaping_flow* shaping_flow_new(struct shaping_thread_ctx *ctx)
{
struct shaping_node *s_node = NULL;
- int i;
s_node = (struct shaping_node*)calloc(1, sizeof(struct shaping_node));
if (!s_node) {
goto ERROR;
}
- for (i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) {
- s_node->avl_node[i] = avl_tree_node_new(0, &s_node->shaping_flow, NULL);
- if (!s_node->avl_node[i]) {
- goto ERROR;
+ for (int i = 0; i < SHAPING_DIR_MAX; i++) {
+ for (int j = 0; j < SHAPING_PRIORITY_NUM_MAX; j++) {
+ s_node->avl_node[i][j] = avl_tree_node_new(0, &s_node->shaping_flow, NULL);
+ if (!s_node->avl_node[i][j]) {
+ goto ERROR;
+ }
}
}
- TAILQ_INIT(&s_node->shaping_flow.packet_queue);
- s_node->shaping_flow.priority = SHAPING_PRIORITY_NUM_MAX - 1;
+ TAILQ_INIT(&s_node->shaping_flow.packet_queue[SHAPING_DIR_IN]);
+ TAILQ_INIT(&s_node->shaping_flow.packet_queue[SHAPING_DIR_OUT]);
+ s_node->shaping_flow.priority = SHAPING_PRIORITY_NUM_MAX - 1;
s_node->shaping_flow.ref_cnt = 1;
return &s_node->shaping_flow;
@@ -195,34 +197,33 @@ static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_
}
s_pkt->pkt_buff = pkt_buff;
- s_pkt->direction = meta->dir;
s_pkt->length = meta->raw_len;
- s_pkt->rule_anchor = sf->anchor;
+ s_pkt->rule_anchor = sf->anchor[meta->dir];
s_pkt->income_time_ns = curr_time->tv_sec * NANO_SECONDS_PER_SEC + curr_time->tv_nsec;
s_pkt->enqueue_time_us = curr_time->tv_sec * MICRO_SECONDS_PER_SEC + curr_time->tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
- TAILQ_INSERT_TAIL(&sf->packet_queue, s_pkt, node);
+ TAILQ_INSERT_TAIL(&sf->packet_queue[meta->dir], s_pkt, node);
sf->queue_len++;
return 0;
}
-bool shaper_queue_empty(struct shaping_flow *sf)
+bool shaper_queue_empty(struct shaping_flow *sf, enum shaping_packet_dir dir)
{
- return TAILQ_EMPTY(&sf->packet_queue);
+ return TAILQ_EMPTY(&sf->packet_queue[dir]);
}
-struct shaping_packet_wrapper* shaper_first_pkt_get(struct shaping_flow *sf)
+static struct shaping_packet_wrapper* shaper_first_pkt_get(struct shaping_flow *sf, enum shaping_packet_dir dir)
{
- return TAILQ_FIRST(&sf->packet_queue);
+ return TAILQ_FIRST(&sf->packet_queue[dir]);
}
-void shaper_packet_dequeue(struct shaping_flow *sf)
+static void shaper_packet_dequeue(struct shaping_flow *sf, enum shaping_packet_dir dir)
{
struct shaping_packet_wrapper *s_pkt;
- s_pkt = TAILQ_FIRST(&sf->packet_queue);
+ s_pkt = TAILQ_FIRST(&sf->packet_queue[dir]);
if (s_pkt) {
- TAILQ_REMOVE(&sf->packet_queue, s_pkt, node);
+ TAILQ_REMOVE(&sf->packet_queue[dir], s_pkt, node);
sf->queue_len--;
free(s_pkt);
}
@@ -230,43 +231,43 @@ void shaper_packet_dequeue(struct shaping_flow *sf)
return;
}
-void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx)
+void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx, enum shaping_packet_dir dir)
{
struct shaping_packet_wrapper *pkt_wrapper;
struct shaping_rule_info *rule = &sf->matched_rule_infos[0];
- while (!shaper_queue_empty(sf)) {
- pkt_wrapper = shaper_first_pkt_get(sf);
+ while (!shaper_queue_empty(sf, dir)) {
+ pkt_wrapper = shaper_first_pkt_get(sf, dir);
- shaper_stat_queueing_pkt_dec(&rule->primary.stat, pkt_wrapper->direction, ctx->thread_index);
+ shaper_stat_queueing_pkt_dec(&rule->primary.stat, dir, ctx->thread_index);
shaper_stat_drop_inc(&rule->primary.stat, pkt_wrapper->length, ctx->thread_index);
shaper_global_stat_queueing_dec(&ctx->thread_global_stat, pkt_wrapper->length);
shaper_global_stat_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length);
shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length);
marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index);
- shaper_packet_dequeue(sf);
+ shaper_packet_dequeue(sf, dir);
}
return;
}
//return success(0) while any avl tree insert success
-static int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time_us)
+static int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, enum shaping_packet_dir dir, unsigned long long enqueue_time_us)
{
struct shaping_node *s_node = (struct shaping_node*)sf;
- struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor];
+ struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor[dir]];
struct shaper *sp = ctx->sp;
struct shaping_packet_wrapper *pkt_wrapper = NULL;
int priority;
int i;
- pkt_wrapper = shaper_first_pkt_get(sf);
+ pkt_wrapper = shaper_first_pkt_get(sf, dir);
assert(pkt_wrapper != NULL);
priority = s_rule_info->primary.priority;
- avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns);
- if (0 != avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {//primary profile failed means flow push failed, ignore borrow profile
+ avl_tree_node_key_set(s_node->avl_node[dir][priority], pkt_wrapper->income_time_ns);
+ if (0 != avl_tree_node_insert(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority])) {//primary profile failed means flow push failed, ignore borrow profile
return -1;
}
@@ -276,44 +277,44 @@ static int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow
for (i = 0; i < s_rule_info->borrowing_num; i++) {
priority = s_rule_info->borrowing[i].priority;
- avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns);
- if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {
- shaper_stat_queueing_pkt_inc(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index);
+ avl_tree_node_key_set(s_node->avl_node[dir][priority], pkt_wrapper->income_time_ns);
+ if (0 == avl_tree_node_insert(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority])) {
+ shaper_stat_queueing_pkt_inc(&s_rule_info->borrowing[i].stat, dir, ctx->thread_index);
}
}
END:
- s_rule_info->primary.enqueue_time_us = enqueue_time_us;
- shaper_stat_queueing_pkt_inc(&s_rule_info->primary.stat, pkt_wrapper->direction, ctx->thread_index);
+ s_rule_info->primary.enqueue_time_us[dir] = enqueue_time_us;
+ shaper_stat_queueing_pkt_inc(&s_rule_info->primary.stat, dir, ctx->thread_index);
return 0;
}
-static unsigned long long shaper_pkt_latency_us_calculate(struct shaping_profile_info *profile, struct timespec *time)
+static unsigned long long shaper_pkt_latency_us_calculate(struct shaping_profile_info *profile, struct timespec *time, enum shaping_packet_dir dir)
{
- unsigned long long enqueue_time = profile->enqueue_time_us;
+ unsigned long long enqueue_time = profile->enqueue_time_us[dir];
unsigned long long curr_time = time->tv_sec * MICRO_SECONDS_PER_SEC + time->tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
assert(curr_time >= enqueue_time);
return (curr_time - enqueue_time);
}
-static void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct timespec *curr_time)
+static void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, enum shaping_packet_dir dir, struct timespec *curr_time)
{
struct shaping_node *s_node = (struct shaping_node*)sf;
- struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor];
+ struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor[dir]];
struct shaper *sp = ctx->sp;
struct shaping_packet_wrapper *pkt_wrapper = NULL;
unsigned long long latency;
int priority;
int i;
- pkt_wrapper = shaper_first_pkt_get(sf);
+ pkt_wrapper = shaper_first_pkt_get(sf, dir);
assert(pkt_wrapper != NULL);
priority = s_rule_info->primary.priority;
- if (avl_node_in_tree(s_node->avl_node[priority])) {
- avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]);
+ if (avl_node_in_tree(s_node->avl_node[dir][priority])) {
+ avl_tree_node_remove(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority]);
}
if (s_rule_info->borrowing_num == 0) {
@@ -322,35 +323,35 @@ static void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow
for (i = 0; i < s_rule_info->borrowing_num; i++) {
priority = s_rule_info->borrowing[i].priority;
- if (avl_node_in_tree(s_node->avl_node[priority])) {
- avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]);
- shaper_stat_queueing_pkt_dec(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index);
+ if (avl_node_in_tree(s_node->avl_node[dir][priority])) {
+ avl_tree_node_remove(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority]);
+ shaper_stat_queueing_pkt_dec(&s_rule_info->borrowing[i].stat, dir, ctx->thread_index);
}
}
END:
- latency = shaper_pkt_latency_us_calculate(&s_rule_info->primary, curr_time);
- shaper_stat_max_latency_update(&s_rule_info->primary.stat, pkt_wrapper->direction, latency, ctx->thread_index);
- shaper_stat_queueing_pkt_dec(&s_rule_info->primary.stat, pkt_wrapper->direction, ctx->thread_index);
+ latency = shaper_pkt_latency_us_calculate(&s_rule_info->primary, curr_time, dir);
+ shaper_stat_max_latency_update(&s_rule_info->primary.stat, dir, latency, ctx->thread_index);
+ shaper_stat_queueing_pkt_dec(&s_rule_info->primary.stat, dir, ctx->thread_index);
return;
}
-static void shaper_flow_specific_borrow_priority_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int priority)
+static void shaper_flow_specific_borrow_priority_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, enum shaping_packet_dir dir, int priority)
{
struct shaping_node *s_node = (struct shaping_node*)sf;
- struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor];
+ struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor[dir]];
struct shaper *sp = ctx->sp;
struct shaping_packet_wrapper *pkt_wrapper = NULL;
- pkt_wrapper = shaper_first_pkt_get(sf);
+ pkt_wrapper = shaper_first_pkt_get(sf, dir);
assert(pkt_wrapper != NULL);
for (int i = 0; i < s_rule_info->borrowing_num; i++) {
if (priority == s_rule_info->borrowing[i].priority) {
- if (avl_node_in_tree(s_node->avl_node[priority])) {
- avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]);
- shaper_stat_queueing_pkt_dec(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index);
+ if (avl_node_in_tree(s_node->avl_node[dir][priority])) {
+ avl_tree_node_remove(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority]);
+ shaper_stat_queueing_pkt_dec(&s_rule_info->borrowing[i].stat, dir, ctx->thread_index);
}
}
}
@@ -358,7 +359,7 @@ static void shaper_flow_specific_borrow_priority_pop(struct shaping_thread_ctx *
return;
}
-int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num)
+static int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], enum shaping_packet_dir dir, int priority, int max_sf_num)
{
struct avl_node *avl_node = NULL;
int count = 0;
@@ -367,7 +368,7 @@ int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_i
return 0;
}
- avl_node = avl_tree_minimum_node_get(sp->priority_trees[priority]);
+ avl_node = avl_tree_minimum_node_get(sp->priority_trees[dir][priority]);
while(avl_node) {
sf_ins[count].sf = (struct shaping_flow*)avl_tree_node_data_get(avl_node);
sf_ins[count].priority = priority;
@@ -601,7 +602,7 @@ static int shaper_deposit_token_get(struct shaping_profile_info *profile, int re
return 0;
}
- if (*deposit_token < req_token_bits) {
+ if (*deposit_token <= req_token_bits) {
*need_get_token = 1;
}
@@ -635,7 +636,7 @@ static void shaper_profile_hash_node_refresh(struct shaping_thread_ctx *ctx, str
return;
}
-static void shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *pf_info, int req_token_bits, unsigned char direction, struct timespec *curr_timespec)
+static void shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *pf_info, int req_token_bits, enum shaping_packet_dir dir, struct timespec *curr_timespec)
{
struct shaping_tconsume_cb_arg *arg = NULL;
struct shaping_profile_hash_node *pf_hash_node = pf_info->hash_node;
@@ -650,14 +651,14 @@ static void shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
if (pf_hash_node->limit_direction == PROFILE_LIMIT_DIRECTION_BIDIRECTION) {
snprintf(key, sizeof(key), "tsg-shaping-%d-bidirectional", pf_info->id);
} else {
- snprintf(key, sizeof(key), "tsg-shaping-%d-%s", pf_info->id, direction == SHAPING_DIR_OUT ? "outgoing" : "incoming");
+ snprintf(key, sizeof(key), "tsg-shaping-%d-%s", pf_info->id, dir == SHAPING_DIR_OUT ? "outgoing" : "incoming");
}
arg = (struct shaping_tconsume_cb_arg *)calloc(1, sizeof(struct shaping_tconsume_cb_arg));
arg->ctx = ctx;
arg->profile = pf_info;
arg->sf = sf;
- arg->direction = direction;
+ arg->direction = dir;
arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat);
@@ -672,7 +673,7 @@ static void shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
break;
case PROFILE_TYPE_HOST_FARINESS:
case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS:
- swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits, shaper_token_get_cb, arg);
+ swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor[dir]].fair_factor, req_token_bits, shaper_token_get_cb, arg);
//TODO: ftconsume with flexiable
//swarmkv_async_command(ctx->swarmkv_db, shaper_token_get_cb, arg, "FTCONSUME %s %s %d %d %s", key, sf->src_ip_str, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits, "FLEXIBLE");
break;
@@ -826,9 +827,9 @@ static int shaping_swarmkv_is_too_short_interval(long long curr_time_ms, struct
}
}
-static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, int profile_type, int req_token_bytes, unsigned char direction, struct timespec *curr_timespec)
+static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, int profile_type, int req_token_bytes, enum shaping_packet_dir direction, struct timespec *curr_timespec)
{
- struct shaping_rule_info *rule = &sf->matched_rule_infos[sf->anchor];
+ struct shaping_rule_info *rule = &sf->matched_rule_infos[sf->anchor[direction]];
int need_get_token = 0;
int ret = SHAPER_TOKEN_GET_FAILED;
@@ -917,9 +918,9 @@ int shaper_profile_get(struct shaping_rule_info *s_rule_info, int priority, stru
return num;
}
-static int shaper_next_anchor_get(struct shaping_flow *sf, unsigned char direction)
+static int shaper_next_anchor_get(struct shaping_flow *sf, enum shaping_packet_dir dir)
{
- int anchor = sf->anchor + 1;
+ int anchor = sf->anchor[dir] + 1;
if (anchor > sf->rule_num - 1) {
return 0;
@@ -928,7 +929,7 @@ static int shaper_next_anchor_get(struct shaping_flow *sf, unsigned char directi
return anchor;
}
-static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int priority)
+static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, enum shaping_packet_dir dir, int priority)
{
struct shaping_rule_info *rule = NULL;
struct shaping_profile_info *profile = NULL;
@@ -941,18 +942,18 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi
int get_token_success = 0;
int profile_num;
- rule = &sf->matched_rule_infos[sf->anchor];
+ rule = &sf->matched_rule_infos[sf->anchor[dir]];
profile_num = shaper_profile_get(rule, priority, pf_container);
assert(profile_num > 0);
- pkt_wrapper = shaper_first_pkt_get(sf);
+ pkt_wrapper = shaper_first_pkt_get(sf, dir);
assert(pkt_wrapper != NULL);
clock_gettime(CLOCK_MONOTONIC, &curr_time);
- latency_us = shaper_pkt_latency_us_calculate(&rule->primary, &curr_time);
+ latency_us = shaper_pkt_latency_us_calculate(&rule->primary, &curr_time, dir);
if (pf_container[0].pf_type == PROFILE_IN_RULE_TYPE_PRIMARY) {
if (latency_us > ctx->conf.pkt_max_delay_time_us) {
- shaper_flow_pop(ctx, sf, &curr_time);
+ shaper_flow_pop(ctx, sf, dir, &curr_time);
goto DROP;
}
}
@@ -964,18 +965,18 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi
/*AQM process, if aqm not pass, for primary profile drop packet, for borrow profile just don't give token to this packet*/
if (shaper_aqm_need_drop(profile, pkt_wrapper, &curr_time, latency_us)) {
if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {
- shaper_flow_pop(ctx, sf, &curr_time);
+ shaper_flow_pop(ctx, sf, dir, &curr_time);
goto DROP;
} else {
- shaper_flow_specific_borrow_priority_pop(ctx, sf, priority);
+ shaper_flow_specific_borrow_priority_pop(ctx, sf, dir, priority);
continue;
}
}
- int ret = shaper_token_consume(ctx, sf, profile, profile_type, pkt_wrapper->length, pkt_wrapper->direction, &curr_time);
+ int ret = shaper_token_consume(ctx, sf, profile, profile_type, pkt_wrapper->length, dir, &curr_time);
if (ret >= SHAPER_TOKEN_GET_SUCCESS) {
if (ret == SHAPER_TOKEN_GET_SUCCESS) {
- shaper_stat_forward_inc(&profile->stat, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index);
+ shaper_stat_forward_inc(&profile->stat, dir, pkt_wrapper->length, ctx->thread_index);
}
get_token_success = 1;
break;
@@ -986,24 +987,24 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi
return SHAPING_QUEUED;
}
- shaper_flow_pop(ctx, sf, &curr_time);
- sf->anchor = shaper_next_anchor_get(sf, pkt_wrapper->direction);
- if (sf->anchor == 0) {//no next rule
+ shaper_flow_pop(ctx, sf, dir, &curr_time);
+ sf->anchor[dir] = shaper_next_anchor_get(sf, dir);
+ if (sf->anchor[dir] == 0) {//no next rule
return SHAPING_FORWARD;
}
//push sf for next rule
enqueue_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
- if (0 == shaper_flow_push(ctx, sf, enqueue_time_us)) {
+ if (0 == shaper_flow_push(ctx, sf, dir, enqueue_time_us)) {
return SHAPING_QUEUED;
} else {
goto DROP;
}
DROP:
- rule = &sf->matched_rule_infos[sf->anchor];
- shaper_stat_drop_inc(&rule->primary.stat, pkt_wrapper->direction, ctx->thread_index);
- sf->anchor = 0;
+ rule = &sf->matched_rule_infos[sf->anchor[dir]];
+ shaper_stat_drop_inc(&rule->primary.stat, dir, ctx->thread_index);
+ sf->anchor[dir] = 0;
return SHAPING_DROP;
}
@@ -1021,8 +1022,8 @@ static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shapi
shaper_stat_forward_inc(&profile->stat, meta->dir, meta->raw_len, ctx->thread_index);
}
- sf->anchor = shaper_next_anchor_get(sf, meta->dir);
- if (sf->anchor == 0) {//no next rule
+ sf->anchor[meta->dir] = shaper_next_anchor_get(sf, meta->dir);
+ if (sf->anchor[meta->dir] == 0) {//no next rule
return SHAPING_FORWARD;
}
}
@@ -1040,7 +1041,7 @@ static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shapi
}
enqueue_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
- if (0 == shaper_flow_push(ctx, sf, enqueue_time_us)) {
+ if (0 == shaper_flow_push(ctx, sf, meta->dir, enqueue_time_us)) {
return SHAPING_QUEUED;
} else {
goto DROP;
@@ -1048,30 +1049,30 @@ static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shapi
DROP:
if (enqueue_success) {
- shaper_packet_dequeue(sf);
+ shaper_packet_dequeue(sf, meta->dir);
}
- struct shaping_profile_info *pf_info = &sf->matched_rule_infos[sf->anchor].primary;
+ struct shaping_profile_info *pf_info = &sf->matched_rule_infos[sf->anchor[meta->dir]].primary;
shaper_stat_drop_inc(&pf_info->stat, meta->dir, ctx->thread_index);
- sf->anchor = 0;
+ sf->anchor[meta->dir] = 0;
return SHAPING_DROP;
}
-static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_flow *sf, int priority,
- struct shaping_stat *stat, struct shaping_thread_ctx *ctx)
+static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_flow *sf, enum shaping_packet_dir dir,
+ int priority, struct shaping_stat *stat, struct shaping_thread_ctx *ctx)
{
struct shaping_packet_wrapper *pkt_wrapper;
- int old_anchor = sf->anchor;
+ int old_anchor = sf->anchor[dir];
int shaping_ret;
- pkt_wrapper = shaper_first_pkt_get(sf);
+ pkt_wrapper = shaper_first_pkt_get(sf, dir);
assert(pkt_wrapper != NULL);
- shaping_ret = shaper_pkt_action_decide_queueing(ctx, sf, priority);
+ shaping_ret = shaper_pkt_action_decide_queueing(ctx, sf, dir, priority);
switch (shaping_ret) {
case SHAPING_QUEUED:
- if (old_anchor == sf->anchor) {//didn't get token
+ if (old_anchor == sf->anchor[dir]) {//didn't get token
return -1;
} else {//got token for one rule and waiting get token for next rule
return 0;
@@ -1083,7 +1084,7 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length);
marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index);
- shaper_packet_dequeue(sf);
+ shaper_packet_dequeue(sf, dir);
break;
case SHAPING_FORWARD:
shaper_global_stat_queueing_dec(&ctx->thread_global_stat, pkt_wrapper->length);
@@ -1091,7 +1092,7 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
shaper_global_stat_hit_policy_throughput_tx_inc(&ctx->thread_global_stat, pkt_wrapper->length);
marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &pkt_wrapper->pkt_buff, 1);
- shaper_packet_dequeue(sf);
+ shaper_packet_dequeue(sf, dir);
break;
default:
assert(0);//impossible path
@@ -1100,20 +1101,21 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
shaper_stat_refresh(ctx, sf, 0);
- if (shaper_queue_empty(sf)) {
- if (sf->flag & SESSION_CLOSE) {
+ enum shaping_packet_dir dir_opposite = (dir == SHAPING_DIR_IN) ? SHAPING_DIR_OUT : SHAPING_DIR_IN;
+ if (shaper_queue_empty(sf, dir)) {
+ if (shaper_queue_empty(sf, dir_opposite) && sf->flag & SESSION_CLOSE) {
sf->flag &= (~SESSION_CLOSE);
shaping_flow_free(ctx, sf);
}
return 0;
} else {
- pkt_wrapper = shaper_first_pkt_get(sf);
- shaper_stat_queueing_pkt_dec_for_rule(&sf->matched_rule_infos[pkt_wrapper->rule_anchor], pkt_wrapper->direction, ctx->thread_index);
+ pkt_wrapper = shaper_first_pkt_get(sf, dir);
+ shaper_stat_queueing_pkt_dec_for_rule(&sf->matched_rule_infos[pkt_wrapper->rule_anchor], dir, ctx->thread_index);
- sf->anchor = 0;
- if (shaper_flow_push(ctx, sf, pkt_wrapper->enqueue_time_us) != 0) {
- shaper_queue_clear(sf, ctx);//first packet fail, then every packet will fail
- if (sf->flag & SESSION_CLOSE) {
+ sf->anchor[dir] = 0;
+ if (shaper_flow_push(ctx, sf, dir, pkt_wrapper->enqueue_time_us) != 0) {
+ shaper_queue_clear(sf, ctx, dir);//first packet fail, then every packet will fail
+ if (shaper_queue_empty(sf, dir_opposite) && sf->flag & SESSION_CLOSE) {
sf->flag &= (~SESSION_CLOSE);
shaping_flow_free(ctx, sf);
}
@@ -1162,11 +1164,11 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
goto END;//for tcp pure control pkt, transmit it directly
}
- if (!shaper_queue_empty(sf)) {//already have queueing pkt, enqueue directly
+ if (!shaper_queue_empty(sf, meta->dir)) {//already have queueing pkt, enqueue directly
struct timespec curr_time;
clock_gettime(CLOCK_MONOTONIC, &curr_time);
- s_rule = &sf->matched_rule_infos[sf->anchor];
+ s_rule = &sf->matched_rule_infos[sf->anchor[meta->dir]];
if (0 == shaper_packet_enqueue(ctx, sf, rx_buff, meta, &curr_time)) {
shaper_stat_queueing_pkt_inc_for_rule(s_rule, meta->dir, ctx->thread_index);
shaper_global_stat_queueing_inc(&ctx->thread_global_stat, meta->raw_len);
@@ -1180,9 +1182,9 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
}
} else {//no queueing pkt, decide action
- sf->anchor = 0;
+ sf->anchor[meta->dir] = 0;
- shaping_ret = shaper_pkt_action_decide_no_queue(ctx, sf, meta, &sf->matched_rule_infos[sf->anchor].primary, rx_buff);
+ shaping_ret = shaper_pkt_action_decide_no_queue(ctx, sf, meta, &sf->matched_rule_infos[sf->anchor[meta->dir]].primary, rx_buff);
switch (shaping_ret) {
case SHAPING_QUEUED:
shaper_global_stat_queueing_inc(&ctx->thread_global_stat, meta->raw_len);
@@ -1207,7 +1209,7 @@ END:
shaper_stat_refresh(ctx, sf, 0);
if(sf->flag & SESSION_CLOSE) {
- if (shaper_queue_empty(sf)) {
+ if (shaper_queue_empty(sf, SHAPING_DIR_IN) && shaper_queue_empty(sf, SHAPING_DIR_OUT)) {
char *addr_str = addr_tuple4_to_str(&sf->tuple4);
LOG_DEBUG("%s: shaping free a shaping_flow for session: %s", LOG_TAG_SHAPING, addr_str);
@@ -1223,6 +1225,29 @@ END:
return;
}
+static void shaper_polling_token_get(struct shaper *sp, struct shaping_stat *stat, struct shaping_thread_ctx *ctx, enum shaping_packet_dir dir)
+{
+ struct shaper_flow_instance sf_ins[SHAPER_FLOW_POP_NUM_MAX];
+ int sf_num;
+ int ret;
+
+ for (int i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) {
+ sf_num = shaper_flow_in_order_get(sp, sf_ins, dir, i, ctx->conf.polling_node_num_max[i]);
+ if (sf_num == 0) {
+ continue;
+ }
+
+ for (int j = 0; j < sf_num; j++) {
+ ret = shaper_polling_first_pkt_token_get(sp, sf_ins[j].sf, dir, sf_ins[j].priority, stat, ctx);
+ if (ret == 0) {
+ return;
+ }
+ }
+ }
+
+ return;
+}
+
void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_thread_ctx *ctx)
{
static thread_local int swarmkv_caller_loop_divisor = SWARMKV_CALLER_LOOP_DIVISOR_MIN;
@@ -1265,23 +1290,8 @@ void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_
return;
}
- struct shaper_flow_instance sf_ins[SHAPER_FLOW_POP_NUM_MAX];
- int sf_num;
- int ret;
-
- for (int i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) {
- sf_num = shaper_flow_in_order_get(sp, sf_ins, i, ctx->conf.polling_node_num_max[i]);
- if (sf_num == 0) {
- continue;
- }
-
- for (int j = 0; j < sf_num; j++) {
- ret = shaper_polling_first_pkt_token_get(sp, sf_ins[j].sf, sf_ins[j].priority, stat, ctx);
- if (ret == 0) {
- return;
- }
- }
- }
+ shaper_polling_token_get(sp, stat, ctx, SHAPING_DIR_IN);
+ shaper_polling_token_get(sp, stat, ctx, SHAPING_DIR_OUT);
return;
}
diff --git a/shaping/src/shaper_session.cpp b/shaping/src/shaper_session.cpp
index 494950b..f58d55f 100644
--- a/shaping/src/shaper_session.cpp
+++ b/shaping/src/shaper_session.cpp
@@ -195,7 +195,7 @@ struct shaping_flow* shaper_session_close(struct shaping_thread_ctx *ctx, struct
sf = (struct shaping_flow *)session_node->val_data;
- if (shaper_queue_empty(sf)) {
+ if (shaper_queue_empty(sf, SHAPING_DIR_IN) && shaper_queue_empty(sf, SHAPING_DIR_OUT)) {
shaping_flow_free(ctx, sf);
} else {
sf->flag |= SESSION_CLOSE;
@@ -245,7 +245,8 @@ void shaper_session_data_free_cb(void *session_data, void *data)
struct shaping_thread_ctx *ctx = (struct shaping_thread_ctx *)data;
if (sf) {
- shaper_queue_clear(sf, ctx);
+ shaper_queue_clear(sf, ctx, SHAPING_DIR_IN);
+ shaper_queue_clear(sf, ctx, SHAPING_DIR_OUT);
shaping_flow_free(ctx, sf);
}
diff --git a/shaping/src/shaper_swarmkv.cpp b/shaping/src/shaper_swarmkv.cpp
index 6ac1db5..d7fcd77 100644
--- a/shaping/src/shaper_swarmkv.cpp
+++ b/shaping/src/shaper_swarmkv.cpp
@@ -129,7 +129,7 @@ struct swarmkv* shaper_swarmkv_init(int caller_thread_num)
swarmkv_options_set_health_check_announce_port(swarmkv_opts, conf.swarmkv_health_check_announce_port);
swarmkv_options_set_log_path(swarmkv_opts, "log");
swarmkv_options_set_log_level(swarmkv_opts, conf.swarmkv_log_level);
- swarmkv_options_set_caller_thread_number(swarmkv_opts, caller_thread_num);
+ swarmkv_options_set_caller_thread_number(swarmkv_opts, caller_thread_num + 1);
swarmkv_options_set_worker_thread_number(swarmkv_opts, conf.swarmkv_worker_thread_num);
swarmkv_db = swarmkv_open(swarmkv_opts, conf.swarmkv_cluster_name, &err);
@@ -139,6 +139,8 @@ struct swarmkv* shaper_swarmkv_init(int caller_thread_num)
return NULL;
}
+ swarmkv_register_thread(swarmkv_db);
+
LOG_DEBUG("%s: shaping open swarmkv: %s", LOG_TAG_SWARMKV, conf.swarmkv_cluster_name);
char cmd[256] = {0};//重启之后自动执行一次heal