summaryrefslogtreecommitdiff
path: root/shaping/src
diff options
context:
space:
mode:
authorroot <[email protected]>2024-01-19 02:33:20 +0000
committerroot <[email protected]>2024-01-19 02:33:20 +0000
commitf0c91c0cfd4ec5a8f3e6636605484f1467c40a1f (patch)
tree93e2b2045d9e83aa6565b7f0d84271f636851345 /shaping/src
parent008d4b3906cc84e007f4519f901a288dd968a14e (diff)
temp code feature AQM blue alghorithm
Diffstat (limited to 'shaping/src')
-rw-r--r--shaping/src/shaper.cpp107
-rw-r--r--shaping/src/shaper_aqm.cpp52
2 files changed, 114 insertions, 45 deletions
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index 6457025..0e308ea 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -22,25 +22,14 @@ extern "C" {
#include "shaper_swarmkv.h"
#include "shaper_maat.h"
#include "shaper_global_stat.h"
+#include "shaper_aqm.h"
#define TOKEN_ENLARGE_TIMES 10
#define TOKEN_GET_FAILED_INTERVAL_MS 1
#define HMGET_REQUEST_INTERVAL_MS 10
#define PRIORITY_BLOCK_MIN_TIME_MS 500
-#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 "HMGET tsg-shaping-%d priority-0"
-#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 " priority-1"
-#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 " priority-2"
-#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_4 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3 " priority-3"
-#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_5 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_4 " priority-4"
-#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_6 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_5 " priority-5"
-#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_7 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_6 " priority-6"
-#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_8 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_7 " priority-7"
-#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_9 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_8 " priority-8"
-
-const char *swarmkv_queue_len_get_cmd[] = {"", SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3,
- SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_4, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_5, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_6,
- SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_7, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_8, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_9};
+#define SWARMKV_QUEUE_LEN_GET_CMD "HMGET tsg-shaping-%d priority-0 priority-1 priority-2 priority-3 priority-4 priority-5 priority-6 priority-7 priority-8 priority-9"
struct shaper {//trees in one thread
struct avl_tree *priority_trees[SHAPING_PRIORITY_NUM_MAX];//represent 10 avl tree corresponding to 10 priority
@@ -62,17 +51,6 @@ enum shaper_token_get_result {
SHAPER_TOKEN_GET_PASS = 1,//don't need to get token, regard as success
};
-struct shaping_profile_hash_node {
- int id;
- int in_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX];
- int out_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX];
- long long last_failed_get_token_ms;
- long long last_hmget_ms[SHAPING_PRIORITY_NUM_MAX];
- long long priority_blocked_time_ms[SHAPING_PRIORITY_NUM_MAX];
- unsigned char is_invalid;
- UT_hash_handle hh;
-};
-
thread_local struct shaping_profile_hash_node *thread_sp_hashtbl = NULL;
struct shaper* shaper_new(unsigned int priority_queue_len_max)
@@ -273,14 +251,13 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx)
}
//return success(0) while any avl tree insert success
-int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time)
+static int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time)
{
struct shaping_node *s_node = (struct shaping_node*)sf;
struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor];
struct shaper *sp = ctx->sp;
struct shaping_packet_wrapper *pkt_wrapper = NULL;
int priority;
- int ret = -1;
int i;
pkt_wrapper = shaper_first_pkt_get(sf);
@@ -288,8 +265,8 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un
priority = s_rule_info->primary.priority;
avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns);
- if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {
- ret = 0;
+ if (0 != avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {//primary profile failed means flow push failed, ignore borrow profile
+ return -1;
}
if (s_rule_info->borrowing_num == 0) {// no borrow profile
@@ -300,18 +277,15 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un
priority = s_rule_info->borrowing[i].priority;
avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns);
if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {
- ret = 0;
shaper_stat_queueing_pkt_inc(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index);
}
}
END:
- if (ret == 0) {//all avl tree success
- s_rule_info->primary.enqueue_time_us = enqueue_time;
- shaper_stat_queueing_pkt_inc(&s_rule_info->primary.stat, pkt_wrapper->direction, ctx->thread_index);
- }
+ s_rule_info->primary.enqueue_time_us = enqueue_time;
+ shaper_stat_queueing_pkt_inc(&s_rule_info->primary.stat, pkt_wrapper->direction, ctx->thread_index);
- return ret;
+ return 0;
}
static unsigned long long shaper_pkt_latency_us_calculate(struct shaping_profile_info *profile, struct timespec *time)
@@ -323,7 +297,7 @@ static unsigned long long shaper_pkt_latency_us_calculate(struct shaping_profile
return (curr_time - enqueue_time);
}
-void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
+static void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
{
struct shaping_node *s_node = (struct shaping_node*)sf;
struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor];
@@ -364,6 +338,27 @@ END:
return;
}
+static void shaper_flow_specific_borrow_priority_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int priority)
+{
+ struct shaping_node *s_node = (struct shaping_node*)sf;
+ struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor];
+ struct shaper *sp = ctx->sp;
+ struct shaping_packet_wrapper *pkt_wrapper = NULL;
+
+ pkt_wrapper = shaper_first_pkt_get(sf);
+ assert(pkt_wrapper != NULL);
+
+ for (int i = 0; i < s_rule_info->borrowing_num; i++) {
+ priority = s_rule_info->borrowing[i].priority;
+ if (avl_node_in_tree(s_node->avl_node[priority])) {
+ avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]);
+ shaper_stat_queueing_pkt_dec(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index);
+ }
+ }
+
+ return;
+}
+
int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num)
{
struct avl_node *avl_node = NULL;
@@ -606,7 +601,6 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb
struct shaping_hmget_cb_arg *arg = (struct shaping_hmget_cb_arg *)cb_arg;
struct shaping_thread_ctx *ctx = arg->ctx;
struct shaping_profile_hash_node *pf_hash_node = arg->pf_hash_node;
- int priority = arg->priority;
struct timespec curr_time;
long long curr_time_us;
long long curr_time_ms;
@@ -632,15 +626,15 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb
if (reply->elements[i]->type == SWARMKV_REPLY_STRING) {
char tmp_str[32] = {0};
memcpy(tmp_str, reply->elements[i]->str, reply->elements[i]->len);
- if (strtoll(tmp_str, NULL, 10) > 0) {
- pf_hash_node->priority_blocked_time_ms[priority] = curr_time_ms;
- break;
- }
+ pf_hash_node->queue_len[i] = strtoll(tmp_str, NULL, 10);
+ } else {
+ pf_hash_node->queue_len[i] = 0;
}
}
END:
- pf_hash_node->last_hmget_ms[priority] = curr_time_ms;
+ pf_hash_node->last_hmget_ms = curr_time_ms;
+ pf_hash_node->ref_cnt--;
free(cb_arg);
cb_arg = NULL;
@@ -659,19 +653,31 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st
return 0;
}
- if (curr_time_ms - profile->hash_node->last_hmget_ms[priority] < HMGET_REQUEST_INTERVAL_MS) {//don't send hmget command in 10 ms
+ if (profile->hash_node->ref_cnt > 0) {//if hmget command is pending, don't send hmget command again
+ goto END;
+ }
+
+ if (curr_time_ms - profile->hash_node->last_hmget_ms < HMGET_REQUEST_INTERVAL_MS) {//don't send hmget command in 10 ms
goto END;
}
arg = (struct shaping_hmget_cb_arg *)calloc(1, sizeof(struct shaping_hmget_cb_arg));
arg->ctx = ctx;
arg->pf_hash_node = profile->hash_node;
- arg->priority = priority;
arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
+ profile->hash_node->ref_cnt++;
+
shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat);
shaper_global_stat_hmget_invoke_inc(&ctx->thread_global_stat);
- swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, swarmkv_queue_len_get_cmd[priority], profile->id);
+ swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, SWARMKV_QUEUE_LEN_GET_CMD, profile->id);
+
+ for (int i = 0; i < priority; i++) {
+ if (profile->hash_node->queue_len[i] > 0) {
+ profile->hash_node->priority_blocked_time_ms[priority] = curr_time_ms;
+ goto END;
+ }
+ }
END:
if (curr_time_ms - profile->hash_node->priority_blocked_time_ms[priority] < PRIORITY_BLOCK_MIN_TIME_MS) {
@@ -848,11 +854,22 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi
goto DROP;
}
}
- /*todo: AQM, just for primary profile*/
for (int i = 0; i < profile_num; i++) {
profile = pf_container[i].pf_info;
profile_type = pf_container[i].pf_type;
+
+ /*AQM process, if aqm not pass, for primary profile drop packet, for borrow profile just don't give token to this packet*/
+ if (shaper_aqm_need_drop(profile, pkt_wrapper)) {
+ if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {
+ shaper_flow_pop(ctx, sf);
+ goto DROP;
+ } else {
+ shaper_flow_specific_borrow_priority_pop(ctx, sf, priority);
+ continue;
+ }
+ }
+
int ret = shaper_token_consume(ctx, sf, pkt_wrapper->length, profile, profile_type, pkt_wrapper->direction);
if (ret >= SHAPER_TOKEN_GET_SUCCESS) {
if (ret == SHAPER_TOKEN_GET_SUCCESS) {
diff --git a/shaping/src/shaper_aqm.cpp b/shaping/src/shaper_aqm.cpp
new file mode 100644
index 0000000..1d71769
--- /dev/null
+++ b/shaping/src/shaper_aqm.cpp
@@ -0,0 +1,52 @@
+#include <time.h>
+#include "shaper.h"
+#include "shaper_aqm.h"
+
+#define PROBABILITY_MAX 100
+#define INCREMENT 10
+#define DECREMENT 1
+#define FREEZE_TIME 2 //unit:s
+#define QUEUE_LEN_MAX 100
+static int shaper_aqm_blue_need_drop(struct shaping_packet_wrapper *pkt_wrapper, struct shaper_aqm_blue_para *para, int curr_queue_len)
+{
+ time_t curr_time;
+
+ if (pkt_wrapper->aqm_processed) {
+ return 0;
+ }
+
+ if (time(&curr_time) - para->update_time >= FREEZE_TIME) {
+ para->update_time = curr_time;
+ if (curr_queue_len >= QUEUE_LEN_MAX) {
+ para->probability = (para->probability + INCREMENT) > PROBABILITY_MAX ? PROBABILITY_MAX : (para->probability + INCREMENT);
+ } else if (curr_queue_len == 0) {
+ para->probability = (para->probability - DECREMENT) >= 0 ? (para->probability - DECREMENT) : 0;
+ }
+ }
+
+ if (rand() % PROBABILITY_MAX < para->probability) {
+ return 1;
+ }
+
+ return 0;
+}
+
+int shaper_aqm_need_drop(struct shaping_profile_info *profile, struct shaping_packet_wrapper *pkt_wrapper)
+{
+ //TODO: judge if this packet is aqm processed
+ if (pkt_wrapper->aqm_processed) {
+ return 0;
+ }
+
+ switch (profile->hash_node->aqm_type) {
+ case AQM_TYPE_BLUE:
+ return shaper_aqm_blue_need_drop(pkt_wrapper, &profile->hash_node->aqm_blue_para, profile->hash_node->queue_len[profile->priority]);
+ break;
+ case AQM_TYPE_CODEL:
+ break;
+ default:
+ return 0;
+ }
+
+ return 0;
+} \ No newline at end of file