diff options
| -rw-r--r-- | shaping/include/shaper.h | 15 | ||||
| -rw-r--r-- | shaping/src/shaper.cpp | 96 | ||||
| -rw-r--r-- | shaping/src/shaper_maat.cpp | 2 | ||||
| -rw-r--r-- | shaping/test/gtest_shaper.cpp | 6 |
4 files changed, 77 insertions, 42 deletions
diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h index 46bc07f..62f0eb6 100644 --- a/shaping/include/shaper.h +++ b/shaping/include/shaper.h @@ -12,20 +12,16 @@ extern "C" { #include "timeout.h" } -#define SHAPING_DIR_IN 0x1 -#define SHAPING_DIR_OUT 0x2 - #define SHAPING_RULE_NUM_MAX 8 #define SHAPING_REF_PROFILE_NUM_MAX 8 #define SHAPING_PRIORITY_NUM_MAX 10 #define SHAPER_FLOW_POP_NUM_MAX 10 #define SESSION_CLOSE 0x1 -#define SESSION_BORROW 0x2 #define CONFIRM_PRIORITY_PKTS 20 -#define SHAPING_WROK_THREAD_NUM_MAX 128 +#define SHAPING_WROK_THREAD_NUM_MAX 256 #define SHAPING_STAT_REFRESH_INTERVAL_SEC 2 #define SHAPING_STAT_REFRESH_MAX_PER_POLLING 5 @@ -80,6 +76,12 @@ struct shaping_ctx { struct shaping_thread_ctx *thread_ctx; }; +enum shaping_packet_dir { + SHAPING_DIR_IN = 0, + SHAPING_DIR_OUT, + SHAPING_DIR_MAX +}; + enum shaping_packet_action { SHAPING_FORWARD = 0, SHAPING_QUEUED, @@ -130,6 +132,7 @@ struct shaping_profile_hash_node { struct shaper_aqm_blue_para aqm_blue_para; struct shaper_aqm_codel_para aqm_codel_para; unsigned char is_invalid; + unsigned char async_pass[SHAPING_PRIORITY_NUM_MAX][SHAPING_DIR_MAX]; struct timeout timeout_handle; UT_hash_handle hh; }; @@ -138,6 +141,7 @@ struct shaping_profile_info { int id;//profile_id enum shaping_profile_type type; int priority; + unsigned char async_pass[SHAPING_DIR_MAX]; long long in_deposit_token_bits; long long out_deposit_token_bits; long long bidirection_deposit_token_bits; @@ -212,7 +216,6 @@ struct shaping_tconsume_cb_arg { struct shaping_profile_info *profile; struct shaping_flow *sf; unsigned char direction; - unsigned char is_primary_pf; long long start_time_us; }; diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 3ef35c1..60e7acd 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -28,7 +28,7 @@ extern "C" { #define TOKEN_MULTIPLE_DEFAULT 10 #define TOKEN_GET_FAILED_INTERVAL_MS 1 #define HMGET_REQUEST_INTERVAL_MS 10 -#define PRIORITY_BLOCK_MIN_TIME_MS 500 +#define PRIORITY_BLOCK_MIN_TIME_MS 50 #define PROFILE_HASH_NODE_REFRESH_MS 500 @@ -383,6 +383,35 @@ int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_i return count; } +static void shaper_profile_async_pass_set(struct shaping_profile_info *profile, unsigned char direction, int priority, int async_pass_enabled) +{ + struct shaping_profile_hash_node *pf_hash_node = profile->hash_node; + unsigned char *async_pass = NULL; + + if (profile->type == PROFILE_TYPE_GENERIC) { + async_pass = &pf_hash_node->async_pass[priority][direction]; + } else { + async_pass = &profile->async_pass[direction]; + } + + if (*async_pass != async_pass_enabled) { + *async_pass = async_pass_enabled; + } + + return; +} + +static int shaper_profile_async_pass_get(struct shaping_profile_info *profile, unsigned char direction, int priority) +{ + struct shaping_profile_hash_node *pf_hash_node = profile->hash_node; + + if (profile->type == PROFILE_TYPE_GENERIC) { + return pf_hash_node->async_pass[priority][direction]; + } else { + return profile->async_pass[direction]; + } +} + static void shaper_deposit_token_add(struct shaping_profile_info *profile, int req_token_bits, unsigned char direction, int priority) { long long *deposit_token; @@ -415,6 +444,9 @@ static void shaper_deposit_token_add(struct shaping_profile_info *profile, int r } *deposit_token += req_token_bits; + if (*deposit_token > 0) { + shaper_profile_async_pass_set(profile, direction, priority, 1); + } } static void shaper_token_multiple_update(struct shaping_thread_ctx *ctx, struct shaping_profile_info *profile) @@ -490,17 +522,13 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg shaper_deposit_token_add(profile, reply->integer, arg->direction, profile->priority);//deposit tokens to profile } - if (arg->is_primary_pf) { - if (reply->integer > 0) { - sf->flag &= (~SESSION_BORROW); - } else { - sf->flag |= SESSION_BORROW; - } - } + if (reply->integer == 0) { + shaper_profile_async_pass_set(profile, arg->direction, profile->priority, 0); - if (reply->integer == 0 && profile->type == PROFILE_TYPE_GENERIC) { + if (profile->type == PROFILE_TYPE_GENERIC) { pf_hash_node->token_multiple.has_failed_get_token = 1; shaper_token_multiple_update(ctx, profile); + } } END: @@ -527,10 +555,11 @@ END: return; } -static int shaper_deposit_token_get(struct shaping_profile_info *profile, int req_token_bits, unsigned char direction, int priority, int force) +static int shaper_deposit_token_get(struct shaping_profile_info *profile, int req_token_bits, unsigned char direction, int priority, int force, int *need_get_token) { long long *deposit_token; struct shaping_profile_hash_node *pf_hash_node = profile->hash_node; + int ret = -1; switch (profile->type) { case PROFILE_TYPE_GENERIC: @@ -558,17 +587,16 @@ static int shaper_deposit_token_get(struct shaping_profile_info *profile, int re return 0; } - if (force) { + if (force || *deposit_token >= req_token_bits) { *deposit_token -= req_token_bits; - return 0; + ret = 0; } - if (*deposit_token >= req_token_bits) { - *deposit_token -= req_token_bits; - return 0; + if (*deposit_token <= 0) { + *need_get_token = 1; } - return -1; + return ret; } static void shaper_profile_hash_node_refresh(struct shaping_thread_ctx *ctx, struct shaping_profile_hash_node *pf_hash_node, struct timespec *curr_timespec) @@ -589,7 +617,7 @@ static void shaper_profile_hash_node_refresh(struct shaping_thread_ctx *ctx, str return; } -static void shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *pf_info, int profile_type, int req_token_bits, unsigned char direction, struct timespec *curr_timespec) +static void shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *pf_info, int req_token_bits, unsigned char direction, struct timespec *curr_timespec) { struct shaping_tconsume_cb_arg *arg = NULL; struct shaping_profile_hash_node *pf_hash_node = pf_info->hash_node; @@ -613,9 +641,6 @@ static void shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct arg->sf = sf; arg->direction = direction; arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; - if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) { - arg->is_primary_pf = 1; - } shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat); sheper_global_stat_tconsume_invoke_inc(&ctx->thread_global_stat); @@ -733,7 +758,8 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st END: if (curr_time_ms - profile->hash_node->priority_blocked_time_ms[priority] < PRIORITY_BLOCK_MIN_TIME_MS) { - sf->flag |= SESSION_BORROW; + shaper_profile_async_pass_set(profile, SHAPING_DIR_OUT, priority, 0); + shaper_profile_async_pass_set(profile, SHAPING_DIR_IN, priority, 0); return 1; } else { return 0; @@ -784,11 +810,9 @@ static int shaping_swarmkv_is_too_short_interval(long long curr_time_ms, struct static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, int profile_type, int req_token_bytes, unsigned char direction, struct timespec *curr_timespec) { - if (profile_type == PROFILE_IN_RULE_TYPE_BORROW && !(sf->flag & SESSION_BORROW)) {//TODO: 会减慢swarmkv请求速度 - return SHAPER_TOKEN_GET_FAILED; - } - struct shaping_rule_info *rule = &sf->matched_rule_infos[sf->anchor]; + int need_get_token = 0; + int ret = SHAPER_TOKEN_GET_FAILED; time_t curr_time = time(NULL); if (curr_time - sf->check_rule_time >= ctx->conf.check_rule_enable_interval_sec) { @@ -805,27 +829,34 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f return SHAPER_TOKEN_GET_PASS;//rule is disabled, don't need to get token and forward packet } - if (shaper_deposit_token_get(profile, req_token_bytes * 8, direction, profile->priority, 0) == 0) { - return SHAPER_TOKEN_GET_SUCCESS; + if (shaper_profile_async_pass_get(profile, direction, profile->priority) == 1) { + shaper_deposit_token_get(profile, req_token_bytes * 8, direction, profile->priority, 1, &need_get_token); + ret = SHAPER_TOKEN_GET_SUCCESS; + } else if (shaper_deposit_token_get(profile, req_token_bytes * 8, direction, profile->priority, 0, &need_get_token) == 0) { + ret = SHAPER_TOKEN_GET_SUCCESS; + } + + if (!need_get_token) { + return ret; } long long curr_time_ms = curr_timespec->tv_sec * MILLI_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MILLI_SEC; if (shaping_swarmkv_is_too_short_interval(curr_time_ms, profile)) { - return SHAPER_TOKEN_GET_FAILED; + return ret; } if (shaper_profile_is_priority_blocked(ctx, sf, profile, curr_timespec, curr_time_ms)) { - return SHAPER_TOKEN_GET_FAILED; + return ret; } int req_token_bits = req_token_bytes * 8; - shaper_token_get_from_profile(ctx, sf, profile, profile_type, req_token_bits, direction, curr_timespec); + shaper_token_get_from_profile(ctx, sf, profile, req_token_bits, direction, curr_timespec); if (profile->hash_node->is_invalid && profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {//for primary, means this rule don't need get token return SHAPER_TOKEN_GET_SUCCESS; } else { - return SHAPER_TOKEN_GET_FAILED; + return ret; } } @@ -1078,13 +1109,14 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ static void shaper_token_consume_force(struct shaping_flow *sf, struct metadata *meta) { struct shaping_rule_info *rule; + int need_get_token = 0; for (int i = 0; i < sf->rule_num; i++) { rule = &sf->matched_rule_infos[i]; if (!rule->is_enabled) { continue; } - shaper_deposit_token_get(&rule->primary, meta->raw_len * 8, meta->dir, rule->primary.priority, 1); + shaper_deposit_token_get(&rule->primary, meta->raw_len * 8, meta->dir, rule->primary.priority, 1, &need_get_token); } return; diff --git a/shaping/src/shaper_maat.cpp b/shaping/src/shaper_maat.cpp index cac0d3d..ea91a1d 100644 --- a/shaping/src/shaper_maat.cpp +++ b/shaping/src/shaper_maat.cpp @@ -259,7 +259,7 @@ void shaper_profile_ex_new(const char *table_name, int table_id, const char *key tmp_obj = cJSON_GetObjectItem(json, "algorithm"); - if (!tmp_obj) { + if (!tmp_obj || tmp_obj->type != cJSON_String || !tmp_obj->valuestring) { LOG_ERROR("%s: json parse algorithm failed for profile id %d, line %s", LOG_TAG_MAAT, s_pf->id, table_line); goto END; } diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp index c0a0711..e735d7f 100644 --- a/shaping/test/gtest_shaper.cpp +++ b/shaping/test/gtest_shaper.cpp @@ -429,7 +429,7 @@ TEST(max_min_host_fairness_profile, udp_tx_in_order) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 180000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts + shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); @@ -599,7 +599,7 @@ TEST(single_session, udp_diff_direction) ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 21000, SHAPING_DIR_OUT, profile_type_primary); - shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 22000, SHAPING_DIR_IN, profile_type_primary); + shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 21000, SHAPING_DIR_IN, profile_type_primary); fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); @@ -678,7 +678,7 @@ TEST(single_session, udp_multi_rules) shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 507000, SHAPING_DIR_OUT, profile_type_primary); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1 - shaping_stat_judge(line, 1, 1, 1, 100, 10000, 0, 0, 2000, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 1, 1, 1, 100, 10000, 0, 0, 1000, SHAPING_DIR_OUT, profile_type_primary); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2 shaping_stat_judge(line, 2, 2, 1, 100, 10000, 0, 0, 91000, SHAPING_DIR_OUT, profile_type_primary);//max latency is first queued pkt |
