summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author刘畅 <[email protected]>2023-12-07 10:03:07 +0000
committer刘畅 <[email protected]>2023-12-07 10:03:07 +0000
commitb2e221093a0afa5ff4a0d530ed5a6336d7df88af (patch)
treeee0711cdafebbee42d3afb445d6d8e9a1e65e919
parentfe477349056d0797a5b1ca29556ee7689826b332 (diff)
parented2b9e3d510882eef7a2fa20730d89794d24f2ce (diff)
Merge branch 'fix_borrow_early' into 'rel'v1.3.11
fix borrow early, only borrow when tconsume_cb don't get token See merge request tango/shaping-engine!59
-rw-r--r--shaping/include/shaper.h18
-rw-r--r--shaping/src/shaper.cpp59
-rw-r--r--shaping/src/shaper_stat.cpp13
3 files changed, 64 insertions, 26 deletions
diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h
index 7fede3b..9ae3846 100644
--- a/shaping/include/shaper.h
+++ b/shaping/include/shaper.h
@@ -19,7 +19,7 @@ extern "C" {
#define SHAPER_FLOW_POP_NUM_MAX 10
#define SESSION_CLOSE 0x1
-#define SESSION_UPDATE_PF_PRIO_LEN 0x2
+#define SESSION_BORROW 0x2
#define CONFIRM_PRIORITY_PKTS 20
@@ -150,6 +150,7 @@ struct shaping_flow {
int priority;
int rule_num;
int anchor;//rule_idx
+ int ref_cnt;
unsigned int queue_len;
unsigned int flag;
struct metadata ctrl_meta;
@@ -165,13 +166,26 @@ struct shaper_flow_instance {
int priority;
};
-struct shaping_async_cb_arg {
+struct shaping_tconsume_cb_arg {
struct shaping_thread_ctx *ctx;
struct shaping_profile_info *profile;
+ struct shaping_flow *sf;
unsigned char direction;
long long start_time_us;
};
+struct shaping_hmget_cb_arg {
+ struct shaping_thread_ctx *ctx;
+ struct shaping_profile_hash_node *pf_hash_node;
+ int priority;
+ long long start_time_us;
+};
+
+struct shaping_hincrby_cb_arg {
+ struct shaping_thread_ctx *ctx;
+ long long start_time_us;
+};
+
struct shaper;//instance of shaping, thread unsafe
struct shaping_flow* shaping_flow_new(struct shaping_thread_ctx *ctx);
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index 55f32fb..e0fc695 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -162,6 +162,8 @@ struct shaping_flow* shaping_flow_new(struct shaping_thread_ctx *ctx)
timeout_init(&s_node->shaping_flow.timeout_handle, TIMEOUT_ABS);
timeouts_add(ctx->expires, &s_node->shaping_flow.timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC);
+ s_node->shaping_flow.ref_cnt = 1;
+
return &s_node->shaping_flow;
ERROR:
@@ -171,6 +173,11 @@ ERROR:
void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
{
+ sf->ref_cnt--;
+ if (sf->ref_cnt > 0) {
+ return;
+ }
+
struct shaping_node *s_node = (struct shaping_node*)sf;
timeouts_del(ctx->expires, &sf->timeout_handle);
@@ -412,23 +419,25 @@ static void shaper_deposit_token_add(struct shaping_profile_info *profile, int r
static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg)
{
- struct shaping_async_cb_arg *arg = (struct shaping_async_cb_arg*)cb_arg;
+ struct shaping_tconsume_cb_arg *arg = (struct shaping_tconsume_cb_arg*)cb_arg;
+ struct shaping_thread_ctx *ctx = arg->ctx;
struct shaping_profile_info *profile = arg->profile;
struct shaping_profile_hash_node *pf_hash_node = profile->hash_node;
+ struct shaping_flow *sf = arg->sf;
struct timespec curr_time;
long long curr_time_us;
clock_gettime(CLOCK_MONOTONIC, &curr_time);
curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
- shaper_global_stat_swarmkv_latency_update(arg->ctx->global_stat, curr_time_us - arg->start_time_us);
+ shaper_global_stat_swarmkv_latency_update(ctx->global_stat, curr_time_us - arg->start_time_us);
- shaper_global_stat_async_callback_inc(arg->ctx->global_stat);
- shaper_global_stat_tconsume_callback_inc(arg->ctx->global_stat);
+ shaper_global_stat_async_callback_inc(ctx->global_stat);
+ shaper_global_stat_tconsume_callback_inc(ctx->global_stat);
LOG_INFO("Swarmkv reply type =%d, direction =%d, integer =%llu",reply->type, arg->direction, reply->integer);
if (reply->type != SWARMKV_REPLY_INTEGER) {
- shaper_global_stat_async_tconsume_failed_inc(arg->ctx->global_stat);
+ shaper_global_stat_async_tconsume_failed_inc(ctx->global_stat);
goto END;
}
@@ -440,7 +449,10 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg
}
if (reply->integer > 0) {
+ sf->flag &= (~SESSION_BORROW);
shaper_deposit_token_add(profile, reply->integer, arg->direction, profile->priority);//deposit tokens to profile
+ } else {
+ sf->flag |= SESSION_BORROW;
}
END:
@@ -459,6 +471,8 @@ END:
}
}
+ shaping_flow_free(ctx, sf);
+
free(cb_arg);
cb_arg = NULL;
@@ -533,18 +547,22 @@ static int shaper_deposit_token_is_enough(struct shaping_profile_info *profile,
static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *pf_info, int profile_type, int req_token_bits, unsigned char direction, struct timespec *curr_timespec)
{
- struct shaping_async_cb_arg *arg = NULL;
+ struct shaping_tconsume_cb_arg *arg = NULL;
char key[32] = {0};
snprintf(key, sizeof(key), "tsg-shaping-%d-%s", pf_info->id, direction == SHAPING_DIR_OUT ? "outgoing" : "incoming");
- arg = (struct shaping_async_cb_arg *)calloc(1, sizeof(struct shaping_async_cb_arg));
+ arg = (struct shaping_tconsume_cb_arg *)calloc(1, sizeof(struct shaping_tconsume_cb_arg));
arg->ctx = ctx;
arg->profile = pf_info;
+ arg->sf = sf;
arg->direction = direction;
arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
shaper_global_stat_async_invoke_inc(ctx->global_stat);
sheper_global_stat_tconsume_invoke_inc(ctx->global_stat);
+
+ sf->ref_cnt++;
+
switch (pf_info->type) {
case PROFILE_TYPE_GENERIC:
swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg);
@@ -583,24 +601,24 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb_arg)
{
- struct shaping_async_cb_arg *arg = (struct shaping_async_cb_arg *)cb_arg;
- struct shaping_profile_info *profile = arg->profile;
- struct shaping_profile_hash_node *pf_hash_node = profile->hash_node;
- int priority = profile->priority;
+ struct shaping_hmget_cb_arg *arg = (struct shaping_hmget_cb_arg *)cb_arg;
+ struct shaping_thread_ctx *ctx = arg->ctx;
+ struct shaping_profile_hash_node *pf_hash_node = arg->pf_hash_node;
+ int priority = arg->priority;
struct timespec curr_time;
long long curr_time_us;
clock_gettime(CLOCK_MONOTONIC, &curr_time);
curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
- shaper_global_stat_swarmkv_latency_update(arg->ctx->global_stat, curr_time_us - arg->start_time_us);
+ shaper_global_stat_swarmkv_latency_update(ctx->global_stat, curr_time_us - arg->start_time_us);
- shaper_global_stat_async_callback_inc(arg->ctx->global_stat);
- shaper_global_stat_hmget_callback_inc(arg->ctx->global_stat);
+ shaper_global_stat_async_callback_inc(ctx->global_stat);
+ shaper_global_stat_hmget_callback_inc(ctx->global_stat);
pf_hash_node->is_priority_blocked[priority] = 0;
if (!reply || (reply->type != SWARMKV_REPLY_NIL && reply->type != SWARMKV_REPLY_ARRAY)) {
- shaper_global_stat_async_hmget_failed_inc(arg->ctx->global_stat);
+ shaper_global_stat_async_hmget_failed_inc(ctx->global_stat);
goto END;
}
@@ -628,7 +646,7 @@ END:
static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, struct timespec *curr_timespec, long long curr_time_ms)
{
- struct shaping_async_cb_arg *arg;
+ struct shaping_hmget_cb_arg *arg;
int priority = profile->priority;
if (priority == 0) {//highest priority, can't be blocked
@@ -639,9 +657,10 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st
goto END;
}
- arg = (struct shaping_async_cb_arg *)calloc(1, sizeof(struct shaping_async_cb_arg));
+ arg = (struct shaping_hmget_cb_arg *)calloc(1, sizeof(struct shaping_hmget_cb_arg));
arg->ctx = ctx;
- arg->profile = profile;
+ arg->pf_hash_node = profile->hash_node;
+ arg->priority = priority;
arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
shaper_global_stat_async_invoke_inc(ctx->global_stat);
@@ -698,6 +717,10 @@ static int shaping_swarmkv_is_too_short_interval(long long curr_time_ms, struct
static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int req_token_bytes,
struct shaping_profile_info *profile, int profile_type, unsigned char direction)
{
+ if (profile_type == PROFILE_IN_RULE_TYPE_BORROW && !(sf->flag & SESSION_BORROW)) {
+ return SHAPER_TOKEN_GET_FAILED;
+ }
+
struct shaping_rule_info *rule = &sf->matched_rule_infos[sf->anchor];
time_t curr_time = time(NULL);
diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp
index 40a52a2..e9cd3e6 100644
--- a/shaping/src/shaper_stat.cpp
+++ b/shaping/src/shaper_stat.cpp
@@ -134,19 +134,20 @@ static void shaper_stat_tags_build(int vsys_id, int rule_id, int profile_id, int
static void shaper_stat_swarmkv_hincrby_cb(const struct swarmkv_reply *reply, void * cb_arg)
{
- struct shaping_async_cb_arg *arg = (struct shaping_async_cb_arg *)cb_arg;
+ struct shaping_hincrby_cb_arg *arg = (struct shaping_hincrby_cb_arg *)cb_arg;
+ struct shaping_thread_ctx *ctx = arg->ctx;
struct timespec curr_time;
long long curr_time_us;
clock_gettime(CLOCK_MONOTONIC, &curr_time);
curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
- shaper_global_stat_swarmkv_latency_update(arg->ctx->global_stat, curr_time_us - arg->start_time_us);
+ shaper_global_stat_swarmkv_latency_update(ctx->global_stat, curr_time_us - arg->start_time_us);
- shaper_global_stat_async_callback_inc(arg->ctx->global_stat);
- shaper_global_stat_hincrby_callback_inc(arg->ctx->global_stat);
+ shaper_global_stat_async_callback_inc(ctx->global_stat);
+ shaper_global_stat_hincrby_callback_inc(ctx->global_stat);
if (reply->type != SWARMKV_REPLY_INTEGER) {
- shaper_global_stat_async_hincrby_failed_inc(arg->ctx->global_stat);
+ shaper_global_stat_async_hincrby_failed_inc(ctx->global_stat);
}
free(cb_arg);
@@ -183,7 +184,7 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, i
fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_QUEUE_LEN_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.queue_len, tags, TAG_IDX_MAX, thread_id);
fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_QUEUE_LEN_IDX], SHAPER_STAT_ROW_NAME, profile_stat->out.queue_len, tags, TAG_IDX_MAX, thread_id);
- struct shaping_async_cb_arg *arg = (struct shaping_async_cb_arg *)calloc(1, sizeof(struct shaping_async_cb_arg));
+ struct shaping_hincrby_cb_arg *arg = (struct shaping_hincrby_cb_arg *)calloc(1, sizeof(struct shaping_hincrby_cb_arg));
struct timespec curr_time;
clock_gettime(CLOCK_MONOTONIC, &curr_time);