summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author刘畅 <[email protected]>2023-11-13 09:50:12 +0000
committer刘畅 <[email protected]>2023-11-13 09:50:12 +0000
commit5b20b8ffa36252a676caefe731886f3c6898905a (patch)
tree8fef799b5d8cc04d2ef54a72a2bb1eaae66d81a3
parentf1c9565d486fe9a278996a3cfd8be6f77cf7a977 (diff)
parenta1609051ec2e8a11cdf13377efb133c2f2b603e0 (diff)
Merge branch 'tcp_pure_ctrl_consume_token_force' into 'rel'v1.3.8
tcp pure ctrl packet force consume token and forward directly See merge request tango/shaping-engine!51
-rw-r--r--shaping/include/shaper.h6
-rw-r--r--shaping/src/main.cpp2
-rw-r--r--shaping/src/shaper.cpp108
-rw-r--r--shaping/test/gtest_shaper.cpp38
4 files changed, 96 insertions, 58 deletions
diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h
index d130222..f61d573 100644
--- a/shaping/include/shaper.h
+++ b/shaping/include/shaper.h
@@ -93,11 +93,7 @@ struct shaping_profile_info {
int id;//profile_id
enum shaping_profile_type type;
int priority;
- int in_deposit_token;
- int out_deposit_token;
unsigned long long enqueue_time_us;//to calculate max latency
- unsigned char is_priority_blocked;
- unsigned char is_invalid;
struct shaping_stat_for_profile stat;
struct shaping_profile_hash_node *hash_node;
};
@@ -118,7 +114,6 @@ struct shaping_packet_wrapper {
unsigned long long enqueue_time_us;//first enqueue time
unsigned int length;
unsigned char direction;
- unsigned char tcp_pure_contorl;
TAILQ_ENTRY(shaping_packet_wrapper) node;
};
TAILQ_HEAD(delay_queue, shaping_packet_wrapper);
@@ -187,3 +182,4 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
struct shaping_ctx *shaping_engine_init();
void shaping_engine_destroy(struct shaping_ctx *ctx);
+void shaper_thread_resource_clear();
diff --git a/shaping/src/main.cpp b/shaping/src/main.cpp
index b7807f7..89cb176 100644
--- a/shaping/src/main.cpp
+++ b/shaping/src/main.cpp
@@ -43,6 +43,8 @@ static void *shaper_thread_loop(void *data)
marsio_poll_wait(ctx->marsio_info->instance, &ctx->marsio_info->mr_dev, 1, ctx->thread_index, ctx->global_stat->output_interval_s * 1000);
}
+ shaper_thread_resource_clear();
+
return NULL;
}
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index 1013d0f..bef1b56 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -78,6 +78,8 @@ enum shaper_token_get_result {
struct shaping_profile_hash_node {
int id;
+ int in_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX];
+ int out_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX];
long long last_failed_get_token_ms;
long long last_hmget_ms[SHAPING_PRIORITY_NUM_MAX];
unsigned char is_priority_blocked[SHAPING_PRIORITY_NUM_MAX];
@@ -85,7 +87,7 @@ struct shaping_profile_hash_node {
UT_hash_handle hh;
};
-struct shaping_profile_hash_node *g_shaping_profile_table = NULL;
+thread_local struct shaping_profile_hash_node *thread_sp_hashtbl = NULL;
struct shaper* shaper_new(unsigned int priority_queue_len_max)
{
@@ -196,6 +198,17 @@ void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
return;
}
+void shaper_thread_resource_clear()
+{
+ struct shaping_profile_hash_node *thread_sp_hashtbl_tmp = NULL;
+ struct shaping_profile_hash_node *node = NULL;
+
+ HASH_ITER(hh, thread_sp_hashtbl, node, thread_sp_hashtbl_tmp) {
+ HASH_DEL(thread_sp_hashtbl, node);
+ free(node);
+ }
+}
+
static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, void *pkt_buff, struct metadata *meta)
{
struct shaping_packet_wrapper *s_pkt = NULL;
@@ -215,7 +228,6 @@ static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_
s_pkt->pkt_buff = pkt_buff;
s_pkt->direction = meta->dir;
s_pkt->length = meta->raw_len;
- s_pkt->tcp_pure_contorl = meta->is_tcp_pure_ctrl;
s_pkt->income_time_ns = curr_time.tv_sec * NANO_SECONDS_PER_SEC + curr_time.tv_nsec;
s_pkt->enqueue_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
TAILQ_INSERT_TAIL(&sf->packet_queue, s_pkt, node);
@@ -386,12 +398,12 @@ int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_i
return count;
}
-static void shaper_deposit_token_add(struct shaping_profile_info *pf_info, int req_token, unsigned char direction)
+static void shaper_deposit_token_add(struct shaping_profile_hash_node *pf_hash_node, int req_token_bits, unsigned char direction, int priority)
{
if (direction == SHAPING_DIR_IN) {
- __atomic_add_fetch(&pf_info->in_deposit_token, req_token, __ATOMIC_SEQ_CST);
+ pf_hash_node->in_deposit_token_bits[priority] += req_token_bits;
} else {
- __atomic_add_fetch(&pf_info->out_deposit_token, req_token, __ATOMIC_SEQ_CST);
+ pf_hash_node->out_deposit_token_bits[priority] += req_token_bits;
}
}
@@ -403,7 +415,7 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg
shaper_global_stat_async_callback_inc(arg->ctx->global_stat);
- LOG_INFO("Swarmkv reply type =%d, integer =%llu",reply->type, reply->integer);
+ LOG_INFO("Swarmkv reply type =%d, direction =%d, integer =%llu",reply->type, arg->direction, reply->integer);
if (reply->type != SWARMKV_REPLY_INTEGER) {
shaper_global_stat_async_tconsume_failed_inc(arg->ctx->global_stat);
@@ -411,14 +423,14 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg
}
if (reply->integer < 0) {//profile not exist
- s_pf_info->is_invalid = 1;
+ s_pf_info->hash_node->is_invalid = 1;
goto END;
} else {
- s_pf_info->is_invalid = 0;
+ s_pf_info->hash_node->is_invalid = 0;
}
if (reply->integer > 0) {
- shaper_deposit_token_add(s_pf_info, reply->integer, arg->direction);//deposit tokens to profile
+ shaper_deposit_token_add(s_pf_info->hash_node, reply->integer, arg->direction, s_pf_info->priority);//deposit tokens to profile
}
END:
@@ -435,25 +447,25 @@ END:
return;
}
-static void shaper_deposit_token_sub(struct shaping_profile_info *pf_info, int req_token, unsigned char direction)
+static void shaper_deposit_token_sub(struct shaping_profile_hash_node *pf_hash_node, int req_token_bits, unsigned char direction, int priority)
{
if (direction == SHAPING_DIR_IN) {
- __atomic_sub_fetch(&pf_info->in_deposit_token, req_token, __ATOMIC_SEQ_CST);
+ pf_hash_node->in_deposit_token_bits[priority] -= req_token_bits;
} else {
- __atomic_sub_fetch(&pf_info->out_deposit_token, req_token, __ATOMIC_SEQ_CST);
+ pf_hash_node->out_deposit_token_bits[priority] -= req_token_bits;
}
}
-static int shaper_deposit_token_is_enough(struct shaping_profile_info *pf_info, int req_token, unsigned char direction)
+static int shaper_deposit_token_is_enough(struct shaping_profile_hash_node *pf_hash_node, int req_token_bits, unsigned char direction, int priority)
{
if (direction == SHAPING_DIR_IN) {
- if (__atomic_load_n(&pf_info->in_deposit_token, __ATOMIC_SEQ_CST) >= req_token) {
+ if (pf_hash_node->in_deposit_token_bits[priority] >= req_token_bits) {
return 1;
} else {
return 0;
}
} else {
- if (__atomic_load_n(&pf_info->out_deposit_token, __ATOMIC_SEQ_CST) >= req_token) {
+ if (pf_hash_node->out_deposit_token_bits[priority] >= req_token_bits) {
return 1;
} else {
return 0;
@@ -494,7 +506,7 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
break;
}
- if (pf_info->is_invalid) {
+ if (pf_info->hash_node->is_invalid) {
if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {//for primary, means this rule don't need get token
return SHAPER_TOKEN_GET_SUCCESS;
} else {//for borrowing, means this profile has no token to borrow
@@ -502,8 +514,8 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
}
}
- if (shaper_deposit_token_is_enough(pf_info, req_token_bits, direction)) {
- shaper_deposit_token_sub(pf_info, req_token_bits, direction);
+ if (shaper_deposit_token_is_enough(pf_info->hash_node, req_token_bits, direction, pf_info->priority)) {
+ shaper_deposit_token_sub(pf_info->hash_node, req_token_bits, direction, pf_info->priority);
return SHAPER_TOKEN_GET_SUCCESS;
}
@@ -583,6 +595,23 @@ END:
}
}
+static void shaper_profile_hash_node_update(struct shaping_profile_info *profile)
+{
+ if (profile->hash_node == NULL) {
+ struct shaping_profile_hash_node *hash_node = NULL;
+ HASH_FIND_INT(thread_sp_hashtbl, &profile->id, hash_node);
+ if (hash_node) {
+ profile->hash_node = hash_node;
+ } else {
+ profile->hash_node = (struct shaping_profile_hash_node*)calloc(1, sizeof(struct shaping_profile_hash_node));
+ profile->hash_node->id = profile->id;
+ HASH_ADD_INT(thread_sp_hashtbl, id, profile->hash_node);
+ }
+ }
+
+ return;
+}
+
static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int req_token_bytes,
struct shaping_profile_info *profile, int profile_type, unsigned char direction)
{
@@ -603,21 +632,11 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f
return SHAPER_TOKEN_GET_PASS;//rule is disabled, don't need to get token and forward packet
}
- if (shaper_deposit_token_is_enough(profile, req_token_bytes * 8, direction)) {
- shaper_deposit_token_sub(profile, req_token_bytes * 8, direction);
- return SHAPER_TOKEN_GET_SUCCESS;
- }
+ shaper_profile_hash_node_update(profile);
- if (profile->hash_node == NULL) {
- struct shaping_profile_hash_node *hash_node = NULL;
- HASH_FIND_INT(g_shaping_profile_table, &profile->id, hash_node);
- if (hash_node) {
- profile->hash_node = hash_node;
- } else {
- profile->hash_node = (struct shaping_profile_hash_node*)calloc(1, sizeof(struct shaping_profile_hash_node));
- profile->hash_node->id = profile->id;
- HASH_ADD_INT(g_shaping_profile_table, id, profile->hash_node);
- }
+ if (shaper_deposit_token_is_enough(profile->hash_node, req_token_bytes * 8, direction, profile->priority)) {
+ shaper_deposit_token_sub(profile->hash_node, req_token_bytes * 8, direction, profile->priority);
+ return SHAPER_TOKEN_GET_SUCCESS;
}
struct timespec curr_timespec;
@@ -708,12 +727,6 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi
pkt_wrapper = shaper_first_pkt_get(sf);
assert(pkt_wrapper != NULL);
- if (pkt_wrapper->tcp_pure_contorl) {
- shaper_flow_pop(ctx, sf);
- shaper_stat_forward_all_rule_inc(ctx->stat, sf, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index);
- return SHAPING_FORWARD;
- }
-
if (pf_container[0].pf_type == PROFILE_IN_RULE_TYPE_PRIMARY) {
clock_gettime(CLOCK_MONOTONIC, &curr_time);
if (shaper_pkt_latency_us_calculate(pf_container[0].pf_info, &curr_time) > ctx->conf.pkt_max_delay_time_us) {
@@ -770,11 +783,6 @@ static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shapi
unsigned long long enqueue_time;
int enqueue_success = 0;
- if (meta->is_tcp_pure_ctrl) {
- shaper_stat_forward_all_rule_inc(ctx->stat, sf, meta->dir, meta->raw_len, ctx->thread_index);
- return SHAPING_FORWARD;
- }
-
int ret = shaper_token_consume(ctx, sf, meta->raw_len, profile, profile_type, meta->dir);
if (ret >= SHAPER_TOKEN_GET_SUCCESS) {
if (ret == SHAPER_TOKEN_GET_SUCCESS) {
@@ -886,6 +894,19 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
}
}
+static void shaper_token_consume_force(struct shaping_flow *sf, struct metadata *meta)
+{
+ struct shaping_rule_info *rule;
+
+ for (int i = 0; i < sf->rule_num; i++) {
+ rule = &sf->matched_rule_infos[i];
+ shaper_profile_hash_node_update(&rule->primary);
+ shaper_deposit_token_sub(rule->primary.hash_node, meta->raw_len * 8, meta->dir, rule->primary.priority);
+ }
+
+ return;
+}
+
void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_buff, struct metadata *meta, struct shaping_flow *sf)
{
int shaping_ret;
@@ -896,6 +917,7 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
sf->processed_pkts++;
if (meta->is_tcp_pure_ctrl) {
+ shaper_token_consume_force(sf, meta);
marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1);
shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len);
shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len);
diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp
index a625068..8196ed2 100644
--- a/shaping/test/gtest_shaper.cpp
+++ b/shaping/test/gtest_shaper.cpp
@@ -274,7 +274,7 @@ TEST(single_session, udp_tx_in_order)
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
shaper_global_stat_refresh(ctx->global_stat);
-
+ shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
@@ -330,12 +330,12 @@ TEST(single_session, tcp_tx_in_order)
/*******send packets***********/
send_packets(&ctx->thread_ctx[0], sf, 20, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0);
- send_packets(&ctx->thread_ctx[0], sf, 20, 100, SHAPING_DIR_OUT, &expec_pure_ctl_tx_queue, 1, 1);
+ send_packets(&ctx->thread_ctx[0], sf, 10, 100, SHAPING_DIR_OUT, &expec_pure_ctl_tx_queue, 1, 1);
//first 10 packets
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
- //20 pure ctrl pkts
- ASSERT_EQ(0, judge_packet_eq(&expec_pure_ctl_tx_queue, actual_tx_queue, 20));
+ //10 pure ctrl pkts
+ ASSERT_EQ(0, judge_packet_eq(&expec_pure_ctl_tx_queue, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
@@ -346,6 +346,13 @@ TEST(single_session, tcp_tx_in_order)
stub_refresh_token_bucket(0);
+ for (int i = 0; i < 10; i++) {
+ polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
+ stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
+ }
+ ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));//pure ctrl pkts force consume 1000 tokens, current token: -1000--->0, so no pkt can be sent
+
+ stub_refresh_token_bucket(0);
for (int i = 0; i < 10; i++) {//10 pkts which is not pure control
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
@@ -358,7 +365,7 @@ TEST(single_session, tcp_tx_in_order)
/***********send stat data here********************/
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
-
+ shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
@@ -370,7 +377,7 @@ TEST(single_session, tcp_tx_in_order)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 0, 0, 1, 30, 3000, 0, 10, 0, SHAPING_DIR_OUT, profile_type_primary);
+ shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 10, 0, SHAPING_DIR_OUT, profile_type_primary);
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
shaping_stat_judge(line, 0, 0, 1, 10, 1000, 0, 0, 30000, SHAPING_DIR_OUT, profile_type_primary);
@@ -436,7 +443,7 @@ TEST(single_session, udp_diff_direction)
/***********send stat data here********************/
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
-
+ shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
@@ -515,6 +522,7 @@ TEST(single_session, udp_multi_rules)
/***********send stat data here********************/
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+ shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
@@ -589,6 +597,7 @@ TEST(single_session, udp_borrow)
/***********send stat data here********************/
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+ shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
@@ -663,6 +672,7 @@ TEST(single_session, udp_borrow_same_priority_9)
/***********send stat data here********************/
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+ shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
@@ -771,7 +781,7 @@ TEST(two_session_diff_priority, udp_in_order)
/***********send stat data here********************/
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
-
+ shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
@@ -890,7 +900,7 @@ TEST(two_session_diff_priority, udp_in_order_multi_rule)
/***********send stat data here********************/
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
-
+ shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
@@ -977,6 +987,7 @@ TEST(single_session_async, udp_tx_in_order)
/***********send stat data here********************/
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+ shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
@@ -1038,6 +1049,7 @@ TEST(single_session_async, udp_close_before_async_exec)
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
fieldstat_global_disable_prometheus_endpoint();
+ shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
}
@@ -1127,6 +1139,7 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order)
/***********send stat data here********************/
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+ shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
@@ -1213,6 +1226,7 @@ TEST(two_session_same_rule, udp_tx_in_order)
/***********send stat data here********************/
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+ shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
@@ -1308,6 +1322,7 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order)
shaping_flow_free(&ctx->thread_ctx[0], sf1);
shaping_flow_free(&ctx->thread_ctx[1], sf2);
fieldstat_global_disable_prometheus_endpoint();
+ shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
}
@@ -1407,6 +1422,7 @@ TEST(two_session_diff_priority_same_profile, session_timer_test)
shaping_flow_free(&ctx->thread_ctx[0], sf1);
shaping_flow_free(&ctx->thread_ctx[1], sf2);
fieldstat_global_disable_prometheus_endpoint();
+ shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
}
@@ -1459,6 +1475,7 @@ TEST(statistics, udp_drop_pkt)
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
shaper_global_stat_refresh(ctx->global_stat);
+ shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
@@ -1546,6 +1563,7 @@ TEST(statistics, udp_queueing_pkt)
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
shaper_global_stat_refresh(ctx->global_stat);
+ shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
@@ -1577,6 +1595,6 @@ TEST(statistics, udp_queueing_pkt)
int main(int argc, char **argv)
{
testing::InitGoogleTest(&argc, argv);
- //testing::GTEST_FLAG(filter) = "single_session_async.udp_close_before_async_exec";
+ //testing::GTEST_FLAG(filter) = "single_session.tcp_tx_in_order";
return RUN_ALL_TESTS();
} \ No newline at end of file