summaryrefslogtreecommitdiff
path: root/shaping/src/shaper.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'shaping/src/shaper.cpp')
-rw-r--r--shaping/src/shaper.cpp108
1 files changed, 41 insertions, 67 deletions
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index 2d88b01..aef5a65 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -22,10 +22,7 @@ extern "C" {
#include "shaper_swarmkv.h"
#include "shaper_maat.h"
#include "shaper_global_stat.h"
-
-#define NANO_SECONDS_PER_MICRO_SEC 1000
-#define MICRO_SECONDS_PER_SEC 1000000
-#define NANO_SECONDS_PER_SEC 1000000000
+#include "shaper_aqm.h"
#define SHAPING_LATENCY_THRESHOLD 2000000 //2s
@@ -65,7 +62,23 @@ struct shaping_profile_container {
int pf_type;
};
-struct shaper* shaper_new(unsigned int priority_queue_len_max)
+static void shaper_free(struct shaper *sp)
+{
+ int i;
+
+ if (sp) {
+ for (i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) {
+ if (sp->priority_trees[i]) {
+ avl_tree_destroy(sp->priority_trees[i]);
+ }
+ }
+ free(sp);
+ }
+
+ return;
+}
+
+static struct shaper* shaper_new(unsigned int priority_queue_len_max)
{
struct shaper *sp = NULL;
int i;
@@ -90,22 +103,6 @@ ERROR:
return NULL;
}
-void shaper_free(struct shaper *sp)
-{
- int i;
-
- if (sp) {
- for (i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) {
- if (sp->priority_trees[i]) {
- avl_tree_destroy(sp->priority_trees[i]);
- }
- }
- free(sp);
- }
-
- return;
-}
-
static void shaping_node_free(struct shaping_node *s_node)
{
int i;
@@ -164,7 +161,7 @@ void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
struct shaping_node *s_node = (struct shaping_node*)sf;
if (__atomic_sub_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST) == 0) {
- shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 1);
+ shaper_stat_refresh(ctx, sf, ctx->thread_index, 1);
shaping_node_free(s_node);
}
@@ -180,6 +177,10 @@ static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_
return -1;
}
+ if (shaper_aqm_enqueue(&sf->matched_rule_infos[sf->anchor].primary) == AQM_ACTION_DROP) {
+ return -1;
+ }
+
s_pkt = (struct shaping_packet_wrapper*)calloc(1, sizeof(struct shaping_packet_wrapper));
if (!s_pkt) {
return -1;
@@ -204,12 +205,12 @@ bool shaper_queue_empty(struct shaping_flow *sf)
return TAILQ_EMPTY(&sf->packet_queue);
}
-struct shaping_packet_wrapper* shaper_first_pkt_get(struct shaping_flow *sf)
+static struct shaping_packet_wrapper* shaper_first_pkt_get(struct shaping_flow *sf)
{
return TAILQ_FIRST(&sf->packet_queue);
}
-void shaper_packet_dequeue(struct shaping_flow *sf)
+static void shaper_packet_dequeue(struct shaping_flow *sf)
{
struct shaping_packet_wrapper *s_pkt;
@@ -244,21 +245,8 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx)
return;
}
-static void swarmkv_reply_cb_do_nothing(const struct swarmkv_reply *reply, void * cb_arg)
-{
- struct shaping_global_stat *global_stat = (struct shaping_global_stat *)cb_arg;
-
- shaper_global_stat_async_callback_inc(global_stat);
-
- if (reply->type != SWARMKV_REPLY_INTEGER) {
- shaper_global_stat_async_hincrby_failed_inc(global_stat);
- }
-
- return;
-}
-
//return success(0) while any avl tree insert success
-int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time)
+static int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time)
{
struct shaping_node *s_node = (struct shaping_node*)sf;
struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor];
@@ -271,20 +259,10 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un
pkt_wrapper = shaper_first_pkt_get(sf);
assert(pkt_wrapper != NULL);
- if ((sf->flag & SESSION_UPDATE_PF_PRIO_LEN) == 0) {
- if (sf->processed_pkts > CONFIRM_PRIORITY_PKTS) {
- sf->flag |= SESSION_UPDATE_PF_PRIO_LEN;
- }
- }
-
priority = s_rule_info->primary.priority;
avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns);
if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {
ret = 0;
- if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) {
- shaper_global_stat_async_invoke_inc(ctx->global_stat);
- swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->primary.id, priority);
- }
}
if (s_rule_info->borrowing_num == 0) {// no borrow profile
@@ -296,10 +274,8 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un
avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns);
if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {
ret = 0;
- if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) {
- shaper_global_stat_async_invoke_inc(ctx->global_stat);
- swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->borrowing[i].id, priority);
- }
+ //TODO: calculate queue_len for borrow profile and add judge when refresh stat????
+ //shaper_stat_queueing_pkt_inc(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index);
}
}
@@ -320,7 +296,7 @@ static unsigned long long shaper_pkt_latency_us_calculate(struct shaping_profile
return (curr_time - enqueue_time);
}
-void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
+static void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
{
struct shaping_node *s_node = (struct shaping_node*)sf;
struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor];
@@ -339,10 +315,6 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
priority = s_rule_info->primary.priority;
if (avl_node_in_tree(s_node->avl_node[priority])) {
avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]);
- if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) {
- shaper_global_stat_async_invoke_inc(ctx->global_stat);
- swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->primary.id, priority);
- }
}
if (s_rule_info->borrowing_num == 0) {
@@ -353,10 +325,7 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
priority = s_rule_info->borrowing[i].priority;
if (avl_node_in_tree(s_node->avl_node[priority])) {
avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]);
- if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) {
- shaper_global_stat_async_invoke_inc(ctx->global_stat);
- swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->borrowing[i].id, priority);
- }
+ //TODO: calculate queue_len for borrow profile and add judge when refresh stat
}
}
@@ -368,7 +337,7 @@ END:
return;
}
-int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num)
+static int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num)
{
struct avl_node *avl_node = NULL;
int count = 0;
@@ -691,8 +660,13 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi
shaper_flow_pop(ctx, sf);
goto DROP;
}
- }
- /*todo: AQM, just for primary profile*/
+
+ if (shaper_aqm_need_drop(pf_container[0].pf_info, pkt_wrapper)) {
+ shaper_flow_pop(ctx, sf);
+ goto DROP;
+ }
+ }
+ /*todo: AQM*/
for (int i = 0; i < profile_num; i++) {
profile = pf_container[i].pf_info;
@@ -819,7 +793,7 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
break;
}
- shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 0);
+ shaper_stat_refresh(ctx, sf, ctx->thread_index, 0);
if (shaper_queue_empty(sf)) {
if (sf->flag & SESSION_CLOSE) {
@@ -899,7 +873,7 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
}
END:
- shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 0);
+ shaper_stat_refresh(ctx, sf, ctx->thread_index, 0);
if(sf->flag & SESSION_CLOSE) {
if (shaper_queue_empty(sf)) {
char *addr_str = addr_tuple4_to_str(&sf->tuple4);
@@ -1046,7 +1020,7 @@ void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx)
return;
}
-int shaper_global_conf_init(struct shaping_system_conf *conf)
+static int shaper_global_conf_init(struct shaping_system_conf *conf)
{
int ret;
int array_num;