summaryrefslogtreecommitdiff
path: root/shaping/src/shaper.cpp
diff options
context:
space:
mode:
authorroot <[email protected]>2023-11-13 08:07:52 +0000
committerroot <[email protected]>2023-11-13 08:07:52 +0000
commita1609051ec2e8a11cdf13377efb133c2f2b603e0 (patch)
tree8fef799b5d8cc04d2ef54a72a2bb1eaae66d81a3 /shaping/src/shaper.cpp
parentf1c9565d486fe9a278996a3cfd8be6f77cf7a977 (diff)
TSG-17653: tcp pure ctrl packet force consume token and forward directly
Diffstat (limited to 'shaping/src/shaper.cpp')
-rw-r--r--shaping/src/shaper.cpp108
1 files changed, 65 insertions, 43 deletions
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index 1013d0f..bef1b56 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -78,6 +78,8 @@ enum shaper_token_get_result {
struct shaping_profile_hash_node {
int id;
+ int in_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX];
+ int out_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX];
long long last_failed_get_token_ms;
long long last_hmget_ms[SHAPING_PRIORITY_NUM_MAX];
unsigned char is_priority_blocked[SHAPING_PRIORITY_NUM_MAX];
@@ -85,7 +87,7 @@ struct shaping_profile_hash_node {
UT_hash_handle hh;
};
-struct shaping_profile_hash_node *g_shaping_profile_table = NULL;
+thread_local struct shaping_profile_hash_node *thread_sp_hashtbl = NULL;
struct shaper* shaper_new(unsigned int priority_queue_len_max)
{
@@ -196,6 +198,17 @@ void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
return;
}
+void shaper_thread_resource_clear()
+{
+ struct shaping_profile_hash_node *thread_sp_hashtbl_tmp = NULL;
+ struct shaping_profile_hash_node *node = NULL;
+
+ HASH_ITER(hh, thread_sp_hashtbl, node, thread_sp_hashtbl_tmp) {
+ HASH_DEL(thread_sp_hashtbl, node);
+ free(node);
+ }
+}
+
static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, void *pkt_buff, struct metadata *meta)
{
struct shaping_packet_wrapper *s_pkt = NULL;
@@ -215,7 +228,6 @@ static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_
s_pkt->pkt_buff = pkt_buff;
s_pkt->direction = meta->dir;
s_pkt->length = meta->raw_len;
- s_pkt->tcp_pure_contorl = meta->is_tcp_pure_ctrl;
s_pkt->income_time_ns = curr_time.tv_sec * NANO_SECONDS_PER_SEC + curr_time.tv_nsec;
s_pkt->enqueue_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
TAILQ_INSERT_TAIL(&sf->packet_queue, s_pkt, node);
@@ -386,12 +398,12 @@ int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_i
return count;
}
-static void shaper_deposit_token_add(struct shaping_profile_info *pf_info, int req_token, unsigned char direction)
+static void shaper_deposit_token_add(struct shaping_profile_hash_node *pf_hash_node, int req_token_bits, unsigned char direction, int priority)
{
if (direction == SHAPING_DIR_IN) {
- __atomic_add_fetch(&pf_info->in_deposit_token, req_token, __ATOMIC_SEQ_CST);
+ pf_hash_node->in_deposit_token_bits[priority] += req_token_bits;
} else {
- __atomic_add_fetch(&pf_info->out_deposit_token, req_token, __ATOMIC_SEQ_CST);
+ pf_hash_node->out_deposit_token_bits[priority] += req_token_bits;
}
}
@@ -403,7 +415,7 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg
shaper_global_stat_async_callback_inc(arg->ctx->global_stat);
- LOG_INFO("Swarmkv reply type =%d, integer =%llu",reply->type, reply->integer);
+ LOG_INFO("Swarmkv reply type =%d, direction =%d, integer =%llu",reply->type, arg->direction, reply->integer);
if (reply->type != SWARMKV_REPLY_INTEGER) {
shaper_global_stat_async_tconsume_failed_inc(arg->ctx->global_stat);
@@ -411,14 +423,14 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg
}
if (reply->integer < 0) {//profile not exist
- s_pf_info->is_invalid = 1;
+ s_pf_info->hash_node->is_invalid = 1;
goto END;
} else {
- s_pf_info->is_invalid = 0;
+ s_pf_info->hash_node->is_invalid = 0;
}
if (reply->integer > 0) {
- shaper_deposit_token_add(s_pf_info, reply->integer, arg->direction);//deposit tokens to profile
+ shaper_deposit_token_add(s_pf_info->hash_node, reply->integer, arg->direction, s_pf_info->priority);//deposit tokens to profile
}
END:
@@ -435,25 +447,25 @@ END:
return;
}
-static void shaper_deposit_token_sub(struct shaping_profile_info *pf_info, int req_token, unsigned char direction)
+static void shaper_deposit_token_sub(struct shaping_profile_hash_node *pf_hash_node, int req_token_bits, unsigned char direction, int priority)
{
if (direction == SHAPING_DIR_IN) {
- __atomic_sub_fetch(&pf_info->in_deposit_token, req_token, __ATOMIC_SEQ_CST);
+ pf_hash_node->in_deposit_token_bits[priority] -= req_token_bits;
} else {
- __atomic_sub_fetch(&pf_info->out_deposit_token, req_token, __ATOMIC_SEQ_CST);
+ pf_hash_node->out_deposit_token_bits[priority] -= req_token_bits;
}
}
-static int shaper_deposit_token_is_enough(struct shaping_profile_info *pf_info, int req_token, unsigned char direction)
+static int shaper_deposit_token_is_enough(struct shaping_profile_hash_node *pf_hash_node, int req_token_bits, unsigned char direction, int priority)
{
if (direction == SHAPING_DIR_IN) {
- if (__atomic_load_n(&pf_info->in_deposit_token, __ATOMIC_SEQ_CST) >= req_token) {
+ if (pf_hash_node->in_deposit_token_bits[priority] >= req_token_bits) {
return 1;
} else {
return 0;
}
} else {
- if (__atomic_load_n(&pf_info->out_deposit_token, __ATOMIC_SEQ_CST) >= req_token) {
+ if (pf_hash_node->out_deposit_token_bits[priority] >= req_token_bits) {
return 1;
} else {
return 0;
@@ -494,7 +506,7 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
break;
}
- if (pf_info->is_invalid) {
+ if (pf_info->hash_node->is_invalid) {
if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {//for primary, means this rule don't need get token
return SHAPER_TOKEN_GET_SUCCESS;
} else {//for borrowing, means this profile has no token to borrow
@@ -502,8 +514,8 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
}
}
- if (shaper_deposit_token_is_enough(pf_info, req_token_bits, direction)) {
- shaper_deposit_token_sub(pf_info, req_token_bits, direction);
+ if (shaper_deposit_token_is_enough(pf_info->hash_node, req_token_bits, direction, pf_info->priority)) {
+ shaper_deposit_token_sub(pf_info->hash_node, req_token_bits, direction, pf_info->priority);
return SHAPER_TOKEN_GET_SUCCESS;
}
@@ -583,6 +595,23 @@ END:
}
}
+static void shaper_profile_hash_node_update(struct shaping_profile_info *profile)
+{
+ if (profile->hash_node == NULL) {
+ struct shaping_profile_hash_node *hash_node = NULL;
+ HASH_FIND_INT(thread_sp_hashtbl, &profile->id, hash_node);
+ if (hash_node) {
+ profile->hash_node = hash_node;
+ } else {
+ profile->hash_node = (struct shaping_profile_hash_node*)calloc(1, sizeof(struct shaping_profile_hash_node));
+ profile->hash_node->id = profile->id;
+ HASH_ADD_INT(thread_sp_hashtbl, id, profile->hash_node);
+ }
+ }
+
+ return;
+}
+
static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int req_token_bytes,
struct shaping_profile_info *profile, int profile_type, unsigned char direction)
{
@@ -603,21 +632,11 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f
return SHAPER_TOKEN_GET_PASS;//rule is disabled, don't need to get token and forward packet
}
- if (shaper_deposit_token_is_enough(profile, req_token_bytes * 8, direction)) {
- shaper_deposit_token_sub(profile, req_token_bytes * 8, direction);
- return SHAPER_TOKEN_GET_SUCCESS;
- }
+ shaper_profile_hash_node_update(profile);
- if (profile->hash_node == NULL) {
- struct shaping_profile_hash_node *hash_node = NULL;
- HASH_FIND_INT(g_shaping_profile_table, &profile->id, hash_node);
- if (hash_node) {
- profile->hash_node = hash_node;
- } else {
- profile->hash_node = (struct shaping_profile_hash_node*)calloc(1, sizeof(struct shaping_profile_hash_node));
- profile->hash_node->id = profile->id;
- HASH_ADD_INT(g_shaping_profile_table, id, profile->hash_node);
- }
+ if (shaper_deposit_token_is_enough(profile->hash_node, req_token_bytes * 8, direction, profile->priority)) {
+ shaper_deposit_token_sub(profile->hash_node, req_token_bytes * 8, direction, profile->priority);
+ return SHAPER_TOKEN_GET_SUCCESS;
}
struct timespec curr_timespec;
@@ -708,12 +727,6 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi
pkt_wrapper = shaper_first_pkt_get(sf);
assert(pkt_wrapper != NULL);
- if (pkt_wrapper->tcp_pure_contorl) {
- shaper_flow_pop(ctx, sf);
- shaper_stat_forward_all_rule_inc(ctx->stat, sf, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index);
- return SHAPING_FORWARD;
- }
-
if (pf_container[0].pf_type == PROFILE_IN_RULE_TYPE_PRIMARY) {
clock_gettime(CLOCK_MONOTONIC, &curr_time);
if (shaper_pkt_latency_us_calculate(pf_container[0].pf_info, &curr_time) > ctx->conf.pkt_max_delay_time_us) {
@@ -770,11 +783,6 @@ static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shapi
unsigned long long enqueue_time;
int enqueue_success = 0;
- if (meta->is_tcp_pure_ctrl) {
- shaper_stat_forward_all_rule_inc(ctx->stat, sf, meta->dir, meta->raw_len, ctx->thread_index);
- return SHAPING_FORWARD;
- }
-
int ret = shaper_token_consume(ctx, sf, meta->raw_len, profile, profile_type, meta->dir);
if (ret >= SHAPER_TOKEN_GET_SUCCESS) {
if (ret == SHAPER_TOKEN_GET_SUCCESS) {
@@ -886,6 +894,19 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
}
}
+static void shaper_token_consume_force(struct shaping_flow *sf, struct metadata *meta)
+{
+ struct shaping_rule_info *rule;
+
+ for (int i = 0; i < sf->rule_num; i++) {
+ rule = &sf->matched_rule_infos[i];
+ shaper_profile_hash_node_update(&rule->primary);
+ shaper_deposit_token_sub(rule->primary.hash_node, meta->raw_len * 8, meta->dir, rule->primary.priority);
+ }
+
+ return;
+}
+
void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_buff, struct metadata *meta, struct shaping_flow *sf)
{
int shaping_ret;
@@ -896,6 +917,7 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
sf->processed_pkts++;
if (meta->is_tcp_pure_ctrl) {
+ shaper_token_consume_force(sf, meta);
marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1);
shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len);
shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len);