diff options
| author | 刘畅 <[email protected]> | 2023-12-04 10:29:18 +0000 |
|---|---|---|
| committer | 刘畅 <[email protected]> | 2023-12-04 10:29:18 +0000 |
| commit | 99145fdb2f22bb82516ede7f90e3b4652ec162f6 (patch) | |
| tree | a4e5ee9d72ca4a731ae0831d7ff18a511750e351 | |
| parent | ca892c8bdfd82979edc7dbb6821cd51ceae516c4 (diff) | |
| parent | 4d2ddeeec873c25ea771290122f45a4b0e985dd3 (diff) | |
Merge branch 'store_token_in_session_for_fairness_splitby_profile' into 'rel'
Store token in session for fairness splitby profile
See merge request tango/shaping-engine!55
| -rw-r--r-- | shaping/include/shaper.h | 6 | ||||
| -rw-r--r-- | shaping/src/shaper.cpp | 128 |
2 files changed, 104 insertions, 30 deletions
diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h index a521ece..7fede3b 100644 --- a/shaping/include/shaper.h +++ b/shaping/include/shaper.h @@ -100,6 +100,9 @@ struct shaping_profile_info { int id;//profile_id enum shaping_profile_type type; int priority; + int in_deposit_token_bits; + int out_deposit_token_bits; + long long last_failed_get_token_ms; unsigned long long enqueue_time_us;//to calculate max latency struct shaping_stat_for_profile stat; struct shaping_profile_hash_node *hash_node; @@ -164,8 +167,7 @@ struct shaper_flow_instance { struct shaping_async_cb_arg { struct shaping_thread_ctx *ctx; - struct shaping_profile_hash_node *pf_hash_node; - int priority; + struct shaping_profile_info *profile; unsigned char direction; long long start_time_us; }; diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 3de4e0b..170a235 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -380,19 +380,41 @@ int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_i return count; } -static void shaper_deposit_token_add(struct shaping_profile_hash_node *pf_hash_node, int req_token_bits, unsigned char direction, int priority) +static void shaper_deposit_token_add(struct shaping_profile_info *profile, int req_token_bits, unsigned char direction, int priority) { - if (direction == SHAPING_DIR_IN) { - pf_hash_node->in_deposit_token_bits[priority] += req_token_bits; - } else { - pf_hash_node->out_deposit_token_bits[priority] += req_token_bits; + int *deposit_token; + struct shaping_profile_hash_node *pf_hash_node = profile->hash_node; + + switch (profile->type) { + case PROFILE_TYPE_GENERIC: + if (direction == SHAPING_DIR_IN) { + deposit_token = &pf_hash_node->in_deposit_token_bits[priority]; + } else { + deposit_token = &pf_hash_node->out_deposit_token_bits[priority]; + } + break; + case PROFILE_TYPE_HOST_FARINESS: + case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS: + case PROFILE_TYPE_SPLIT_BY_LOCAL_HOST: + if (direction == SHAPING_DIR_IN) { + deposit_token = &profile->in_deposit_token_bits; + } else { + deposit_token = &profile->out_deposit_token_bits; + } + break; + default: + LOG_ERROR("%s: invalid profile type %d, profile id %d", LOG_TAG_SHAPING, profile->type, profile->id); + return; } + + *deposit_token += req_token_bits; } static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg) { struct shaping_async_cb_arg *arg = (struct shaping_async_cb_arg*)cb_arg; - struct shaping_profile_hash_node *pf_hash_node = arg->pf_hash_node; + struct shaping_profile_info *profile = arg->profile; + struct shaping_profile_hash_node *pf_hash_node = profile->hash_node; struct timespec curr_time; long long curr_time_us; @@ -418,14 +440,23 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg } if (reply->integer > 0) { - shaper_deposit_token_add(pf_hash_node, reply->integer, arg->direction, arg->priority);//deposit tokens to profile + shaper_deposit_token_add(profile, reply->integer, arg->direction, profile->priority);//deposit tokens to profile } END: if (reply->type != SWARMKV_REPLY_INTEGER || reply->integer == 0) { struct timespec curr_time; clock_gettime(CLOCK_MONOTONIC, &curr_time); - pf_hash_node->last_failed_get_token_ms = curr_time.tv_sec * MILLI_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MILLI_SEC; + switch (profile->type) { + case PROFILE_TYPE_GENERIC: + pf_hash_node->last_failed_get_token_ms = curr_time.tv_sec * MILLI_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MILLI_SEC; + break; + case PROFILE_TYPE_HOST_FARINESS: + case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS: + case PROFILE_TYPE_SPLIT_BY_LOCAL_HOST: + profile->last_failed_get_token_ms = curr_time.tv_sec * MILLI_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MILLI_SEC; + break; + } } free(cb_arg); @@ -443,20 +474,37 @@ static void shaper_deposit_token_sub(struct shaping_profile_hash_node *pf_hash_n } } -static int shaper_deposit_token_is_enough(struct shaping_profile_hash_node *pf_hash_node, int req_token_bits, unsigned char direction, int priority) +static int shaper_deposit_token_is_enough(struct shaping_profile_info *profile, int req_token_bits, unsigned char direction, int priority) { - if (direction == SHAPING_DIR_IN) { - if (pf_hash_node->in_deposit_token_bits[priority] >= req_token_bits) { - return 1; - } else { + int deposit_token; + struct shaping_profile_hash_node *pf_hash_node = profile->hash_node; + + switch (profile->type) { + case PROFILE_TYPE_GENERIC: + if (direction == SHAPING_DIR_IN) { + deposit_token = pf_hash_node->in_deposit_token_bits[priority]; + } else { + deposit_token = pf_hash_node->out_deposit_token_bits[priority]; + } + break; + case PROFILE_TYPE_HOST_FARINESS: + case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS: + case PROFILE_TYPE_SPLIT_BY_LOCAL_HOST: + if (direction == SHAPING_DIR_IN) { + deposit_token = profile->in_deposit_token_bits; + } else { + deposit_token = profile->out_deposit_token_bits; + } + break; + default: + LOG_ERROR("%s: invalid profile type %d, profile id %d", LOG_TAG_SHAPING, profile->type, profile->id); return 0; - } + } + + if (deposit_token >= req_token_bits) { + return 1; } else { - if (pf_hash_node->out_deposit_token_bits[priority] >= req_token_bits) { - return 1; - } else { - return 0; - } + return 0; } } @@ -468,8 +516,7 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct snprintf(key, sizeof(key), "tsg-shaping-%d-%s", pf_info->id, direction == SHAPING_DIR_OUT ? "outgoing" : "incoming"); arg = (struct shaping_async_cb_arg *)calloc(1, sizeof(struct shaping_async_cb_arg)); arg->ctx = ctx; - arg->pf_hash_node = pf_info->hash_node; - arg->priority = pf_info->priority; + arg->profile = pf_info; arg->direction = direction; arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; @@ -493,6 +540,8 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct break; } + swarmkv_caller_loop(ctx->swarmkv_db, SWARMKV_LOOP_NONBLOCK, NULL); + if (pf_info->hash_node->is_invalid) { if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {//for primary, means this rule don't need get token return SHAPER_TOKEN_GET_SUCCESS; @@ -501,7 +550,7 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct } } - if (shaper_deposit_token_is_enough(pf_info->hash_node, req_token_bits, direction, pf_info->priority)) { + if (shaper_deposit_token_is_enough(pf_info, req_token_bits, direction, pf_info->priority)) { shaper_deposit_token_sub(pf_info->hash_node, req_token_bits, direction, pf_info->priority); return SHAPER_TOKEN_GET_SUCCESS; } @@ -512,8 +561,9 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb_arg) { struct shaping_async_cb_arg *arg = (struct shaping_async_cb_arg *)cb_arg; - struct shaping_profile_hash_node *pf_hash_node = arg->pf_hash_node; - int priority = arg->priority; + struct shaping_profile_info *profile = arg->profile; + struct shaping_profile_hash_node *pf_hash_node = profile->hash_node; + int priority = profile->priority; struct timespec curr_time; long long curr_time_us; @@ -568,8 +618,7 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st arg = (struct shaping_async_cb_arg *)calloc(1, sizeof(struct shaping_async_cb_arg)); arg->ctx = ctx; - arg->pf_hash_node = profile->hash_node; - arg->priority = priority; + arg->profile = profile; arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; shaper_global_stat_async_invoke_inc(ctx->global_stat); @@ -601,6 +650,28 @@ static void shaper_profile_hash_node_update(struct shaping_profile_info *profile return; } +static int shaping_swarmkv_is_too_short_interval(long long curr_time_ms, struct shaping_profile_info *profile) +{ + long long last_failed_ms; + + switch (profile->type) { + case PROFILE_TYPE_GENERIC: + last_failed_ms = profile->hash_node->last_failed_get_token_ms; + break; + case PROFILE_TYPE_HOST_FARINESS: + case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS: + case PROFILE_TYPE_SPLIT_BY_LOCAL_HOST: + last_failed_ms = profile->last_failed_get_token_ms; + break; + } + + if (curr_time_ms - last_failed_ms < TOKEN_GET_FAILED_INTERVAL_MS) {//if failed to get token in last 1ms, it't too short interval; for swarmkv can't reproduce token in 1ms + return 1; + } else { + return 0; + } +} + static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int req_token_bytes, struct shaping_profile_info *profile, int profile_type, unsigned char direction) { @@ -623,7 +694,7 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f shaper_profile_hash_node_update(profile); - if (shaper_deposit_token_is_enough(profile->hash_node, req_token_bytes * 8, direction, profile->priority)) { + if (shaper_deposit_token_is_enough(profile, req_token_bytes * 8, direction, profile->priority)) { shaper_deposit_token_sub(profile->hash_node, req_token_bytes * 8, direction, profile->priority); return SHAPER_TOKEN_GET_SUCCESS; } @@ -631,7 +702,8 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f struct timespec curr_timespec; clock_gettime(CLOCK_MONOTONIC, &curr_timespec); long long curr_time_ms = curr_timespec.tv_sec * MILLI_SECONDS_PER_SEC + curr_timespec.tv_nsec / NANO_SECONDS_PER_MILLI_SEC; - if (curr_time_ms - profile->hash_node->last_failed_get_token_ms < TOKEN_GET_FAILED_INTERVAL_MS) {//if failed to get token in last 1ms, return failed; for swarmkv can't reproduce token in 1ms + + if (shaping_swarmkv_is_too_short_interval(curr_time_ms, profile)) { return SHAPER_TOKEN_GET_FAILED; } |
