summaryrefslogtreecommitdiff
path: root/shaping
diff options
context:
space:
mode:
authorliuchang <[email protected]>2023-11-07 10:37:22 +0000
committerliuchang <[email protected]>2023-11-07 10:37:22 +0000
commit2bcbeb873b9d9fed90c010e8bea349c5579cd452 (patch)
tree3e93392ee60ff84b33a09031dd55163091a33e68 /shaping
parent7ce4f3abd8ae83c95bee215c272b354d1a6143e7 (diff)
1.reduce frequency of invoking hmget
2.add aqm control for swarmkv pending queue
Diffstat (limited to 'shaping')
-rw-r--r--shaping/include/shaper.h2
-rw-r--r--shaping/include/shaper_swarmkv.h3
-rw-r--r--shaping/src/shaper.cpp35
-rw-r--r--shaping/src/shaper_swarmkv.cpp36
4 files changed, 67 insertions, 9 deletions
diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h
index da32ef8..aacd15e 100644
--- a/shaping/include/shaper.h
+++ b/shaping/include/shaper.h
@@ -50,6 +50,8 @@ struct shaping_thread_ctx {
struct shaping_global_stat *global_stat;
struct shaping_marsio_info *marsio_info;
struct swarmkv *swarmkv_db;//handle of swarmkv
+ int swarmkv_aqm_prob;
+ time_t swarmkv_aqm_update_time;
struct shaping_maat_info *maat_info;
struct session_table *session_table;
struct timeouts *expires;
diff --git a/shaping/include/shaper_swarmkv.h b/shaping/include/shaper_swarmkv.h
index e533802..963fff0 100644
--- a/shaping/include/shaper_swarmkv.h
+++ b/shaping/include/shaper_swarmkv.h
@@ -2,4 +2,5 @@
struct swarmkv* shaper_swarmkv_init(int caller_thread_num);
void shaper_swarmkv_destroy(struct swarmkv* swarmkv_db);
-void swarmkv_reload_log_level(); \ No newline at end of file
+void swarmkv_reload_log_level();
+int shaper_swarmkv_pending_queue_aqm_drop(struct shaping_thread_ctx *ctx); \ No newline at end of file
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index 6792ee2..8853197 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -34,6 +34,7 @@ extern "C" {
#define TOKEN_ENLARGE_TIMES 10
#define TOKEN_GET_FAILED_INTERVAL_MS 1
+#define HMGET_REQUEST_INTERVAL_MS 1000
#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 "HMGET tsg-shaping-%d priority-0"
#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 " priority-1"
@@ -79,7 +80,10 @@ enum shaper_token_get_result {
struct shaping_profile_hash_node {
int id;
- unsigned long long last_failed_get_token_ms;
+ long long last_failed_get_token_ms;
+ long long last_hmget_ms[SHAPING_PRIORITY_NUM_MAX];
+ unsigned char is_priority_blocked[SHAPING_PRIORITY_NUM_MAX];
+ unsigned char is_invalid;
UT_hash_handle hh;
};
@@ -511,12 +515,13 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb_arg)
{
struct shaping_async_cb_arg *arg = (struct shaping_async_cb_arg *)cb_arg;
- struct shaping_profile_info *s_pf_info = arg->s_pf_info;
+ struct shaping_profile_hash_node *pf_hash_node = arg->s_pf_info->hash_node;
struct shaping_flow *sf = arg->sf;
+ int priority = arg->priority;
shaper_global_stat_async_callback_inc(arg->ctx->global_stat);
- s_pf_info->is_priority_blocked = 0;
+ pf_hash_node->is_priority_blocked[priority] = 0;
if (!reply || (reply->type != SWARMKV_REPLY_NIL && reply->type != SWARMKV_REPLY_ARRAY)) {
shaper_global_stat_async_hmget_failed_inc(arg->ctx->global_stat);
@@ -532,19 +537,23 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb
char tmp_str[32] = {0};
memcpy(tmp_str, reply->elements[i]->str, reply->elements[i]->len);
if (strtoll(tmp_str, NULL, 10) > 0) {
- s_pf_info->is_priority_blocked = 1;
+ pf_hash_node->is_priority_blocked[priority] = 1;
break;
}
}
}
END:
+ struct timespec curr_time;
+ clock_gettime(CLOCK_MONOTONIC, &curr_time);
+ pf_hash_node->last_hmget_ms[priority] = curr_time.tv_sec * MILLI_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MILLI_SEC;
+
shaping_flow_free(arg->ctx, sf);//sub ref count and decide if need to free
free(cb_arg);
cb_arg = NULL;
}
-static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile)
+static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, long long curr_time_ms)
{
struct shaping_async_cb_arg *arg;
int priority = profile->priority;
@@ -553,6 +562,10 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st
return 0;
}
+ if (curr_time_ms - profile->hash_node->last_hmget_ms[priority] < HMGET_REQUEST_INTERVAL_MS) {//don't send hmget command in 1s
+ goto END;
+ }
+
arg = (struct shaping_async_cb_arg *)calloc(1, sizeof(struct shaping_async_cb_arg));
arg->ctx = ctx;
arg->s_pf_info = profile;
@@ -564,7 +577,8 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st
shaper_global_stat_async_invoke_inc(ctx->global_stat);
swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, swarmkv_queue_len_get_cmd[priority], profile->id);
- if (profile->is_priority_blocked) {
+END:
+ if (profile->hash_node->is_priority_blocked[priority] == 1) {
return 1;
} else {
return 0;
@@ -610,12 +624,17 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f
struct timespec curr_timespec;
clock_gettime(CLOCK_MONOTONIC, &curr_timespec);
- unsigned long long curr_time_ms = curr_timespec.tv_sec * MILLI_SECONDS_PER_SEC + curr_timespec.tv_nsec / NANO_SECONDS_PER_MILLI_SEC;
+ long long curr_time_ms = curr_timespec.tv_sec * MILLI_SECONDS_PER_SEC + curr_timespec.tv_nsec / NANO_SECONDS_PER_MILLI_SEC;
if (curr_time_ms - profile->hash_node->last_failed_get_token_ms < TOKEN_GET_FAILED_INTERVAL_MS) {//if failed to get token in last 1ms, return failed; for swarmkv can't reproduce token in 1ms
return SHAPER_TOKEN_GET_FAILED;
}
- if (shaper_profile_is_priority_blocked(ctx, sf, profile)) {
+ if (shaper_swarmkv_pending_queue_aqm_drop(ctx) == 1) {
+ profile->hash_node->last_failed_get_token_ms = curr_time_ms;
+ return SHAPER_TOKEN_GET_FAILED;
+ }
+
+ if (shaper_profile_is_priority_blocked(ctx, sf, profile, curr_time_ms)) {
return SHAPER_TOKEN_GET_FAILED;
} else {
int req_token_bits = req_token_bytes * 8;
diff --git a/shaping/src/shaper_swarmkv.cpp b/shaping/src/shaper_swarmkv.cpp
index 3f6df0c..d6ab943 100644
--- a/shaping/src/shaper_swarmkv.cpp
+++ b/shaping/src/shaper_swarmkv.cpp
@@ -6,6 +6,12 @@
#include "utils.h"
#include "shaper_swarmkv.h"
+#define PROBABILITY_MAX 100
+#define INCREMENT 10
+#define DECREMENT 1
+#define FREEZE_TIME 1 //unit:s
+#define PENDING_QUEUE_LEN_MAX 1500
+
struct shaper_swarmkv_conf
{
char swarmkv_cluster_name[64];
@@ -100,6 +106,36 @@ void swarmkv_reload_log_level()
return;
}
+int shaper_swarmkv_pending_queue_aqm_drop(struct shaping_thread_ctx *ctx)
+{
+ size_t pending_queue_len = swarmkv_caller_get_pending_commands(ctx->swarmkv_db);
+ time_t now = time(NULL);
+
+ if (now - ctx->swarmkv_aqm_update_time < FREEZE_TIME) {
+ goto END;
+ }
+
+ if (pending_queue_len > PENDING_QUEUE_LEN_MAX) {
+ if (ctx->swarmkv_aqm_prob < PROBABILITY_MAX) {
+ ctx->swarmkv_aqm_prob += INCREMENT;
+ }
+ LOG_DEBUG("%s: shaping pending queue len %lu, aqm prob %d", LOG_TAG_SWARMKV, pending_queue_len, ctx->swarmkv_aqm_prob);
+ } else {
+ if (ctx->swarmkv_aqm_prob >= DECREMENT) {
+ ctx->swarmkv_aqm_prob -= DECREMENT;
+ }
+ LOG_DEBUG("%s: shaping pending queue len %lu, aqm prob %d", LOG_TAG_SWARMKV, pending_queue_len, ctx->swarmkv_aqm_prob);
+ }
+ ctx->swarmkv_aqm_update_time = now;
+
+END:
+ if (rand() % PROBABILITY_MAX < ctx->swarmkv_aqm_prob) {
+ return 1;
+ }
+
+ return 0;
+}
+
struct swarmkv* shaper_swarmkv_init(int caller_thread_num)
{
struct swarmkv_options *swarmkv_opts = NULL;