diff options
| author | root <[email protected]> | 2024-01-29 08:48:08 +0000 |
|---|---|---|
| committer | root <[email protected]> | 2024-01-29 08:48:08 +0000 |
| commit | fbb3a5e84b175d6249de0afd09a3b66470a47dc8 (patch) | |
| tree | 4b7219e26a6aedfabd76ecb383e5c26ddcb718c0 /shaping | |
| parent | f9cd8219dc43b5d19da8f421c19c08d65240683d (diff) | |
优化一次取token的放大倍数机制
Diffstat (limited to 'shaping')
| -rw-r--r-- | shaping/include/shaper.h | 12 | ||||
| -rw-r--r-- | shaping/include/shaper_stat.h | 1 | ||||
| -rw-r--r-- | shaping/src/shaper.cpp | 73 | ||||
| -rw-r--r-- | shaping/src/shaper_aqm.cpp | 32 | ||||
| -rw-r--r-- | shaping/src/shaper_maat.cpp | 2 | ||||
| -rw-r--r-- | shaping/src/shaper_stat.cpp | 32 |
6 files changed, 125 insertions, 27 deletions
diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h index b17dd40..fd16e34 100644 --- a/shaping/include/shaper.h +++ b/shaping/include/shaper.h @@ -107,6 +107,13 @@ struct shaper_aqm_blue_para { int probability; }; +struct shaper_token_multiple { + int token_get_multiple; + unsigned char has_drop_by_queue_full; + unsigned char has_failed_get_token; + time_t token_multiple_update_time_s; +}; + struct shaping_profile_hash_node { int id; enum shaper_aqm_type aqm_type; @@ -120,6 +127,7 @@ struct shaping_profile_hash_node { long long priority_blocked_time_ms[SHAPING_PRIORITY_NUM_MAX]; int hmget_ref_cnt; int tconsume_ref_cnt; + struct shaper_token_multiple token_multiple; struct shaper_aqm_blue_para aqm_blue_para; unsigned char is_invalid; struct timeout timeout_handle; @@ -143,7 +151,7 @@ struct shaping_rule_info { int id;//rule_id int fair_factor; struct shaping_profile_info primary; - struct shaping_profile_info borrowing[SHAPING_REF_PROFILE_NUM_MAX]; + struct shaping_profile_info borrowing[SHAPING_REF_PROFILE_NUM_MAX - 1]; int borrowing_num; int is_enabled; }; @@ -154,8 +162,8 @@ struct shaping_packet_wrapper { unsigned long long enqueue_time_us;//first enqueue time unsigned int length; int rule_anchor; + int aqm_processed_pf_ids[SHAPING_REF_PROFILE_NUM_MAX]; unsigned char direction; - unsigned char aqm_processed; TAILQ_ENTRY(shaping_packet_wrapper) node; }; TAILQ_HEAD(delay_queue, shaping_packet_wrapper); diff --git a/shaping/include/shaper_stat.h b/shaping/include/shaper_stat.h index 132049c..5c720a3 100644 --- a/shaping/include/shaper_stat.h +++ b/shaping/include/shaper_stat.h @@ -37,6 +37,7 @@ struct shaping_stat_for_profile_dir { struct shaping_stat_for_profile { struct shaping_stat_for_profile_dir in; struct shaping_stat_for_profile_dir out; + long long priority_queue_len; }; struct shaping_stat { diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index f1b963a..ac66c5a 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -24,7 +24,9 @@ extern "C" { #include "shaper_global_stat.h" #include "shaper_aqm.h" -#define TOKEN_ENLARGE_TIMES 10//TODO +#define TOKEN_MULTIPLE_UPDATE_INTERVAL_S 1 +#define TOKEN_MULTIPLE_MIN 10 +#define TOKEN_MULTIPLE_MAX 20 #define TOKEN_GET_FAILED_INTERVAL_MS 1 #define HMGET_REQUEST_INTERVAL_MS 10 #define PRIORITY_BLOCK_MIN_TIME_MS 500 @@ -179,7 +181,7 @@ static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_ struct shaping_packet_wrapper *s_pkt = NULL; struct timespec curr_time; - if (sf->queue_len == ctx->conf.session_queue_len_max) {//TODO: profile queue_len??? + if (sf->queue_len == ctx->conf.session_queue_len_max) { return -1; } @@ -411,6 +413,40 @@ static void shaper_deposit_token_add(struct shaping_profile_info *profile, int r *deposit_token += req_token_bits; } +static void shaper_token_multiple_update(struct shaping_profile_info *profile) +{ + if (profile->type != PROFILE_TYPE_GENERIC) { + return; + } + + struct shaper_token_multiple *token_multiple = &profile->hash_node->token_multiple; + int curr_multiple = token_multiple->token_get_multiple; + time_t curr_time_s = time(NULL); + + if (curr_time_s - token_multiple->token_multiple_update_time_s < TOKEN_MULTIPLE_UPDATE_INTERVAL_S) { + return; + } + + token_multiple->token_multiple_update_time_s = curr_time_s; + + if (token_multiple->has_failed_get_token) { + token_multiple->token_get_multiple = (curr_multiple - 1) < TOKEN_MULTIPLE_MIN ? TOKEN_MULTIPLE_MIN : (curr_multiple - 1); + goto END; + } + + if (token_multiple->has_drop_by_queue_full) { + token_multiple->token_get_multiple = (curr_multiple + 1) > TOKEN_MULTIPLE_MAX ? TOKEN_MULTIPLE_MAX : (curr_multiple + 1); + goto END; + } + +END: + LOG_INFO("%s: profile id %d, token_get_multiple %d, has_failed_get_token %d, has_drop_by_queue_full %d", LOG_TAG_SHAPING, profile->id, token_multiple->token_get_multiple, token_multiple->has_failed_get_token, token_multiple->has_drop_by_queue_full); + token_multiple->has_failed_get_token = 0; + token_multiple->has_drop_by_queue_full = 0; + + return; +} + static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg) { struct shaping_tconsume_cb_arg *arg = (struct shaping_tconsume_cb_arg*)cb_arg; @@ -428,7 +464,7 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg shaper_global_stat_async_callback_inc(&ctx->thread_global_stat); shaper_global_stat_tconsume_callback_inc(&ctx->thread_global_stat); - LOG_INFO("Swarmkv reply type =%d, profile_id %d, profile_consume_ref = %d, direction =%d, integer =%llu",reply->type, profile->id, pf_hash_node->tconsume_ref_cnt, arg->direction, reply->integer); + LOG_DEBUG("Swarmkv reply type =%d, profile_id %d, direction =%d, integer =%llu",reply->type, profile->id, arg->direction, reply->integer); if (reply->type != SWARMKV_REPLY_INTEGER) { shaper_global_stat_async_tconsume_failed_inc(&ctx->thread_global_stat); @@ -449,6 +485,11 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg sf->flag |= SESSION_BORROW; } + if (reply->integer == 0 && profile->type == PROFILE_TYPE_GENERIC) { + pf_hash_node->token_multiple.has_failed_get_token = 1; + shaper_token_multiple_update(profile); + } + END: if (profile->type == PROFILE_TYPE_GENERIC) { pf_hash_node->tconsume_ref_cnt--; @@ -546,9 +587,10 @@ static int shaper_deposit_token_is_enough(struct shaping_profile_info *profile, static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *pf_info, int profile_type, int req_token_bits, unsigned char direction, struct timespec *curr_timespec) { struct shaping_tconsume_cb_arg *arg = NULL; + struct shaping_profile_hash_node *pf_hash_node = pf_info->hash_node; char key[32] = {0}; - if (pf_info->hash_node->tconsume_ref_cnt > 0) { + if (pf_hash_node->tconsume_ref_cnt > 0) { goto END; } @@ -564,18 +606,18 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct sheper_global_stat_tconsume_invoke_inc(&ctx->thread_global_stat); sf->ref_cnt++; - pf_info->hash_node->tconsume_ref_cnt++; + pf_hash_node->tconsume_ref_cnt++; switch (pf_info->type) { case PROFILE_TYPE_GENERIC: - swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg); + swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits * pf_hash_node->token_multiple.token_get_multiple, shaper_token_get_cb, arg); break; case PROFILE_TYPE_HOST_FARINESS: case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS: - swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg); + swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits * TOKEN_MULTIPLE_MIN, shaper_token_get_cb, arg); break; case PROFILE_TYPE_SPLIT_BY_LOCAL_HOST: - swarmkv_btconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg); + swarmkv_btconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, req_token_bits * TOKEN_MULTIPLE_MIN, shaper_token_get_cb, arg); break; default: if (arg) { @@ -587,7 +629,7 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct END: swarmkv_caller_loop(ctx->swarmkv_db, SWARMKV_LOOP_NONBLOCK, NULL); - if (pf_info->hash_node->is_invalid) { + if (pf_hash_node->is_invalid) { if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {//for primary, means this rule don't need get token return SHAPER_TOKEN_GET_SUCCESS; } else {//for borrowing, means this profile has no token to borrow @@ -705,6 +747,7 @@ void shaper_profile_hash_node_update(struct shaping_thread_ctx *ctx, struct shap } else { profile->hash_node = (struct shaping_profile_hash_node*)calloc(1, sizeof(struct shaping_profile_hash_node)); profile->hash_node->id = profile->id; + profile->hash_node->token_multiple.token_get_multiple = TOKEN_MULTIPLE_MIN; HASH_ADD_INT(thread_sp_hashtbl, id, profile->hash_node); timeout_init(&profile->hash_node->timeout_handle, TIMEOUT_ABS); timeouts_add(ctx->expires, &profile->hash_node->timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC); @@ -953,8 +996,12 @@ DROP: if (enqueue_success) { shaper_packet_dequeue(sf); } - shaper_stat_drop_inc(&sf->matched_rule_infos[sf->anchor].primary.stat, meta->dir, ctx->thread_index); + struct shaping_profile_info *pf_info = &sf->matched_rule_infos[sf->anchor].primary; + shaper_stat_drop_inc(&pf_info->stat, meta->dir, ctx->thread_index); sf->anchor = 0; + + pf_info->hash_node->token_multiple.has_drop_by_queue_full = 1; + shaper_token_multiple_update(pf_info); return SHAPING_DROP; } @@ -1060,7 +1107,11 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu shaper_stat_queueing_pkt_inc_for_rule(s_rule, meta->dir, ctx->thread_index); shaper_global_stat_queueing_inc(&ctx->thread_global_stat, meta->raw_len); } else { - shaper_stat_drop_inc(&s_rule->primary.stat, meta->dir, ctx->thread_index); + struct shaping_profile_info *pf_info = &s_rule->primary; + pf_info->hash_node->token_multiple.has_drop_by_queue_full = 1; + shaper_token_multiple_update(pf_info); + + shaper_stat_drop_inc(&pf_info->stat, meta->dir, ctx->thread_index); shaper_global_stat_drop_inc(&ctx->thread_global_stat, meta->raw_len); shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, meta->raw_len); marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index); diff --git a/shaping/src/shaper_aqm.cpp b/shaping/src/shaper_aqm.cpp index 636d377..20c4190 100644 --- a/shaping/src/shaper_aqm.cpp +++ b/shaping/src/shaper_aqm.cpp @@ -27,6 +27,33 @@ static int shaper_aqm_blue_need_drop(struct shaping_packet_wrapper *pkt_wrapper, return 0; } +static int shaper_aqm_have_processed(struct shaping_packet_wrapper *pkt_wrapper, int profile_id) +{ + int i = 0; + + for (i = 0; i < SHAPING_REF_PROFILE_NUM_MAX; i++) { + if (pkt_wrapper->aqm_processed_pf_ids[i] == profile_id) { + return 1; + } else if (pkt_wrapper->aqm_processed_pf_ids[i] == 0) { + break; + } + } + + return 0; +} + +static void shaper_aqm_mark_processed(struct shaping_packet_wrapper *pkt_wrapper, int profile_id) +{ + int i = 0; + + for (i = 0; i < SHAPING_REF_PROFILE_NUM_MAX; i++) { + if (pkt_wrapper->aqm_processed_pf_ids[i] == 0) { + pkt_wrapper->aqm_processed_pf_ids[i] = profile_id; + break; + } + } +} + int shaper_aqm_need_drop(struct shaping_profile_info *profile, struct shaping_packet_wrapper *pkt_wrapper) { int ret = 0; @@ -35,8 +62,7 @@ int shaper_aqm_need_drop(struct shaping_profile_info *profile, struct shaping_pa return 0; } - //TODO: judge if this packet is aqm processed for this profile - if (pkt_wrapper->aqm_processed) { + if (shaper_aqm_have_processed(pkt_wrapper, profile->id)) { return 0; } @@ -50,7 +76,7 @@ int shaper_aqm_need_drop(struct shaping_profile_info *profile, struct shaping_pa break; } - pkt_wrapper->aqm_processed = 1; + shaper_aqm_mark_processed(pkt_wrapper, profile->id); return ret; }
\ No newline at end of file diff --git a/shaping/src/shaper_maat.cpp b/shaping/src/shaper_maat.cpp index ec7b59a..8a0fa88 100644 --- a/shaping/src/shaper_maat.cpp +++ b/shaping/src/shaper_maat.cpp @@ -388,7 +388,7 @@ void shaper_rules_update(struct shaping_thread_ctx *ctx, struct shaping_flow *sf return; } - for (int i = 0; rule_num_remove_dup; i++) { + for (int i = 0; i < rule_num_remove_dup; i++) { if (shaper_rule_update(ctx, sf, &sf->matched_rule_infos[sf->rule_num], rule_ids_remove_dup[i], &priority_changed) == 0) { sf->rule_num++; } diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp index 1e36a35..62048f1 100644 --- a/shaping/src/shaper_stat.cpp +++ b/shaping/src/shaper_stat.cpp @@ -154,7 +154,7 @@ static void shaper_stat_swarmkv_hincrby_cb(const struct swarmkv_reply *reply, vo shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat);//hincrby failed, retry shaper_global_stat_hincrby_invoke_inc(&ctx->thread_global_stat); - LOG_INFO("%s: shaping stat hincrby failed, retry for profile id %d priority %d, operate queue_len %lld", LOG_TAG_STAT, arg->profile_id, arg->priority, arg->queue_len); + LOG_DEBUG("%s: shaping stat hincrby failed, retry for profile id %d priority %d, operate queue_len %lld", LOG_TAG_STAT, arg->profile_id, arg->priority, arg->queue_len); swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, arg, "HINCRBY tsg-shaping-%d priority-%d %lld", arg->profile_id, arg->priority, arg->queue_len); return; @@ -207,13 +207,24 @@ void shaper_stat_priority_queue_len_refresh_all(struct shaping_thread_ctx *ctx, return; } -static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, struct shaping_rule_info *rule, struct shaping_profile_info *profile, int profile_type, int need_update_guage, long long curr_time_us) +static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, struct shaping_rule_info *rule, struct shaping_profile_info *profile, int profile_type, int need_refresh_stat, int need_update_guage, long long curr_time_us) { struct shaping_stat_for_profile *profile_stat = &profile->stat; struct shaping_stat *stat = ctx->stat; int priority = profile->priority; int thread_id = ctx->thread_index; unsigned long long old_latency; + + if (need_update_guage) { + profile->hash_node->queue_len[priority] += profile_stat->priority_queue_len; + profile_stat->priority_queue_len = 0; + shaper_stat_priority_queue_len_refresh(ctx, profile->hash_node, priority, curr_time_us); + return; + } + + if (!need_refresh_stat) { + return; + } shaper_stat_tags_build(rule->vsys_id, rule->id, profile->id, priority, profile_type); fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_DROP_PKTS_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.drop_pkts, tags, TAG_IDX_MAX, thread_id); @@ -240,9 +251,6 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, s fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_QUEUE_LEN_IDX], SHAPER_STAT_ROW_NAME, profile_stat->out.queue_len, tags, TAG_IDX_MAX, thread_id); } - profile->hash_node->queue_len[priority] += profile_stat->in.queue_len + profile_stat->out.queue_len; - shaper_stat_priority_queue_len_refresh(ctx, profile->hash_node, priority, curr_time_us); - memset(profile_stat, 0, sizeof(struct shaping_stat_for_profile)); } else { profile_stat->in.pkts = 0; @@ -278,18 +286,18 @@ void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf } } - if (!need_refresh) {//TODO: add queue_len to profile??? + int need_update_guage = sf->processed_pkts > CONFIRM_PRIORITY_PKTS ? 1 : 0; + + if (!need_refresh && !need_update_guage) { return; } - int need_update_guage = sf->processed_pkts > CONFIRM_PRIORITY_PKTS ? 1 : 0; - for (int i = 0; i < sf->rule_num; i++) { rule = &sf->matched_rule_infos[i]; - shaper_stat_profile_metirc_refresh(ctx, rule, &rule->primary, PROFILE_IN_RULE_TYPE_PRIMARY, need_update_guage, curr_time_us); + shaper_stat_profile_metirc_refresh(ctx, rule, &rule->primary, PROFILE_IN_RULE_TYPE_PRIMARY, need_refresh, need_update_guage, curr_time_us); for (int j = 0; j < rule->borrowing_num; j++) { - shaper_stat_profile_metirc_refresh(ctx, rule, &rule->borrowing[j], PROFILE_IN_RULE_TYPE_BORROW, need_update_guage, curr_time_us); + shaper_stat_profile_metirc_refresh(ctx, rule, &rule->borrowing[j], PROFILE_IN_RULE_TYPE_BORROW, need_refresh, need_update_guage, curr_time_us); } } @@ -341,6 +349,8 @@ void shaper_stat_queueing_pkt_inc(struct shaping_stat_for_profile *profile_stat, profile_stat->out.queue_len++; } + profile_stat->priority_queue_len++; + return; } @@ -352,6 +362,8 @@ void shaper_stat_queueing_pkt_dec(struct shaping_stat_for_profile *profile_stat, profile_stat->out.queue_len--; } + profile_stat->priority_queue_len--; + return; } |
