diff options
| author | liuchang <[email protected]> | 2023-11-07 10:37:22 +0000 |
|---|---|---|
| committer | liuchang <[email protected]> | 2023-11-07 10:37:22 +0000 |
| commit | 2bcbeb873b9d9fed90c010e8bea349c5579cd452 (patch) | |
| tree | 3e93392ee60ff84b33a09031dd55163091a33e68 /shaping | |
| parent | 7ce4f3abd8ae83c95bee215c272b354d1a6143e7 (diff) | |
1.reduce frequency of invoking hmget
2.add aqm control for swarmkv pending queue
Diffstat (limited to 'shaping')
| -rw-r--r-- | shaping/include/shaper.h | 2 | ||||
| -rw-r--r-- | shaping/include/shaper_swarmkv.h | 3 | ||||
| -rw-r--r-- | shaping/src/shaper.cpp | 35 | ||||
| -rw-r--r-- | shaping/src/shaper_swarmkv.cpp | 36 |
4 files changed, 67 insertions, 9 deletions
diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h index da32ef8..aacd15e 100644 --- a/shaping/include/shaper.h +++ b/shaping/include/shaper.h @@ -50,6 +50,8 @@ struct shaping_thread_ctx { struct shaping_global_stat *global_stat; struct shaping_marsio_info *marsio_info; struct swarmkv *swarmkv_db;//handle of swarmkv + int swarmkv_aqm_prob; + time_t swarmkv_aqm_update_time; struct shaping_maat_info *maat_info; struct session_table *session_table; struct timeouts *expires; diff --git a/shaping/include/shaper_swarmkv.h b/shaping/include/shaper_swarmkv.h index e533802..963fff0 100644 --- a/shaping/include/shaper_swarmkv.h +++ b/shaping/include/shaper_swarmkv.h @@ -2,4 +2,5 @@ struct swarmkv* shaper_swarmkv_init(int caller_thread_num); void shaper_swarmkv_destroy(struct swarmkv* swarmkv_db); -void swarmkv_reload_log_level();
\ No newline at end of file +void swarmkv_reload_log_level(); +int shaper_swarmkv_pending_queue_aqm_drop(struct shaping_thread_ctx *ctx);
\ No newline at end of file diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 6792ee2..8853197 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -34,6 +34,7 @@ extern "C" { #define TOKEN_ENLARGE_TIMES 10 #define TOKEN_GET_FAILED_INTERVAL_MS 1 +#define HMGET_REQUEST_INTERVAL_MS 1000 #define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 "HMGET tsg-shaping-%d priority-0" #define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 " priority-1" @@ -79,7 +80,10 @@ enum shaper_token_get_result { struct shaping_profile_hash_node { int id; - unsigned long long last_failed_get_token_ms; + long long last_failed_get_token_ms; + long long last_hmget_ms[SHAPING_PRIORITY_NUM_MAX]; + unsigned char is_priority_blocked[SHAPING_PRIORITY_NUM_MAX]; + unsigned char is_invalid; UT_hash_handle hh; }; @@ -511,12 +515,13 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb_arg) { struct shaping_async_cb_arg *arg = (struct shaping_async_cb_arg *)cb_arg; - struct shaping_profile_info *s_pf_info = arg->s_pf_info; + struct shaping_profile_hash_node *pf_hash_node = arg->s_pf_info->hash_node; struct shaping_flow *sf = arg->sf; + int priority = arg->priority; shaper_global_stat_async_callback_inc(arg->ctx->global_stat); - s_pf_info->is_priority_blocked = 0; + pf_hash_node->is_priority_blocked[priority] = 0; if (!reply || (reply->type != SWARMKV_REPLY_NIL && reply->type != SWARMKV_REPLY_ARRAY)) { shaper_global_stat_async_hmget_failed_inc(arg->ctx->global_stat); @@ -532,19 +537,23 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb char tmp_str[32] = {0}; memcpy(tmp_str, reply->elements[i]->str, reply->elements[i]->len); if (strtoll(tmp_str, NULL, 10) > 0) { - s_pf_info->is_priority_blocked = 1; + pf_hash_node->is_priority_blocked[priority] = 1; break; } } } END: + struct timespec curr_time; + clock_gettime(CLOCK_MONOTONIC, &curr_time); + pf_hash_node->last_hmget_ms[priority] = curr_time.tv_sec * MILLI_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MILLI_SEC; + shaping_flow_free(arg->ctx, sf);//sub ref count and decide if need to free free(cb_arg); cb_arg = NULL; } -static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile) +static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, long long curr_time_ms) { struct shaping_async_cb_arg *arg; int priority = profile->priority; @@ -553,6 +562,10 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st return 0; } + if (curr_time_ms - profile->hash_node->last_hmget_ms[priority] < HMGET_REQUEST_INTERVAL_MS) {//don't send hmget command in 1s + goto END; + } + arg = (struct shaping_async_cb_arg *)calloc(1, sizeof(struct shaping_async_cb_arg)); arg->ctx = ctx; arg->s_pf_info = profile; @@ -564,7 +577,8 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st shaper_global_stat_async_invoke_inc(ctx->global_stat); swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, swarmkv_queue_len_get_cmd[priority], profile->id); - if (profile->is_priority_blocked) { +END: + if (profile->hash_node->is_priority_blocked[priority] == 1) { return 1; } else { return 0; @@ -610,12 +624,17 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f struct timespec curr_timespec; clock_gettime(CLOCK_MONOTONIC, &curr_timespec); - unsigned long long curr_time_ms = curr_timespec.tv_sec * MILLI_SECONDS_PER_SEC + curr_timespec.tv_nsec / NANO_SECONDS_PER_MILLI_SEC; + long long curr_time_ms = curr_timespec.tv_sec * MILLI_SECONDS_PER_SEC + curr_timespec.tv_nsec / NANO_SECONDS_PER_MILLI_SEC; if (curr_time_ms - profile->hash_node->last_failed_get_token_ms < TOKEN_GET_FAILED_INTERVAL_MS) {//if failed to get token in last 1ms, return failed; for swarmkv can't reproduce token in 1ms return SHAPER_TOKEN_GET_FAILED; } - if (shaper_profile_is_priority_blocked(ctx, sf, profile)) { + if (shaper_swarmkv_pending_queue_aqm_drop(ctx) == 1) { + profile->hash_node->last_failed_get_token_ms = curr_time_ms; + return SHAPER_TOKEN_GET_FAILED; + } + + if (shaper_profile_is_priority_blocked(ctx, sf, profile, curr_time_ms)) { return SHAPER_TOKEN_GET_FAILED; } else { int req_token_bits = req_token_bytes * 8; diff --git a/shaping/src/shaper_swarmkv.cpp b/shaping/src/shaper_swarmkv.cpp index 3f6df0c..d6ab943 100644 --- a/shaping/src/shaper_swarmkv.cpp +++ b/shaping/src/shaper_swarmkv.cpp @@ -6,6 +6,12 @@ #include "utils.h" #include "shaper_swarmkv.h" +#define PROBABILITY_MAX 100 +#define INCREMENT 10 +#define DECREMENT 1 +#define FREEZE_TIME 1 //unit:s +#define PENDING_QUEUE_LEN_MAX 1500 + struct shaper_swarmkv_conf { char swarmkv_cluster_name[64]; @@ -100,6 +106,36 @@ void swarmkv_reload_log_level() return; } +int shaper_swarmkv_pending_queue_aqm_drop(struct shaping_thread_ctx *ctx) +{ + size_t pending_queue_len = swarmkv_caller_get_pending_commands(ctx->swarmkv_db); + time_t now = time(NULL); + + if (now - ctx->swarmkv_aqm_update_time < FREEZE_TIME) { + goto END; + } + + if (pending_queue_len > PENDING_QUEUE_LEN_MAX) { + if (ctx->swarmkv_aqm_prob < PROBABILITY_MAX) { + ctx->swarmkv_aqm_prob += INCREMENT; + } + LOG_DEBUG("%s: shaping pending queue len %lu, aqm prob %d", LOG_TAG_SWARMKV, pending_queue_len, ctx->swarmkv_aqm_prob); + } else { + if (ctx->swarmkv_aqm_prob >= DECREMENT) { + ctx->swarmkv_aqm_prob -= DECREMENT; + } + LOG_DEBUG("%s: shaping pending queue len %lu, aqm prob %d", LOG_TAG_SWARMKV, pending_queue_len, ctx->swarmkv_aqm_prob); + } + ctx->swarmkv_aqm_update_time = now; + +END: + if (rand() % PROBABILITY_MAX < ctx->swarmkv_aqm_prob) { + return 1; + } + + return 0; +} + struct swarmkv* shaper_swarmkv_init(int caller_thread_num) { struct swarmkv_options *swarmkv_opts = NULL; |
