summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--shaping/CMakeLists.txt2
-rw-r--r--shaping/include/shaper.h42
-rw-r--r--shaping/include/shaper_aqm.h10
-rw-r--r--shaping/include/shaper_stat.h2
-rw-r--r--shaping/src/shaper.cpp108
-rw-r--r--shaping/src/shaper_aqm.cpp177
-rw-r--r--shaping/src/shaper_maat.cpp2
-rw-r--r--shaping/src/shaper_stat.cpp26
-rw-r--r--shaping/test/gtest_shaper.cpp4
9 files changed, 288 insertions, 85 deletions
diff --git a/shaping/CMakeLists.txt b/shaping/CMakeLists.txt
index 846a1bb..9c6daf2 100644
--- a/shaping/CMakeLists.txt
+++ b/shaping/CMakeLists.txt
@@ -1,4 +1,4 @@
-add_library(shaper src/shaper_maat.cpp src/shaper_marsio.cpp src/shaper_session.cpp src/shaper_stat.cpp src/shaper_swarmkv.cpp src/shaper.cpp src/shaper_global_stat.cpp)
+add_library(shaper src/shaper_maat.cpp src/shaper_marsio.cpp src/shaper_session.cpp src/shaper_stat.cpp src/shaper_swarmkv.cpp src/shaper.cpp src/shaper_global_stat.cpp src/shaper_aqm.cpp)
target_link_libraries(shaper PUBLIC common)
target_link_libraries(shaper PUBLIC avl_tree)
target_link_libraries(shaper PUBLIC cjson)
diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h
index 2be4d65..f629124 100644
--- a/shaping/include/shaper.h
+++ b/shaping/include/shaper.h
@@ -16,7 +16,6 @@
#define SHAPER_FLOW_POP_NUM_MAX 10
#define SESSION_CLOSE 0x1
-#define SESSION_UPDATE_PF_PRIO_LEN 0x2
#define CONFIRM_PRIORITY_PKTS 20
@@ -24,6 +23,10 @@
#define SHAPING_GLOBAL_CONF_FILE "./conf/shaping.conf"
+#define NANO_SECONDS_PER_MICRO_SEC 1000
+#define MICRO_SECONDS_PER_SEC 1000000
+#define NANO_SECONDS_PER_SEC 1000000000
+
struct shaping_system_conf {
unsigned int session_queue_len_max;
unsigned int priority_queue_len_max;
@@ -78,9 +81,26 @@ enum shaping_profile_type {
PROFILE_TYPE_SPLIT_BY_LOCAL_HOST
};
+enum shaper_aqm_type {
+ AQM_TYPE_NONE = 0,
+ AQM_TYPE_BLUE,
+ AQM_TYPE_CODEL,
+ AQM_TYPE_MAX
+};
+
+struct shaper_aqm_blue_para {
+ time_t update_time;
+ int queue_len_max;
+ int probability;
+ int queue_len;
+ //int d1;//increase delta
+ //int d2;//decrease delta
+};
+
struct shaping_profile_info {
int id;//profile_id
enum shaping_profile_type type;
+ enum shaper_aqm_type aqm_type;
int priority;
int in_deposit_token;
int out_deposit_token;
@@ -90,6 +110,10 @@ struct shaping_profile_info {
unsigned char is_priority_blocked;
unsigned char is_invalid;
struct shaping_stat_for_profile stat;
+ union {
+ struct shaper_aqm_blue_para blue_para;
+ //struct shaper_aqm_codel_para codel_para;
+ }aqm_para;
};
struct shaping_rule_info {
@@ -153,22 +177,22 @@ struct shaper;//instance of shaping, thread unsafe
struct shaping_flow* shaping_flow_new();
void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf);
-struct shaper* shaper_new(unsigned int priority_queue_len_max);
-void shaper_free(struct shaper *sp);
+//struct shaper* shaper_new(unsigned int priority_queue_len_max);
+//void shaper_free(struct shaper *sp);
bool shaper_queue_empty(struct shaping_flow *sf);
-void shaper_packet_dequeue(struct shaping_flow *sf);
-struct shaping_packet_wrapper* shaper_first_pkt_get(struct shaping_flow *sf);
+//void shaper_packet_dequeue(struct shaping_flow *sf);
+//struct shaping_packet_wrapper* shaper_first_pkt_get(struct shaping_flow *sf);
void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx);
/*return value: 0 for success, -1 for failed*/
-int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time);
+//int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time);
/*return num of sf_ins*/
-void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf);
-int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num);
+//void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf);
+//int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num);
//enum shaping_packet_action shaper_pkt_action_decide(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int priority, int sf_in_queue);
-int shaper_global_conf_init(struct shaping_system_conf *conf);
+//int shaper_global_conf_init(struct shaping_system_conf *conf);
void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx);
void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_buff, struct metadata *meta, struct shaping_flow *sf);
diff --git a/shaping/include/shaper_aqm.h b/shaping/include/shaper_aqm.h
new file mode 100644
index 0000000..f7e0123
--- /dev/null
+++ b/shaping/include/shaper_aqm.h
@@ -0,0 +1,10 @@
+#pragma once
+
+enum shaper_aqm_action {
+ AQM_ACTION_PASS,
+ AQM_ACTION_DROP,
+};
+
+int shaper_aqm_enqueue(struct shaping_profile_info *profile);
+int shaper_aqm_dequeue();
+int shaper_aqm_need_drop(struct shaping_profile_info *profile, struct shaping_packet_wrapper *pkt_wrapper); \ No newline at end of file
diff --git a/shaping/include/shaper_stat.h b/shaping/include/shaper_stat.h
index 20da941..a2527cd 100644
--- a/shaping/include/shaper_stat.h
+++ b/shaping/include/shaper_stat.h
@@ -55,4 +55,4 @@ void shaper_stat_forward_all_rule_inc(struct shaping_stat *stat, struct shaping_
void shaper_stat_drop_inc(struct shaping_stat_for_profile *profile_stat, unsigned char direction, int thread_id);
void shaper_stat_max_latency_update(struct shaping_stat_for_profile *profile_stat, unsigned char direction, unsigned long long latency, int thread_id);
-void shaper_stat_refresh(struct shaping_stat *stat, struct shaping_flow *sf, int thread_id, int force); \ No newline at end of file
+void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int thread_id, int force); \ No newline at end of file
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index 2d88b01..aef5a65 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -22,10 +22,7 @@ extern "C" {
#include "shaper_swarmkv.h"
#include "shaper_maat.h"
#include "shaper_global_stat.h"
-
-#define NANO_SECONDS_PER_MICRO_SEC 1000
-#define MICRO_SECONDS_PER_SEC 1000000
-#define NANO_SECONDS_PER_SEC 1000000000
+#include "shaper_aqm.h"
#define SHAPING_LATENCY_THRESHOLD 2000000 //2s
@@ -65,7 +62,23 @@ struct shaping_profile_container {
int pf_type;
};
-struct shaper* shaper_new(unsigned int priority_queue_len_max)
+static void shaper_free(struct shaper *sp)
+{
+ int i;
+
+ if (sp) {
+ for (i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) {
+ if (sp->priority_trees[i]) {
+ avl_tree_destroy(sp->priority_trees[i]);
+ }
+ }
+ free(sp);
+ }
+
+ return;
+}
+
+static struct shaper* shaper_new(unsigned int priority_queue_len_max)
{
struct shaper *sp = NULL;
int i;
@@ -90,22 +103,6 @@ ERROR:
return NULL;
}
-void shaper_free(struct shaper *sp)
-{
- int i;
-
- if (sp) {
- for (i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) {
- if (sp->priority_trees[i]) {
- avl_tree_destroy(sp->priority_trees[i]);
- }
- }
- free(sp);
- }
-
- return;
-}
-
static void shaping_node_free(struct shaping_node *s_node)
{
int i;
@@ -164,7 +161,7 @@ void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
struct shaping_node *s_node = (struct shaping_node*)sf;
if (__atomic_sub_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST) == 0) {
- shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 1);
+ shaper_stat_refresh(ctx, sf, ctx->thread_index, 1);
shaping_node_free(s_node);
}
@@ -180,6 +177,10 @@ static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_
return -1;
}
+ if (shaper_aqm_enqueue(&sf->matched_rule_infos[sf->anchor].primary) == AQM_ACTION_DROP) {
+ return -1;
+ }
+
s_pkt = (struct shaping_packet_wrapper*)calloc(1, sizeof(struct shaping_packet_wrapper));
if (!s_pkt) {
return -1;
@@ -204,12 +205,12 @@ bool shaper_queue_empty(struct shaping_flow *sf)
return TAILQ_EMPTY(&sf->packet_queue);
}
-struct shaping_packet_wrapper* shaper_first_pkt_get(struct shaping_flow *sf)
+static struct shaping_packet_wrapper* shaper_first_pkt_get(struct shaping_flow *sf)
{
return TAILQ_FIRST(&sf->packet_queue);
}
-void shaper_packet_dequeue(struct shaping_flow *sf)
+static void shaper_packet_dequeue(struct shaping_flow *sf)
{
struct shaping_packet_wrapper *s_pkt;
@@ -244,21 +245,8 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx)
return;
}
-static void swarmkv_reply_cb_do_nothing(const struct swarmkv_reply *reply, void * cb_arg)
-{
- struct shaping_global_stat *global_stat = (struct shaping_global_stat *)cb_arg;
-
- shaper_global_stat_async_callback_inc(global_stat);
-
- if (reply->type != SWARMKV_REPLY_INTEGER) {
- shaper_global_stat_async_hincrby_failed_inc(global_stat);
- }
-
- return;
-}
-
//return success(0) while any avl tree insert success
-int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time)
+static int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time)
{
struct shaping_node *s_node = (struct shaping_node*)sf;
struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor];
@@ -271,20 +259,10 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un
pkt_wrapper = shaper_first_pkt_get(sf);
assert(pkt_wrapper != NULL);
- if ((sf->flag & SESSION_UPDATE_PF_PRIO_LEN) == 0) {
- if (sf->processed_pkts > CONFIRM_PRIORITY_PKTS) {
- sf->flag |= SESSION_UPDATE_PF_PRIO_LEN;
- }
- }
-
priority = s_rule_info->primary.priority;
avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns);
if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {
ret = 0;
- if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) {
- shaper_global_stat_async_invoke_inc(ctx->global_stat);
- swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->primary.id, priority);
- }
}
if (s_rule_info->borrowing_num == 0) {// no borrow profile
@@ -296,10 +274,8 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un
avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns);
if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {
ret = 0;
- if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) {
- shaper_global_stat_async_invoke_inc(ctx->global_stat);
- swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->borrowing[i].id, priority);
- }
+ //TODO: calculate queue_len for borrow profile and add judge when refresh stat????
+ //shaper_stat_queueing_pkt_inc(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index);
}
}
@@ -320,7 +296,7 @@ static unsigned long long shaper_pkt_latency_us_calculate(struct shaping_profile
return (curr_time - enqueue_time);
}
-void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
+static void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
{
struct shaping_node *s_node = (struct shaping_node*)sf;
struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor];
@@ -339,10 +315,6 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
priority = s_rule_info->primary.priority;
if (avl_node_in_tree(s_node->avl_node[priority])) {
avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]);
- if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) {
- shaper_global_stat_async_invoke_inc(ctx->global_stat);
- swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->primary.id, priority);
- }
}
if (s_rule_info->borrowing_num == 0) {
@@ -353,10 +325,7 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
priority = s_rule_info->borrowing[i].priority;
if (avl_node_in_tree(s_node->avl_node[priority])) {
avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]);
- if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) {
- shaper_global_stat_async_invoke_inc(ctx->global_stat);
- swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->borrowing[i].id, priority);
- }
+ //TODO: calculate queue_len for borrow profile and add judge when refresh stat
}
}
@@ -368,7 +337,7 @@ END:
return;
}
-int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num)
+static int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num)
{
struct avl_node *avl_node = NULL;
int count = 0;
@@ -691,8 +660,13 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi
shaper_flow_pop(ctx, sf);
goto DROP;
}
- }
- /*todo: AQM, just for primary profile*/
+
+ if (shaper_aqm_need_drop(pf_container[0].pf_info, pkt_wrapper)) {
+ shaper_flow_pop(ctx, sf);
+ goto DROP;
+ }
+ }
+ /*todo: AQM*/
for (int i = 0; i < profile_num; i++) {
profile = pf_container[i].pf_info;
@@ -819,7 +793,7 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
break;
}
- shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 0);
+ shaper_stat_refresh(ctx, sf, ctx->thread_index, 0);
if (shaper_queue_empty(sf)) {
if (sf->flag & SESSION_CLOSE) {
@@ -899,7 +873,7 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
}
END:
- shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 0);
+ shaper_stat_refresh(ctx, sf, ctx->thread_index, 0);
if(sf->flag & SESSION_CLOSE) {
if (shaper_queue_empty(sf)) {
char *addr_str = addr_tuple4_to_str(&sf->tuple4);
@@ -1046,7 +1020,7 @@ void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx)
return;
}
-int shaper_global_conf_init(struct shaping_system_conf *conf)
+static int shaper_global_conf_init(struct shaping_system_conf *conf)
{
int ret;
int array_num;
diff --git a/shaping/src/shaper_aqm.cpp b/shaping/src/shaper_aqm.cpp
new file mode 100644
index 0000000..e7b00e5
--- /dev/null
+++ b/shaping/src/shaper_aqm.cpp
@@ -0,0 +1,177 @@
+#include <cstdlib>
+#include <time.h>
+
+#include "shaper.h"
+#include "shaper_aqm.h"
+
+
+/**************************blue******************************/
+#define PROBABILITY_MAX 100
+#define INCREMENT 10
+#define DECREMENT 1
+#define FREEZE_TIME 3 //unit:s
+
+static int shaper_aqm_blue_need_drop(int profile_id, struct shaper_aqm_blue_para *para)
+{
+ time_t curr_time;
+ if (time(&curr_time) - para->update_time >= FREEZE_TIME) {
+ para->update_time = curr_time;
+ if (para->queue_len >= para->queue_len_max) {
+ para->probability = (para->probability + INCREMENT) > PROBABILITY_MAX ? PROBABILITY_MAX : (para->probability + INCREMENT);
+ } else if (para->queue_len == 0) {
+ para->probability = (para->probability - DECREMENT) >= 0 ? (para->probability - DECREMENT) : 0;
+ }
+ }
+
+ if (rand() / PROBABILITY_MAX < para->probability) {
+ return 1;
+ }
+
+ return 0;
+}
+/**************************blue*****************************/
+
+#if 0
+/**************************stochastic fair blue*****************************/
+/*
+ * SFB uses two B[l][n] : L x N arrays of bins (L levels, N bins per level)
+ * This implementation uses L = 8 and N = 16
+ * This permits us to split one 32bit hash (provided per packet by rxhash or
+ * external classifier) into 8 subhashes of 4 bits.
+ */
+#define SFB_BUCKET_SHIFT 4
+#define SFB_NUMBUCKETS (1 << SFB_BUCKET_SHIFT) /* N bins per Level */
+#define SFB_BUCKET_MASK (SFB_NUMBUCKETS - 1)
+#define SFB_LEVELS (32 / SFB_BUCKET_SHIFT) /* L */
+
+struct sfb_bucket {
+ int queue_len;
+ int probability;
+};
+struct shaper_aqm_sfb_para {
+ struct sfb_bucket bins[SFB_LEVELS][SFB_NUMBUCKETS];
+};
+/**************************stochastic fair blue*****************************/
+#endif
+
+
+#if 0
+/**************************codel*****************************/
+#define CODEL_MAX_LATENCY 1500000 //unit:us
+
+#define REC_INV_SQRT_BITS (8 * sizeof(unsigned short)) /* or sizeof_in_bits(rec_inv_sqrt) */
+/* needed shift to get a Q0.32 number from rec_inv_sqrt */
+#define REC_INV_SQRT_SHIFT (32 - REC_INV_SQRT_BITS)
+
+static void shaper_aqm_codel_enqueue()
+{
+ return;
+}
+
+static void shaper_aqm_codel_Newton_step(struct codel_vars *vars)
+{
+ unsigned int invsqrt = ((unsigned int)vars->rec_inv_sqrt) << REC_INV_SQRT_SHIFT;
+ unsigned int invsqrt2 = ((unsigned long long)invsqrt * invsqrt) >> 32;
+ unsigned long long val = (3LL << 32) - ((unsigned long long)vars->count * invsqrt2);
+
+ val >>= 2; /* avoid overflow in following multiply */
+ val = (val * invsqrt) >> (32 - 2 + 1);
+
+ vars->rec_inv_sqrt = val >> REC_INV_SQRT_SHIFT;
+}
+
+static inline unsigned int reciprocal_scale(unsigned int val, unsigned int ep_ro)
+{
+ return (unsigned int)(((unsigned long long) val * ep_ro) >> 32);
+}
+
+static unsigned long long shaper_aqm_codel_control_law(unsigned long long t,
+ unsigned long long interval,
+ unsigned int rec_inv_sqrt)
+{
+ return t + reciprocal_scale(interval, rec_inv_sqrt << REC_INV_SQRT_SHIFT);
+}
+
+static int shaper_aqm_codel_need_drop(struct shaper_aqm_codel_para *para, struct shaping_profile_info *profile)
+{
+ struct timespec time;
+ unsigned long long curr_time;
+
+ clock_gettime(CLOCK_MONOTONIC, &time);
+ curr_time = time.tv_sec * MICRO_SECONDS_PER_SEC + time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
+
+ if (curr_time - profile->enqueue_time_us < CODEL_MAX_LATENCY) {
+ //TODO:swarmkv set first above time to 0
+ return 0;
+ }
+
+ if (first_above_time_us == 0) {
+ first_above_time_us = curr_time + para->interval;//set in swarmkv
+ return 0;
+ } else if (curr_time > first_above_time_us) {
+ return 1;
+ }
+}
+
+static int shaper_aqm_codel_dequeue_action()
+{
+ int need_drop = shaper_aqm_codel_need_drop();
+
+ if (dropping_state) {
+ if (!need_drop) {
+ dropping_state = 0;
+ } else if (curr_time > drop_next) {
+ drop_count++;
+ shaper_aqm_codel_Newton_step();
+ drop_next = shaper_aqm_codel_control_law();
+ }
+ } else if (need_drop) {
+ dropping_state = 1;
+ delta = drop_count - last_drop_count;
+ if (delta > 1 && (curr_time - drop_next) < 16 * interval) {
+ drop_count = delta;
+ shaper_aqm_codel_Newton_step();
+ } else {
+ drop_count = 1;
+ rec_inv_sqrt = ~0U >> REC_INV_SQRT_SHIFT;
+ }
+
+ last_drop_count = drop_count;
+ drop_next = shaper_aqm_codel_control_law();
+ }
+}
+/**************************codel*****************************/
+#endif
+
+
+int shaper_aqm_enqueue(struct shaping_profile_info *profile)
+{
+ switch (profile->aqm_type) {
+ case AQM_TYPE_BLUE:
+ if (shaper_aqm_blue_need_drop(profile->id, &profile->aqm_para.blue_para)) {
+ return AQM_ACTION_DROP;
+ } else {
+ return AQM_ACTION_PASS;
+ }
+ case AQM_TYPE_CODEL:
+ default:
+ return AQM_ACTION_PASS;
+ }
+}
+
+int shaper_aqm_dequeue()
+{
+ return AQM_ACTION_PASS;
+}
+
+int shaper_aqm_need_drop(struct shaping_profile_info *profile, struct shaping_packet_wrapper *pkt_wrapper)
+{
+ switch (profile->aqm_type) {
+ case AQM_TYPE_BLUE:
+
+ case AQM_TYPE_CODEL:
+
+ default:
+ return 0;
+ }
+} \ No newline at end of file
diff --git a/shaping/src/shaper_maat.cpp b/shaping/src/shaper_maat.cpp
index 4b4f21f..64db3e6 100644
--- a/shaping/src/shaper_maat.cpp
+++ b/shaping/src/shaper_maat.cpp
@@ -381,7 +381,7 @@ void shaper_rules_update(struct shaping_thread_ctx *ctx, struct shaping_flow *sf
}
if (sf->rule_num > 0 && priority_changed) {
- shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 1);
+ shaper_stat_refresh(ctx, sf, ctx->thread_index, 1);
}
sf->rule_num += rule_num;
diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp
index e7d03a1..71965fa 100644
--- a/shaping/src/shaper_stat.cpp
+++ b/shaping/src/shaper_stat.cpp
@@ -4,12 +4,14 @@
#include <sys/socket.h>
#include <arpa/inet.h>
#include <MESA/MESA_prof_load.h>
+#include <MESA/swarmkv.h>
#include <fieldstat.h>
#include "log.h"
#include "utils.h"
#include "shaper.h"
#include "shaper_stat.h"
+#include "shaper_global_stat.h"
#define SHAPER_STAT_ROW_NAME "traffic_shaping_rule_hits"
@@ -131,9 +133,23 @@ static void shaper_stat_tags_build(int vsys_id, int rule_id, int profile_id, int
return;
}
-static void shaper_stat_profile_metirc_refresh(struct shaping_stat *stat, int vsys_id, int thread_id, int rule_id, struct shaping_profile_info *profile, int profile_type, int need_update_guage)
+static void shaper_stat_swarmkv_hincrby_cb(const struct swarmkv_reply *reply, void * cb_arg)
+{
+ struct shaping_global_stat *global_stat = (struct shaping_global_stat *)cb_arg;
+
+ shaper_global_stat_async_callback_inc(global_stat);
+
+ if (reply->type != SWARMKV_REPLY_INTEGER) {
+ shaper_global_stat_async_hincrby_failed_inc(global_stat);
+ }
+
+ return;
+}
+
+static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, int vsys_id, int thread_id, int rule_id, struct shaping_profile_info *profile, int profile_type, int need_update_guage)
{
struct shaping_stat_for_profile *profile_stat = &profile->stat;
+ struct shaping_stat *stat = ctx->stat;
unsigned long long old_latency;
shaper_stat_tags_build(vsys_id, rule_id, profile->id, profile->priority, profile_type);
@@ -158,6 +174,8 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_stat *stat, int vs
if (need_update_guage) {
fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_QUEUE_LEN_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.queue_len, tags, TAG_IDX_MAX, thread_id);
fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_QUEUE_LEN_IDX], SHAPER_STAT_ROW_NAME, profile_stat->out.queue_len, tags, TAG_IDX_MAX, thread_id);
+ shaper_global_stat_async_invoke_inc(ctx->global_stat);
+ swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d %lld", profile->id, profile->priority, profile_stat->in.queue_len + profile_stat->out.queue_len);
memset(profile_stat, 0, sizeof(struct shaping_stat_for_profile));
} else {
profile_stat->in.pkts = 0;
@@ -174,7 +192,7 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_stat *stat, int vs
return;
}
-void shaper_stat_refresh(struct shaping_stat *stat, struct shaping_flow *sf, int thread_id, int force)
+void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int thread_id, int force)
{
struct shaping_rule_info *rule;
struct timespec curr_time;
@@ -199,10 +217,10 @@ void shaper_stat_refresh(struct shaping_stat *stat, struct shaping_flow *sf, int
for (int i = 0; i < sf->rule_num; i++) {
rule = &sf->matched_rule_infos[i];
- shaper_stat_profile_metirc_refresh(stat, rule->vsys_id, thread_id, rule->id, &rule->primary, PROFILE_IN_RULE_TYPE_PRIMARY, need_update_guage);
+ shaper_stat_profile_metirc_refresh(ctx, rule->vsys_id, thread_id, rule->id, &rule->primary, PROFILE_IN_RULE_TYPE_PRIMARY, need_update_guage);
for (int j = 0; j < rule->borrowing_num; j++) {
- shaper_stat_profile_metirc_refresh(stat, rule->vsys_id, thread_id, rule->id, &rule->borrowing[j], PROFILE_IN_RULE_TYPE_BORROW, need_update_guage);
+ shaper_stat_profile_metirc_refresh(ctx, rule->vsys_id, thread_id, rule->id, &rule->borrowing[j], PROFILE_IN_RULE_TYPE_BORROW, need_update_guage);
}
}
diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp
index a1a9b77..c292f91 100644
--- a/shaping/test/gtest_shaper.cpp
+++ b/shaping/test/gtest_shaper.cpp
@@ -341,7 +341,7 @@ TEST(single_session, tcp_tx_in_order)
/***********send stat data here********************/
stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
- shaper_stat_refresh(ctx->thread_ctx[0].stat, sf, ctx->thread_ctx[0].thread_index, 1);
+ shaper_stat_refresh(&ctx->thread_ctx[0], sf, ctx->thread_ctx[0].thread_index, 1);
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
sleep(2);//wait telegraf generate metric
@@ -1578,7 +1578,7 @@ TEST(statistics, udp_queueing_pkt)
/***********send stat data here********************/
stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
- shaper_stat_refresh(ctx->thread_ctx[0].stat, sf, ctx->thread_ctx[0].thread_index, 1);
+ shaper_stat_refresh(&ctx->thread_ctx[0], sf, ctx->thread_ctx[0].thread_index, 1);
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
shaper_global_stat_refresh(ctx->global_stat);
sleep(2);//wait telegraf generate metric