diff options
| author | root <[email protected]> | 2024-01-19 02:33:20 +0000 |
|---|---|---|
| committer | root <[email protected]> | 2024-01-19 02:33:20 +0000 |
| commit | f0c91c0cfd4ec5a8f3e6636605484f1467c40a1f (patch) | |
| tree | 93e2b2045d9e83aa6565b7f0d84271f636851345 /shaping | |
| parent | 008d4b3906cc84e007f4519f901a288dd968a14e (diff) | |
temp code feature AQM blue alghorithm
Diffstat (limited to 'shaping')
| -rw-r--r-- | shaping/include/shaper.h | 34 | ||||
| -rw-r--r-- | shaping/include/shaper_aqm.h | 2 | ||||
| -rw-r--r-- | shaping/src/shaper.cpp | 107 | ||||
| -rw-r--r-- | shaping/src/shaper_aqm.cpp | 52 |
4 files changed, 145 insertions, 50 deletions
diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h index 6168191..069abcf 100644 --- a/shaping/include/shaper.h +++ b/shaping/include/shaper.h @@ -97,6 +97,33 @@ enum shaping_profile_type { PROFILE_TYPE_SPLIT_BY_LOCAL_HOST }; +enum shaper_aqm_type { + AQM_TYPE_NONE = 0, + AQM_TYPE_BLUE, + AQM_TYPE_CODEL, + AQM_TYPE_MAX +}; + +struct shaper_aqm_blue_para { + time_t update_time; + int probability; +}; + +struct shaping_profile_hash_node { + int id; + enum shaper_aqm_type aqm_type; + int in_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX]; + int out_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX]; + long long last_failed_get_token_ms; + long long last_hmget_ms; + long long queue_len[SHAPING_PRIORITY_NUM_MAX]; + long long priority_blocked_time_ms[SHAPING_PRIORITY_NUM_MAX]; + int ref_cnt; + struct shaper_aqm_blue_para aqm_blue_para; + unsigned char is_invalid; + UT_hash_handle hh; +}; + struct shaping_profile_info { int id;//profile_id enum shaping_profile_type type; @@ -126,6 +153,7 @@ struct shaping_packet_wrapper { unsigned int length; int rule_anchor; unsigned char direction; + unsigned char aqm_processed; TAILQ_ENTRY(shaping_packet_wrapper) node; }; TAILQ_HEAD(delay_queue, shaping_packet_wrapper); @@ -179,7 +207,6 @@ struct shaping_tconsume_cb_arg { struct shaping_hmget_cb_arg { struct shaping_thread_ctx *ctx; struct shaping_profile_hash_node *pf_hash_node; - int priority; long long start_time_us; }; @@ -202,10 +229,7 @@ bool shaper_queue_empty(struct shaping_flow *sf); void shaper_packet_dequeue(struct shaping_flow *sf); struct shaping_packet_wrapper* shaper_first_pkt_get(struct shaping_flow *sf); void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx); -/*return value: 0 for success, -1 for failed*/ -int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time); -/*return num of sf_ins*/ -void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf); + int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num); //enum shaping_packet_action shaper_pkt_action_decide(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int priority, int sf_in_queue); diff --git a/shaping/include/shaper_aqm.h b/shaping/include/shaper_aqm.h new file mode 100644 index 0000000..794c74d --- /dev/null +++ b/shaping/include/shaper_aqm.h @@ -0,0 +1,2 @@ + +int shaper_aqm_need_drop(struct shaping_profile_info *profile, struct shaping_packet_wrapper *pkt_wrapper);
\ No newline at end of file diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 6457025..0e308ea 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -22,25 +22,14 @@ extern "C" { #include "shaper_swarmkv.h" #include "shaper_maat.h" #include "shaper_global_stat.h" +#include "shaper_aqm.h" #define TOKEN_ENLARGE_TIMES 10 #define TOKEN_GET_FAILED_INTERVAL_MS 1 #define HMGET_REQUEST_INTERVAL_MS 10 #define PRIORITY_BLOCK_MIN_TIME_MS 500 -#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 "HMGET tsg-shaping-%d priority-0" -#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 " priority-1" -#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 " priority-2" -#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_4 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3 " priority-3" -#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_5 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_4 " priority-4" -#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_6 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_5 " priority-5" -#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_7 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_6 " priority-6" -#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_8 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_7 " priority-7" -#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_9 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_8 " priority-8" - -const char *swarmkv_queue_len_get_cmd[] = {"", SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3, - SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_4, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_5, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_6, - SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_7, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_8, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_9}; +#define SWARMKV_QUEUE_LEN_GET_CMD "HMGET tsg-shaping-%d priority-0 priority-1 priority-2 priority-3 priority-4 priority-5 priority-6 priority-7 priority-8 priority-9" struct shaper {//trees in one thread struct avl_tree *priority_trees[SHAPING_PRIORITY_NUM_MAX];//represent 10 avl tree corresponding to 10 priority @@ -62,17 +51,6 @@ enum shaper_token_get_result { SHAPER_TOKEN_GET_PASS = 1,//don't need to get token, regard as success }; -struct shaping_profile_hash_node { - int id; - int in_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX]; - int out_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX]; - long long last_failed_get_token_ms; - long long last_hmget_ms[SHAPING_PRIORITY_NUM_MAX]; - long long priority_blocked_time_ms[SHAPING_PRIORITY_NUM_MAX]; - unsigned char is_invalid; - UT_hash_handle hh; -}; - thread_local struct shaping_profile_hash_node *thread_sp_hashtbl = NULL; struct shaper* shaper_new(unsigned int priority_queue_len_max) @@ -273,14 +251,13 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx) } //return success(0) while any avl tree insert success -int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time) +static int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time) { struct shaping_node *s_node = (struct shaping_node*)sf; struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor]; struct shaper *sp = ctx->sp; struct shaping_packet_wrapper *pkt_wrapper = NULL; int priority; - int ret = -1; int i; pkt_wrapper = shaper_first_pkt_get(sf); @@ -288,8 +265,8 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un priority = s_rule_info->primary.priority; avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns); - if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) { - ret = 0; + if (0 != avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {//primary profile failed means flow push failed, ignore borrow profile + return -1; } if (s_rule_info->borrowing_num == 0) {// no borrow profile @@ -300,18 +277,15 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un priority = s_rule_info->borrowing[i].priority; avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns); if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) { - ret = 0; shaper_stat_queueing_pkt_inc(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index); } } END: - if (ret == 0) {//all avl tree success - s_rule_info->primary.enqueue_time_us = enqueue_time; - shaper_stat_queueing_pkt_inc(&s_rule_info->primary.stat, pkt_wrapper->direction, ctx->thread_index); - } + s_rule_info->primary.enqueue_time_us = enqueue_time; + shaper_stat_queueing_pkt_inc(&s_rule_info->primary.stat, pkt_wrapper->direction, ctx->thread_index); - return ret; + return 0; } static unsigned long long shaper_pkt_latency_us_calculate(struct shaping_profile_info *profile, struct timespec *time) @@ -323,7 +297,7 @@ static unsigned long long shaper_pkt_latency_us_calculate(struct shaping_profile return (curr_time - enqueue_time); } -void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) +static void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) { struct shaping_node *s_node = (struct shaping_node*)sf; struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor]; @@ -364,6 +338,27 @@ END: return; } +static void shaper_flow_specific_borrow_priority_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int priority) +{ + struct shaping_node *s_node = (struct shaping_node*)sf; + struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor]; + struct shaper *sp = ctx->sp; + struct shaping_packet_wrapper *pkt_wrapper = NULL; + + pkt_wrapper = shaper_first_pkt_get(sf); + assert(pkt_wrapper != NULL); + + for (int i = 0; i < s_rule_info->borrowing_num; i++) { + priority = s_rule_info->borrowing[i].priority; + if (avl_node_in_tree(s_node->avl_node[priority])) { + avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); + shaper_stat_queueing_pkt_dec(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index); + } + } + + return; +} + int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num) { struct avl_node *avl_node = NULL; @@ -606,7 +601,6 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb struct shaping_hmget_cb_arg *arg = (struct shaping_hmget_cb_arg *)cb_arg; struct shaping_thread_ctx *ctx = arg->ctx; struct shaping_profile_hash_node *pf_hash_node = arg->pf_hash_node; - int priority = arg->priority; struct timespec curr_time; long long curr_time_us; long long curr_time_ms; @@ -632,15 +626,15 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb if (reply->elements[i]->type == SWARMKV_REPLY_STRING) { char tmp_str[32] = {0}; memcpy(tmp_str, reply->elements[i]->str, reply->elements[i]->len); - if (strtoll(tmp_str, NULL, 10) > 0) { - pf_hash_node->priority_blocked_time_ms[priority] = curr_time_ms; - break; - } + pf_hash_node->queue_len[i] = strtoll(tmp_str, NULL, 10); + } else { + pf_hash_node->queue_len[i] = 0; } } END: - pf_hash_node->last_hmget_ms[priority] = curr_time_ms; + pf_hash_node->last_hmget_ms = curr_time_ms; + pf_hash_node->ref_cnt--; free(cb_arg); cb_arg = NULL; @@ -659,19 +653,31 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st return 0; } - if (curr_time_ms - profile->hash_node->last_hmget_ms[priority] < HMGET_REQUEST_INTERVAL_MS) {//don't send hmget command in 10 ms + if (profile->hash_node->ref_cnt > 0) {//if hmget command is pending, don't send hmget command again + goto END; + } + + if (curr_time_ms - profile->hash_node->last_hmget_ms < HMGET_REQUEST_INTERVAL_MS) {//don't send hmget command in 10 ms goto END; } arg = (struct shaping_hmget_cb_arg *)calloc(1, sizeof(struct shaping_hmget_cb_arg)); arg->ctx = ctx; arg->pf_hash_node = profile->hash_node; - arg->priority = priority; arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; + profile->hash_node->ref_cnt++; + shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat); shaper_global_stat_hmget_invoke_inc(&ctx->thread_global_stat); - swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, swarmkv_queue_len_get_cmd[priority], profile->id); + swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, SWARMKV_QUEUE_LEN_GET_CMD, profile->id); + + for (int i = 0; i < priority; i++) { + if (profile->hash_node->queue_len[i] > 0) { + profile->hash_node->priority_blocked_time_ms[priority] = curr_time_ms; + goto END; + } + } END: if (curr_time_ms - profile->hash_node->priority_blocked_time_ms[priority] < PRIORITY_BLOCK_MIN_TIME_MS) { @@ -848,11 +854,22 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi goto DROP; } } - /*todo: AQM, just for primary profile*/ for (int i = 0; i < profile_num; i++) { profile = pf_container[i].pf_info; profile_type = pf_container[i].pf_type; + + /*AQM process, if aqm not pass, for primary profile drop packet, for borrow profile just don't give token to this packet*/ + if (shaper_aqm_need_drop(profile, pkt_wrapper)) { + if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) { + shaper_flow_pop(ctx, sf); + goto DROP; + } else { + shaper_flow_specific_borrow_priority_pop(ctx, sf, priority); + continue; + } + } + int ret = shaper_token_consume(ctx, sf, pkt_wrapper->length, profile, profile_type, pkt_wrapper->direction); if (ret >= SHAPER_TOKEN_GET_SUCCESS) { if (ret == SHAPER_TOKEN_GET_SUCCESS) { diff --git a/shaping/src/shaper_aqm.cpp b/shaping/src/shaper_aqm.cpp new file mode 100644 index 0000000..1d71769 --- /dev/null +++ b/shaping/src/shaper_aqm.cpp @@ -0,0 +1,52 @@ +#include <time.h> +#include "shaper.h" +#include "shaper_aqm.h" + +#define PROBABILITY_MAX 100 +#define INCREMENT 10 +#define DECREMENT 1 +#define FREEZE_TIME 2 //unit:s +#define QUEUE_LEN_MAX 100 +static int shaper_aqm_blue_need_drop(struct shaping_packet_wrapper *pkt_wrapper, struct shaper_aqm_blue_para *para, int curr_queue_len) +{ + time_t curr_time; + + if (pkt_wrapper->aqm_processed) { + return 0; + } + + if (time(&curr_time) - para->update_time >= FREEZE_TIME) { + para->update_time = curr_time; + if (curr_queue_len >= QUEUE_LEN_MAX) { + para->probability = (para->probability + INCREMENT) > PROBABILITY_MAX ? PROBABILITY_MAX : (para->probability + INCREMENT); + } else if (curr_queue_len == 0) { + para->probability = (para->probability - DECREMENT) >= 0 ? (para->probability - DECREMENT) : 0; + } + } + + if (rand() % PROBABILITY_MAX < para->probability) { + return 1; + } + + return 0; +} + +int shaper_aqm_need_drop(struct shaping_profile_info *profile, struct shaping_packet_wrapper *pkt_wrapper) +{ + //TODO: judge if this packet is aqm processed + if (pkt_wrapper->aqm_processed) { + return 0; + } + + switch (profile->hash_node->aqm_type) { + case AQM_TYPE_BLUE: + return shaper_aqm_blue_need_drop(pkt_wrapper, &profile->hash_node->aqm_blue_para, profile->hash_node->queue_len[profile->priority]); + break; + case AQM_TYPE_CODEL: + break; + default: + return 0; + } + + return 0; +}
\ No newline at end of file |
