summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author刘畅 <[email protected]>2024-02-07 01:59:40 +0000
committer刘畅 <[email protected]>2024-02-07 01:59:40 +0000
commitbad1c91e472c30e348eb0eb8c0cdb504bc6f928e (patch)
tree95da51ac9b71f85c7591bc4155b3d69bbdd676cf
parent008d4b3906cc84e007f4519f901a288dd968a14e (diff)
parent1795b5592e6b397d83e96c54088470d158ebaac9 (diff)
Merge branch 'feature_aqm_blue' into 'rel'
TSG-19341: add Feature aqm blue and some performance optimize See merge request tango/shaping-engine!75
-rw-r--r--common/src/session_table.cpp8
-rw-r--r--shaping/CMakeLists.txt6
-rw-r--r--shaping/include/shaper.h64
-rw-r--r--shaping/include/shaper_aqm.h2
-rw-r--r--shaping/include/shaper_global_stat.h3
-rw-r--r--shaping/include/shaper_stat.h4
-rw-r--r--shaping/include/shaper_swarmkv.h3
-rw-r--r--shaping/src/main.cpp4
-rw-r--r--shaping/src/shaper.cpp312
-rw-r--r--shaping/src/shaper_aqm.cpp82
-rw-r--r--shaping/src/shaper_global_stat.cpp9
-rw-r--r--shaping/src/shaper_maat.cpp33
-rw-r--r--shaping/src/shaper_session.cpp9
-rw-r--r--shaping/src/shaper_stat.cpp101
-rw-r--r--shaping/src/shaper_swarmkv.cpp32
-rw-r--r--shaping/test/gtest_shaper.cpp455
-rw-r--r--shaping/test/stub.cpp2
17 files changed, 481 insertions, 648 deletions
diff --git a/common/src/session_table.cpp b/common/src/session_table.cpp
index b5a2278..e3dcf92 100644
--- a/common/src/session_table.cpp
+++ b/common/src/session_table.cpp
@@ -204,11 +204,11 @@ struct session_node *session_table_search_by_id(struct session_table *table, uin
{
struct session_node *temp = NULL;
HASH_FIND(hh1, table->root_by_id, &session_id, sizeof(session_id), temp);
- if (!temp)
- {
+ //if (!temp)
+ //{
//LOG_DEBUG("%s: search: key %lu not exists", LOG_TAG_STABLE, session_id);
- return NULL;
- }
+ // return NULL;
+ //}
//LOG_DEBUG("%s: search: key %lu success", LOG_TAG_STABLE, session_id);
diff --git a/shaping/CMakeLists.txt b/shaping/CMakeLists.txt
index cc82405..c85bbba 100644
--- a/shaping/CMakeLists.txt
+++ b/shaping/CMakeLists.txt
@@ -1,4 +1,4 @@
-add_library(shaper src/shaper_maat.cpp src/shaper_marsio.cpp src/shaper_session.cpp src/shaper_stat.cpp src/shaper_swarmkv.cpp src/shaper.cpp src/shaper_global_stat.cpp ${PROJECT_SOURCE_DIR}/deps/timeout/timeout.c)
+add_library(shaper src/shaper_maat.cpp src/shaper_marsio.cpp src/shaper_session.cpp src/shaper_stat.cpp src/shaper_swarmkv.cpp src/shaper.cpp src/shaper_global_stat.cpp src/shaper_aqm.cpp ${PROJECT_SOURCE_DIR}/deps/timeout/timeout.c)
target_link_libraries(shaper PUBLIC common)
target_link_libraries(shaper PUBLIC avl_tree)
target_link_libraries(shaper PUBLIC cjson)
@@ -17,4 +17,8 @@ target_link_libraries(shaping_engine PUBLIC jemalloc)
install(TARGETS shaping_engine RUNTIME DESTINATION bin COMPONENT Program)
+# 在安装时创建一个子目录log
+install(DIRECTORY DESTINATION ${CMAKE_INSTALL_PREFIX}/log)
+
+
add_subdirectory(test) \ No newline at end of file
diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h
index 6168191..83f93f1 100644
--- a/shaping/include/shaper.h
+++ b/shaping/include/shaper.h
@@ -42,6 +42,8 @@ struct shaping_system_conf {
unsigned int session_queue_len_max;
unsigned int priority_queue_len_max;
unsigned int pkt_max_delay_time_us;
+ int token_multiple_min;
+ int token_multiple_max;
int polling_node_num_max[SHAPING_PRIORITY_NUM_MAX];
int work_thread_num;
int cpu_affinity_enable;
@@ -59,8 +61,6 @@ struct shaping_thread_ctx {
struct shaping_stat *stat;
struct shaping_marsio_info *marsio_info;
struct swarmkv *swarmkv_db;//handle of swarmkv
- int swarmkv_aqm_prob;
- time_t swarmkv_aqm_update_time;
struct shaping_maat_info *maat_info;
struct session_table *session_table;
struct timeouts *expires;
@@ -97,12 +97,51 @@ enum shaping_profile_type {
PROFILE_TYPE_SPLIT_BY_LOCAL_HOST
};
+enum shaper_aqm_type {
+ AQM_TYPE_NONE = 0,
+ AQM_TYPE_BLUE,
+ AQM_TYPE_CODEL,
+ AQM_TYPE_MAX
+};
+
+struct shaper_aqm_blue_para {
+ time_t update_time;
+ int probability;
+};
+
+struct shaper_token_multiple {
+ int token_get_multiple;
+ unsigned char has_drop_by_queue_full;
+ unsigned char has_failed_get_token;
+ time_t token_multiple_update_time_s;
+};
+
+struct shaping_profile_hash_node {
+ int id;
+ enum shaper_aqm_type aqm_type;
+ long long in_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX];
+ long long out_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX];
+ long long last_failed_get_token_ms;
+ long long last_hmget_ms;
+ long long queue_len[SHAPING_PRIORITY_NUM_MAX];
+ long long local_queue_len[SHAPING_PRIORITY_NUM_MAX];
+ long long local_queue_len_update_time_us[SHAPING_PRIORITY_NUM_MAX];
+ long long priority_blocked_time_ms[SHAPING_PRIORITY_NUM_MAX];
+ int hmget_ref_cnt;
+ int tconsume_ref_cnt;
+ struct shaper_token_multiple token_multiple;
+ struct shaper_aqm_blue_para aqm_blue_para;
+ unsigned char is_invalid;
+ struct timeout timeout_handle;
+ UT_hash_handle hh;
+};
+
struct shaping_profile_info {
int id;//profile_id
enum shaping_profile_type type;
int priority;
- int in_deposit_token_bits;
- int out_deposit_token_bits;
+ long long in_deposit_token_bits;
+ long long out_deposit_token_bits;
long long last_failed_get_token_ms;
unsigned long long enqueue_time_us;//to calculate max latency
struct shaping_stat_for_profile stat;
@@ -114,7 +153,7 @@ struct shaping_rule_info {
int id;//rule_id
int fair_factor;
struct shaping_profile_info primary;
- struct shaping_profile_info borrowing[SHAPING_REF_PROFILE_NUM_MAX];
+ struct shaping_profile_info borrowing[SHAPING_REF_PROFILE_NUM_MAX - 1];
int borrowing_num;
int is_enabled;
};
@@ -125,6 +164,7 @@ struct shaping_packet_wrapper {
unsigned long long enqueue_time_us;//first enqueue time
unsigned int length;
int rule_anchor;
+ int aqm_processed_pf_ids[SHAPING_REF_PROFILE_NUM_MAX];
unsigned char direction;
TAILQ_ENTRY(shaping_packet_wrapper) node;
};
@@ -157,10 +197,8 @@ struct shaping_flow {
unsigned int flag;
struct metadata ctrl_meta;
unsigned long long processed_pkts;
- struct timespec stat_update_time;
+ unsigned long long stat_update_time_us;
time_t check_rule_time;
- struct timeout timeout_handle;
- time_t last_update_timeout_sec;
};
struct shaper_flow_instance {
@@ -173,13 +211,13 @@ struct shaping_tconsume_cb_arg {
struct shaping_profile_info *profile;
struct shaping_flow *sf;
unsigned char direction;
+ unsigned char is_primary_pf;
long long start_time_us;
};
struct shaping_hmget_cb_arg {
struct shaping_thread_ctx *ctx;
struct shaping_profile_hash_node *pf_hash_node;
- int priority;
long long start_time_us;
};
@@ -202,13 +240,9 @@ bool shaper_queue_empty(struct shaping_flow *sf);
void shaper_packet_dequeue(struct shaping_flow *sf);
struct shaping_packet_wrapper* shaper_first_pkt_get(struct shaping_flow *sf);
void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx);
-/*return value: 0 for success, -1 for failed*/
-int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time);
-/*return num of sf_ins*/
-void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf);
-int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num);
-//enum shaping_packet_action shaper_pkt_action_decide(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int priority, int sf_in_queue);
+int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num);
+void shaper_profile_hash_node_update(struct shaping_thread_ctx *ctx, struct shaping_profile_info *profile);
int shaper_global_conf_init(struct shaping_system_conf *conf);
diff --git a/shaping/include/shaper_aqm.h b/shaping/include/shaper_aqm.h
new file mode 100644
index 0000000..794c74d
--- /dev/null
+++ b/shaping/include/shaper_aqm.h
@@ -0,0 +1,2 @@
+
+int shaper_aqm_need_drop(struct shaping_profile_info *profile, struct shaping_packet_wrapper *pkt_wrapper); \ No newline at end of file
diff --git a/shaping/include/shaper_global_stat.h b/shaping/include/shaper_global_stat.h
index 940745e..4c100a7 100644
--- a/shaping/include/shaper_global_stat.h
+++ b/shaping/include/shaper_global_stat.h
@@ -38,6 +38,7 @@ enum shaping_global_stat_column_index {
HIT_POLICY_RX_BYTES_IDX,
HIT_POLICY_TX_PKTS_IDX,
HIT_POLICY_TX_BYTES_IDX,
+ HIT_POLICY_TX_SYN_ACK_PKTS_IDX,
HIT_POLICY_DROP_PKTS_IDX,
HIT_POLICY_DROP_BYTES_IDX,
@@ -49,6 +50,7 @@ struct shaping_global_stat_traffic_data {
long long rx_bytes;
long long tx_pkts;
long long tx_bytes;
+ long long tx_syn_ack_pkts;
long long drop_pkts;
long long drop_bytes;
};
@@ -127,6 +129,7 @@ void shaper_global_stat_throughput_tx_inc(struct shaping_global_stat_data *threa
void shaper_global_stat_hit_policy_throughput_rx_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len);
void shaper_global_stat_hit_policy_throughput_tx_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len);
+void shaper_global_stat_hit_policy_throughput_tx_syn_ack_inc(struct shaping_global_stat_data *thread_global_stat);
void shaper_global_stat_hit_policy_drop_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len);
void shaper_global_stat_refresh(struct shaping_ctx *ctx); \ No newline at end of file
diff --git a/shaping/include/shaper_stat.h b/shaping/include/shaper_stat.h
index d9ca979..5c720a3 100644
--- a/shaping/include/shaper_stat.h
+++ b/shaping/include/shaper_stat.h
@@ -37,6 +37,7 @@ struct shaping_stat_for_profile_dir {
struct shaping_stat_for_profile {
struct shaping_stat_for_profile_dir in;
struct shaping_stat_for_profile_dir out;
+ long long priority_queue_len;
};
struct shaping_stat {
@@ -57,4 +58,5 @@ void shaper_stat_forward_all_rule_inc(struct shaping_stat *stat, struct shaping_
void shaper_stat_drop_inc(struct shaping_stat_for_profile *profile_stat, unsigned char direction, int thread_id);
void shaper_stat_max_latency_update(struct shaping_stat_for_profile *profile_stat, unsigned char direction, unsigned long long latency, int thread_id);
-void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int thread_id, int force); \ No newline at end of file
+void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int force);
+void shaper_stat_priority_queue_len_refresh_all(struct shaping_thread_ctx *ctx, struct shaping_profile_hash_node *profile_hash_node); \ No newline at end of file
diff --git a/shaping/include/shaper_swarmkv.h b/shaping/include/shaper_swarmkv.h
index 963fff0..e533802 100644
--- a/shaping/include/shaper_swarmkv.h
+++ b/shaping/include/shaper_swarmkv.h
@@ -2,5 +2,4 @@
struct swarmkv* shaper_swarmkv_init(int caller_thread_num);
void shaper_swarmkv_destroy(struct swarmkv* swarmkv_db);
-void swarmkv_reload_log_level();
-int shaper_swarmkv_pending_queue_aqm_drop(struct shaping_thread_ctx *ctx); \ No newline at end of file
+void swarmkv_reload_log_level(); \ No newline at end of file
diff --git a/shaping/src/main.cpp b/shaping/src/main.cpp
index d31f768..54e2f60 100644
--- a/shaping/src/main.cpp
+++ b/shaping/src/main.cpp
@@ -58,7 +58,7 @@ static void sig_handler(int signo)
swarmkv_reload_log_level();
}
- if (signo == SIGQUIT) {
+ if (signo == SIGQUIT || signo == SIGTERM) {
quit = 1;
}
@@ -82,7 +82,7 @@ int main(int argc, char **argv)
return -1;
}
- if (signal(SIGQUIT, sig_handler) == SIG_ERR)
+ if (signal(SIGQUIT, sig_handler) == SIG_ERR || signal(SIGTERM, sig_handler) == SIG_ERR)
{
LOG_ERROR("%s: unable to register SIGQUIT signal handler, error %d: %s", LOG_TAG_SHAPING, errno, strerror(errno));
LOG_CLOSE();
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index 6457025..32b41cb 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -22,25 +22,15 @@ extern "C" {
#include "shaper_swarmkv.h"
#include "shaper_maat.h"
#include "shaper_global_stat.h"
+#include "shaper_aqm.h"
-#define TOKEN_ENLARGE_TIMES 10
+#define TOKEN_MULTIPLE_UPDATE_INTERVAL_S 1
+#define TOKEN_MULTIPLE_DEFAULT 10
#define TOKEN_GET_FAILED_INTERVAL_MS 1
#define HMGET_REQUEST_INTERVAL_MS 10
#define PRIORITY_BLOCK_MIN_TIME_MS 500
-#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 "HMGET tsg-shaping-%d priority-0"
-#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 " priority-1"
-#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 " priority-2"
-#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_4 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3 " priority-3"
-#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_5 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_4 " priority-4"
-#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_6 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_5 " priority-5"
-#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_7 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_6 " priority-6"
-#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_8 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_7 " priority-7"
-#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_9 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_8 " priority-8"
-
-const char *swarmkv_queue_len_get_cmd[] = {"", SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3,
- SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_4, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_5, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_6,
- SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_7, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_8, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_9};
+#define SWARMKV_QUEUE_LEN_GET_CMD "HMGET tsg-shaping-%d priority-0 priority-1 priority-2 priority-3 priority-4 priority-5 priority-6 priority-7 priority-8 priority-9"
struct shaper {//trees in one thread
struct avl_tree *priority_trees[SHAPING_PRIORITY_NUM_MAX];//represent 10 avl tree corresponding to 10 priority
@@ -62,17 +52,6 @@ enum shaper_token_get_result {
SHAPER_TOKEN_GET_PASS = 1,//don't need to get token, regard as success
};
-struct shaping_profile_hash_node {
- int id;
- int in_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX];
- int out_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX];
- long long last_failed_get_token_ms;
- long long last_hmget_ms[SHAPING_PRIORITY_NUM_MAX];
- long long priority_blocked_time_ms[SHAPING_PRIORITY_NUM_MAX];
- unsigned char is_invalid;
- UT_hash_handle hh;
-};
-
thread_local struct shaping_profile_hash_node *thread_sp_hashtbl = NULL;
struct shaper* shaper_new(unsigned int priority_queue_len_max)
@@ -160,8 +139,6 @@ struct shaping_flow* shaping_flow_new(struct shaping_thread_ctx *ctx)
TAILQ_INIT(&s_node->shaping_flow.packet_queue);
s_node->shaping_flow.priority = SHAPING_PRIORITY_NUM_MAX - 1;
- timeout_init(&s_node->shaping_flow.timeout_handle, TIMEOUT_ABS);
- timeouts_add(ctx->expires, &s_node->shaping_flow.timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC);
s_node->shaping_flow.ref_cnt = 1;
@@ -181,8 +158,7 @@ void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
struct shaping_node *s_node = (struct shaping_node*)sf;
- timeouts_del(ctx->expires, &sf->timeout_handle);
- shaper_stat_refresh(ctx, sf, ctx->thread_index, 1);
+ shaper_stat_refresh(ctx, sf, 1);
shaping_node_free(s_node);
return;
@@ -199,10 +175,9 @@ void shaper_thread_resource_clear()
}
}
-static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, void *pkt_buff, struct metadata *meta)
+static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, void *pkt_buff, struct metadata *meta, struct timespec *curr_time)
{
struct shaping_packet_wrapper *s_pkt = NULL;
- struct timespec curr_time;
if (sf->queue_len == ctx->conf.session_queue_len_max) {
return -1;
@@ -213,14 +188,12 @@ static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_
return -1;
}
- clock_gettime(CLOCK_MONOTONIC, &curr_time);
-
s_pkt->pkt_buff = pkt_buff;
s_pkt->direction = meta->dir;
s_pkt->length = meta->raw_len;
s_pkt->rule_anchor = sf->anchor;
- s_pkt->income_time_ns = curr_time.tv_sec * NANO_SECONDS_PER_SEC + curr_time.tv_nsec;
- s_pkt->enqueue_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
+ s_pkt->income_time_ns = curr_time->tv_sec * NANO_SECONDS_PER_SEC + curr_time->tv_nsec;
+ s_pkt->enqueue_time_us = curr_time->tv_sec * MICRO_SECONDS_PER_SEC + curr_time->tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
TAILQ_INSERT_TAIL(&sf->packet_queue, s_pkt, node);
sf->queue_len++;
@@ -273,14 +246,13 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx)
}
//return success(0) while any avl tree insert success
-int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time)
+static int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time)
{
struct shaping_node *s_node = (struct shaping_node*)sf;
struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor];
struct shaper *sp = ctx->sp;
struct shaping_packet_wrapper *pkt_wrapper = NULL;
int priority;
- int ret = -1;
int i;
pkt_wrapper = shaper_first_pkt_get(sf);
@@ -288,8 +260,8 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un
priority = s_rule_info->primary.priority;
avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns);
- if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {
- ret = 0;
+ if (0 != avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {//primary profile failed means flow push failed, ignore borrow profile
+ return -1;
}
if (s_rule_info->borrowing_num == 0) {// no borrow profile
@@ -300,18 +272,15 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un
priority = s_rule_info->borrowing[i].priority;
avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns);
if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {
- ret = 0;
shaper_stat_queueing_pkt_inc(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index);
}
}
END:
- if (ret == 0) {//all avl tree success
- s_rule_info->primary.enqueue_time_us = enqueue_time;
- shaper_stat_queueing_pkt_inc(&s_rule_info->primary.stat, pkt_wrapper->direction, ctx->thread_index);
- }
+ s_rule_info->primary.enqueue_time_us = enqueue_time;
+ shaper_stat_queueing_pkt_inc(&s_rule_info->primary.stat, pkt_wrapper->direction, ctx->thread_index);
- return ret;
+ return 0;
}
static unsigned long long shaper_pkt_latency_us_calculate(struct shaping_profile_info *profile, struct timespec *time)
@@ -323,13 +292,12 @@ static unsigned long long shaper_pkt_latency_us_calculate(struct shaping_profile
return (curr_time - enqueue_time);
}
-void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
+static void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct timespec *curr_time)
{
struct shaping_node *s_node = (struct shaping_node*)sf;
struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor];
struct shaper *sp = ctx->sp;
struct shaping_packet_wrapper *pkt_wrapper = NULL;
- struct timespec curr_time;
unsigned long long latency;
int priority;
int i;
@@ -337,8 +305,6 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
pkt_wrapper = shaper_first_pkt_get(sf);
assert(pkt_wrapper != NULL);
- clock_gettime(CLOCK_MONOTONIC, &curr_time);
-
priority = s_rule_info->primary.priority;
if (avl_node_in_tree(s_node->avl_node[priority])) {
avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]);
@@ -357,13 +323,34 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
}
END:
- latency = shaper_pkt_latency_us_calculate(&s_rule_info->primary, &curr_time);
+ latency = shaper_pkt_latency_us_calculate(&s_rule_info->primary, curr_time);
shaper_stat_max_latency_update(&s_rule_info->primary.stat, pkt_wrapper->direction, latency, ctx->thread_index);
shaper_stat_queueing_pkt_dec(&s_rule_info->primary.stat, pkt_wrapper->direction, ctx->thread_index);
return;
}
+static void shaper_flow_specific_borrow_priority_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int priority)
+{
+ struct shaping_node *s_node = (struct shaping_node*)sf;
+ struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor];
+ struct shaper *sp = ctx->sp;
+ struct shaping_packet_wrapper *pkt_wrapper = NULL;
+
+ pkt_wrapper = shaper_first_pkt_get(sf);
+ assert(pkt_wrapper != NULL);
+
+ for (int i = 0; i < s_rule_info->borrowing_num; i++) {
+ priority = s_rule_info->borrowing[i].priority;
+ if (avl_node_in_tree(s_node->avl_node[priority])) {
+ avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]);
+ shaper_stat_queueing_pkt_dec(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index);
+ }
+ }
+
+ return;
+}
+
int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num)
{
struct avl_node *avl_node = NULL;
@@ -391,7 +378,7 @@ int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_i
static void shaper_deposit_token_add(struct shaping_profile_info *profile, int req_token_bits, unsigned char direction, int priority)
{
- int *deposit_token;
+ long long *deposit_token;
struct shaping_profile_hash_node *pf_hash_node = profile->hash_node;
switch (profile->type) {
@@ -419,6 +406,42 @@ static void shaper_deposit_token_add(struct shaping_profile_info *profile, int r
*deposit_token += req_token_bits;
}
+static void shaper_token_multiple_update(struct shaping_thread_ctx *ctx, struct shaping_profile_info *profile)
+{
+ if (profile->type != PROFILE_TYPE_GENERIC) {
+ return;
+ }
+
+ struct shaper_token_multiple *token_multiple = &profile->hash_node->token_multiple;
+ int curr_multiple = token_multiple->token_get_multiple;
+ time_t curr_time_s = time(NULL);
+ int token_multiple_min = ctx->conf.token_multiple_min;
+ int token_multiple_max = ctx->conf.token_multiple_max;
+
+ if (curr_time_s - token_multiple->token_multiple_update_time_s < TOKEN_MULTIPLE_UPDATE_INTERVAL_S) {
+ return;
+ }
+
+ token_multiple->token_multiple_update_time_s = curr_time_s;
+
+ if (token_multiple->has_failed_get_token) {
+ token_multiple->token_get_multiple = (curr_multiple - 1) < token_multiple_min ? token_multiple_min : (curr_multiple - 1);
+ goto END;
+ }
+
+ if (token_multiple->has_drop_by_queue_full) {
+ token_multiple->token_get_multiple = (curr_multiple + 1) > token_multiple_max ? token_multiple_max : (curr_multiple + 1);
+ goto END;
+ }
+
+END:
+ LOG_INFO("%s: profile id %d, token_get_multiple %d, has_failed_get_token %d, has_drop_by_queue_full %d", LOG_TAG_SHAPING, profile->id, token_multiple->token_get_multiple, token_multiple->has_failed_get_token, token_multiple->has_drop_by_queue_full);
+ token_multiple->has_failed_get_token = 0;
+ token_multiple->has_drop_by_queue_full = 0;
+
+ return;
+}
+
static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg)
{
struct shaping_tconsume_cb_arg *arg = (struct shaping_tconsume_cb_arg*)cb_arg;
@@ -436,7 +459,7 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg
shaper_global_stat_async_callback_inc(&ctx->thread_global_stat);
shaper_global_stat_tconsume_callback_inc(&ctx->thread_global_stat);
- LOG_INFO("Swarmkv reply type =%d, profile_id %d, direction =%d, integer =%llu",reply->type, profile->id, arg->direction, reply->integer);
+ LOG_DEBUG("Swarmkv reply type =%d, profile_id %d, direction =%d, integer =%llu",reply->type, profile->id, arg->direction, reply->integer);
if (reply->type != SWARMKV_REPLY_INTEGER) {
shaper_global_stat_async_tconsume_failed_inc(&ctx->thread_global_stat);
@@ -451,16 +474,26 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg
}
if (reply->integer > 0) {
- sf->flag &= (~SESSION_BORROW);
shaper_deposit_token_add(profile, reply->integer, arg->direction, profile->priority);//deposit tokens to profile
- } else {
- sf->flag |= SESSION_BORROW;
+ }
+
+ if (arg->is_primary_pf) {
+ if (reply->integer > 0) {
+ sf->flag &= (~SESSION_BORROW);
+ } else {
+ sf->flag |= SESSION_BORROW;
+ }
+ }
+
+ if (reply->integer == 0 && profile->type == PROFILE_TYPE_GENERIC) {
+ pf_hash_node->token_multiple.has_failed_get_token = 1;
+ shaper_token_multiple_update(ctx, profile);
}
END:
+ pf_hash_node->tconsume_ref_cnt--;
+
if (reply->type != SWARMKV_REPLY_INTEGER || reply->integer == 0) {
- struct timespec curr_time;
- clock_gettime(CLOCK_MONOTONIC, &curr_time);
switch (profile->type) {
case PROFILE_TYPE_GENERIC:
pf_hash_node->last_failed_get_token_ms = curr_time.tv_sec * MILLI_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MILLI_SEC;
@@ -483,7 +516,7 @@ END:
static void shaper_deposit_token_sub(struct shaping_profile_info *profile, int req_token_bits, unsigned char direction, int priority)
{
- int *deposit_token;
+ long long *deposit_token;
struct shaping_profile_hash_node *pf_hash_node = profile->hash_node;
switch (profile->type) {
@@ -550,31 +583,41 @@ static int shaper_deposit_token_is_enough(struct shaping_profile_info *profile,
static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *pf_info, int profile_type, int req_token_bits, unsigned char direction, struct timespec *curr_timespec)
{
struct shaping_tconsume_cb_arg *arg = NULL;
+ struct shaping_profile_hash_node *pf_hash_node = pf_info->hash_node;
char key[32] = {0};
+ if (pf_hash_node->tconsume_ref_cnt > 0) {
+ return SHAPER_TOKEN_GET_FAILED;
+ }
+
snprintf(key, sizeof(key), "tsg-shaping-%d-%s", pf_info->id, direction == SHAPING_DIR_OUT ? "outgoing" : "incoming");
+
arg = (struct shaping_tconsume_cb_arg *)calloc(1, sizeof(struct shaping_tconsume_cb_arg));
arg->ctx = ctx;
arg->profile = pf_info;
arg->sf = sf;
arg->direction = direction;
arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
+ if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {
+ arg->is_primary_pf = 1;
+ }
shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat);
sheper_global_stat_tconsume_invoke_inc(&ctx->thread_global_stat);
sf->ref_cnt++;
+ pf_hash_node->tconsume_ref_cnt++;
switch (pf_info->type) {
case PROFILE_TYPE_GENERIC:
- swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg);
+ swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits * pf_hash_node->token_multiple.token_get_multiple, shaper_token_get_cb, arg);
break;
case PROFILE_TYPE_HOST_FARINESS:
case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS:
- swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg);
+ swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits * TOKEN_MULTIPLE_DEFAULT, shaper_token_get_cb, arg);
break;
case PROFILE_TYPE_SPLIT_BY_LOCAL_HOST:
- swarmkv_btconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg);
+ swarmkv_btconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, req_token_bits * TOKEN_MULTIPLE_DEFAULT, shaper_token_get_cb, arg);
break;
default:
if (arg) {
@@ -583,21 +626,6 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
break;
}
- swarmkv_caller_loop(ctx->swarmkv_db, SWARMKV_LOOP_NONBLOCK, NULL);
-
- if (pf_info->hash_node->is_invalid) {
- if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {//for primary, means this rule don't need get token
- return SHAPER_TOKEN_GET_SUCCESS;
- } else {//for borrowing, means this profile has no token to borrow
- return SHAPER_TOKEN_GET_FAILED;
- }
- }
-
- if (shaper_deposit_token_is_enough(pf_info, req_token_bits, direction, pf_info->priority)) {
- shaper_deposit_token_sub(pf_info, req_token_bits, direction, pf_info->priority);
- return SHAPER_TOKEN_GET_SUCCESS;
- }
-
return SHAPER_TOKEN_GET_FAILED;
}
@@ -606,7 +634,6 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb
struct shaping_hmget_cb_arg *arg = (struct shaping_hmget_cb_arg *)cb_arg;
struct shaping_thread_ctx *ctx = arg->ctx;
struct shaping_profile_hash_node *pf_hash_node = arg->pf_hash_node;
- int priority = arg->priority;
struct timespec curr_time;
long long curr_time_us;
long long curr_time_ms;
@@ -632,15 +659,15 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb
if (reply->elements[i]->type == SWARMKV_REPLY_STRING) {
char tmp_str[32] = {0};
memcpy(tmp_str, reply->elements[i]->str, reply->elements[i]->len);
- if (strtoll(tmp_str, NULL, 10) > 0) {
- pf_hash_node->priority_blocked_time_ms[priority] = curr_time_ms;
- break;
- }
+ pf_hash_node->queue_len[i] = strtoll(tmp_str, NULL, 10);
+ } else {
+ pf_hash_node->queue_len[i] = 0;
}
}
END:
- pf_hash_node->last_hmget_ms[priority] = curr_time_ms;
+ pf_hash_node->last_hmget_ms = curr_time_ms;
+ pf_hash_node->hmget_ref_cnt--;
free(cb_arg);
cb_arg = NULL;
@@ -659,19 +686,31 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st
return 0;
}
- if (curr_time_ms - profile->hash_node->last_hmget_ms[priority] < HMGET_REQUEST_INTERVAL_MS) {//don't send hmget command in 10 ms
+ if (profile->hash_node->hmget_ref_cnt > 0) {//if hmget command is pending, don't send hmget command again
+ goto END;
+ }
+
+ if (curr_time_ms - profile->hash_node->last_hmget_ms < HMGET_REQUEST_INTERVAL_MS) {//don't send hmget command in 10 ms
goto END;
}
arg = (struct shaping_hmget_cb_arg *)calloc(1, sizeof(struct shaping_hmget_cb_arg));
arg->ctx = ctx;
arg->pf_hash_node = profile->hash_node;
- arg->priority = priority;
arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
+ profile->hash_node->hmget_ref_cnt++;
+
shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat);
shaper_global_stat_hmget_invoke_inc(&ctx->thread_global_stat);
- swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, swarmkv_queue_len_get_cmd[priority], profile->id);
+ swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, SWARMKV_QUEUE_LEN_GET_CMD, profile->id);
+
+ for (int i = 0; i < priority; i++) {
+ if (profile->hash_node->queue_len[i] > 0) {
+ profile->hash_node->priority_blocked_time_ms[priority] = curr_time_ms;
+ goto END;
+ }
+ }
END:
if (curr_time_ms - profile->hash_node->priority_blocked_time_ms[priority] < PRIORITY_BLOCK_MIN_TIME_MS) {
@@ -682,7 +721,7 @@ END:
}
}
-static void shaper_profile_hash_node_update(struct shaping_profile_info *profile)
+void shaper_profile_hash_node_update(struct shaping_thread_ctx *ctx, struct shaping_profile_info *profile)
{
if (profile->hash_node == NULL) {
struct shaping_profile_hash_node *hash_node = NULL;
@@ -692,7 +731,10 @@ static void shaper_profile_hash_node_update(struct shaping_profile_info *profile
} else {
profile->hash_node = (struct shaping_profile_hash_node*)calloc(1, sizeof(struct shaping_profile_hash_node));
profile->hash_node->id = profile->id;
+ profile->hash_node->token_multiple.token_get_multiple = TOKEN_MULTIPLE_DEFAULT;
HASH_ADD_INT(thread_sp_hashtbl, id, profile->hash_node);
+ timeout_init(&profile->hash_node->timeout_handle, TIMEOUT_ABS);
+ timeouts_add(ctx->expires, &profile->hash_node->timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC);
}
}
@@ -701,7 +743,7 @@ static void shaper_profile_hash_node_update(struct shaping_profile_info *profile
static int shaping_swarmkv_is_too_short_interval(long long curr_time_ms, struct shaping_profile_info *profile)
{
- long long last_failed_ms;
+ long long last_failed_ms = 0;
switch (profile->type) {
case PROFILE_TYPE_GENERIC:
@@ -721,10 +763,9 @@ static int shaping_swarmkv_is_too_short_interval(long long curr_time_ms, struct
}
}
-static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int req_token_bytes,
- struct shaping_profile_info *profile, int profile_type, unsigned char direction)
+static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, int profile_type, int req_token_bytes, unsigned char direction, struct timespec *curr_timespec)
{
- if (profile_type == PROFILE_IN_RULE_TYPE_BORROW && !(sf->flag & SESSION_BORROW)) {
+ if (profile_type == PROFILE_IN_RULE_TYPE_BORROW && !(sf->flag & SESSION_BORROW)) {//TODO: 会减慢swarmkv请求速度
return SHAPER_TOKEN_GET_FAILED;
}
@@ -745,31 +786,22 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f
return SHAPER_TOKEN_GET_PASS;//rule is disabled, don't need to get token and forward packet
}
- shaper_profile_hash_node_update(profile);
-
if (shaper_deposit_token_is_enough(profile, req_token_bytes * 8, direction, profile->priority)) {
shaper_deposit_token_sub(profile, req_token_bytes * 8, direction, profile->priority);
return SHAPER_TOKEN_GET_SUCCESS;
}
- struct timespec curr_timespec;
- clock_gettime(CLOCK_MONOTONIC, &curr_timespec);
- long long curr_time_ms = curr_timespec.tv_sec * MILLI_SECONDS_PER_SEC + curr_timespec.tv_nsec / NANO_SECONDS_PER_MILLI_SEC;
+ long long curr_time_ms = curr_timespec->tv_sec * MILLI_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MILLI_SEC;
if (shaping_swarmkv_is_too_short_interval(curr_time_ms, profile)) {
return SHAPER_TOKEN_GET_FAILED;
}
- if (shaper_swarmkv_pending_queue_aqm_drop(ctx) == 1) {
- profile->hash_node->last_failed_get_token_ms = curr_time_ms;
- return SHAPER_TOKEN_GET_FAILED;
- }
-
- if (shaper_profile_is_priority_blocked(ctx, sf, profile, &curr_timespec, curr_time_ms)) {
+ if (shaper_profile_is_priority_blocked(ctx, sf, profile, curr_timespec, curr_time_ms)) {
return SHAPER_TOKEN_GET_FAILED;
} else {
int req_token_bits = req_token_bytes * 8;
- return shaper_token_get_from_profile(ctx, sf, profile, profile_type, req_token_bits, direction, &curr_timespec);
+ return shaper_token_get_from_profile(ctx, sf, profile, profile_type, req_token_bits, direction, curr_timespec);
}
}
@@ -841,19 +873,31 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi
pkt_wrapper = shaper_first_pkt_get(sf);
assert(pkt_wrapper != NULL);
+ clock_gettime(CLOCK_MONOTONIC, &curr_time);
+
if (pf_container[0].pf_type == PROFILE_IN_RULE_TYPE_PRIMARY) {
- clock_gettime(CLOCK_MONOTONIC, &curr_time);
if (shaper_pkt_latency_us_calculate(pf_container[0].pf_info, &curr_time) > ctx->conf.pkt_max_delay_time_us) {
- shaper_flow_pop(ctx, sf);
+ shaper_flow_pop(ctx, sf, &curr_time);
goto DROP;
}
}
- /*todo: AQM, just for primary profile*/
for (int i = 0; i < profile_num; i++) {
profile = pf_container[i].pf_info;
profile_type = pf_container[i].pf_type;
- int ret = shaper_token_consume(ctx, sf, pkt_wrapper->length, profile, profile_type, pkt_wrapper->direction);
+
+ /*AQM process, if aqm not pass, for primary profile drop packet, for borrow profile just don't give token to this packet*/
+ if (shaper_aqm_need_drop(profile, pkt_wrapper)) {
+ if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {
+ shaper_flow_pop(ctx, sf, &curr_time);
+ goto DROP;
+ } else {
+ shaper_flow_specific_borrow_priority_pop(ctx, sf, priority);
+ continue;
+ }
+ }
+
+ int ret = shaper_token_consume(ctx, sf, profile, profile_type, pkt_wrapper->length, pkt_wrapper->direction, &curr_time);
if (ret >= SHAPER_TOKEN_GET_SUCCESS) {
if (ret == SHAPER_TOKEN_GET_SUCCESS) {
shaper_stat_forward_inc(&profile->stat, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index);
@@ -867,14 +911,13 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi
return SHAPING_QUEUED;
}
- shaper_flow_pop(ctx, sf);
+ shaper_flow_pop(ctx, sf, &curr_time);
sf->anchor = shaper_next_anchor_get(sf, pkt_wrapper->direction);
if (sf->anchor == 0) {//no next rule
return SHAPING_FORWARD;
}
//push sf for next rule
- clock_gettime(CLOCK_MONOTONIC, &curr_time);
enqueue_time = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
if (0 == shaper_flow_push(ctx, sf, enqueue_time)) {
return SHAPING_QUEUED;
@@ -896,7 +939,8 @@ static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shapi
unsigned long long enqueue_time;
int enqueue_success = 0;
- int ret = shaper_token_consume(ctx, sf, meta->raw_len, profile, profile_type, meta->dir);
+ clock_gettime(CLOCK_MONOTONIC, &curr_time);
+ int ret = shaper_token_consume(ctx, sf, profile, profile_type, meta->raw_len, meta->dir, &curr_time);
if (ret >= SHAPER_TOKEN_GET_SUCCESS) {
if (ret == SHAPER_TOKEN_GET_SUCCESS) {
shaper_stat_forward_inc(&profile->stat, meta->dir, meta->raw_len, ctx->thread_index);
@@ -905,13 +949,11 @@ static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shapi
sf->anchor = shaper_next_anchor_get(sf, meta->dir);
if (sf->anchor == 0) {//no next rule
return SHAPING_FORWARD;
- } else {
- goto FLOW_PUSH;
}
}
-FLOW_PUSH:
- if (shaper_packet_enqueue(ctx, sf, rx_buff, meta) == 0) {
+ //get token failed, or have multiple rules, enqueue packet and push sf
+ if (shaper_packet_enqueue(ctx, sf, rx_buff, meta, &curr_time) == 0) {
enqueue_success = 1;
} else {
char *addr_str = addr_tuple4_to_str(&sf->tuple4);
@@ -922,7 +964,6 @@ FLOW_PUSH:
goto DROP;
}
- clock_gettime(CLOCK_MONOTONIC, &curr_time);
enqueue_time = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
if (0 == shaper_flow_push(ctx, sf, enqueue_time)) {
return SHAPING_QUEUED;
@@ -934,8 +975,12 @@ DROP:
if (enqueue_success) {
shaper_packet_dequeue(sf);
}
- shaper_stat_drop_inc(&sf->matched_rule_infos[sf->anchor].primary.stat, meta->dir, ctx->thread_index);
+ struct shaping_profile_info *pf_info = &sf->matched_rule_infos[sf->anchor].primary;
+ shaper_stat_drop_inc(&pf_info->stat, meta->dir, ctx->thread_index);
sf->anchor = 0;
+
+ pf_info->hash_node->token_multiple.has_drop_by_queue_full = 1;
+ shaper_token_multiple_update(ctx, pf_info);
return SHAPING_DROP;
}
@@ -980,7 +1025,7 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
break;
}
- shaper_stat_refresh(ctx, sf, ctx->thread_index, 0);
+ shaper_stat_refresh(ctx, sf, 0);
if (shaper_queue_empty(sf)) {
if (sf->flag & SESSION_CLOSE) {
@@ -1010,7 +1055,6 @@ static void shaper_token_consume_force(struct shaping_flow *sf, struct metadata
for (int i = 0; i < sf->rule_num; i++) {
rule = &sf->matched_rule_infos[i];
- shaper_profile_hash_node_update(&rule->primary);
shaper_deposit_token_sub(&rule->primary, meta->raw_len * 8, meta->dir, rule->primary.priority);
}
@@ -1031,17 +1075,25 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1);
shaper_global_stat_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len);
shaper_global_stat_hit_policy_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len);
+ shaper_global_stat_hit_policy_throughput_tx_syn_ack_inc(&ctx->thread_global_stat);
shaper_stat_forward_all_rule_inc(stat, sf, meta->dir, meta->raw_len, ctx->thread_index);
goto END;//for tcp pure control pkt, transmit it directly
}
if (!shaper_queue_empty(sf)) {//already have queueing pkt, enqueue directly
+ struct timespec curr_time;
+ clock_gettime(CLOCK_MONOTONIC, &curr_time);
+
s_rule = &sf->matched_rule_infos[sf->anchor];
- if (0 == shaper_packet_enqueue(ctx, sf, rx_buff, meta)) {
+ if (0 == shaper_packet_enqueue(ctx, sf, rx_buff, meta, &curr_time)) {
shaper_stat_queueing_pkt_inc_for_rule(s_rule, meta->dir, ctx->thread_index);
shaper_global_stat_queueing_inc(&ctx->thread_global_stat, meta->raw_len);
} else {
- shaper_stat_drop_inc(&s_rule->primary.stat, meta->dir, ctx->thread_index);
+ struct shaping_profile_info *pf_info = &s_rule->primary;
+ pf_info->hash_node->token_multiple.has_drop_by_queue_full = 1;
+ shaper_token_multiple_update(ctx, pf_info);
+
+ shaper_stat_drop_inc(&pf_info->stat, meta->dir, ctx->thread_index);
shaper_global_stat_drop_inc(&ctx->thread_global_stat, meta->raw_len);
shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, meta->raw_len);
marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index);
@@ -1072,12 +1124,7 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
}
END:
- shaper_stat_refresh(ctx, sf, ctx->thread_index, 0);
- time_t curr_time = time(NULL);
- if (curr_time > sf->last_update_timeout_sec) {
- timeouts_add(ctx->expires, &sf->timeout_handle, curr_time + SHAPING_STAT_REFRESH_INTERVAL_SEC);//timeouts_add will delete if sf exist, then add
- sf->last_update_timeout_sec = curr_time;
- }
+ shaper_stat_refresh(ctx, sf, 0);
if(sf->flag & SESSION_CLOSE) {
if (shaper_queue_empty(sf)) {
@@ -1100,7 +1147,7 @@ void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_
{
swarmkv_caller_loop(ctx->swarmkv_db, SWARMKV_LOOP_NONBLOCK, NULL);
- struct shaping_flow *sf = NULL;
+ struct shaping_profile_hash_node *hash_node = NULL;
time_t curr_time = time(NULL);
int cnt = 0;
@@ -1116,9 +1163,9 @@ void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_
break;
}
- sf = container_of(t, struct shaping_flow, timeout_handle);
- shaper_stat_refresh(ctx, sf, ctx->thread_index, 0);
- timeouts_add(ctx->expires, &sf->timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC);//timeouts_get will delete sf from queue, add it back
+ hash_node = container_of(t, struct shaping_profile_hash_node, timeout_handle);
+ shaper_stat_priority_queue_len_refresh_all(ctx, hash_node);
+ timeouts_add(ctx->expires, &hash_node->timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC);//timeouts_get will delete item from queue, add it back
cnt++;
}
@@ -1228,7 +1275,6 @@ void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx)
return;
}
- prefetch(rx_buff[0]);
for (int i = 0; i < rx_num; i++) {
if (marsio_buff_is_ctrlbuf(rx_buff[i])) {
sf = shaper_ctrl_pkt_session_handle(ctx, rx_buff[i], &meta);
@@ -1245,8 +1291,6 @@ void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx)
}
polling_entry(ctx->sp, ctx->stat, ctx);
-
- prefetch(rx_buff[i+1]);
}
return;
@@ -1352,6 +1396,8 @@ int shaper_global_conf_init(struct shaping_system_conf *conf)
MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "CHECK_RULE_ENABLE_INTERVAL_SEC", &conf->check_rule_enable_interval_sec, 120);
MESA_load_profile_uint_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "PKT_MAX_DELAY_TIME_US", &conf->pkt_max_delay_time_us, 2000000);
+ MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "TOKEN_MULTIPLE_MIN", &conf->token_multiple_min, 10);
+ MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "TOKEN_MULTIPLE_MAX", &conf->token_multiple_max, 50);
return 0;
@@ -1451,4 +1497,4 @@ struct shaping_ctx *shaping_engine_init()
ERROR:
shaping_engine_destroy(ctx);
return NULL;
-} \ No newline at end of file
+}
diff --git a/shaping/src/shaper_aqm.cpp b/shaping/src/shaper_aqm.cpp
new file mode 100644
index 0000000..20c4190
--- /dev/null
+++ b/shaping/src/shaper_aqm.cpp
@@ -0,0 +1,82 @@
+#include <time.h>
+#include "shaper.h"
+#include "shaper_aqm.h"
+
+#define PROBABILITY_MAX 100
+#define INCREMENT 10
+#define DECREMENT 1
+#define FREEZE_TIME 2 //unit:s
+#define QUEUE_LEN_MAX 100
+static int shaper_aqm_blue_need_drop(struct shaping_packet_wrapper *pkt_wrapper, struct shaper_aqm_blue_para *para, int curr_queue_len)
+{
+ time_t curr_time;
+
+ if (time(&curr_time) - para->update_time >= FREEZE_TIME) {
+ para->update_time = curr_time;
+ if (curr_queue_len >= QUEUE_LEN_MAX) {
+ para->probability = (para->probability + INCREMENT) > PROBABILITY_MAX ? PROBABILITY_MAX : (para->probability + INCREMENT);
+ } else if (curr_queue_len == 0) {
+ para->probability = (para->probability - DECREMENT) >= 0 ? (para->probability - DECREMENT) : 0;
+ }
+ }
+
+ if (rand() % PROBABILITY_MAX < para->probability) {
+ return 1;
+ }
+
+ return 0;
+}
+
+static int shaper_aqm_have_processed(struct shaping_packet_wrapper *pkt_wrapper, int profile_id)
+{
+ int i = 0;
+
+ for (i = 0; i < SHAPING_REF_PROFILE_NUM_MAX; i++) {
+ if (pkt_wrapper->aqm_processed_pf_ids[i] == profile_id) {
+ return 1;
+ } else if (pkt_wrapper->aqm_processed_pf_ids[i] == 0) {
+ break;
+ }
+ }
+
+ return 0;
+}
+
+static void shaper_aqm_mark_processed(struct shaping_packet_wrapper *pkt_wrapper, int profile_id)
+{
+ int i = 0;
+
+ for (i = 0; i < SHAPING_REF_PROFILE_NUM_MAX; i++) {
+ if (pkt_wrapper->aqm_processed_pf_ids[i] == 0) {
+ pkt_wrapper->aqm_processed_pf_ids[i] = profile_id;
+ break;
+ }
+ }
+}
+
+int shaper_aqm_need_drop(struct shaping_profile_info *profile, struct shaping_packet_wrapper *pkt_wrapper)
+{
+ int ret = 0;
+
+ if (profile->hash_node->aqm_type == AQM_TYPE_NONE) {
+ return 0;
+ }
+
+ if (shaper_aqm_have_processed(pkt_wrapper, profile->id)) {
+ return 0;
+ }
+
+ switch (profile->hash_node->aqm_type) {
+ case AQM_TYPE_BLUE:
+ ret = shaper_aqm_blue_need_drop(pkt_wrapper, &profile->hash_node->aqm_blue_para, profile->hash_node->queue_len[profile->priority]);
+ break;
+ case AQM_TYPE_CODEL:
+ break;
+ default:
+ break;
+ }
+
+ shaper_aqm_mark_processed(pkt_wrapper, profile->id);
+
+ return ret;
+} \ No newline at end of file
diff --git a/shaping/src/shaper_global_stat.cpp b/shaping/src/shaper_global_stat.cpp
index 7144658..d59b290 100644
--- a/shaping/src/shaper_global_stat.cpp
+++ b/shaping/src/shaper_global_stat.cpp
@@ -63,6 +63,7 @@ static void shaper_global_stat_fieldstat_reg(struct shaping_global_stat *stat)
stat->column_ids[HIT_POLICY_RX_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_rx_bytes", NULL, 0);
stat->column_ids[HIT_POLICY_TX_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_tx_pkts", NULL, 0);
stat->column_ids[HIT_POLICY_TX_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_tx_bytes", NULL, 0);
+ stat->column_ids[HIT_POLICY_TX_SYN_ACK_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_tx_syn_ack_pkts", NULL, 0);
stat->column_ids[HIT_POLICY_DROP_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_drop_pkts", NULL, 0);
stat->column_ids[HIT_POLICY_DROP_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_drop_bytes", NULL, 0);
@@ -344,6 +345,12 @@ void shaper_global_stat_hit_policy_throughput_tx_inc(struct shaping_global_stat_
data->tx_bytes += pkt_len;
}
+void shaper_global_stat_hit_policy_throughput_tx_syn_ack_inc(struct shaping_global_stat_data *thread_global_stat)
+{
+ struct shaping_global_stat_traffic_data *data = &thread_global_stat->hit_policy_traffic;
+ data->tx_syn_ack_pkts++;
+}
+
void shaper_global_stat_hit_policy_drop_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len)
{
struct shaping_global_stat_traffic_data *data = &thread_global_stat->hit_policy_traffic;
@@ -402,6 +409,7 @@ void shaper_global_stat_refresh(struct shaping_ctx *ctx)
sum.hit_policy_traffic.rx_bytes += stat_data[i].hit_policy_traffic.rx_bytes;
sum.hit_policy_traffic.tx_pkts += stat_data[i].hit_policy_traffic.tx_pkts;
sum.hit_policy_traffic.tx_bytes += stat_data[i].hit_policy_traffic.tx_bytes;
+ sum.hit_policy_traffic.tx_syn_ack_pkts += stat_data[i].hit_policy_traffic.tx_syn_ack_pkts;
sum.hit_policy_traffic.drop_pkts += stat_data[i].hit_policy_traffic.drop_pkts;
sum.hit_policy_traffic.drop_bytes += stat_data[i].hit_policy_traffic.drop_bytes;
}
@@ -446,6 +454,7 @@ void shaper_global_stat_refresh(struct shaping_ctx *ctx)
fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_RX_BYTES_IDX], hit_policy_traffic_data->rx_bytes);
fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_TX_PKTS_IDX], hit_policy_traffic_data->tx_pkts);
fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_TX_BYTES_IDX], hit_policy_traffic_data->tx_bytes);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_TX_SYN_ACK_PKTS_IDX], hit_policy_traffic_data->tx_syn_ack_pkts);
fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_DROP_PKTS_IDX], hit_policy_traffic_data->drop_pkts);
fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_DROP_BYTES_IDX], hit_policy_traffic_data->drop_bytes);
diff --git a/shaping/src/shaper_maat.cpp b/shaping/src/shaper_maat.cpp
index f159c2f..8a0fa88 100644
--- a/shaping/src/shaper_maat.cpp
+++ b/shaping/src/shaper_maat.cpp
@@ -253,15 +253,16 @@ void shaper_profile_ex_free(int table_id, void **ad, long argl, void *argp)
return;
}
-void shaper_profile_update(struct shaping_profile_info *s_pf_info, struct shaping_profile *s_pf_ex)
+void shaper_profile_update(struct shaping_thread_ctx *ctx, struct shaping_profile_info *s_pf_info, struct shaping_profile *s_pf_ex)
{
s_pf_info->id = s_pf_ex->id;
s_pf_info->type = s_pf_ex->type;
+ shaper_profile_hash_node_update(ctx, s_pf_info);
return;
}
-static void shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_rule_info *s_rule_info, long long rule_compile_id, int *priority_changed)
+static int shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_rule_info *s_rule_info, long long rule_compile_id, int *priority_changed)
{
struct shaping_rule *s_rule = NULL;
struct shaping_profile *s_pf = NULL;
@@ -270,7 +271,7 @@ static void shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_fl
s_rule = (struct shaping_rule*)maat_plugin_table_get_ex_data(g_maat_instance, ctx->maat_info->rule_table_id, (char *)&rule_compile_id, sizeof(rule_compile_id));
if (!s_rule) {
LOG_ERROR("%s maat_plugin_table_get_ex_data get rule failed for compile id %lld", LOG_TAG_MAAT, rule_compile_id);
- goto END;
+ return -1;
}
s_rule_info->id = s_rule->id;
s_rule_info->fair_factor = s_rule->fair_factor;
@@ -282,9 +283,9 @@ static void shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_fl
s_pf = (struct shaping_profile *)maat_plugin_table_get_ex_data(g_maat_instance, ctx->maat_info->profile_table_id, pf_id_key, strlen(pf_id_key));
if (!s_pf) {
LOG_ERROR("%s maat_plugin_table_get_ex_data get profile failed for key %s", LOG_TAG_MAAT, pf_id_key);
- goto END;
+ return -1;
}
- shaper_profile_update(&s_rule_info->primary, s_pf);
+ shaper_profile_update(ctx, &s_rule_info->primary, s_pf);
if (sf->processed_pkts <= CONFIRM_PRIORITY_PKTS) {
if (sf->priority > s_rule->priority) {
@@ -294,7 +295,7 @@ static void shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_fl
}
if (s_rule->borrow_pf_num == 0) {
- goto END;
+ return 0;
}
for (int i = 0; i < s_rule->borrow_pf_num; i++) {
@@ -303,15 +304,14 @@ static void shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_fl
s_pf = (struct shaping_profile *)maat_plugin_table_get_ex_data(g_maat_instance, ctx->maat_info->profile_table_id, pf_id_key, strlen(pf_id_key));
if (!s_pf) {
LOG_ERROR("%s maat_plugin_table_get_ex_data get profile failed for key %s", LOG_TAG_MAAT, pf_id_key);
- goto END;
+ return -1;
}
- shaper_profile_update(&s_rule_info->borrowing[i], s_pf);
+ shaper_profile_update(ctx, &s_rule_info->borrowing[i], s_pf);
s_rule_info->borrowing_num++;
}
-END:
- return;
+ return 0;
}
static void shaper_profiles_priority_update(struct shaping_flow *sf)
@@ -360,10 +360,10 @@ static int shaper_rules_dup_remove(struct shaping_flow *sf, long long *rule_comp
void shaper_rules_update(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, long long *rule_compile_ids, int rule_num)
{
- int i, j;
int priority_changed = 0;
long long rule_ids_remove_dup[SHAPING_RULE_NUM_MAX] = {0};
int rule_num_remove_dup = 0;
+ int old_rule_num = sf->rule_num;
if (rule_num > SHAPING_RULE_NUM_MAX) {
char *addr_str = addr_tuple4_to_str(&sf->tuple4);
@@ -388,15 +388,16 @@ void shaper_rules_update(struct shaping_thread_ctx *ctx, struct shaping_flow *sf
return;
}
- for (i = sf->rule_num, j = 0; i < sf->rule_num + rule_num_remove_dup; i++, j++) {
- shaper_rule_update(ctx, sf, &sf->matched_rule_infos[i], rule_ids_remove_dup[j], &priority_changed);
+ for (int i = 0; i < rule_num_remove_dup; i++) {
+ if (shaper_rule_update(ctx, sf, &sf->matched_rule_infos[sf->rule_num], rule_ids_remove_dup[i], &priority_changed) == 0) {
+ sf->rule_num++;
+ }
}
- if (sf->rule_num > 0 && priority_changed) {
- shaper_stat_refresh(ctx, sf, ctx->thread_index, 1);
+ if (old_rule_num > 0 && priority_changed) {
+ shaper_stat_refresh(ctx, sf, 1);
}
- sf->rule_num += rule_num_remove_dup;
shaper_profiles_priority_update(sf);
return;
diff --git a/shaping/src/shaper_session.cpp b/shaping/src/shaper_session.cpp
index 4a42391..4384a06 100644
--- a/shaping/src/shaper_session.cpp
+++ b/shaping/src/shaper_session.cpp
@@ -122,11 +122,16 @@ static void shaper_session_log_send(struct shaping_thread_ctx *ctx, struct shapi
int pkt_header_len = sf->ctrl_meta.l7_offset;
struct metadata *ctrl_meta = &sf->ctrl_meta;
struct sids sids;
+ char *dst = NULL;
char *addr_str = addr_tuple4_to_str(&sf->tuple4);
- marsio_buff_malloc_global(ctx->marsio_info->instance, &tx_buff, 1, 0, ctx->thread_index);
+ if (marsio_buff_malloc_global(ctx->marsio_info->instance, &tx_buff, 1, 0, ctx->thread_index) < 0) {
+ LOG_ERROR("%s: marsio_buff_malloc_global failed for session %s", LOG_TAG_SHAPING, addr_str);
+ goto END;
+ }
+
marsio_buff_set_ctrlbuf(tx_buff);
- char *dst = marsio_buff_append(tx_buff, pkt_header_len + mpack_size);
+ dst = marsio_buff_append(tx_buff, pkt_header_len + mpack_size);
memcpy(dst, pkt_header_data, pkt_header_len);
memcpy(dst + pkt_header_len, mpack_data, mpack_size);
diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp
index 735e154..808abd1 100644
--- a/shaping/src/shaper_stat.cpp
+++ b/shaping/src/shaper_stat.cpp
@@ -14,7 +14,7 @@
#define SHAPER_STAT_ROW_NAME "traffic_shaping_rule_hits"
-#define SHAPER_STAT_REFRESH_TIME_NS 10000000 //10 ms
+#define SHAPER_STAT_REFRESH_TIME_US 10000 //10 ms
struct shaper_stat_conf {
int enable_backgroud_thread;
@@ -150,10 +150,11 @@ static void shaper_stat_swarmkv_hincrby_cb(const struct swarmkv_reply *reply, vo
if (reply->type != SWARMKV_REPLY_INTEGER) {
shaper_global_stat_async_hincrby_failed_inc(&ctx->thread_global_stat);
+ arg->start_time_us = curr_time_us;
shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat);//hincrby failed, retry
shaper_global_stat_hincrby_invoke_inc(&ctx->thread_global_stat);
- LOG_INFO("%s: shaping stat hincrby failed, retry for profile id %d priority %d, operate queue_len %lld", LOG_TAG_STAT, arg->profile_id, arg->priority, arg->queue_len);
+ LOG_DEBUG("%s: shaping stat hincrby failed, retry for profile id %d priority %d, operate queue_len %lld", LOG_TAG_STAT, arg->profile_id, arg->priority, arg->queue_len);
swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, arg, "HINCRBY tsg-shaping-%d priority-%d %lld", arg->profile_id, arg->priority, arg->queue_len);
return;
@@ -164,13 +165,67 @@ static void shaper_stat_swarmkv_hincrby_cb(const struct swarmkv_reply *reply, vo
return;
}
-static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, int vsys_id, int thread_id, int rule_id, struct shaping_profile_info *profile, int profile_type, int need_update_guage)
+static void shaper_stat_priority_queue_len_refresh(struct shaping_thread_ctx *ctx, struct shaping_profile_hash_node *profile_hash_node, int priority, long long curr_time_us)
+{
+ if (profile_hash_node->local_queue_len[priority] == 0) {
+ return;
+ }
+
+ if (curr_time_us - profile_hash_node->local_queue_len_update_time_us[priority] < SHAPER_STAT_REFRESH_TIME_US) {
+ return;
+ }
+
+ struct shaping_hincrby_cb_arg *arg = (struct shaping_hincrby_cb_arg *)calloc(1, sizeof(struct shaping_hincrby_cb_arg));
+
+ arg->ctx = ctx;
+ arg->start_time_us = curr_time_us;
+ arg->profile_id = profile_hash_node->id;
+ arg->priority = priority;
+ arg->queue_len = profile_hash_node->local_queue_len[priority];
+ shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat);
+ shaper_global_stat_hincrby_invoke_inc(&ctx->thread_global_stat);
+ swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, arg, "HINCRBY tsg-shaping-%d priority-%d %lld", arg->profile_id, arg->priority, arg->queue_len);
+
+ profile_hash_node->local_queue_len_update_time_us[priority] = curr_time_us;
+ profile_hash_node->local_queue_len[priority] = 0;
+
+ return;
+}
+
+void shaper_stat_priority_queue_len_refresh_all(struct shaping_thread_ctx *ctx, struct shaping_profile_hash_node *profile_hash_node)
+{
+ struct timespec curr_time;
+ long long curr_time_us;
+
+ clock_gettime(CLOCK_MONOTONIC_COARSE, &curr_time);
+ curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
+
+ for (int i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) {
+ shaper_stat_priority_queue_len_refresh(ctx, profile_hash_node, i, curr_time_us);
+ }
+
+ return;
+}
+
+static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, struct shaping_rule_info *rule, struct shaping_profile_info *profile, int profile_type, int need_refresh_stat, int need_update_guage, long long curr_time_us)
{
struct shaping_stat_for_profile *profile_stat = &profile->stat;
struct shaping_stat *stat = ctx->stat;
+ int priority = profile->priority;
+ int thread_id = ctx->thread_index;
unsigned long long old_latency;
+
+ if (need_update_guage) {
+ profile->hash_node->local_queue_len[priority] += profile_stat->priority_queue_len;
+ profile_stat->priority_queue_len = 0;
+ shaper_stat_priority_queue_len_refresh(ctx, profile->hash_node, priority, curr_time_us);
+ }
+
+ if (!need_refresh_stat) {
+ return;
+ }
- shaper_stat_tags_build(vsys_id, rule_id, profile->id, profile->priority, profile_type);
+ shaper_stat_tags_build(rule->vsys_id, rule->id, profile->id, priority, profile_type);
fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_DROP_PKTS_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.drop_pkts, tags, TAG_IDX_MAX, thread_id);
fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_PKTS_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.pkts, tags, TAG_IDX_MAX, thread_id);
fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_BYTES_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.bytes, tags, TAG_IDX_MAX, thread_id);
@@ -195,19 +250,6 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, i
fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_QUEUE_LEN_IDX], SHAPER_STAT_ROW_NAME, profile_stat->out.queue_len, tags, TAG_IDX_MAX, thread_id);
}
- struct shaping_hincrby_cb_arg *arg = (struct shaping_hincrby_cb_arg *)calloc(1, sizeof(struct shaping_hincrby_cb_arg));
- struct timespec curr_time;
-
- clock_gettime(CLOCK_MONOTONIC, &curr_time);
- arg->ctx = ctx;
- arg->start_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
- arg->profile_id = profile->id;
- arg->priority = profile->priority;
- arg->queue_len = profile_stat->in.queue_len + profile_stat->out.queue_len;
- shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat);
- shaper_global_stat_hincrby_invoke_inc(&ctx->thread_global_stat);
- swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, arg, "HINCRBY tsg-shaping-%d priority-%d %lld", arg->profile_id, arg->priority, arg->queue_len);
-
memset(profile_stat, 0, sizeof(struct shaping_stat_for_profile));
} else {
profile_stat->in.pkts = 0;
@@ -224,34 +266,37 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, i
return;
}
-void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int thread_id, int force)
+void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int force)
{
struct shaping_rule_info *rule;
struct timespec curr_time;
int need_refresh = 0;
+ long long curr_time_us;
+
+ clock_gettime(CLOCK_MONOTONIC_COARSE, &curr_time);
+ curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
if (force) {
need_refresh = 1;
} else {
- clock_gettime(CLOCK_MONOTONIC, &curr_time);
- if (curr_time.tv_sec - sf->stat_update_time.tv_sec > 0 || curr_time.tv_nsec - sf->stat_update_time.tv_nsec >= SHAPER_STAT_REFRESH_TIME_NS) {
+ if (curr_time_us - sf->stat_update_time_us >= SHAPER_STAT_REFRESH_TIME_US) {
need_refresh = 1;
- memcpy(&sf->stat_update_time, &curr_time, sizeof(struct timespec));
+ sf->stat_update_time_us = curr_time_us;
}
}
- if (!need_refresh) {
+ int need_update_guage = sf->processed_pkts > CONFIRM_PRIORITY_PKTS ? 1 : 0;
+
+ if (!need_refresh && !need_update_guage) {
return;
}
- int need_update_guage = sf->processed_pkts > CONFIRM_PRIORITY_PKTS ? 1 : 0;
-
for (int i = 0; i < sf->rule_num; i++) {
rule = &sf->matched_rule_infos[i];
- shaper_stat_profile_metirc_refresh(ctx, rule->vsys_id, thread_id, rule->id, &rule->primary, PROFILE_IN_RULE_TYPE_PRIMARY, need_update_guage);
+ shaper_stat_profile_metirc_refresh(ctx, rule, &rule->primary, PROFILE_IN_RULE_TYPE_PRIMARY, need_refresh, need_update_guage, curr_time_us);
for (int j = 0; j < rule->borrowing_num; j++) {
- shaper_stat_profile_metirc_refresh(ctx, rule->vsys_id, thread_id, rule->id, &rule->borrowing[j], PROFILE_IN_RULE_TYPE_BORROW, need_update_guage);
+ shaper_stat_profile_metirc_refresh(ctx, rule, &rule->borrowing[j], PROFILE_IN_RULE_TYPE_BORROW, need_refresh, need_update_guage, curr_time_us);
}
}
@@ -303,6 +348,8 @@ void shaper_stat_queueing_pkt_inc(struct shaping_stat_for_profile *profile_stat,
profile_stat->out.queue_len++;
}
+ profile_stat->priority_queue_len++;
+
return;
}
@@ -314,6 +361,8 @@ void shaper_stat_queueing_pkt_dec(struct shaping_stat_for_profile *profile_stat,
profile_stat->out.queue_len--;
}
+ profile_stat->priority_queue_len--;
+
return;
}
diff --git a/shaping/src/shaper_swarmkv.cpp b/shaping/src/shaper_swarmkv.cpp
index 81d8ac9..6ac1db5 100644
--- a/shaping/src/shaper_swarmkv.cpp
+++ b/shaping/src/shaper_swarmkv.cpp
@@ -6,7 +6,7 @@
#include "utils.h"
#include "shaper_swarmkv.h"
-#define PROBABILITY_MAX 100
+#define PROBABILITY_MAX 500
#define INCREMENT 10
#define DECREMENT 1
#define FREEZE_TIME 1 //unit:s
@@ -106,36 +106,6 @@ void swarmkv_reload_log_level()
return;
}
-int shaper_swarmkv_pending_queue_aqm_drop(struct shaping_thread_ctx *ctx)
-{
- long long pending_queue_len = swarmkv_caller_get_pending_commands(ctx->swarmkv_db);
- time_t now = time(NULL);
-
- if (now - ctx->swarmkv_aqm_update_time < FREEZE_TIME) {
- goto END;
- }
-
- if (pending_queue_len > PENDING_QUEUE_LEN_MAX) {
- if (ctx->swarmkv_aqm_prob < PROBABILITY_MAX) {
- ctx->swarmkv_aqm_prob += INCREMENT;
- }
- LOG_DEBUG("%s: shaping pending queue len %lld, aqm prob %d", LOG_TAG_SWARMKV, pending_queue_len, ctx->swarmkv_aqm_prob);
- } else {
- if (ctx->swarmkv_aqm_prob >= DECREMENT) {
- ctx->swarmkv_aqm_prob -= DECREMENT;
- }
- LOG_DEBUG("%s: shaping pending queue len %lld, aqm prob %d", LOG_TAG_SWARMKV, pending_queue_len, ctx->swarmkv_aqm_prob);
- }
- ctx->swarmkv_aqm_update_time = now;
-
-END:
- if (rand() % PROBABILITY_MAX < ctx->swarmkv_aqm_prob) {
- return 1;
- }
-
- return 0;
-}
-
struct swarmkv* shaper_swarmkv_init(int caller_thread_num)
{
struct swarmkv_options *swarmkv_opts = NULL;
diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp
index f262412..2b8f296 100644
--- a/shaping/test/gtest_shaper.cpp
+++ b/shaping/test/gtest_shaper.cpp
@@ -287,7 +287,7 @@ TEST(single_session, udp_tx_in_order)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 170000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts
+ shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
fclose(stat_file);
@@ -366,7 +366,7 @@ TEST(max_min_host_fairness_profile, udp_tx_in_order)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 170000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts
+ shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
fclose(stat_file);
@@ -419,7 +419,7 @@ TEST(single_session, tcp_tx_in_order)
/***********send stat data here********************/
- shaper_stat_refresh(&ctx->thread_ctx[0], sf, ctx->thread_ctx[0].thread_index, 1);
+ shaper_stat_refresh(&ctx->thread_ctx[0], sf, 1);
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
sleep(2);//wait telegraf generate metric
@@ -432,7 +432,7 @@ TEST(single_session, tcp_tx_in_order)
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));//pure ctrl pkts force consume 1000 tokens, current token: -1000--->0, so no pkt can be sent
stub_refresh_token_bucket(0);
- for (int i = 0; i < 10; i++) {//10 pkts which is not pure control
+ for (int i = 0; i < 11; i++) {//10 pkts which is not pure control, first polling request 10 times token, then 10 loops send 10 pkts
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
}
@@ -459,7 +459,7 @@ TEST(single_session, tcp_tx_in_order)
shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 10, 0, SHAPING_DIR_OUT, profile_type_primary);
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 0, 0, 1, 10, 1000, 0, 0, 30000, SHAPING_DIR_OUT, profile_type_primary);
+ shaping_stat_judge(line, 0, 0, 1, 10, 1000, 0, 0, 31000, SHAPING_DIR_OUT, profile_type_primary);
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
@@ -506,7 +506,7 @@ TEST(single_session, udp_diff_direction)
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
stub_refresh_token_bucket(0);
- for (int i = 0; i < 20; i++) {
+ for (int i = 0; i < 22; i++) {//first polling just request token and don't send pkt
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
}
@@ -534,9 +534,9 @@ TEST(single_session, udp_diff_direction)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 20000, SHAPING_DIR_OUT, profile_type_primary);
+ shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 21000, SHAPING_DIR_OUT, profile_type_primary);
- shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 20000, SHAPING_DIR_IN, profile_type_primary);
+ shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 22000, SHAPING_DIR_IN, profile_type_primary);
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
fclose(stat_file);
@@ -575,7 +575,7 @@ TEST(single_session, udp_multi_rules)
shaper_rules_update(&ctx->thread_ctx[0], sf, rule_id, 3);
/*******send packets***********/
- send_packets(&ctx->thread_ctx[0], sf, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue, 2, 0);
+ send_packets(&ctx->thread_ctx[0], sf, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue, 5, 0);
//first 10 packets
@@ -612,13 +612,13 @@ TEST(single_session, udp_multi_rules)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 0
- shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 506000, SHAPING_DIR_OUT, profile_type_primary);
+ shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 507000, SHAPING_DIR_OUT, profile_type_primary);
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1
- shaping_stat_judge(line, 1, 1, 1, 100, 10000, 0, 0, 1000, SHAPING_DIR_OUT, profile_type_primary);//latency of every queued pkt is 1
+ shaping_stat_judge(line, 1, 1, 1, 100, 10000, 0, 0, 2000, SHAPING_DIR_OUT, profile_type_primary);
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2
- shaping_stat_judge(line, 2, 2, 1, 100, 10000, 0, 0, 90000, SHAPING_DIR_OUT, profile_type_primary);//max latency is first queued pkt
+ shaping_stat_judge(line, 2, 2, 1, 100, 10000, 0, 0, 91000, SHAPING_DIR_OUT, profile_type_primary);//max latency is first queued pkt
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
@@ -687,7 +687,7 @@ TEST(single_session, udp_borrow)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1, primary
- shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 170000, SHAPING_DIR_OUT, profile_type_primary);
+ shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary);
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, borrow
shaping_stat_judge(line, 1, 2, 2, 100, 10000, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow);
@@ -762,7 +762,7 @@ TEST(single_session, udp_borrow_same_priority_9)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1, primary
- shaping_stat_judge(line, 1, 1, 9, 0, 0, 0, 0, 170000, SHAPING_DIR_OUT, profile_type_primary);
+ shaping_stat_judge(line, 1, 1, 9, 0, 0, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary);
#if 0 //fieldstat don't output a row when all values is zero
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, borrow
@@ -777,312 +777,6 @@ TEST(single_session, udp_borrow_same_priority_9)
fclose(stat_file);
}
-/*session1 match rule1, session2 match rule2
- rule1:
- priority:2
- profile1: limit 1000
- rule2:
- priority:1
- profile2: limit 1000*/
-TEST(two_session_diff_priority, udp_in_order)
-{
- struct stub_pkt_queue expec_tx_queue1;
- struct stub_pkt_queue expec_tx_queue2;
- struct stub_pkt_queue *actual_tx_queue;
- struct shaping_ctx *ctx = NULL;
- struct shaping_flow *sf1 = NULL;
- struct shaping_flow *sf2 = NULL;
- long long rule_ids[] = {0, 1};
- long long rule_id1[] = {0};
- long long rule_id2[] = {1};
- int profile_nums[] = {1, 1};
- int prioritys[] = {2, 1};
- int profile_ids[][MAX_REF_PROFILE] = {{0}, {1}};
-
- stub_init();
- TAILQ_INIT(&expec_tx_queue1);
- TAILQ_INIT(&expec_tx_queue2);
- ctx = shaping_engine_init();
- ASSERT_TRUE(ctx != NULL);
- sf1 = shaping_flow_new(&ctx->thread_ctx[0]);
- ASSERT_TRUE(sf1 != NULL);
- sf2 = shaping_flow_new(&ctx->thread_ctx[0]);
- ASSERT_TRUE(sf2 != NULL);
-
- stub_set_matched_shaping_rules(2, rule_ids, prioritys, profile_nums, profile_ids);
- stub_set_token_bucket_avl_per_sec(0, 1000, SHAPING_DIR_OUT);
- stub_set_token_bucket_avl_per_sec(1, 1000, SHAPING_DIR_OUT);
-
- actual_tx_queue = stub_get_tx_queue();
- shaper_rules_update(&ctx->thread_ctx[0], sf1, rule_id1, 1);
- shaper_rules_update(&ctx->thread_ctx[0], sf2, rule_id2, 1);
-
-
- /*******send packets***********/
- send_packets(&ctx->thread_ctx[0], sf1, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 1, 0);
- send_packets(&ctx->thread_ctx[0], sf2, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue2, 1, 0);
-
-
- //first 10 packets for stream1
- ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10));
-
- //first 10 packets for stream2
- ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10));
-
- ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
-
- while (!TAILQ_EMPTY(&expec_tx_queue2)) {//last 90 delay packets
- stub_refresh_token_bucket(0);
- stub_refresh_token_bucket(1);
- for (int i = 0; i < 10; i++) {
- polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
- }
- ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10));//stream2 priority 1, first
- ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
- }
-
- while (!TAILQ_EMPTY(&expec_tx_queue1)) {//last 90 delay packets
- stub_refresh_token_bucket(0);
- stub_refresh_token_bucket(1);
- for (int i = 0; i < 10; i++) {
- polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
- }
- ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10));//stream1 priority 2
- ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
- }
-
- shaping_flow_free(&ctx->thread_ctx[0], sf1);
- shaping_flow_free(&ctx->thread_ctx[0], sf2);
- fieldstat_global_disable_prometheus_endpoint();
-
- /***********send stat data here********************/
- fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
-
- shaper_thread_resource_clear();
- shaping_engine_destroy(ctx);
- stub_clear_matched_shaping_rules();
-
- /*******test statistics***********/
- sleep(2);//wait telegraf to output
- FILE *stat_file;
-
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
- memset(line, 0, sizeof(line));
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 0
- shaping_stat_judge(line, 0, 0, 2, 100, 10000, 0, 0, 280000, SHAPING_DIR_OUT, profile_type_primary);//max latency is every queued pkts
-
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1
- shaping_stat_judge(line, 1, 1, 1, 100, 10000, 0, 0, 90000, SHAPING_DIR_OUT, profile_type_primary);//max latency is every queued pkts
-
- fclose(stat_file);
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
- fclose(stat_file);
-}
-
-/*session1 match rule1,rule2,rule4; session2 match rule3
- rule1:
- priority:1
- profile1: limit 1000
- rule2:
- priority:2
- profile2: limit 1000
- rule3:
- priority:3
- profile3: limit 1000
- rule4:
- priority:4
- profile4: limit 1000*/
-TEST(two_session_diff_priority, udp_in_order_multi_rule)
-{
- struct stub_pkt_queue expec_tx_queue1;
- struct stub_pkt_queue expec_tx_queue2;
- struct stub_pkt_queue *actual_tx_queue;
- struct shaping_ctx *ctx = NULL;
- struct shaping_flow *sf1 = NULL;
- struct shaping_flow *sf2 = NULL;
- long long rule_ids[] = {1, 2, 3, 4};
- long long rule_id1[] = {1, 2, 4};
- long long rule_id2[] = {3};
- int profile_nums[] = {1, 1, 1, 1};
- int prioritys[] = {1, 2, 3, 4};
- int profile_ids[][MAX_REF_PROFILE] = {{1}, {2}, {3}, {4}};
-
-
- TAILQ_INIT(&expec_tx_queue1);
- TAILQ_INIT(&expec_tx_queue2);
- stub_init();
-
- ctx = shaping_engine_init();
- ASSERT_TRUE(ctx != NULL);
- sf1 = shaping_flow_new(&ctx->thread_ctx[0]);
- ASSERT_TRUE(sf1 != NULL);
- sf2 = shaping_flow_new(&ctx->thread_ctx[0]);
- ASSERT_TRUE(sf2 != NULL);
-
- stub_set_matched_shaping_rules(4, rule_ids, prioritys, profile_nums, profile_ids);
-
- stub_set_token_bucket_avl_per_sec(1, 1000, SHAPING_DIR_OUT);
- stub_set_token_bucket_avl_per_sec(2, 1000, SHAPING_DIR_OUT);
- stub_set_token_bucket_avl_per_sec(3, 1000, SHAPING_DIR_OUT);
- stub_set_token_bucket_avl_per_sec(4, 1000, SHAPING_DIR_OUT);
-
- actual_tx_queue = stub_get_tx_queue();
- shaper_rules_update(&ctx->thread_ctx[0], sf1, rule_id1, 3);
- shaper_rules_update(&ctx->thread_ctx[0], sf2, rule_id2, 1);
-
-
- /*******send packets***********/
- send_packets(&ctx->thread_ctx[0], sf1, 20, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 2, 0);
- send_packets(&ctx->thread_ctx[0], sf2, 20, 100, SHAPING_DIR_OUT, &expec_tx_queue2, 1, 0);
-
-
- //first 10 packets for stream1
- ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10));
-
- //first 10 packets for stream2
- ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10));
-
- ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
-
- while (!TAILQ_EMPTY(&expec_tx_queue1)) {//stream1, priority 1, last 10 packets
- stub_refresh_token_bucket(1);
- stub_refresh_token_bucket(2);
- stub_refresh_token_bucket(3);
- stub_refresh_token_bucket(4);
- for (int i = 0; i < 30; i++) {
- polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
- }
- ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10));
- ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
- }
-
- while (!TAILQ_EMPTY(&expec_tx_queue2)) {
- stub_refresh_token_bucket(1);
- stub_refresh_token_bucket(2);
- stub_refresh_token_bucket(3);
- stub_refresh_token_bucket(4);
- for (int i = 0; i < 10; i++) {
- polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
- }
- ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10));//stream2 priority 3
- ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
- }
-
- shaping_flow_free(&ctx->thread_ctx[0], sf1);
- shaping_flow_free(&ctx->thread_ctx[0], sf2);
- fieldstat_global_disable_prometheus_endpoint();
-
- /***********send stat data here********************/
- fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
-
- shaper_thread_resource_clear();
- shaping_engine_destroy(ctx);
- stub_clear_matched_shaping_rules();
-
- /*******test statistics***********/
- sleep(2);//wait telegraf to output
- FILE *stat_file;
-
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
- memset(line, 0, sizeof(line));
-
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 1, 1, 1, 20, 2000, 0, 0, 48000, SHAPING_DIR_OUT, profile_type_primary);//profile_id 1, max latency is last pkt
-
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 2, 2, 1, 20, 2000, 0, 0, 1000, SHAPING_DIR_OUT, profile_type_primary);//profile_id 2, evevy queued pkt's latency is 1
-
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 4, 4, 1, 20, 2000, 0, 0, 1000, SHAPING_DIR_OUT, profile_type_primary);//profile_id 4, max latency is first queued pkt
-
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 3, 3, 3, 20, 2000, 0, 0, 40000, SHAPING_DIR_OUT, profile_type_primary);//profile_id 3, every queued pkt's latency is 40
-
- fclose(stat_file);
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
- fclose(stat_file);
-}
-
-/*session1 match rule1
- rule1:
- priority:1
- profile1: limit 1000, first 20 pkts async, then sync
-*/
-TEST(single_session_async, udp_tx_in_order)
-{
- struct stub_pkt_queue expec_tx_queue;
- struct stub_pkt_queue *actual_tx_queue;
- struct shaping_ctx *ctx = NULL;
- struct shaping_flow *sf = NULL;
- long long rule_id[] = {0};
- int priority[] = {1};
- int profile_num[] = {1};
- int profile_id[][MAX_REF_PROFILE] = {{0}};
-
-
- TAILQ_INIT(&expec_tx_queue);
- stub_init();
- ctx = shaping_engine_init();
- ASSERT_TRUE(ctx != NULL);
- sf = shaping_flow_new(&ctx->thread_ctx[0]);
- ASSERT_TRUE(sf != NULL);
-
- stub_set_matched_shaping_rules(1, rule_id, priority, profile_num, profile_id);
- stub_set_token_bucket_avl_per_sec(0, 1000, SHAPING_DIR_OUT);
- stub_set_async_token_get_times(0, 20);
- actual_tx_queue = stub_get_tx_queue();
- shaper_rules_update(&ctx->thread_ctx[0], sf, rule_id, 1);
-
-
- /*******packets, OP_STATE_DATA***********/
- send_packets(&ctx->thread_ctx[0], sf, 20, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0);
-
- sleep(1);//wait stub async thread complete
-
- send_packets(&ctx->thread_ctx[0], sf, 80, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0);
-
-
- //first 10 packets, got token
- ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
- ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
-
- while (!TAILQ_EMPTY(&expec_tx_queue)) {//last 90 delay packets
- stub_refresh_token_bucket(0);
- for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent
- polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
- }
- ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
- ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
- }
-
- shaping_flow_free(&ctx->thread_ctx[0], sf);
- fieldstat_global_disable_prometheus_endpoint();
-
- /***********send stat data here********************/
- fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
-
- shaper_thread_resource_clear();
- shaping_engine_destroy(ctx);
- stub_clear_matched_shaping_rules();
-
- /*******test statistics***********/
- sleep(2);//wait telegraf to output
- FILE *stat_file;
-
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
- memset(line, 0, sizeof(line));
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 170000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts
- fclose(stat_file);
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
- fclose(stat_file);
-}
-
/*session1 match rule1
rule1:
priority:1
@@ -1199,7 +893,7 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order)
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
}
- shaper_stat_refresh(&ctx->thread_ctx[0], sf2, 0, 1);//refresh stat, to ensure priority queue_len in swarmkv is correct
+ shaper_stat_refresh(&ctx->thread_ctx[0], sf2, 1);//refresh stat, to ensure priority queue_len in swarmkv is correct
stub_curr_time_s_inc(1);//inc time to refresh hmget interval
while (!TAILQ_EMPTY(&expec_tx_queue1)) {//last 90 delay packets
@@ -1230,99 +924,19 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1, primary
- shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 1470000, SHAPING_DIR_OUT, profile_type_primary);
+ shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 1471000, SHAPING_DIR_OUT, profile_type_primary);
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, borrow
shaping_stat_judge(line, 1, 2, 2, 100, 10000, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow);
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, primary
- shaping_stat_judge(line, 2, 2, 1, 100, 10000, 0, 0, 190000, SHAPING_DIR_OUT, profile_type_primary);
+ shaping_stat_judge(line, 2, 2, 1, 100, 10000, 0, 0, 191000, SHAPING_DIR_OUT, profile_type_primary);
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
fclose(stat_file);
}
-/*session1 match rule1; session2 match rule1
- rule1:
- priority:1
- primary profile_a:
-
-profile_a: limit 1000
-*/
-TEST(two_session_same_rule, udp_tx_in_order)
-{
- struct stub_pkt_queue expec_tx_queue;
- struct stub_pkt_queue *actual_tx_queue;
- struct shaping_ctx *ctx = NULL;
- struct shaping_flow *sf1 = NULL;
- struct shaping_flow *sf2 = NULL;
- long long rule_id[] = {1};
- int profile_num[] = {1};
- int priority[] = {1};
- int profile_id[][MAX_REF_PROFILE] = {{1}};
-
-
- TAILQ_INIT(&expec_tx_queue);
- stub_init();
-
- ctx = shaping_engine_init();
- ASSERT_TRUE(ctx != NULL);
- sf1 = shaping_flow_new(&ctx->thread_ctx[0]);
- ASSERT_TRUE(sf1 != NULL);
- sf2 = shaping_flow_new(&ctx->thread_ctx[0]);
- ASSERT_TRUE(sf2 != NULL);
-
- stub_set_matched_shaping_rules(1, rule_id, priority, profile_num, profile_id);
-
- stub_set_token_bucket_avl_per_sec(1, 1000, SHAPING_DIR_OUT);
- actual_tx_queue = stub_get_tx_queue();
-
- shaper_rules_update(&ctx->thread_ctx[0], sf1, rule_id, 1);
- shaper_rules_update(&ctx->thread_ctx[0], sf2, rule_id, 1);
-
- /*******packets, OP_STATE_DATA***********/
- send_packets(&ctx->thread_ctx[0], sf1, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0);
- send_packets(&ctx->thread_ctx[0], sf2, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0);
-
- //first 10 packets
- ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
- ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
-
- while (!TAILQ_EMPTY(&expec_tx_queue)) {//last 190 delay packets
- stub_refresh_token_bucket(1);
- for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent
- polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
- }
- ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
- ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
- }
-
- shaping_flow_free(&ctx->thread_ctx[0], sf1);
- shaping_flow_free(&ctx->thread_ctx[0], sf2);
- fieldstat_global_disable_prometheus_endpoint();
-
- /***********send stat data here********************/
- fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
-
- shaper_thread_resource_clear();
- shaping_engine_destroy(ctx);
- stub_clear_matched_shaping_rules();
-
- /*******test statistics***********/
- sleep(2);//wait telegraf to output
- FILE *stat_file;
-
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
- memset(line, 0, sizeof(line));
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 1, 1, 1, 200, 20000, 0, 0, 370000, SHAPING_DIR_OUT, profile_type_primary);
- fclose(stat_file);
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
- fclose(stat_file);
-}
-
/*session1 match rule1; session2 match rule2
rule1:
priority:1
@@ -1373,7 +987,7 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order)
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
- shaper_stat_refresh(&ctx->thread_ctx[0], sf1, 0, 1);//刷新线程0中的优先级队列长度到swarmkv中
+ shaper_stat_refresh(&ctx->thread_ctx[0], sf1, 1);//刷新线程0中的优先级队列长度到swarmkv中
stub_curr_time_s_inc(1);//inc time to refresh hmget interval
for (int i = 0; i < 10; i++) {//线程1中的session优先级为2,被线程0中优先级为1的session阻断
stub_refresh_token_bucket(0);
@@ -1383,16 +997,18 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order)
while (!TAILQ_EMPTY(&expec_tx_queue1)) {
stub_refresh_token_bucket(0);
- polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
+ polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);//require tokens
+ polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);//send pkt
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//sf1 priority 1
}
- shaper_stat_refresh(&ctx->thread_ctx[0], sf1, 0, 1);//刷新线程0中的优先级队列长度到swarmkv中
stub_curr_time_s_inc(1);//inc time to refresh hmget interval
+ shaper_stat_refresh(&ctx->thread_ctx[0], sf1, 1);//刷新线程0中的优先级队列长度到swarmkv中
while (!TAILQ_EMPTY(&expec_tx_queue2)) {
stub_refresh_token_bucket(0);
polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);
+ polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1));//sf2 priority 2
}
@@ -1417,7 +1033,7 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order)
profile_a: limit 1000
*/
-TEST(two_session_diff_priority_same_profile, session_timer_test)
+TEST(two_session_diff_priority_same_profile, profile_timer_test)
{
struct stub_pkt_queue expec_tx_queue1;
struct stub_pkt_queue expec_tx_queue2;
@@ -1457,7 +1073,7 @@ TEST(two_session_diff_priority_same_profile, session_timer_test)
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
- sleep(3);//wait session timer to expire, to refresh priority queue_len to swarmkv
+ sleep(3);//wait profile timer to expire, to refresh priority queue_len to swarmkv
for (int i = 0; i < 500; i++) {
stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);//inc time to refresh stat in timer
}
@@ -1475,7 +1091,8 @@ TEST(two_session_diff_priority_same_profile, session_timer_test)
while (!TAILQ_EMPTY(&expec_tx_queue1)) {
stub_refresh_token_bucket(0);
- polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
+ polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);//first polling request token
+ polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);//then send pkt
stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//sf1 priority 1
@@ -1491,7 +1108,8 @@ TEST(two_session_diff_priority_same_profile, session_timer_test)
stub_curr_time_s_inc(1);//inc time to refresh hmget interval
while (!TAILQ_EMPTY(&expec_tx_queue2)) {
stub_refresh_token_bucket(0);
- polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);
+ polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);//first polling request token
+ polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);//then send pkt
stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1));//sf2 priority 2
@@ -1557,7 +1175,7 @@ TEST(two_sessions, priority_non_block)
shaper_rules_update(&ctx->thread_ctx[1], sf2, rule_id2, 1);
/*******send packets***********/
- send_packets(&ctx->thread_ctx[0], sf1, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 2, 0);//sf1 blocked by rule2(profile id 1), while rule3(profile id 0) still has 1000 token
+ send_packets(&ctx->thread_ctx[0], sf1, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 3, 0);//sf1 blocked by rule2(profile id 1), while rule3(profile id 0) still has 1000 token
send_packets(&ctx->thread_ctx[1], sf2, 10, 100, SHAPING_DIR_OUT, &expec_tx_queue2, 1, 0);
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10));//sf1 should send 10 pkts
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10));//sf2 should send 10 pkts cause rule3(profile id 0) has 1000 token
@@ -1566,8 +1184,10 @@ TEST(two_sessions, priority_non_block)
while (!TAILQ_EMPTY(&expec_tx_queue1)) {
stub_refresh_token_bucket(0);
stub_refresh_token_bucket(1);
- polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);//tow rules per pkt need two polling
+
+ for (int i = 0; i < 4; i++) {
+ polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);//two rules, one rule need two polling, request token and send pkt
+ }
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//sf1 remaining 90 pkts
}
@@ -1642,6 +1262,7 @@ TEST(two_sessions, borrow_when_primary_profile_priority_blocked)
while (!TAILQ_EMPTY(&expec_tx_queue1)) {
stub_refresh_token_bucket(0);
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
+ polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));
}
@@ -1709,6 +1330,7 @@ TEST(two_sessions, primary_profile_priority_blocked_by_borrow_profile)
while (!TAILQ_EMPTY(&expec_tx_queue1)) {
stub_refresh_token_bucket(1);
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
+ polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);//blocked by priority, sf1 has priority 2 for profile_b(id 1)
stub_curr_time_ns_inc(STUB_TIME_INC_FOR_HMGET);
@@ -1719,6 +1341,7 @@ TEST(two_sessions, primary_profile_priority_blocked_by_borrow_profile)
while (!TAILQ_EMPTY(&expec_tx_queue2)) {
stub_refresh_token_bucket(1);
polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);
+ polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1));
}
@@ -1768,6 +1391,7 @@ TEST(statistics, udp_drop_pkt)
while (!TAILQ_EMPTY(&expec_tx_queue)) {
stub_refresh_token_bucket(0);
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
+ polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 1));
}
@@ -1837,7 +1461,7 @@ TEST(statistics, udp_queueing_pkt)
/***********send stat data here********************/
- shaper_stat_refresh(&ctx->thread_ctx[0], sf, ctx->thread_ctx[0].thread_index, 1);
+ shaper_stat_refresh(&ctx->thread_ctx[0], sf, 1);
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
shaper_global_stat_refresh(ctx);
sleep(2);//wait telegraf generate metric
@@ -1856,6 +1480,7 @@ TEST(statistics, udp_queueing_pkt)
while (!TAILQ_EMPTY(&expec_tx_queue)) {//last 90 delay packets
stub_refresh_token_bucket(0);
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
+ polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 1));
}
@@ -1899,6 +1524,6 @@ TEST(statistics, udp_queueing_pkt)
int main(int argc, char **argv)
{
testing::InitGoogleTest(&argc, argv);
- //testing::GTEST_FLAG(filter) = "two_sessions.primary_profile_priority_blocked_by_borrow_profile";
+ //testing::GTEST_FLAG(filter) = "single_session.udp_tx_in_order";
return RUN_ALL_TESTS();
} \ No newline at end of file
diff --git a/shaping/test/stub.cpp b/shaping/test/stub.cpp
index 45a2e55..93ce156 100644
--- a/shaping/test/stub.cpp
+++ b/shaping/test/stub.cpp
@@ -3,6 +3,7 @@
#include <MESA/maat.h>
#include <cstdio>
+#include <cstring>
#include <marsio.h>
#include <vector>
#include <stdlib.h>
@@ -194,6 +195,7 @@ void stub_init()
pf_array[i].in_limit_bandwidth = DEFAULT_AVALIABLE_TOKEN_PER_SEC;
pf_array[i].out_limit_bandwidth = DEFAULT_AVALIABLE_TOKEN_PER_SEC;
pf_async_times[i] = 0;
+ memset(profile_priority_len[i], 0, 10 * sizeof(int));
}
return;
}