summaryrefslogtreecommitdiff
path: root/shaping/src/shaper.cpp
diff options
context:
space:
mode:
authorroot <[email protected]>2024-04-30 09:21:29 +0000
committerroot <[email protected]>2024-04-30 09:21:29 +0000
commit48b9b26f9e3f120e86f4bdb0239efb63da2bb4a0 (patch)
tree211721b50cfa380ac0d58a5bd71c9a8fa4c70a4d /shaping/src/shaper.cpp
parent2971ce29dd34d19d5472c14df3db4824a3bfb2a2 (diff)
由拿到token才发送,改为拿不到token就不发送
Diffstat (limited to 'shaping/src/shaper.cpp')
-rw-r--r--shaping/src/shaper.cpp91
1 files changed, 66 insertions, 25 deletions
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index 3ef35c1..103b440 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -28,7 +28,7 @@ extern "C" {
#define TOKEN_MULTIPLE_DEFAULT 10
#define TOKEN_GET_FAILED_INTERVAL_MS 1
#define HMGET_REQUEST_INTERVAL_MS 10
-#define PRIORITY_BLOCK_MIN_TIME_MS 500
+#define PRIORITY_BLOCK_MIN_TIME_MS 50
#define PROFILE_HASH_NODE_REFRESH_MS 500
@@ -383,6 +383,35 @@ int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_i
return count;
}
+static void shaper_profile_async_pass_set(struct shaping_profile_info *profile, int priority, int async_pass_enabled)
+{
+ struct shaping_profile_hash_node *pf_hash_node = profile->hash_node;
+ int *async_pass = NULL;
+
+ if (profile->type == PROFILE_TYPE_GENERIC) {
+ async_pass = &pf_hash_node->async_pass[priority];
+ } else {
+ async_pass = &profile->async_pass;
+ }
+
+ if (*async_pass != async_pass_enabled) {
+ *async_pass = async_pass_enabled;
+ }
+
+ return;
+}
+
+static int shaper_profile_async_pass_get(struct shaping_profile_info *profile, int priority)
+{
+ struct shaping_profile_hash_node *pf_hash_node = profile->hash_node;
+
+ if (profile->type == PROFILE_TYPE_GENERIC) {
+ return pf_hash_node->async_pass[priority];
+ } else {
+ return profile->async_pass;
+ }
+}
+
static void shaper_deposit_token_add(struct shaping_profile_info *profile, int req_token_bits, unsigned char direction, int priority)
{
long long *deposit_token;
@@ -415,6 +444,9 @@ static void shaper_deposit_token_add(struct shaping_profile_info *profile, int r
}
*deposit_token += req_token_bits;
+ if (*deposit_token > 0) {
+ shaper_profile_async_pass_set(profile, priority, 1);
+ }
}
static void shaper_token_multiple_update(struct shaping_thread_ctx *ctx, struct shaping_profile_info *profile)
@@ -491,16 +523,16 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg
}
if (arg->is_primary_pf) {
- if (reply->integer > 0) {
- sf->flag &= (~SESSION_BORROW);
- } else {
- sf->flag |= SESSION_BORROW;
- }
+ //TODO: is_primary_pf is not used
}
- if (reply->integer == 0 && profile->type == PROFILE_TYPE_GENERIC) {
+ if (reply->integer == 0) {
+ shaper_profile_async_pass_set(profile, profile->priority, 0);
+
+ if (profile->type == PROFILE_TYPE_GENERIC) {
pf_hash_node->token_multiple.has_failed_get_token = 1;
shaper_token_multiple_update(ctx, profile);
+ }
}
END:
@@ -527,10 +559,11 @@ END:
return;
}
-static int shaper_deposit_token_get(struct shaping_profile_info *profile, int req_token_bits, unsigned char direction, int priority, int force)
+static int shaper_deposit_token_get(struct shaping_profile_info *profile, int req_token_bits, unsigned char direction, int priority, int force, int *need_get_token)
{
long long *deposit_token;
struct shaping_profile_hash_node *pf_hash_node = profile->hash_node;
+ int ret = -1;
switch (profile->type) {
case PROFILE_TYPE_GENERIC:
@@ -560,15 +593,17 @@ static int shaper_deposit_token_get(struct shaping_profile_info *profile, int re
if (force) {
*deposit_token -= req_token_bits;
- return 0;
+ ret = 0;
+ } else if (*deposit_token >= req_token_bits) {
+ *deposit_token -= req_token_bits;
+ ret = 0;
}
- if (*deposit_token >= req_token_bits) {
- *deposit_token -= req_token_bits;
- return 0;
+ if (*deposit_token <= 0) {
+ *need_get_token = 1;
}
- return -1;
+ return ret;
}
static void shaper_profile_hash_node_refresh(struct shaping_thread_ctx *ctx, struct shaping_profile_hash_node *pf_hash_node, struct timespec *curr_timespec)
@@ -733,7 +768,7 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st
END:
if (curr_time_ms - profile->hash_node->priority_blocked_time_ms[priority] < PRIORITY_BLOCK_MIN_TIME_MS) {
- sf->flag |= SESSION_BORROW;
+ shaper_profile_async_pass_set(profile, priority, 0);
return 1;
} else {
return 0;
@@ -784,11 +819,9 @@ static int shaping_swarmkv_is_too_short_interval(long long curr_time_ms, struct
static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, int profile_type, int req_token_bytes, unsigned char direction, struct timespec *curr_timespec)
{
- if (profile_type == PROFILE_IN_RULE_TYPE_BORROW && !(sf->flag & SESSION_BORROW)) {//TODO: 会减慢swarmkv请求速度
- return SHAPER_TOKEN_GET_FAILED;
- }
-
struct shaping_rule_info *rule = &sf->matched_rule_infos[sf->anchor];
+ int need_get_token = 0;
+ int ret = SHAPER_TOKEN_GET_FAILED;
time_t curr_time = time(NULL);
if (curr_time - sf->check_rule_time >= ctx->conf.check_rule_enable_interval_sec) {
@@ -805,18 +838,25 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f
return SHAPER_TOKEN_GET_PASS;//rule is disabled, don't need to get token and forward packet
}
- if (shaper_deposit_token_get(profile, req_token_bytes * 8, direction, profile->priority, 0) == 0) {
- return SHAPER_TOKEN_GET_SUCCESS;
+ if (shaper_profile_async_pass_get(profile, profile->priority) == 1) {
+ shaper_deposit_token_get(profile, req_token_bytes * 8, direction, profile->priority, 1, &need_get_token);
+ ret = SHAPER_TOKEN_GET_SUCCESS;
+ } else if (shaper_deposit_token_get(profile, req_token_bytes * 8, direction, profile->priority, 0, &need_get_token) == 0) {
+ ret = SHAPER_TOKEN_GET_SUCCESS;
+ }
+
+ if (!need_get_token) {
+ return ret;
}
long long curr_time_ms = curr_timespec->tv_sec * MILLI_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MILLI_SEC;
if (shaping_swarmkv_is_too_short_interval(curr_time_ms, profile)) {
- return SHAPER_TOKEN_GET_FAILED;
+ return ret;
}
if (shaper_profile_is_priority_blocked(ctx, sf, profile, curr_timespec, curr_time_ms)) {
- return SHAPER_TOKEN_GET_FAILED;
+ return ret;
}
int req_token_bits = req_token_bytes * 8;
@@ -825,7 +865,7 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f
if (profile->hash_node->is_invalid && profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {//for primary, means this rule don't need get token
return SHAPER_TOKEN_GET_SUCCESS;
} else {
- return SHAPER_TOKEN_GET_FAILED;
+ return ret;
}
}
@@ -1078,13 +1118,14 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
static void shaper_token_consume_force(struct shaping_flow *sf, struct metadata *meta)
{
struct shaping_rule_info *rule;
+ int need_get_token = 0;
for (int i = 0; i < sf->rule_num; i++) {
rule = &sf->matched_rule_infos[i];
if (!rule->is_enabled) {
continue;
}
- shaper_deposit_token_get(&rule->primary, meta->raw_len * 8, meta->dir, rule->primary.priority, 1);
+ shaper_deposit_token_get(&rule->primary, meta->raw_len * 8, meta->dir, rule->primary.priority, 1, &need_get_token);
}
return;
@@ -1367,7 +1408,7 @@ void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx)
marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &rx_buff[i], 1);
shaper_global_stat_throughput_tx_inc(&ctx->thread_global_stat, meta.raw_len);
} else {
- shaper_datapath_telemetry_info_append(ctx->marsio_info, rx_buff[i], sf);
+ //shaper_datapath_telemetry_info_append(ctx->marsio_info, rx_buff[i], sf);
shaping_packet_process(ctx, rx_buff[i], &meta, sf);
}