summaryrefslogtreecommitdiff
path: root/shaping
diff options
context:
space:
mode:
authorroot <[email protected]>2024-01-29 08:48:08 +0000
committerroot <[email protected]>2024-01-29 08:48:08 +0000
commitfbb3a5e84b175d6249de0afd09a3b66470a47dc8 (patch)
tree4b7219e26a6aedfabd76ecb383e5c26ddcb718c0 /shaping
parentf9cd8219dc43b5d19da8f421c19c08d65240683d (diff)
优化一次取token的放大倍数机制
Diffstat (limited to 'shaping')
-rw-r--r--shaping/include/shaper.h12
-rw-r--r--shaping/include/shaper_stat.h1
-rw-r--r--shaping/src/shaper.cpp73
-rw-r--r--shaping/src/shaper_aqm.cpp32
-rw-r--r--shaping/src/shaper_maat.cpp2
-rw-r--r--shaping/src/shaper_stat.cpp32
6 files changed, 125 insertions, 27 deletions
diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h
index b17dd40..fd16e34 100644
--- a/shaping/include/shaper.h
+++ b/shaping/include/shaper.h
@@ -107,6 +107,13 @@ struct shaper_aqm_blue_para {
int probability;
};
+struct shaper_token_multiple {
+ int token_get_multiple;
+ unsigned char has_drop_by_queue_full;
+ unsigned char has_failed_get_token;
+ time_t token_multiple_update_time_s;
+};
+
struct shaping_profile_hash_node {
int id;
enum shaper_aqm_type aqm_type;
@@ -120,6 +127,7 @@ struct shaping_profile_hash_node {
long long priority_blocked_time_ms[SHAPING_PRIORITY_NUM_MAX];
int hmget_ref_cnt;
int tconsume_ref_cnt;
+ struct shaper_token_multiple token_multiple;
struct shaper_aqm_blue_para aqm_blue_para;
unsigned char is_invalid;
struct timeout timeout_handle;
@@ -143,7 +151,7 @@ struct shaping_rule_info {
int id;//rule_id
int fair_factor;
struct shaping_profile_info primary;
- struct shaping_profile_info borrowing[SHAPING_REF_PROFILE_NUM_MAX];
+ struct shaping_profile_info borrowing[SHAPING_REF_PROFILE_NUM_MAX - 1];
int borrowing_num;
int is_enabled;
};
@@ -154,8 +162,8 @@ struct shaping_packet_wrapper {
unsigned long long enqueue_time_us;//first enqueue time
unsigned int length;
int rule_anchor;
+ int aqm_processed_pf_ids[SHAPING_REF_PROFILE_NUM_MAX];
unsigned char direction;
- unsigned char aqm_processed;
TAILQ_ENTRY(shaping_packet_wrapper) node;
};
TAILQ_HEAD(delay_queue, shaping_packet_wrapper);
diff --git a/shaping/include/shaper_stat.h b/shaping/include/shaper_stat.h
index 132049c..5c720a3 100644
--- a/shaping/include/shaper_stat.h
+++ b/shaping/include/shaper_stat.h
@@ -37,6 +37,7 @@ struct shaping_stat_for_profile_dir {
struct shaping_stat_for_profile {
struct shaping_stat_for_profile_dir in;
struct shaping_stat_for_profile_dir out;
+ long long priority_queue_len;
};
struct shaping_stat {
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index f1b963a..ac66c5a 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -24,7 +24,9 @@ extern "C" {
#include "shaper_global_stat.h"
#include "shaper_aqm.h"
-#define TOKEN_ENLARGE_TIMES 10//TODO
+#define TOKEN_MULTIPLE_UPDATE_INTERVAL_S 1
+#define TOKEN_MULTIPLE_MIN 10
+#define TOKEN_MULTIPLE_MAX 20
#define TOKEN_GET_FAILED_INTERVAL_MS 1
#define HMGET_REQUEST_INTERVAL_MS 10
#define PRIORITY_BLOCK_MIN_TIME_MS 500
@@ -179,7 +181,7 @@ static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_
struct shaping_packet_wrapper *s_pkt = NULL;
struct timespec curr_time;
- if (sf->queue_len == ctx->conf.session_queue_len_max) {//TODO: profile queue_len???
+ if (sf->queue_len == ctx->conf.session_queue_len_max) {
return -1;
}
@@ -411,6 +413,40 @@ static void shaper_deposit_token_add(struct shaping_profile_info *profile, int r
*deposit_token += req_token_bits;
}
+static void shaper_token_multiple_update(struct shaping_profile_info *profile)
+{
+ if (profile->type != PROFILE_TYPE_GENERIC) {
+ return;
+ }
+
+ struct shaper_token_multiple *token_multiple = &profile->hash_node->token_multiple;
+ int curr_multiple = token_multiple->token_get_multiple;
+ time_t curr_time_s = time(NULL);
+
+ if (curr_time_s - token_multiple->token_multiple_update_time_s < TOKEN_MULTIPLE_UPDATE_INTERVAL_S) {
+ return;
+ }
+
+ token_multiple->token_multiple_update_time_s = curr_time_s;
+
+ if (token_multiple->has_failed_get_token) {
+ token_multiple->token_get_multiple = (curr_multiple - 1) < TOKEN_MULTIPLE_MIN ? TOKEN_MULTIPLE_MIN : (curr_multiple - 1);
+ goto END;
+ }
+
+ if (token_multiple->has_drop_by_queue_full) {
+ token_multiple->token_get_multiple = (curr_multiple + 1) > TOKEN_MULTIPLE_MAX ? TOKEN_MULTIPLE_MAX : (curr_multiple + 1);
+ goto END;
+ }
+
+END:
+ LOG_INFO("%s: profile id %d, token_get_multiple %d, has_failed_get_token %d, has_drop_by_queue_full %d", LOG_TAG_SHAPING, profile->id, token_multiple->token_get_multiple, token_multiple->has_failed_get_token, token_multiple->has_drop_by_queue_full);
+ token_multiple->has_failed_get_token = 0;
+ token_multiple->has_drop_by_queue_full = 0;
+
+ return;
+}
+
static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg)
{
struct shaping_tconsume_cb_arg *arg = (struct shaping_tconsume_cb_arg*)cb_arg;
@@ -428,7 +464,7 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg
shaper_global_stat_async_callback_inc(&ctx->thread_global_stat);
shaper_global_stat_tconsume_callback_inc(&ctx->thread_global_stat);
- LOG_INFO("Swarmkv reply type =%d, profile_id %d, profile_consume_ref = %d, direction =%d, integer =%llu",reply->type, profile->id, pf_hash_node->tconsume_ref_cnt, arg->direction, reply->integer);
+ LOG_DEBUG("Swarmkv reply type =%d, profile_id %d, direction =%d, integer =%llu",reply->type, profile->id, arg->direction, reply->integer);
if (reply->type != SWARMKV_REPLY_INTEGER) {
shaper_global_stat_async_tconsume_failed_inc(&ctx->thread_global_stat);
@@ -449,6 +485,11 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg
sf->flag |= SESSION_BORROW;
}
+ if (reply->integer == 0 && profile->type == PROFILE_TYPE_GENERIC) {
+ pf_hash_node->token_multiple.has_failed_get_token = 1;
+ shaper_token_multiple_update(profile);
+ }
+
END:
if (profile->type == PROFILE_TYPE_GENERIC) {
pf_hash_node->tconsume_ref_cnt--;
@@ -546,9 +587,10 @@ static int shaper_deposit_token_is_enough(struct shaping_profile_info *profile,
static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *pf_info, int profile_type, int req_token_bits, unsigned char direction, struct timespec *curr_timespec)
{
struct shaping_tconsume_cb_arg *arg = NULL;
+ struct shaping_profile_hash_node *pf_hash_node = pf_info->hash_node;
char key[32] = {0};
- if (pf_info->hash_node->tconsume_ref_cnt > 0) {
+ if (pf_hash_node->tconsume_ref_cnt > 0) {
goto END;
}
@@ -564,18 +606,18 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
sheper_global_stat_tconsume_invoke_inc(&ctx->thread_global_stat);
sf->ref_cnt++;
- pf_info->hash_node->tconsume_ref_cnt++;
+ pf_hash_node->tconsume_ref_cnt++;
switch (pf_info->type) {
case PROFILE_TYPE_GENERIC:
- swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg);
+ swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits * pf_hash_node->token_multiple.token_get_multiple, shaper_token_get_cb, arg);
break;
case PROFILE_TYPE_HOST_FARINESS:
case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS:
- swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg);
+ swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits * TOKEN_MULTIPLE_MIN, shaper_token_get_cb, arg);
break;
case PROFILE_TYPE_SPLIT_BY_LOCAL_HOST:
- swarmkv_btconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg);
+ swarmkv_btconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, req_token_bits * TOKEN_MULTIPLE_MIN, shaper_token_get_cb, arg);
break;
default:
if (arg) {
@@ -587,7 +629,7 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
END:
swarmkv_caller_loop(ctx->swarmkv_db, SWARMKV_LOOP_NONBLOCK, NULL);
- if (pf_info->hash_node->is_invalid) {
+ if (pf_hash_node->is_invalid) {
if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {//for primary, means this rule don't need get token
return SHAPER_TOKEN_GET_SUCCESS;
} else {//for borrowing, means this profile has no token to borrow
@@ -705,6 +747,7 @@ void shaper_profile_hash_node_update(struct shaping_thread_ctx *ctx, struct shap
} else {
profile->hash_node = (struct shaping_profile_hash_node*)calloc(1, sizeof(struct shaping_profile_hash_node));
profile->hash_node->id = profile->id;
+ profile->hash_node->token_multiple.token_get_multiple = TOKEN_MULTIPLE_MIN;
HASH_ADD_INT(thread_sp_hashtbl, id, profile->hash_node);
timeout_init(&profile->hash_node->timeout_handle, TIMEOUT_ABS);
timeouts_add(ctx->expires, &profile->hash_node->timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC);
@@ -953,8 +996,12 @@ DROP:
if (enqueue_success) {
shaper_packet_dequeue(sf);
}
- shaper_stat_drop_inc(&sf->matched_rule_infos[sf->anchor].primary.stat, meta->dir, ctx->thread_index);
+ struct shaping_profile_info *pf_info = &sf->matched_rule_infos[sf->anchor].primary;
+ shaper_stat_drop_inc(&pf_info->stat, meta->dir, ctx->thread_index);
sf->anchor = 0;
+
+ pf_info->hash_node->token_multiple.has_drop_by_queue_full = 1;
+ shaper_token_multiple_update(pf_info);
return SHAPING_DROP;
}
@@ -1060,7 +1107,11 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
shaper_stat_queueing_pkt_inc_for_rule(s_rule, meta->dir, ctx->thread_index);
shaper_global_stat_queueing_inc(&ctx->thread_global_stat, meta->raw_len);
} else {
- shaper_stat_drop_inc(&s_rule->primary.stat, meta->dir, ctx->thread_index);
+ struct shaping_profile_info *pf_info = &s_rule->primary;
+ pf_info->hash_node->token_multiple.has_drop_by_queue_full = 1;
+ shaper_token_multiple_update(pf_info);
+
+ shaper_stat_drop_inc(&pf_info->stat, meta->dir, ctx->thread_index);
shaper_global_stat_drop_inc(&ctx->thread_global_stat, meta->raw_len);
shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, meta->raw_len);
marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index);
diff --git a/shaping/src/shaper_aqm.cpp b/shaping/src/shaper_aqm.cpp
index 636d377..20c4190 100644
--- a/shaping/src/shaper_aqm.cpp
+++ b/shaping/src/shaper_aqm.cpp
@@ -27,6 +27,33 @@ static int shaper_aqm_blue_need_drop(struct shaping_packet_wrapper *pkt_wrapper,
return 0;
}
+static int shaper_aqm_have_processed(struct shaping_packet_wrapper *pkt_wrapper, int profile_id)
+{
+ int i = 0;
+
+ for (i = 0; i < SHAPING_REF_PROFILE_NUM_MAX; i++) {
+ if (pkt_wrapper->aqm_processed_pf_ids[i] == profile_id) {
+ return 1;
+ } else if (pkt_wrapper->aqm_processed_pf_ids[i] == 0) {
+ break;
+ }
+ }
+
+ return 0;
+}
+
+static void shaper_aqm_mark_processed(struct shaping_packet_wrapper *pkt_wrapper, int profile_id)
+{
+ int i = 0;
+
+ for (i = 0; i < SHAPING_REF_PROFILE_NUM_MAX; i++) {
+ if (pkt_wrapper->aqm_processed_pf_ids[i] == 0) {
+ pkt_wrapper->aqm_processed_pf_ids[i] = profile_id;
+ break;
+ }
+ }
+}
+
int shaper_aqm_need_drop(struct shaping_profile_info *profile, struct shaping_packet_wrapper *pkt_wrapper)
{
int ret = 0;
@@ -35,8 +62,7 @@ int shaper_aqm_need_drop(struct shaping_profile_info *profile, struct shaping_pa
return 0;
}
- //TODO: judge if this packet is aqm processed for this profile
- if (pkt_wrapper->aqm_processed) {
+ if (shaper_aqm_have_processed(pkt_wrapper, profile->id)) {
return 0;
}
@@ -50,7 +76,7 @@ int shaper_aqm_need_drop(struct shaping_profile_info *profile, struct shaping_pa
break;
}
- pkt_wrapper->aqm_processed = 1;
+ shaper_aqm_mark_processed(pkt_wrapper, profile->id);
return ret;
} \ No newline at end of file
diff --git a/shaping/src/shaper_maat.cpp b/shaping/src/shaper_maat.cpp
index ec7b59a..8a0fa88 100644
--- a/shaping/src/shaper_maat.cpp
+++ b/shaping/src/shaper_maat.cpp
@@ -388,7 +388,7 @@ void shaper_rules_update(struct shaping_thread_ctx *ctx, struct shaping_flow *sf
return;
}
- for (int i = 0; rule_num_remove_dup; i++) {
+ for (int i = 0; i < rule_num_remove_dup; i++) {
if (shaper_rule_update(ctx, sf, &sf->matched_rule_infos[sf->rule_num], rule_ids_remove_dup[i], &priority_changed) == 0) {
sf->rule_num++;
}
diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp
index 1e36a35..62048f1 100644
--- a/shaping/src/shaper_stat.cpp
+++ b/shaping/src/shaper_stat.cpp
@@ -154,7 +154,7 @@ static void shaper_stat_swarmkv_hincrby_cb(const struct swarmkv_reply *reply, vo
shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat);//hincrby failed, retry
shaper_global_stat_hincrby_invoke_inc(&ctx->thread_global_stat);
- LOG_INFO("%s: shaping stat hincrby failed, retry for profile id %d priority %d, operate queue_len %lld", LOG_TAG_STAT, arg->profile_id, arg->priority, arg->queue_len);
+ LOG_DEBUG("%s: shaping stat hincrby failed, retry for profile id %d priority %d, operate queue_len %lld", LOG_TAG_STAT, arg->profile_id, arg->priority, arg->queue_len);
swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, arg, "HINCRBY tsg-shaping-%d priority-%d %lld", arg->profile_id, arg->priority, arg->queue_len);
return;
@@ -207,13 +207,24 @@ void shaper_stat_priority_queue_len_refresh_all(struct shaping_thread_ctx *ctx,
return;
}
-static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, struct shaping_rule_info *rule, struct shaping_profile_info *profile, int profile_type, int need_update_guage, long long curr_time_us)
+static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, struct shaping_rule_info *rule, struct shaping_profile_info *profile, int profile_type, int need_refresh_stat, int need_update_guage, long long curr_time_us)
{
struct shaping_stat_for_profile *profile_stat = &profile->stat;
struct shaping_stat *stat = ctx->stat;
int priority = profile->priority;
int thread_id = ctx->thread_index;
unsigned long long old_latency;
+
+ if (need_update_guage) {
+ profile->hash_node->queue_len[priority] += profile_stat->priority_queue_len;
+ profile_stat->priority_queue_len = 0;
+ shaper_stat_priority_queue_len_refresh(ctx, profile->hash_node, priority, curr_time_us);
+ return;
+ }
+
+ if (!need_refresh_stat) {
+ return;
+ }
shaper_stat_tags_build(rule->vsys_id, rule->id, profile->id, priority, profile_type);
fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_DROP_PKTS_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.drop_pkts, tags, TAG_IDX_MAX, thread_id);
@@ -240,9 +251,6 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, s
fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_QUEUE_LEN_IDX], SHAPER_STAT_ROW_NAME, profile_stat->out.queue_len, tags, TAG_IDX_MAX, thread_id);
}
- profile->hash_node->queue_len[priority] += profile_stat->in.queue_len + profile_stat->out.queue_len;
- shaper_stat_priority_queue_len_refresh(ctx, profile->hash_node, priority, curr_time_us);
-
memset(profile_stat, 0, sizeof(struct shaping_stat_for_profile));
} else {
profile_stat->in.pkts = 0;
@@ -278,18 +286,18 @@ void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf
}
}
- if (!need_refresh) {//TODO: add queue_len to profile???
+ int need_update_guage = sf->processed_pkts > CONFIRM_PRIORITY_PKTS ? 1 : 0;
+
+ if (!need_refresh && !need_update_guage) {
return;
}
- int need_update_guage = sf->processed_pkts > CONFIRM_PRIORITY_PKTS ? 1 : 0;
-
for (int i = 0; i < sf->rule_num; i++) {
rule = &sf->matched_rule_infos[i];
- shaper_stat_profile_metirc_refresh(ctx, rule, &rule->primary, PROFILE_IN_RULE_TYPE_PRIMARY, need_update_guage, curr_time_us);
+ shaper_stat_profile_metirc_refresh(ctx, rule, &rule->primary, PROFILE_IN_RULE_TYPE_PRIMARY, need_refresh, need_update_guage, curr_time_us);
for (int j = 0; j < rule->borrowing_num; j++) {
- shaper_stat_profile_metirc_refresh(ctx, rule, &rule->borrowing[j], PROFILE_IN_RULE_TYPE_BORROW, need_update_guage, curr_time_us);
+ shaper_stat_profile_metirc_refresh(ctx, rule, &rule->borrowing[j], PROFILE_IN_RULE_TYPE_BORROW, need_refresh, need_update_guage, curr_time_us);
}
}
@@ -341,6 +349,8 @@ void shaper_stat_queueing_pkt_inc(struct shaping_stat_for_profile *profile_stat,
profile_stat->out.queue_len++;
}
+ profile_stat->priority_queue_len++;
+
return;
}
@@ -352,6 +362,8 @@ void shaper_stat_queueing_pkt_dec(struct shaping_stat_for_profile *profile_stat,
profile_stat->out.queue_len--;
}
+ profile_stat->priority_queue_len--;
+
return;
}