diff options
| author | 刘畅 <[email protected]> | 2023-11-13 09:50:12 +0000 |
|---|---|---|
| committer | 刘畅 <[email protected]> | 2023-11-13 09:50:12 +0000 |
| commit | 5b20b8ffa36252a676caefe731886f3c6898905a (patch) | |
| tree | 8fef799b5d8cc04d2ef54a72a2bb1eaae66d81a3 | |
| parent | f1c9565d486fe9a278996a3cfd8be6f77cf7a977 (diff) | |
| parent | a1609051ec2e8a11cdf13377efb133c2f2b603e0 (diff) | |
Merge branch 'tcp_pure_ctrl_consume_token_force' into 'rel'v1.3.8
tcp pure ctrl packet force consume token and forward directly
See merge request tango/shaping-engine!51
| -rw-r--r-- | shaping/include/shaper.h | 6 | ||||
| -rw-r--r-- | shaping/src/main.cpp | 2 | ||||
| -rw-r--r-- | shaping/src/shaper.cpp | 108 | ||||
| -rw-r--r-- | shaping/test/gtest_shaper.cpp | 38 |
4 files changed, 96 insertions, 58 deletions
diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h index d130222..f61d573 100644 --- a/shaping/include/shaper.h +++ b/shaping/include/shaper.h @@ -93,11 +93,7 @@ struct shaping_profile_info { int id;//profile_id enum shaping_profile_type type; int priority; - int in_deposit_token; - int out_deposit_token; unsigned long long enqueue_time_us;//to calculate max latency - unsigned char is_priority_blocked; - unsigned char is_invalid; struct shaping_stat_for_profile stat; struct shaping_profile_hash_node *hash_node; }; @@ -118,7 +114,6 @@ struct shaping_packet_wrapper { unsigned long long enqueue_time_us;//first enqueue time unsigned int length; unsigned char direction; - unsigned char tcp_pure_contorl; TAILQ_ENTRY(shaping_packet_wrapper) node; }; TAILQ_HEAD(delay_queue, shaping_packet_wrapper); @@ -187,3 +182,4 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu struct shaping_ctx *shaping_engine_init(); void shaping_engine_destroy(struct shaping_ctx *ctx); +void shaper_thread_resource_clear(); diff --git a/shaping/src/main.cpp b/shaping/src/main.cpp index b7807f7..89cb176 100644 --- a/shaping/src/main.cpp +++ b/shaping/src/main.cpp @@ -43,6 +43,8 @@ static void *shaper_thread_loop(void *data) marsio_poll_wait(ctx->marsio_info->instance, &ctx->marsio_info->mr_dev, 1, ctx->thread_index, ctx->global_stat->output_interval_s * 1000); } + shaper_thread_resource_clear(); + return NULL; } diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 1013d0f..bef1b56 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -78,6 +78,8 @@ enum shaper_token_get_result { struct shaping_profile_hash_node { int id; + int in_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX]; + int out_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX]; long long last_failed_get_token_ms; long long last_hmget_ms[SHAPING_PRIORITY_NUM_MAX]; unsigned char is_priority_blocked[SHAPING_PRIORITY_NUM_MAX]; @@ -85,7 +87,7 @@ struct shaping_profile_hash_node { UT_hash_handle hh; }; -struct shaping_profile_hash_node *g_shaping_profile_table = NULL; +thread_local struct shaping_profile_hash_node *thread_sp_hashtbl = NULL; struct shaper* shaper_new(unsigned int priority_queue_len_max) { @@ -196,6 +198,17 @@ void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) return; } +void shaper_thread_resource_clear() +{ + struct shaping_profile_hash_node *thread_sp_hashtbl_tmp = NULL; + struct shaping_profile_hash_node *node = NULL; + + HASH_ITER(hh, thread_sp_hashtbl, node, thread_sp_hashtbl_tmp) { + HASH_DEL(thread_sp_hashtbl, node); + free(node); + } +} + static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, void *pkt_buff, struct metadata *meta) { struct shaping_packet_wrapper *s_pkt = NULL; @@ -215,7 +228,6 @@ static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_ s_pkt->pkt_buff = pkt_buff; s_pkt->direction = meta->dir; s_pkt->length = meta->raw_len; - s_pkt->tcp_pure_contorl = meta->is_tcp_pure_ctrl; s_pkt->income_time_ns = curr_time.tv_sec * NANO_SECONDS_PER_SEC + curr_time.tv_nsec; s_pkt->enqueue_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; TAILQ_INSERT_TAIL(&sf->packet_queue, s_pkt, node); @@ -386,12 +398,12 @@ int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_i return count; } -static void shaper_deposit_token_add(struct shaping_profile_info *pf_info, int req_token, unsigned char direction) +static void shaper_deposit_token_add(struct shaping_profile_hash_node *pf_hash_node, int req_token_bits, unsigned char direction, int priority) { if (direction == SHAPING_DIR_IN) { - __atomic_add_fetch(&pf_info->in_deposit_token, req_token, __ATOMIC_SEQ_CST); + pf_hash_node->in_deposit_token_bits[priority] += req_token_bits; } else { - __atomic_add_fetch(&pf_info->out_deposit_token, req_token, __ATOMIC_SEQ_CST); + pf_hash_node->out_deposit_token_bits[priority] += req_token_bits; } } @@ -403,7 +415,7 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg shaper_global_stat_async_callback_inc(arg->ctx->global_stat); - LOG_INFO("Swarmkv reply type =%d, integer =%llu",reply->type, reply->integer); + LOG_INFO("Swarmkv reply type =%d, direction =%d, integer =%llu",reply->type, arg->direction, reply->integer); if (reply->type != SWARMKV_REPLY_INTEGER) { shaper_global_stat_async_tconsume_failed_inc(arg->ctx->global_stat); @@ -411,14 +423,14 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg } if (reply->integer < 0) {//profile not exist - s_pf_info->is_invalid = 1; + s_pf_info->hash_node->is_invalid = 1; goto END; } else { - s_pf_info->is_invalid = 0; + s_pf_info->hash_node->is_invalid = 0; } if (reply->integer > 0) { - shaper_deposit_token_add(s_pf_info, reply->integer, arg->direction);//deposit tokens to profile + shaper_deposit_token_add(s_pf_info->hash_node, reply->integer, arg->direction, s_pf_info->priority);//deposit tokens to profile } END: @@ -435,25 +447,25 @@ END: return; } -static void shaper_deposit_token_sub(struct shaping_profile_info *pf_info, int req_token, unsigned char direction) +static void shaper_deposit_token_sub(struct shaping_profile_hash_node *pf_hash_node, int req_token_bits, unsigned char direction, int priority) { if (direction == SHAPING_DIR_IN) { - __atomic_sub_fetch(&pf_info->in_deposit_token, req_token, __ATOMIC_SEQ_CST); + pf_hash_node->in_deposit_token_bits[priority] -= req_token_bits; } else { - __atomic_sub_fetch(&pf_info->out_deposit_token, req_token, __ATOMIC_SEQ_CST); + pf_hash_node->out_deposit_token_bits[priority] -= req_token_bits; } } -static int shaper_deposit_token_is_enough(struct shaping_profile_info *pf_info, int req_token, unsigned char direction) +static int shaper_deposit_token_is_enough(struct shaping_profile_hash_node *pf_hash_node, int req_token_bits, unsigned char direction, int priority) { if (direction == SHAPING_DIR_IN) { - if (__atomic_load_n(&pf_info->in_deposit_token, __ATOMIC_SEQ_CST) >= req_token) { + if (pf_hash_node->in_deposit_token_bits[priority] >= req_token_bits) { return 1; } else { return 0; } } else { - if (__atomic_load_n(&pf_info->out_deposit_token, __ATOMIC_SEQ_CST) >= req_token) { + if (pf_hash_node->out_deposit_token_bits[priority] >= req_token_bits) { return 1; } else { return 0; @@ -494,7 +506,7 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct break; } - if (pf_info->is_invalid) { + if (pf_info->hash_node->is_invalid) { if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {//for primary, means this rule don't need get token return SHAPER_TOKEN_GET_SUCCESS; } else {//for borrowing, means this profile has no token to borrow @@ -502,8 +514,8 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct } } - if (shaper_deposit_token_is_enough(pf_info, req_token_bits, direction)) { - shaper_deposit_token_sub(pf_info, req_token_bits, direction); + if (shaper_deposit_token_is_enough(pf_info->hash_node, req_token_bits, direction, pf_info->priority)) { + shaper_deposit_token_sub(pf_info->hash_node, req_token_bits, direction, pf_info->priority); return SHAPER_TOKEN_GET_SUCCESS; } @@ -583,6 +595,23 @@ END: } } +static void shaper_profile_hash_node_update(struct shaping_profile_info *profile) +{ + if (profile->hash_node == NULL) { + struct shaping_profile_hash_node *hash_node = NULL; + HASH_FIND_INT(thread_sp_hashtbl, &profile->id, hash_node); + if (hash_node) { + profile->hash_node = hash_node; + } else { + profile->hash_node = (struct shaping_profile_hash_node*)calloc(1, sizeof(struct shaping_profile_hash_node)); + profile->hash_node->id = profile->id; + HASH_ADD_INT(thread_sp_hashtbl, id, profile->hash_node); + } + } + + return; +} + static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int req_token_bytes, struct shaping_profile_info *profile, int profile_type, unsigned char direction) { @@ -603,21 +632,11 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f return SHAPER_TOKEN_GET_PASS;//rule is disabled, don't need to get token and forward packet } - if (shaper_deposit_token_is_enough(profile, req_token_bytes * 8, direction)) { - shaper_deposit_token_sub(profile, req_token_bytes * 8, direction); - return SHAPER_TOKEN_GET_SUCCESS; - } + shaper_profile_hash_node_update(profile); - if (profile->hash_node == NULL) { - struct shaping_profile_hash_node *hash_node = NULL; - HASH_FIND_INT(g_shaping_profile_table, &profile->id, hash_node); - if (hash_node) { - profile->hash_node = hash_node; - } else { - profile->hash_node = (struct shaping_profile_hash_node*)calloc(1, sizeof(struct shaping_profile_hash_node)); - profile->hash_node->id = profile->id; - HASH_ADD_INT(g_shaping_profile_table, id, profile->hash_node); - } + if (shaper_deposit_token_is_enough(profile->hash_node, req_token_bytes * 8, direction, profile->priority)) { + shaper_deposit_token_sub(profile->hash_node, req_token_bytes * 8, direction, profile->priority); + return SHAPER_TOKEN_GET_SUCCESS; } struct timespec curr_timespec; @@ -708,12 +727,6 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi pkt_wrapper = shaper_first_pkt_get(sf); assert(pkt_wrapper != NULL); - if (pkt_wrapper->tcp_pure_contorl) { - shaper_flow_pop(ctx, sf); - shaper_stat_forward_all_rule_inc(ctx->stat, sf, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index); - return SHAPING_FORWARD; - } - if (pf_container[0].pf_type == PROFILE_IN_RULE_TYPE_PRIMARY) { clock_gettime(CLOCK_MONOTONIC, &curr_time); if (shaper_pkt_latency_us_calculate(pf_container[0].pf_info, &curr_time) > ctx->conf.pkt_max_delay_time_us) { @@ -770,11 +783,6 @@ static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shapi unsigned long long enqueue_time; int enqueue_success = 0; - if (meta->is_tcp_pure_ctrl) { - shaper_stat_forward_all_rule_inc(ctx->stat, sf, meta->dir, meta->raw_len, ctx->thread_index); - return SHAPING_FORWARD; - } - int ret = shaper_token_consume(ctx, sf, meta->raw_len, profile, profile_type, meta->dir); if (ret >= SHAPER_TOKEN_GET_SUCCESS) { if (ret == SHAPER_TOKEN_GET_SUCCESS) { @@ -886,6 +894,19 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ } } +static void shaper_token_consume_force(struct shaping_flow *sf, struct metadata *meta) +{ + struct shaping_rule_info *rule; + + for (int i = 0; i < sf->rule_num; i++) { + rule = &sf->matched_rule_infos[i]; + shaper_profile_hash_node_update(&rule->primary); + shaper_deposit_token_sub(rule->primary.hash_node, meta->raw_len * 8, meta->dir, rule->primary.priority); + } + + return; +} + void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_buff, struct metadata *meta, struct shaping_flow *sf) { int shaping_ret; @@ -896,6 +917,7 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu sf->processed_pkts++; if (meta->is_tcp_pure_ctrl) { + shaper_token_consume_force(sf, meta); marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1); shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len); shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len); diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp index a625068..8196ed2 100644 --- a/shaping/test/gtest_shaper.cpp +++ b/shaping/test/gtest_shaper.cpp @@ -274,7 +274,7 @@ TEST(single_session, udp_tx_in_order) fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy shaper_global_stat_refresh(ctx->global_stat); - + shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); @@ -330,12 +330,12 @@ TEST(single_session, tcp_tx_in_order) /*******send packets***********/ send_packets(&ctx->thread_ctx[0], sf, 20, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); - send_packets(&ctx->thread_ctx[0], sf, 20, 100, SHAPING_DIR_OUT, &expec_pure_ctl_tx_queue, 1, 1); + send_packets(&ctx->thread_ctx[0], sf, 10, 100, SHAPING_DIR_OUT, &expec_pure_ctl_tx_queue, 1, 1); //first 10 packets ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); - //20 pure ctrl pkts - ASSERT_EQ(0, judge_packet_eq(&expec_pure_ctl_tx_queue, actual_tx_queue, 20)); + //10 pure ctrl pkts + ASSERT_EQ(0, judge_packet_eq(&expec_pure_ctl_tx_queue, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); @@ -346,6 +346,13 @@ TEST(single_session, tcp_tx_in_order) stub_refresh_token_bucket(0); + for (int i = 0; i < 10; i++) { + polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); + stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); + } + ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));//pure ctrl pkts force consume 1000 tokens, current token: -1000--->0, so no pkt can be sent + + stub_refresh_token_bucket(0); for (int i = 0; i < 10; i++) {//10 pkts which is not pure control polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); @@ -358,7 +365,7 @@ TEST(single_session, tcp_tx_in_order) /***********send stat data here********************/ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy - + shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); @@ -370,7 +377,7 @@ TEST(single_session, tcp_tx_in_order) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 30, 3000, 0, 10, 0, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 10, 0, SHAPING_DIR_OUT, profile_type_primary); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); shaping_stat_judge(line, 0, 0, 1, 10, 1000, 0, 0, 30000, SHAPING_DIR_OUT, profile_type_primary); @@ -436,7 +443,7 @@ TEST(single_session, udp_diff_direction) /***********send stat data here********************/ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy - + shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); @@ -515,6 +522,7 @@ TEST(single_session, udp_multi_rules) /***********send stat data here********************/ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); @@ -589,6 +597,7 @@ TEST(single_session, udp_borrow) /***********send stat data here********************/ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); @@ -663,6 +672,7 @@ TEST(single_session, udp_borrow_same_priority_9) /***********send stat data here********************/ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); @@ -771,7 +781,7 @@ TEST(two_session_diff_priority, udp_in_order) /***********send stat data here********************/ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy - + shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); @@ -890,7 +900,7 @@ TEST(two_session_diff_priority, udp_in_order_multi_rule) /***********send stat data here********************/ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy - + shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); @@ -977,6 +987,7 @@ TEST(single_session_async, udp_tx_in_order) /***********send stat data here********************/ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); @@ -1038,6 +1049,7 @@ TEST(single_session_async, udp_close_before_async_exec) ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); fieldstat_global_disable_prometheus_endpoint(); + shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); } @@ -1127,6 +1139,7 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order) /***********send stat data here********************/ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); @@ -1213,6 +1226,7 @@ TEST(two_session_same_rule, udp_tx_in_order) /***********send stat data here********************/ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); @@ -1308,6 +1322,7 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order) shaping_flow_free(&ctx->thread_ctx[0], sf1); shaping_flow_free(&ctx->thread_ctx[1], sf2); fieldstat_global_disable_prometheus_endpoint(); + shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); } @@ -1407,6 +1422,7 @@ TEST(two_session_diff_priority_same_profile, session_timer_test) shaping_flow_free(&ctx->thread_ctx[0], sf1); shaping_flow_free(&ctx->thread_ctx[1], sf2); fieldstat_global_disable_prometheus_endpoint(); + shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); } @@ -1459,6 +1475,7 @@ TEST(statistics, udp_drop_pkt) fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy shaper_global_stat_refresh(ctx->global_stat); + shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); @@ -1546,6 +1563,7 @@ TEST(statistics, udp_queueing_pkt) fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy shaper_global_stat_refresh(ctx->global_stat); + shaper_thread_resource_clear(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); @@ -1577,6 +1595,6 @@ TEST(statistics, udp_queueing_pkt) int main(int argc, char **argv) { testing::InitGoogleTest(&argc, argv); - //testing::GTEST_FLAG(filter) = "single_session_async.udp_close_before_async_exec"; + //testing::GTEST_FLAG(filter) = "single_session.tcp_tx_in_order"; return RUN_ALL_TESTS(); }
\ No newline at end of file |
