summaryrefslogtreecommitdiff
path: root/shaping/src/shaper.cpp
diff options
context:
space:
mode:
authorroot <[email protected]>2024-01-29 08:48:08 +0000
committerroot <[email protected]>2024-01-29 08:48:08 +0000
commitfbb3a5e84b175d6249de0afd09a3b66470a47dc8 (patch)
tree4b7219e26a6aedfabd76ecb383e5c26ddcb718c0 /shaping/src/shaper.cpp
parentf9cd8219dc43b5d19da8f421c19c08d65240683d (diff)
优化一次取token的放大倍数机制
Diffstat (limited to 'shaping/src/shaper.cpp')
-rw-r--r--shaping/src/shaper.cpp73
1 files changed, 62 insertions, 11 deletions
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index f1b963a..ac66c5a 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -24,7 +24,9 @@ extern "C" {
#include "shaper_global_stat.h"
#include "shaper_aqm.h"
-#define TOKEN_ENLARGE_TIMES 10//TODO
+#define TOKEN_MULTIPLE_UPDATE_INTERVAL_S 1
+#define TOKEN_MULTIPLE_MIN 10
+#define TOKEN_MULTIPLE_MAX 20
#define TOKEN_GET_FAILED_INTERVAL_MS 1
#define HMGET_REQUEST_INTERVAL_MS 10
#define PRIORITY_BLOCK_MIN_TIME_MS 500
@@ -179,7 +181,7 @@ static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_
struct shaping_packet_wrapper *s_pkt = NULL;
struct timespec curr_time;
- if (sf->queue_len == ctx->conf.session_queue_len_max) {//TODO: profile queue_len???
+ if (sf->queue_len == ctx->conf.session_queue_len_max) {
return -1;
}
@@ -411,6 +413,40 @@ static void shaper_deposit_token_add(struct shaping_profile_info *profile, int r
*deposit_token += req_token_bits;
}
+static void shaper_token_multiple_update(struct shaping_profile_info *profile)
+{
+ if (profile->type != PROFILE_TYPE_GENERIC) {
+ return;
+ }
+
+ struct shaper_token_multiple *token_multiple = &profile->hash_node->token_multiple;
+ int curr_multiple = token_multiple->token_get_multiple;
+ time_t curr_time_s = time(NULL);
+
+ if (curr_time_s - token_multiple->token_multiple_update_time_s < TOKEN_MULTIPLE_UPDATE_INTERVAL_S) {
+ return;
+ }
+
+ token_multiple->token_multiple_update_time_s = curr_time_s;
+
+ if (token_multiple->has_failed_get_token) {
+ token_multiple->token_get_multiple = (curr_multiple - 1) < TOKEN_MULTIPLE_MIN ? TOKEN_MULTIPLE_MIN : (curr_multiple - 1);
+ goto END;
+ }
+
+ if (token_multiple->has_drop_by_queue_full) {
+ token_multiple->token_get_multiple = (curr_multiple + 1) > TOKEN_MULTIPLE_MAX ? TOKEN_MULTIPLE_MAX : (curr_multiple + 1);
+ goto END;
+ }
+
+END:
+ LOG_INFO("%s: profile id %d, token_get_multiple %d, has_failed_get_token %d, has_drop_by_queue_full %d", LOG_TAG_SHAPING, profile->id, token_multiple->token_get_multiple, token_multiple->has_failed_get_token, token_multiple->has_drop_by_queue_full);
+ token_multiple->has_failed_get_token = 0;
+ token_multiple->has_drop_by_queue_full = 0;
+
+ return;
+}
+
static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg)
{
struct shaping_tconsume_cb_arg *arg = (struct shaping_tconsume_cb_arg*)cb_arg;
@@ -428,7 +464,7 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg
shaper_global_stat_async_callback_inc(&ctx->thread_global_stat);
shaper_global_stat_tconsume_callback_inc(&ctx->thread_global_stat);
- LOG_INFO("Swarmkv reply type =%d, profile_id %d, profile_consume_ref = %d, direction =%d, integer =%llu",reply->type, profile->id, pf_hash_node->tconsume_ref_cnt, arg->direction, reply->integer);
+ LOG_DEBUG("Swarmkv reply type =%d, profile_id %d, direction =%d, integer =%llu",reply->type, profile->id, arg->direction, reply->integer);
if (reply->type != SWARMKV_REPLY_INTEGER) {
shaper_global_stat_async_tconsume_failed_inc(&ctx->thread_global_stat);
@@ -449,6 +485,11 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg
sf->flag |= SESSION_BORROW;
}
+ if (reply->integer == 0 && profile->type == PROFILE_TYPE_GENERIC) {
+ pf_hash_node->token_multiple.has_failed_get_token = 1;
+ shaper_token_multiple_update(profile);
+ }
+
END:
if (profile->type == PROFILE_TYPE_GENERIC) {
pf_hash_node->tconsume_ref_cnt--;
@@ -546,9 +587,10 @@ static int shaper_deposit_token_is_enough(struct shaping_profile_info *profile,
static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *pf_info, int profile_type, int req_token_bits, unsigned char direction, struct timespec *curr_timespec)
{
struct shaping_tconsume_cb_arg *arg = NULL;
+ struct shaping_profile_hash_node *pf_hash_node = pf_info->hash_node;
char key[32] = {0};
- if (pf_info->hash_node->tconsume_ref_cnt > 0) {
+ if (pf_hash_node->tconsume_ref_cnt > 0) {
goto END;
}
@@ -564,18 +606,18 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
sheper_global_stat_tconsume_invoke_inc(&ctx->thread_global_stat);
sf->ref_cnt++;
- pf_info->hash_node->tconsume_ref_cnt++;
+ pf_hash_node->tconsume_ref_cnt++;
switch (pf_info->type) {
case PROFILE_TYPE_GENERIC:
- swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg);
+ swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits * pf_hash_node->token_multiple.token_get_multiple, shaper_token_get_cb, arg);
break;
case PROFILE_TYPE_HOST_FARINESS:
case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS:
- swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg);
+ swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits * TOKEN_MULTIPLE_MIN, shaper_token_get_cb, arg);
break;
case PROFILE_TYPE_SPLIT_BY_LOCAL_HOST:
- swarmkv_btconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg);
+ swarmkv_btconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, req_token_bits * TOKEN_MULTIPLE_MIN, shaper_token_get_cb, arg);
break;
default:
if (arg) {
@@ -587,7 +629,7 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
END:
swarmkv_caller_loop(ctx->swarmkv_db, SWARMKV_LOOP_NONBLOCK, NULL);
- if (pf_info->hash_node->is_invalid) {
+ if (pf_hash_node->is_invalid) {
if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {//for primary, means this rule don't need get token
return SHAPER_TOKEN_GET_SUCCESS;
} else {//for borrowing, means this profile has no token to borrow
@@ -705,6 +747,7 @@ void shaper_profile_hash_node_update(struct shaping_thread_ctx *ctx, struct shap
} else {
profile->hash_node = (struct shaping_profile_hash_node*)calloc(1, sizeof(struct shaping_profile_hash_node));
profile->hash_node->id = profile->id;
+ profile->hash_node->token_multiple.token_get_multiple = TOKEN_MULTIPLE_MIN;
HASH_ADD_INT(thread_sp_hashtbl, id, profile->hash_node);
timeout_init(&profile->hash_node->timeout_handle, TIMEOUT_ABS);
timeouts_add(ctx->expires, &profile->hash_node->timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC);
@@ -953,8 +996,12 @@ DROP:
if (enqueue_success) {
shaper_packet_dequeue(sf);
}
- shaper_stat_drop_inc(&sf->matched_rule_infos[sf->anchor].primary.stat, meta->dir, ctx->thread_index);
+ struct shaping_profile_info *pf_info = &sf->matched_rule_infos[sf->anchor].primary;
+ shaper_stat_drop_inc(&pf_info->stat, meta->dir, ctx->thread_index);
sf->anchor = 0;
+
+ pf_info->hash_node->token_multiple.has_drop_by_queue_full = 1;
+ shaper_token_multiple_update(pf_info);
return SHAPING_DROP;
}
@@ -1060,7 +1107,11 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
shaper_stat_queueing_pkt_inc_for_rule(s_rule, meta->dir, ctx->thread_index);
shaper_global_stat_queueing_inc(&ctx->thread_global_stat, meta->raw_len);
} else {
- shaper_stat_drop_inc(&s_rule->primary.stat, meta->dir, ctx->thread_index);
+ struct shaping_profile_info *pf_info = &s_rule->primary;
+ pf_info->hash_node->token_multiple.has_drop_by_queue_full = 1;
+ shaper_token_multiple_update(pf_info);
+
+ shaper_stat_drop_inc(&pf_info->stat, meta->dir, ctx->thread_index);
shaper_global_stat_drop_inc(&ctx->thread_global_stat, meta->raw_len);
shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, meta->raw_len);
marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index);