summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author刘畅 <[email protected]>2023-12-04 10:29:18 +0000
committer刘畅 <[email protected]>2023-12-04 10:29:18 +0000
commit99145fdb2f22bb82516ede7f90e3b4652ec162f6 (patch)
treea4e5ee9d72ca4a731ae0831d7ff18a511750e351
parentca892c8bdfd82979edc7dbb6821cd51ceae516c4 (diff)
parent4d2ddeeec873c25ea771290122f45a4b0e985dd3 (diff)
Merge branch 'store_token_in_session_for_fairness_splitby_profile' into 'rel'
Store token in session for fairness splitby profile See merge request tango/shaping-engine!55
-rw-r--r--shaping/include/shaper.h6
-rw-r--r--shaping/src/shaper.cpp128
2 files changed, 104 insertions, 30 deletions
diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h
index a521ece..7fede3b 100644
--- a/shaping/include/shaper.h
+++ b/shaping/include/shaper.h
@@ -100,6 +100,9 @@ struct shaping_profile_info {
int id;//profile_id
enum shaping_profile_type type;
int priority;
+ int in_deposit_token_bits;
+ int out_deposit_token_bits;
+ long long last_failed_get_token_ms;
unsigned long long enqueue_time_us;//to calculate max latency
struct shaping_stat_for_profile stat;
struct shaping_profile_hash_node *hash_node;
@@ -164,8 +167,7 @@ struct shaper_flow_instance {
struct shaping_async_cb_arg {
struct shaping_thread_ctx *ctx;
- struct shaping_profile_hash_node *pf_hash_node;
- int priority;
+ struct shaping_profile_info *profile;
unsigned char direction;
long long start_time_us;
};
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index 3de4e0b..170a235 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -380,19 +380,41 @@ int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_i
return count;
}
-static void shaper_deposit_token_add(struct shaping_profile_hash_node *pf_hash_node, int req_token_bits, unsigned char direction, int priority)
+static void shaper_deposit_token_add(struct shaping_profile_info *profile, int req_token_bits, unsigned char direction, int priority)
{
- if (direction == SHAPING_DIR_IN) {
- pf_hash_node->in_deposit_token_bits[priority] += req_token_bits;
- } else {
- pf_hash_node->out_deposit_token_bits[priority] += req_token_bits;
+ int *deposit_token;
+ struct shaping_profile_hash_node *pf_hash_node = profile->hash_node;
+
+ switch (profile->type) {
+ case PROFILE_TYPE_GENERIC:
+ if (direction == SHAPING_DIR_IN) {
+ deposit_token = &pf_hash_node->in_deposit_token_bits[priority];
+ } else {
+ deposit_token = &pf_hash_node->out_deposit_token_bits[priority];
+ }
+ break;
+ case PROFILE_TYPE_HOST_FARINESS:
+ case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS:
+ case PROFILE_TYPE_SPLIT_BY_LOCAL_HOST:
+ if (direction == SHAPING_DIR_IN) {
+ deposit_token = &profile->in_deposit_token_bits;
+ } else {
+ deposit_token = &profile->out_deposit_token_bits;
+ }
+ break;
+ default:
+ LOG_ERROR("%s: invalid profile type %d, profile id %d", LOG_TAG_SHAPING, profile->type, profile->id);
+ return;
}
+
+ *deposit_token += req_token_bits;
}
static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg)
{
struct shaping_async_cb_arg *arg = (struct shaping_async_cb_arg*)cb_arg;
- struct shaping_profile_hash_node *pf_hash_node = arg->pf_hash_node;
+ struct shaping_profile_info *profile = arg->profile;
+ struct shaping_profile_hash_node *pf_hash_node = profile->hash_node;
struct timespec curr_time;
long long curr_time_us;
@@ -418,14 +440,23 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg
}
if (reply->integer > 0) {
- shaper_deposit_token_add(pf_hash_node, reply->integer, arg->direction, arg->priority);//deposit tokens to profile
+ shaper_deposit_token_add(profile, reply->integer, arg->direction, profile->priority);//deposit tokens to profile
}
END:
if (reply->type != SWARMKV_REPLY_INTEGER || reply->integer == 0) {
struct timespec curr_time;
clock_gettime(CLOCK_MONOTONIC, &curr_time);
- pf_hash_node->last_failed_get_token_ms = curr_time.tv_sec * MILLI_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MILLI_SEC;
+ switch (profile->type) {
+ case PROFILE_TYPE_GENERIC:
+ pf_hash_node->last_failed_get_token_ms = curr_time.tv_sec * MILLI_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MILLI_SEC;
+ break;
+ case PROFILE_TYPE_HOST_FARINESS:
+ case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS:
+ case PROFILE_TYPE_SPLIT_BY_LOCAL_HOST:
+ profile->last_failed_get_token_ms = curr_time.tv_sec * MILLI_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MILLI_SEC;
+ break;
+ }
}
free(cb_arg);
@@ -443,20 +474,37 @@ static void shaper_deposit_token_sub(struct shaping_profile_hash_node *pf_hash_n
}
}
-static int shaper_deposit_token_is_enough(struct shaping_profile_hash_node *pf_hash_node, int req_token_bits, unsigned char direction, int priority)
+static int shaper_deposit_token_is_enough(struct shaping_profile_info *profile, int req_token_bits, unsigned char direction, int priority)
{
- if (direction == SHAPING_DIR_IN) {
- if (pf_hash_node->in_deposit_token_bits[priority] >= req_token_bits) {
- return 1;
- } else {
+ int deposit_token;
+ struct shaping_profile_hash_node *pf_hash_node = profile->hash_node;
+
+ switch (profile->type) {
+ case PROFILE_TYPE_GENERIC:
+ if (direction == SHAPING_DIR_IN) {
+ deposit_token = pf_hash_node->in_deposit_token_bits[priority];
+ } else {
+ deposit_token = pf_hash_node->out_deposit_token_bits[priority];
+ }
+ break;
+ case PROFILE_TYPE_HOST_FARINESS:
+ case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS:
+ case PROFILE_TYPE_SPLIT_BY_LOCAL_HOST:
+ if (direction == SHAPING_DIR_IN) {
+ deposit_token = profile->in_deposit_token_bits;
+ } else {
+ deposit_token = profile->out_deposit_token_bits;
+ }
+ break;
+ default:
+ LOG_ERROR("%s: invalid profile type %d, profile id %d", LOG_TAG_SHAPING, profile->type, profile->id);
return 0;
- }
+ }
+
+ if (deposit_token >= req_token_bits) {
+ return 1;
} else {
- if (pf_hash_node->out_deposit_token_bits[priority] >= req_token_bits) {
- return 1;
- } else {
- return 0;
- }
+ return 0;
}
}
@@ -468,8 +516,7 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
snprintf(key, sizeof(key), "tsg-shaping-%d-%s", pf_info->id, direction == SHAPING_DIR_OUT ? "outgoing" : "incoming");
arg = (struct shaping_async_cb_arg *)calloc(1, sizeof(struct shaping_async_cb_arg));
arg->ctx = ctx;
- arg->pf_hash_node = pf_info->hash_node;
- arg->priority = pf_info->priority;
+ arg->profile = pf_info;
arg->direction = direction;
arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
@@ -493,6 +540,8 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
break;
}
+ swarmkv_caller_loop(ctx->swarmkv_db, SWARMKV_LOOP_NONBLOCK, NULL);
+
if (pf_info->hash_node->is_invalid) {
if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {//for primary, means this rule don't need get token
return SHAPER_TOKEN_GET_SUCCESS;
@@ -501,7 +550,7 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
}
}
- if (shaper_deposit_token_is_enough(pf_info->hash_node, req_token_bits, direction, pf_info->priority)) {
+ if (shaper_deposit_token_is_enough(pf_info, req_token_bits, direction, pf_info->priority)) {
shaper_deposit_token_sub(pf_info->hash_node, req_token_bits, direction, pf_info->priority);
return SHAPER_TOKEN_GET_SUCCESS;
}
@@ -512,8 +561,9 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb_arg)
{
struct shaping_async_cb_arg *arg = (struct shaping_async_cb_arg *)cb_arg;
- struct shaping_profile_hash_node *pf_hash_node = arg->pf_hash_node;
- int priority = arg->priority;
+ struct shaping_profile_info *profile = arg->profile;
+ struct shaping_profile_hash_node *pf_hash_node = profile->hash_node;
+ int priority = profile->priority;
struct timespec curr_time;
long long curr_time_us;
@@ -568,8 +618,7 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st
arg = (struct shaping_async_cb_arg *)calloc(1, sizeof(struct shaping_async_cb_arg));
arg->ctx = ctx;
- arg->pf_hash_node = profile->hash_node;
- arg->priority = priority;
+ arg->profile = profile;
arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
shaper_global_stat_async_invoke_inc(ctx->global_stat);
@@ -601,6 +650,28 @@ static void shaper_profile_hash_node_update(struct shaping_profile_info *profile
return;
}
+static int shaping_swarmkv_is_too_short_interval(long long curr_time_ms, struct shaping_profile_info *profile)
+{
+ long long last_failed_ms;
+
+ switch (profile->type) {
+ case PROFILE_TYPE_GENERIC:
+ last_failed_ms = profile->hash_node->last_failed_get_token_ms;
+ break;
+ case PROFILE_TYPE_HOST_FARINESS:
+ case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS:
+ case PROFILE_TYPE_SPLIT_BY_LOCAL_HOST:
+ last_failed_ms = profile->last_failed_get_token_ms;
+ break;
+ }
+
+ if (curr_time_ms - last_failed_ms < TOKEN_GET_FAILED_INTERVAL_MS) {//if failed to get token in last 1ms, it't too short interval; for swarmkv can't reproduce token in 1ms
+ return 1;
+ } else {
+ return 0;
+ }
+}
+
static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int req_token_bytes,
struct shaping_profile_info *profile, int profile_type, unsigned char direction)
{
@@ -623,7 +694,7 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f
shaper_profile_hash_node_update(profile);
- if (shaper_deposit_token_is_enough(profile->hash_node, req_token_bytes * 8, direction, profile->priority)) {
+ if (shaper_deposit_token_is_enough(profile, req_token_bytes * 8, direction, profile->priority)) {
shaper_deposit_token_sub(profile->hash_node, req_token_bytes * 8, direction, profile->priority);
return SHAPER_TOKEN_GET_SUCCESS;
}
@@ -631,7 +702,8 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f
struct timespec curr_timespec;
clock_gettime(CLOCK_MONOTONIC, &curr_timespec);
long long curr_time_ms = curr_timespec.tv_sec * MILLI_SECONDS_PER_SEC + curr_timespec.tv_nsec / NANO_SECONDS_PER_MILLI_SEC;
- if (curr_time_ms - profile->hash_node->last_failed_get_token_ms < TOKEN_GET_FAILED_INTERVAL_MS) {//if failed to get token in last 1ms, return failed; for swarmkv can't reproduce token in 1ms
+
+ if (shaping_swarmkv_is_too_short_interval(curr_time_ms, profile)) {
return SHAPER_TOKEN_GET_FAILED;
}