diff options
| author | root <[email protected]> | 2024-01-29 08:48:08 +0000 |
|---|---|---|
| committer | root <[email protected]> | 2024-01-29 08:48:08 +0000 |
| commit | fbb3a5e84b175d6249de0afd09a3b66470a47dc8 (patch) | |
| tree | 4b7219e26a6aedfabd76ecb383e5c26ddcb718c0 /shaping/src/shaper.cpp | |
| parent | f9cd8219dc43b5d19da8f421c19c08d65240683d (diff) | |
优化一次取token的放大倍数机制
Diffstat (limited to 'shaping/src/shaper.cpp')
| -rw-r--r-- | shaping/src/shaper.cpp | 73 |
1 files changed, 62 insertions, 11 deletions
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index f1b963a..ac66c5a 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -24,7 +24,9 @@ extern "C" { #include "shaper_global_stat.h" #include "shaper_aqm.h" -#define TOKEN_ENLARGE_TIMES 10//TODO +#define TOKEN_MULTIPLE_UPDATE_INTERVAL_S 1 +#define TOKEN_MULTIPLE_MIN 10 +#define TOKEN_MULTIPLE_MAX 20 #define TOKEN_GET_FAILED_INTERVAL_MS 1 #define HMGET_REQUEST_INTERVAL_MS 10 #define PRIORITY_BLOCK_MIN_TIME_MS 500 @@ -179,7 +181,7 @@ static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_ struct shaping_packet_wrapper *s_pkt = NULL; struct timespec curr_time; - if (sf->queue_len == ctx->conf.session_queue_len_max) {//TODO: profile queue_len??? + if (sf->queue_len == ctx->conf.session_queue_len_max) { return -1; } @@ -411,6 +413,40 @@ static void shaper_deposit_token_add(struct shaping_profile_info *profile, int r *deposit_token += req_token_bits; } +static void shaper_token_multiple_update(struct shaping_profile_info *profile) +{ + if (profile->type != PROFILE_TYPE_GENERIC) { + return; + } + + struct shaper_token_multiple *token_multiple = &profile->hash_node->token_multiple; + int curr_multiple = token_multiple->token_get_multiple; + time_t curr_time_s = time(NULL); + + if (curr_time_s - token_multiple->token_multiple_update_time_s < TOKEN_MULTIPLE_UPDATE_INTERVAL_S) { + return; + } + + token_multiple->token_multiple_update_time_s = curr_time_s; + + if (token_multiple->has_failed_get_token) { + token_multiple->token_get_multiple = (curr_multiple - 1) < TOKEN_MULTIPLE_MIN ? TOKEN_MULTIPLE_MIN : (curr_multiple - 1); + goto END; + } + + if (token_multiple->has_drop_by_queue_full) { + token_multiple->token_get_multiple = (curr_multiple + 1) > TOKEN_MULTIPLE_MAX ? TOKEN_MULTIPLE_MAX : (curr_multiple + 1); + goto END; + } + +END: + LOG_INFO("%s: profile id %d, token_get_multiple %d, has_failed_get_token %d, has_drop_by_queue_full %d", LOG_TAG_SHAPING, profile->id, token_multiple->token_get_multiple, token_multiple->has_failed_get_token, token_multiple->has_drop_by_queue_full); + token_multiple->has_failed_get_token = 0; + token_multiple->has_drop_by_queue_full = 0; + + return; +} + static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg) { struct shaping_tconsume_cb_arg *arg = (struct shaping_tconsume_cb_arg*)cb_arg; @@ -428,7 +464,7 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg shaper_global_stat_async_callback_inc(&ctx->thread_global_stat); shaper_global_stat_tconsume_callback_inc(&ctx->thread_global_stat); - LOG_INFO("Swarmkv reply type =%d, profile_id %d, profile_consume_ref = %d, direction =%d, integer =%llu",reply->type, profile->id, pf_hash_node->tconsume_ref_cnt, arg->direction, reply->integer); + LOG_DEBUG("Swarmkv reply type =%d, profile_id %d, direction =%d, integer =%llu",reply->type, profile->id, arg->direction, reply->integer); if (reply->type != SWARMKV_REPLY_INTEGER) { shaper_global_stat_async_tconsume_failed_inc(&ctx->thread_global_stat); @@ -449,6 +485,11 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg sf->flag |= SESSION_BORROW; } + if (reply->integer == 0 && profile->type == PROFILE_TYPE_GENERIC) { + pf_hash_node->token_multiple.has_failed_get_token = 1; + shaper_token_multiple_update(profile); + } + END: if (profile->type == PROFILE_TYPE_GENERIC) { pf_hash_node->tconsume_ref_cnt--; @@ -546,9 +587,10 @@ static int shaper_deposit_token_is_enough(struct shaping_profile_info *profile, static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *pf_info, int profile_type, int req_token_bits, unsigned char direction, struct timespec *curr_timespec) { struct shaping_tconsume_cb_arg *arg = NULL; + struct shaping_profile_hash_node *pf_hash_node = pf_info->hash_node; char key[32] = {0}; - if (pf_info->hash_node->tconsume_ref_cnt > 0) { + if (pf_hash_node->tconsume_ref_cnt > 0) { goto END; } @@ -564,18 +606,18 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct sheper_global_stat_tconsume_invoke_inc(&ctx->thread_global_stat); sf->ref_cnt++; - pf_info->hash_node->tconsume_ref_cnt++; + pf_hash_node->tconsume_ref_cnt++; switch (pf_info->type) { case PROFILE_TYPE_GENERIC: - swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg); + swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits * pf_hash_node->token_multiple.token_get_multiple, shaper_token_get_cb, arg); break; case PROFILE_TYPE_HOST_FARINESS: case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS: - swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg); + swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits * TOKEN_MULTIPLE_MIN, shaper_token_get_cb, arg); break; case PROFILE_TYPE_SPLIT_BY_LOCAL_HOST: - swarmkv_btconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg); + swarmkv_btconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, req_token_bits * TOKEN_MULTIPLE_MIN, shaper_token_get_cb, arg); break; default: if (arg) { @@ -587,7 +629,7 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct END: swarmkv_caller_loop(ctx->swarmkv_db, SWARMKV_LOOP_NONBLOCK, NULL); - if (pf_info->hash_node->is_invalid) { + if (pf_hash_node->is_invalid) { if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {//for primary, means this rule don't need get token return SHAPER_TOKEN_GET_SUCCESS; } else {//for borrowing, means this profile has no token to borrow @@ -705,6 +747,7 @@ void shaper_profile_hash_node_update(struct shaping_thread_ctx *ctx, struct shap } else { profile->hash_node = (struct shaping_profile_hash_node*)calloc(1, sizeof(struct shaping_profile_hash_node)); profile->hash_node->id = profile->id; + profile->hash_node->token_multiple.token_get_multiple = TOKEN_MULTIPLE_MIN; HASH_ADD_INT(thread_sp_hashtbl, id, profile->hash_node); timeout_init(&profile->hash_node->timeout_handle, TIMEOUT_ABS); timeouts_add(ctx->expires, &profile->hash_node->timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC); @@ -953,8 +996,12 @@ DROP: if (enqueue_success) { shaper_packet_dequeue(sf); } - shaper_stat_drop_inc(&sf->matched_rule_infos[sf->anchor].primary.stat, meta->dir, ctx->thread_index); + struct shaping_profile_info *pf_info = &sf->matched_rule_infos[sf->anchor].primary; + shaper_stat_drop_inc(&pf_info->stat, meta->dir, ctx->thread_index); sf->anchor = 0; + + pf_info->hash_node->token_multiple.has_drop_by_queue_full = 1; + shaper_token_multiple_update(pf_info); return SHAPING_DROP; } @@ -1060,7 +1107,11 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu shaper_stat_queueing_pkt_inc_for_rule(s_rule, meta->dir, ctx->thread_index); shaper_global_stat_queueing_inc(&ctx->thread_global_stat, meta->raw_len); } else { - shaper_stat_drop_inc(&s_rule->primary.stat, meta->dir, ctx->thread_index); + struct shaping_profile_info *pf_info = &s_rule->primary; + pf_info->hash_node->token_multiple.has_drop_by_queue_full = 1; + shaper_token_multiple_update(pf_info); + + shaper_stat_drop_inc(&pf_info->stat, meta->dir, ctx->thread_index); shaper_global_stat_drop_inc(&ctx->thread_global_stat, meta->raw_len); shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, meta->raw_len); marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index); |
