summaryrefslogtreecommitdiff
path: root/shaping/src/shaper.cpp
diff options
context:
space:
mode:
author刘畅 <[email protected]>2023-04-04 12:27:33 +0000
committer刘畅 <[email protected]>2023-04-04 12:27:33 +0000
commit00d035db8063aef61076138b116fc06dde2ea4f0 (patch)
tree48e50859012bf900ba07bdf3ad9f85d2701d327a /shaping/src/shaper.cpp
parent9a0ff4d68c0d165ca4c65c850dfed9c2c7dd4c80 (diff)
parentd92e71f1082c9f38ca22e762d1dd7ba8fd7c0aa9 (diff)
Merge branch 'priority_by_swarmkv' into 'rel'
Priority by swarmkv See merge request tango/shaping-engine!7
Diffstat (limited to 'shaping/src/shaper.cpp')
-rw-r--r--shaping/src/shaper.cpp117
1 files changed, 78 insertions, 39 deletions
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index 51474e3..64775e0 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -27,6 +27,20 @@ extern "C" {
#define MICRO_SECONDS_PER_SEC 1000000
#define NANO_SECONDS_PER_SEC 1000000000
+#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 "HMGET tsg-shaping-%d priority-0"
+#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 " priority-1"
+#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 " priority-2"
+#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_4 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3 " priority-3"
+#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_5 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_4 " priority-4"
+#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_6 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_5 " priority-5"
+#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_7 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_6 " priority-6"
+#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_8 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_7 " priority-7"
+#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_9 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_8 " priority-8"
+
+const char *swarmkv_queue_len_get_cmd[] = {"", SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3,
+ SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_4, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_5, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_6,
+ SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_7, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_8, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_9};
+
struct shaper {//trees in one thread
struct avl_tree *priority_trees[SHAPING_PRIORITY_NUM_MAX];//represent 10 avl tree corresponding to 10 priority
};
@@ -39,6 +53,7 @@ struct shaping_node {//a session will have 10 nodes, corresponding 10 avl tree
struct shaping_async_cb_arg {
struct shaping_flow *sf;
struct shaping_profile_info *s_pf_info;
+ int priority;
unsigned char direction;
};
@@ -214,6 +229,11 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx)
return;
}
+static void swarmkv_reply_cb_do_nothing(const struct swarmkv_reply *reply, void * arg)
+{
+ return;
+}
+
//return success(0) while any avl tree insert success
int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time)
{
@@ -232,6 +252,7 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un
avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns);
if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {
ret = 0;
+ swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->primary.id, priority);
shaper_stat_queueing_pkt_inc(ctx->stat, s_rule_info->id, s_rule_info->primary.id,
priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index);
shaper_stat_queueing_session_inc(ctx->stat, s_rule_info->id, s_rule_info->primary.id, priority, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index);
@@ -246,6 +267,7 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un
avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns);
if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {
ret = 0;
+ swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->borrowing[i].id, priority);
shaper_stat_queueing_pkt_inc(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id,
priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index);
shaper_stat_queueing_session_inc(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id, priority, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index);
@@ -285,6 +307,8 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
priority = s_rule_info->primary.priority;
if (avl_node_in_tree(s_node->avl_node[priority])) {
avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]);
+ swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->primary.id, priority);
+
shaper_stat_queueing_pkt_dec(ctx->stat, s_rule_info->id, s_rule_info->primary.id,
priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index);
shaper_stat_queueing_session_dec(ctx->stat, s_rule_info->id, s_rule_info->primary.id, priority, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index);
@@ -302,6 +326,8 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
priority = s_rule_info->borrowing[i].priority;
if (avl_node_in_tree(s_node->avl_node[priority])) {
avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]);
+ swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->borrowing[i].id, priority);
+
shaper_stat_queueing_pkt_dec(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id,
priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index);
shaper_stat_queueing_session_dec(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id, priority, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index);
@@ -368,7 +394,7 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg
END:
free(cb_arg);
- __atomic_sub_fetch(&s_pf_info->async_ref_count, 1, __ATOMIC_SEQ_CST);
+ __atomic_sub_fetch(&s_pf_info->async_token_ref_count, 1, __ATOMIC_SEQ_CST);
shaping_flow_free(sf);//sub ref count and decide if need to free
return;
@@ -405,7 +431,7 @@ static int shaper_token_get_from_profile(struct swarmkv *db, struct shaping_flow
struct shaping_async_cb_arg *arg;
char key[32] = {0};
- __atomic_add_fetch(&pf_info->async_ref_count, 1, __ATOMIC_SEQ_CST);
+ __atomic_add_fetch(&pf_info->async_token_ref_count, 1, __ATOMIC_SEQ_CST);
__atomic_add_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST);
snprintf(key, sizeof(key), "tsg-shaping-%d-%s", pf_info->id, direction == SHAPING_DIR_OUT ? "outgoing" : "incoming");
@@ -415,7 +441,7 @@ static int shaper_token_get_from_profile(struct swarmkv *db, struct shaping_flow
arg->direction = direction;
swarmkv_tconsume(db, key, strlen(key), req_token, shaper_token_get_cb, arg);
- if (__atomic_load_n(&pf_info->async_ref_count, __ATOMIC_SEQ_CST) != 0) {//has async operation not completed
+ if (__atomic_load_n(&pf_info->async_token_ref_count, __ATOMIC_SEQ_CST) != 0) {//has async operation not completed
shaper_deposit_token_sub(pf_info, req_token, direction);
return 0;
}
@@ -436,57 +462,70 @@ static int shaper_token_get_from_profile(struct swarmkv *db, struct shaping_flow
return -1;
}
-#if 0
-int shaper_token_consume(struct shaping_flow *sf, unsigned int req_token, struct shaping_rule_info *s_rule_info)
+static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb_arg)
{
- int i;
+ struct shaping_async_cb_arg *arg = (struct shaping_async_cb_arg *)cb_arg;
+ struct shaping_profile_info *s_pf_info = arg->s_pf_info;
+ struct shaping_flow *sf = arg->sf;
+
+ s_pf_info->is_priority_blocked = 0;
- if (SHAPING_SUCCESS == shaping_token_get_from_profile(&s_rule_info->primary, 1, req_token)) {
- return SHAPING_SUCCESS;
+ if (!reply || reply->type != SWARMKV_REPLY_ARRAY) {
+ goto END;
}
- if (s_rule_info->borrowing_num > 0) {
- for (i = 0; i < s_rule_info->borrowing_num; i++) {
- if (SHAPING_SUCCESS == shaping_token_get_from_profile(&s_rule_info->borrowing[i], 0, req_token)) {
- return SHAPING_SUCCESS;
- }
+ for (unsigned int i = 0; i < reply->n_element; i++) {
+ if (reply->elements[i] && reply->elements[i]->integer > 0) {
+ s_pf_info->is_priority_blocked = 1;
+ break;
}
}
- return SHAPING_FAILED;
+END:
+ free(cb_arg);
+ __atomic_sub_fetch(&s_pf_info->async_queue_len_ref_count, 1, __ATOMIC_SEQ_CST);
+ shaping_flow_free(sf);//sub ref count and decide if need to free
}
-#endif
-static int shaper_token_consume(struct swarmkv *db, struct shaping_flow *sf, int req_token,
- struct shaping_profile_info *profile, int profile_type, unsigned char direction)
+static int shaper_profile_is_priority_blocked(struct swarmkv *db, struct shaping_flow *sf, struct shaping_profile_info *profile)
{
- return shaper_token_get_from_profile(db, sf, profile, profile_type, req_token, direction);
-
-}
+ struct shaping_async_cb_arg *arg;
+ int priority = profile->priority;
-#if 0
-enum shaping_packet_action shaper_pkt_action_decide(struct shaping_flow *sf, struct shaper *sp, void *raw_pkt,
- unsigned int pkt_len, unsigned char direction, unsigned long long income_time)
-{
- int i;
- struct shaping_rule_info *s_rule_info;
-
- for (i = sf->anchor; i < sf->rule_num; i++) {
- s_rule_info = &sf->matched_rule_infos[i];
- if (-1 == shaper_token_consume(sf, pkt_len, s_rule_info, s_rule_info->primary.priority)) {
- sf->anchor = i;
- if (0 == shaper_flow_push(sf, sp)) {
- shaper_packet_enqueue(sf, raw_pkt, direction, income_time);
- return SHAPING_HOLD;
- } else {
- return SHAPING_DROP;
- }
+ if (priority == 0) {//highest priority, can't be blocked
+ return 0;
+ }
+
+ arg = (struct shaping_async_cb_arg *)calloc(1, sizeof(struct shaping_async_cb_arg));
+ arg->s_pf_info = profile;
+ arg->sf = sf;
+ arg->priority = priority;
+
+ __atomic_add_fetch(&profile->async_queue_len_ref_count, 1, __ATOMIC_SEQ_CST);
+ __atomic_add_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST);
+
+ swarmkv_async_command(db, shaper_queue_len_get_cb, arg, swarmkv_queue_len_get_cmd[priority], profile->id);
+
+ if (__atomic_load_n(&profile->async_queue_len_ref_count, __ATOMIC_SEQ_CST) != 0) {
+ return 0;
+ } else {
+ if (profile->is_priority_blocked) {
+ return 1;
+ } else {
+ return 0;
}
}
+}
- return SHAPING_FORWARD;
+static int shaper_token_consume(struct swarmkv *db, struct shaping_flow *sf, int req_token,
+ struct shaping_profile_info *profile, int profile_type, unsigned char direction)
+{
+ if (shaper_profile_is_priority_blocked(db, sf, profile)) {
+ return -1;
+ } else {
+ return shaper_token_get_from_profile(db, sf, profile, profile_type, req_token, direction);
+ }
}
-#endif
int shaper_profile_get(struct shaping_rule_info *s_rule_info, int priority, struct shaping_profile_container pf_container[])
{