diff options
| author | 刘畅 <[email protected]> | 2024-02-07 01:59:40 +0000 |
|---|---|---|
| committer | 刘畅 <[email protected]> | 2024-02-07 01:59:40 +0000 |
| commit | bad1c91e472c30e348eb0eb8c0cdb504bc6f928e (patch) | |
| tree | 95da51ac9b71f85c7591bc4155b3d69bbdd676cf | |
| parent | 008d4b3906cc84e007f4519f901a288dd968a14e (diff) | |
| parent | 1795b5592e6b397d83e96c54088470d158ebaac9 (diff) | |
Merge branch 'feature_aqm_blue' into 'rel'
TSG-19341: add Feature aqm blue and some performance optimize
See merge request tango/shaping-engine!75
| -rw-r--r-- | common/src/session_table.cpp | 8 | ||||
| -rw-r--r-- | shaping/CMakeLists.txt | 6 | ||||
| -rw-r--r-- | shaping/include/shaper.h | 64 | ||||
| -rw-r--r-- | shaping/include/shaper_aqm.h | 2 | ||||
| -rw-r--r-- | shaping/include/shaper_global_stat.h | 3 | ||||
| -rw-r--r-- | shaping/include/shaper_stat.h | 4 | ||||
| -rw-r--r-- | shaping/include/shaper_swarmkv.h | 3 | ||||
| -rw-r--r-- | shaping/src/main.cpp | 4 | ||||
| -rw-r--r-- | shaping/src/shaper.cpp | 312 | ||||
| -rw-r--r-- | shaping/src/shaper_aqm.cpp | 82 | ||||
| -rw-r--r-- | shaping/src/shaper_global_stat.cpp | 9 | ||||
| -rw-r--r-- | shaping/src/shaper_maat.cpp | 33 | ||||
| -rw-r--r-- | shaping/src/shaper_session.cpp | 9 | ||||
| -rw-r--r-- | shaping/src/shaper_stat.cpp | 101 | ||||
| -rw-r--r-- | shaping/src/shaper_swarmkv.cpp | 32 | ||||
| -rw-r--r-- | shaping/test/gtest_shaper.cpp | 455 | ||||
| -rw-r--r-- | shaping/test/stub.cpp | 2 |
17 files changed, 481 insertions, 648 deletions
diff --git a/common/src/session_table.cpp b/common/src/session_table.cpp index b5a2278..e3dcf92 100644 --- a/common/src/session_table.cpp +++ b/common/src/session_table.cpp @@ -204,11 +204,11 @@ struct session_node *session_table_search_by_id(struct session_table *table, uin { struct session_node *temp = NULL; HASH_FIND(hh1, table->root_by_id, &session_id, sizeof(session_id), temp); - if (!temp) - { + //if (!temp) + //{ //LOG_DEBUG("%s: search: key %lu not exists", LOG_TAG_STABLE, session_id); - return NULL; - } + // return NULL; + //} //LOG_DEBUG("%s: search: key %lu success", LOG_TAG_STABLE, session_id); diff --git a/shaping/CMakeLists.txt b/shaping/CMakeLists.txt index cc82405..c85bbba 100644 --- a/shaping/CMakeLists.txt +++ b/shaping/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(shaper src/shaper_maat.cpp src/shaper_marsio.cpp src/shaper_session.cpp src/shaper_stat.cpp src/shaper_swarmkv.cpp src/shaper.cpp src/shaper_global_stat.cpp ${PROJECT_SOURCE_DIR}/deps/timeout/timeout.c) +add_library(shaper src/shaper_maat.cpp src/shaper_marsio.cpp src/shaper_session.cpp src/shaper_stat.cpp src/shaper_swarmkv.cpp src/shaper.cpp src/shaper_global_stat.cpp src/shaper_aqm.cpp ${PROJECT_SOURCE_DIR}/deps/timeout/timeout.c) target_link_libraries(shaper PUBLIC common) target_link_libraries(shaper PUBLIC avl_tree) target_link_libraries(shaper PUBLIC cjson) @@ -17,4 +17,8 @@ target_link_libraries(shaping_engine PUBLIC jemalloc) install(TARGETS shaping_engine RUNTIME DESTINATION bin COMPONENT Program) +# 在安装时创建一个子目录log +install(DIRECTORY DESTINATION ${CMAKE_INSTALL_PREFIX}/log) + + add_subdirectory(test)
\ No newline at end of file diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h index 6168191..83f93f1 100644 --- a/shaping/include/shaper.h +++ b/shaping/include/shaper.h @@ -42,6 +42,8 @@ struct shaping_system_conf { unsigned int session_queue_len_max; unsigned int priority_queue_len_max; unsigned int pkt_max_delay_time_us; + int token_multiple_min; + int token_multiple_max; int polling_node_num_max[SHAPING_PRIORITY_NUM_MAX]; int work_thread_num; int cpu_affinity_enable; @@ -59,8 +61,6 @@ struct shaping_thread_ctx { struct shaping_stat *stat; struct shaping_marsio_info *marsio_info; struct swarmkv *swarmkv_db;//handle of swarmkv - int swarmkv_aqm_prob; - time_t swarmkv_aqm_update_time; struct shaping_maat_info *maat_info; struct session_table *session_table; struct timeouts *expires; @@ -97,12 +97,51 @@ enum shaping_profile_type { PROFILE_TYPE_SPLIT_BY_LOCAL_HOST }; +enum shaper_aqm_type { + AQM_TYPE_NONE = 0, + AQM_TYPE_BLUE, + AQM_TYPE_CODEL, + AQM_TYPE_MAX +}; + +struct shaper_aqm_blue_para { + time_t update_time; + int probability; +}; + +struct shaper_token_multiple { + int token_get_multiple; + unsigned char has_drop_by_queue_full; + unsigned char has_failed_get_token; + time_t token_multiple_update_time_s; +}; + +struct shaping_profile_hash_node { + int id; + enum shaper_aqm_type aqm_type; + long long in_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX]; + long long out_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX]; + long long last_failed_get_token_ms; + long long last_hmget_ms; + long long queue_len[SHAPING_PRIORITY_NUM_MAX]; + long long local_queue_len[SHAPING_PRIORITY_NUM_MAX]; + long long local_queue_len_update_time_us[SHAPING_PRIORITY_NUM_MAX]; + long long priority_blocked_time_ms[SHAPING_PRIORITY_NUM_MAX]; + int hmget_ref_cnt; + int tconsume_ref_cnt; + struct shaper_token_multiple token_multiple; + struct shaper_aqm_blue_para aqm_blue_para; + unsigned char is_invalid; + struct timeout timeout_handle; + UT_hash_handle hh; +}; + struct shaping_profile_info { int id;//profile_id enum shaping_profile_type type; int priority; - int in_deposit_token_bits; - int out_deposit_token_bits; + long long in_deposit_token_bits; + long long out_deposit_token_bits; long long last_failed_get_token_ms; unsigned long long enqueue_time_us;//to calculate max latency struct shaping_stat_for_profile stat; @@ -114,7 +153,7 @@ struct shaping_rule_info { int id;//rule_id int fair_factor; struct shaping_profile_info primary; - struct shaping_profile_info borrowing[SHAPING_REF_PROFILE_NUM_MAX]; + struct shaping_profile_info borrowing[SHAPING_REF_PROFILE_NUM_MAX - 1]; int borrowing_num; int is_enabled; }; @@ -125,6 +164,7 @@ struct shaping_packet_wrapper { unsigned long long enqueue_time_us;//first enqueue time unsigned int length; int rule_anchor; + int aqm_processed_pf_ids[SHAPING_REF_PROFILE_NUM_MAX]; unsigned char direction; TAILQ_ENTRY(shaping_packet_wrapper) node; }; @@ -157,10 +197,8 @@ struct shaping_flow { unsigned int flag; struct metadata ctrl_meta; unsigned long long processed_pkts; - struct timespec stat_update_time; + unsigned long long stat_update_time_us; time_t check_rule_time; - struct timeout timeout_handle; - time_t last_update_timeout_sec; }; struct shaper_flow_instance { @@ -173,13 +211,13 @@ struct shaping_tconsume_cb_arg { struct shaping_profile_info *profile; struct shaping_flow *sf; unsigned char direction; + unsigned char is_primary_pf; long long start_time_us; }; struct shaping_hmget_cb_arg { struct shaping_thread_ctx *ctx; struct shaping_profile_hash_node *pf_hash_node; - int priority; long long start_time_us; }; @@ -202,13 +240,9 @@ bool shaper_queue_empty(struct shaping_flow *sf); void shaper_packet_dequeue(struct shaping_flow *sf); struct shaping_packet_wrapper* shaper_first_pkt_get(struct shaping_flow *sf); void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx); -/*return value: 0 for success, -1 for failed*/ -int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time); -/*return num of sf_ins*/ -void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf); -int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num); -//enum shaping_packet_action shaper_pkt_action_decide(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int priority, int sf_in_queue); +int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num); +void shaper_profile_hash_node_update(struct shaping_thread_ctx *ctx, struct shaping_profile_info *profile); int shaper_global_conf_init(struct shaping_system_conf *conf); diff --git a/shaping/include/shaper_aqm.h b/shaping/include/shaper_aqm.h new file mode 100644 index 0000000..794c74d --- /dev/null +++ b/shaping/include/shaper_aqm.h @@ -0,0 +1,2 @@ + +int shaper_aqm_need_drop(struct shaping_profile_info *profile, struct shaping_packet_wrapper *pkt_wrapper);
\ No newline at end of file diff --git a/shaping/include/shaper_global_stat.h b/shaping/include/shaper_global_stat.h index 940745e..4c100a7 100644 --- a/shaping/include/shaper_global_stat.h +++ b/shaping/include/shaper_global_stat.h @@ -38,6 +38,7 @@ enum shaping_global_stat_column_index { HIT_POLICY_RX_BYTES_IDX, HIT_POLICY_TX_PKTS_IDX, HIT_POLICY_TX_BYTES_IDX, + HIT_POLICY_TX_SYN_ACK_PKTS_IDX, HIT_POLICY_DROP_PKTS_IDX, HIT_POLICY_DROP_BYTES_IDX, @@ -49,6 +50,7 @@ struct shaping_global_stat_traffic_data { long long rx_bytes; long long tx_pkts; long long tx_bytes; + long long tx_syn_ack_pkts; long long drop_pkts; long long drop_bytes; }; @@ -127,6 +129,7 @@ void shaper_global_stat_throughput_tx_inc(struct shaping_global_stat_data *threa void shaper_global_stat_hit_policy_throughput_rx_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len); void shaper_global_stat_hit_policy_throughput_tx_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len); +void shaper_global_stat_hit_policy_throughput_tx_syn_ack_inc(struct shaping_global_stat_data *thread_global_stat); void shaper_global_stat_hit_policy_drop_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len); void shaper_global_stat_refresh(struct shaping_ctx *ctx);
\ No newline at end of file diff --git a/shaping/include/shaper_stat.h b/shaping/include/shaper_stat.h index d9ca979..5c720a3 100644 --- a/shaping/include/shaper_stat.h +++ b/shaping/include/shaper_stat.h @@ -37,6 +37,7 @@ struct shaping_stat_for_profile_dir { struct shaping_stat_for_profile { struct shaping_stat_for_profile_dir in; struct shaping_stat_for_profile_dir out; + long long priority_queue_len; }; struct shaping_stat { @@ -57,4 +58,5 @@ void shaper_stat_forward_all_rule_inc(struct shaping_stat *stat, struct shaping_ void shaper_stat_drop_inc(struct shaping_stat_for_profile *profile_stat, unsigned char direction, int thread_id); void shaper_stat_max_latency_update(struct shaping_stat_for_profile *profile_stat, unsigned char direction, unsigned long long latency, int thread_id); -void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int thread_id, int force);
\ No newline at end of file +void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int force); +void shaper_stat_priority_queue_len_refresh_all(struct shaping_thread_ctx *ctx, struct shaping_profile_hash_node *profile_hash_node);
\ No newline at end of file diff --git a/shaping/include/shaper_swarmkv.h b/shaping/include/shaper_swarmkv.h index 963fff0..e533802 100644 --- a/shaping/include/shaper_swarmkv.h +++ b/shaping/include/shaper_swarmkv.h @@ -2,5 +2,4 @@ struct swarmkv* shaper_swarmkv_init(int caller_thread_num); void shaper_swarmkv_destroy(struct swarmkv* swarmkv_db); -void swarmkv_reload_log_level(); -int shaper_swarmkv_pending_queue_aqm_drop(struct shaping_thread_ctx *ctx);
\ No newline at end of file +void swarmkv_reload_log_level();
\ No newline at end of file diff --git a/shaping/src/main.cpp b/shaping/src/main.cpp index d31f768..54e2f60 100644 --- a/shaping/src/main.cpp +++ b/shaping/src/main.cpp @@ -58,7 +58,7 @@ static void sig_handler(int signo) swarmkv_reload_log_level(); } - if (signo == SIGQUIT) { + if (signo == SIGQUIT || signo == SIGTERM) { quit = 1; } @@ -82,7 +82,7 @@ int main(int argc, char **argv) return -1; } - if (signal(SIGQUIT, sig_handler) == SIG_ERR) + if (signal(SIGQUIT, sig_handler) == SIG_ERR || signal(SIGTERM, sig_handler) == SIG_ERR) { LOG_ERROR("%s: unable to register SIGQUIT signal handler, error %d: %s", LOG_TAG_SHAPING, errno, strerror(errno)); LOG_CLOSE(); diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 6457025..32b41cb 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -22,25 +22,15 @@ extern "C" { #include "shaper_swarmkv.h" #include "shaper_maat.h" #include "shaper_global_stat.h" +#include "shaper_aqm.h" -#define TOKEN_ENLARGE_TIMES 10 +#define TOKEN_MULTIPLE_UPDATE_INTERVAL_S 1 +#define TOKEN_MULTIPLE_DEFAULT 10 #define TOKEN_GET_FAILED_INTERVAL_MS 1 #define HMGET_REQUEST_INTERVAL_MS 10 #define PRIORITY_BLOCK_MIN_TIME_MS 500 -#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 "HMGET tsg-shaping-%d priority-0" -#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 " priority-1" -#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 " priority-2" -#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_4 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3 " priority-3" -#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_5 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_4 " priority-4" -#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_6 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_5 " priority-5" -#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_7 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_6 " priority-6" -#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_8 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_7 " priority-7" -#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_9 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_8 " priority-8" - -const char *swarmkv_queue_len_get_cmd[] = {"", SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3, - SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_4, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_5, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_6, - SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_7, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_8, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_9}; +#define SWARMKV_QUEUE_LEN_GET_CMD "HMGET tsg-shaping-%d priority-0 priority-1 priority-2 priority-3 priority-4 priority-5 priority-6 priority-7 priority-8 priority-9" struct shaper {//trees in one thread struct avl_tree *priority_trees[SHAPING_PRIORITY_NUM_MAX];//represent 10 avl tree corresponding to 10 priority @@ -62,17 +52,6 @@ enum shaper_token_get_result { SHAPER_TOKEN_GET_PASS = 1,//don't need to get token, regard as success }; -struct shaping_profile_hash_node { - int id; - int in_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX]; - int out_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX]; - long long last_failed_get_token_ms; - long long last_hmget_ms[SHAPING_PRIORITY_NUM_MAX]; - long long priority_blocked_time_ms[SHAPING_PRIORITY_NUM_MAX]; - unsigned char is_invalid; - UT_hash_handle hh; -}; - thread_local struct shaping_profile_hash_node *thread_sp_hashtbl = NULL; struct shaper* shaper_new(unsigned int priority_queue_len_max) @@ -160,8 +139,6 @@ struct shaping_flow* shaping_flow_new(struct shaping_thread_ctx *ctx) TAILQ_INIT(&s_node->shaping_flow.packet_queue); s_node->shaping_flow.priority = SHAPING_PRIORITY_NUM_MAX - 1; - timeout_init(&s_node->shaping_flow.timeout_handle, TIMEOUT_ABS); - timeouts_add(ctx->expires, &s_node->shaping_flow.timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC); s_node->shaping_flow.ref_cnt = 1; @@ -181,8 +158,7 @@ void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) struct shaping_node *s_node = (struct shaping_node*)sf; - timeouts_del(ctx->expires, &sf->timeout_handle); - shaper_stat_refresh(ctx, sf, ctx->thread_index, 1); + shaper_stat_refresh(ctx, sf, 1); shaping_node_free(s_node); return; @@ -199,10 +175,9 @@ void shaper_thread_resource_clear() } } -static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, void *pkt_buff, struct metadata *meta) +static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, void *pkt_buff, struct metadata *meta, struct timespec *curr_time) { struct shaping_packet_wrapper *s_pkt = NULL; - struct timespec curr_time; if (sf->queue_len == ctx->conf.session_queue_len_max) { return -1; @@ -213,14 +188,12 @@ static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_ return -1; } - clock_gettime(CLOCK_MONOTONIC, &curr_time); - s_pkt->pkt_buff = pkt_buff; s_pkt->direction = meta->dir; s_pkt->length = meta->raw_len; s_pkt->rule_anchor = sf->anchor; - s_pkt->income_time_ns = curr_time.tv_sec * NANO_SECONDS_PER_SEC + curr_time.tv_nsec; - s_pkt->enqueue_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; + s_pkt->income_time_ns = curr_time->tv_sec * NANO_SECONDS_PER_SEC + curr_time->tv_nsec; + s_pkt->enqueue_time_us = curr_time->tv_sec * MICRO_SECONDS_PER_SEC + curr_time->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; TAILQ_INSERT_TAIL(&sf->packet_queue, s_pkt, node); sf->queue_len++; @@ -273,14 +246,13 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx) } //return success(0) while any avl tree insert success -int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time) +static int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time) { struct shaping_node *s_node = (struct shaping_node*)sf; struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor]; struct shaper *sp = ctx->sp; struct shaping_packet_wrapper *pkt_wrapper = NULL; int priority; - int ret = -1; int i; pkt_wrapper = shaper_first_pkt_get(sf); @@ -288,8 +260,8 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un priority = s_rule_info->primary.priority; avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns); - if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) { - ret = 0; + if (0 != avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {//primary profile failed means flow push failed, ignore borrow profile + return -1; } if (s_rule_info->borrowing_num == 0) {// no borrow profile @@ -300,18 +272,15 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un priority = s_rule_info->borrowing[i].priority; avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns); if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) { - ret = 0; shaper_stat_queueing_pkt_inc(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index); } } END: - if (ret == 0) {//all avl tree success - s_rule_info->primary.enqueue_time_us = enqueue_time; - shaper_stat_queueing_pkt_inc(&s_rule_info->primary.stat, pkt_wrapper->direction, ctx->thread_index); - } + s_rule_info->primary.enqueue_time_us = enqueue_time; + shaper_stat_queueing_pkt_inc(&s_rule_info->primary.stat, pkt_wrapper->direction, ctx->thread_index); - return ret; + return 0; } static unsigned long long shaper_pkt_latency_us_calculate(struct shaping_profile_info *profile, struct timespec *time) @@ -323,13 +292,12 @@ static unsigned long long shaper_pkt_latency_us_calculate(struct shaping_profile return (curr_time - enqueue_time); } -void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) +static void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct timespec *curr_time) { struct shaping_node *s_node = (struct shaping_node*)sf; struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor]; struct shaper *sp = ctx->sp; struct shaping_packet_wrapper *pkt_wrapper = NULL; - struct timespec curr_time; unsigned long long latency; int priority; int i; @@ -337,8 +305,6 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) pkt_wrapper = shaper_first_pkt_get(sf); assert(pkt_wrapper != NULL); - clock_gettime(CLOCK_MONOTONIC, &curr_time); - priority = s_rule_info->primary.priority; if (avl_node_in_tree(s_node->avl_node[priority])) { avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); @@ -357,13 +323,34 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) } END: - latency = shaper_pkt_latency_us_calculate(&s_rule_info->primary, &curr_time); + latency = shaper_pkt_latency_us_calculate(&s_rule_info->primary, curr_time); shaper_stat_max_latency_update(&s_rule_info->primary.stat, pkt_wrapper->direction, latency, ctx->thread_index); shaper_stat_queueing_pkt_dec(&s_rule_info->primary.stat, pkt_wrapper->direction, ctx->thread_index); return; } +static void shaper_flow_specific_borrow_priority_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int priority) +{ + struct shaping_node *s_node = (struct shaping_node*)sf; + struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor]; + struct shaper *sp = ctx->sp; + struct shaping_packet_wrapper *pkt_wrapper = NULL; + + pkt_wrapper = shaper_first_pkt_get(sf); + assert(pkt_wrapper != NULL); + + for (int i = 0; i < s_rule_info->borrowing_num; i++) { + priority = s_rule_info->borrowing[i].priority; + if (avl_node_in_tree(s_node->avl_node[priority])) { + avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); + shaper_stat_queueing_pkt_dec(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index); + } + } + + return; +} + int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num) { struct avl_node *avl_node = NULL; @@ -391,7 +378,7 @@ int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_i static void shaper_deposit_token_add(struct shaping_profile_info *profile, int req_token_bits, unsigned char direction, int priority) { - int *deposit_token; + long long *deposit_token; struct shaping_profile_hash_node *pf_hash_node = profile->hash_node; switch (profile->type) { @@ -419,6 +406,42 @@ static void shaper_deposit_token_add(struct shaping_profile_info *profile, int r *deposit_token += req_token_bits; } +static void shaper_token_multiple_update(struct shaping_thread_ctx *ctx, struct shaping_profile_info *profile) +{ + if (profile->type != PROFILE_TYPE_GENERIC) { + return; + } + + struct shaper_token_multiple *token_multiple = &profile->hash_node->token_multiple; + int curr_multiple = token_multiple->token_get_multiple; + time_t curr_time_s = time(NULL); + int token_multiple_min = ctx->conf.token_multiple_min; + int token_multiple_max = ctx->conf.token_multiple_max; + + if (curr_time_s - token_multiple->token_multiple_update_time_s < TOKEN_MULTIPLE_UPDATE_INTERVAL_S) { + return; + } + + token_multiple->token_multiple_update_time_s = curr_time_s; + + if (token_multiple->has_failed_get_token) { + token_multiple->token_get_multiple = (curr_multiple - 1) < token_multiple_min ? token_multiple_min : (curr_multiple - 1); + goto END; + } + + if (token_multiple->has_drop_by_queue_full) { + token_multiple->token_get_multiple = (curr_multiple + 1) > token_multiple_max ? token_multiple_max : (curr_multiple + 1); + goto END; + } + +END: + LOG_INFO("%s: profile id %d, token_get_multiple %d, has_failed_get_token %d, has_drop_by_queue_full %d", LOG_TAG_SHAPING, profile->id, token_multiple->token_get_multiple, token_multiple->has_failed_get_token, token_multiple->has_drop_by_queue_full); + token_multiple->has_failed_get_token = 0; + token_multiple->has_drop_by_queue_full = 0; + + return; +} + static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg) { struct shaping_tconsume_cb_arg *arg = (struct shaping_tconsume_cb_arg*)cb_arg; @@ -436,7 +459,7 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg shaper_global_stat_async_callback_inc(&ctx->thread_global_stat); shaper_global_stat_tconsume_callback_inc(&ctx->thread_global_stat); - LOG_INFO("Swarmkv reply type =%d, profile_id %d, direction =%d, integer =%llu",reply->type, profile->id, arg->direction, reply->integer); + LOG_DEBUG("Swarmkv reply type =%d, profile_id %d, direction =%d, integer =%llu",reply->type, profile->id, arg->direction, reply->integer); if (reply->type != SWARMKV_REPLY_INTEGER) { shaper_global_stat_async_tconsume_failed_inc(&ctx->thread_global_stat); @@ -451,16 +474,26 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg } if (reply->integer > 0) { - sf->flag &= (~SESSION_BORROW); shaper_deposit_token_add(profile, reply->integer, arg->direction, profile->priority);//deposit tokens to profile - } else { - sf->flag |= SESSION_BORROW; + } + + if (arg->is_primary_pf) { + if (reply->integer > 0) { + sf->flag &= (~SESSION_BORROW); + } else { + sf->flag |= SESSION_BORROW; + } + } + + if (reply->integer == 0 && profile->type == PROFILE_TYPE_GENERIC) { + pf_hash_node->token_multiple.has_failed_get_token = 1; + shaper_token_multiple_update(ctx, profile); } END: + pf_hash_node->tconsume_ref_cnt--; + if (reply->type != SWARMKV_REPLY_INTEGER || reply->integer == 0) { - struct timespec curr_time; - clock_gettime(CLOCK_MONOTONIC, &curr_time); switch (profile->type) { case PROFILE_TYPE_GENERIC: pf_hash_node->last_failed_get_token_ms = curr_time.tv_sec * MILLI_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MILLI_SEC; @@ -483,7 +516,7 @@ END: static void shaper_deposit_token_sub(struct shaping_profile_info *profile, int req_token_bits, unsigned char direction, int priority) { - int *deposit_token; + long long *deposit_token; struct shaping_profile_hash_node *pf_hash_node = profile->hash_node; switch (profile->type) { @@ -550,31 +583,41 @@ static int shaper_deposit_token_is_enough(struct shaping_profile_info *profile, static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *pf_info, int profile_type, int req_token_bits, unsigned char direction, struct timespec *curr_timespec) { struct shaping_tconsume_cb_arg *arg = NULL; + struct shaping_profile_hash_node *pf_hash_node = pf_info->hash_node; char key[32] = {0}; + if (pf_hash_node->tconsume_ref_cnt > 0) { + return SHAPER_TOKEN_GET_FAILED; + } + snprintf(key, sizeof(key), "tsg-shaping-%d-%s", pf_info->id, direction == SHAPING_DIR_OUT ? "outgoing" : "incoming"); + arg = (struct shaping_tconsume_cb_arg *)calloc(1, sizeof(struct shaping_tconsume_cb_arg)); arg->ctx = ctx; arg->profile = pf_info; arg->sf = sf; arg->direction = direction; arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; + if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) { + arg->is_primary_pf = 1; + } shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat); sheper_global_stat_tconsume_invoke_inc(&ctx->thread_global_stat); sf->ref_cnt++; + pf_hash_node->tconsume_ref_cnt++; switch (pf_info->type) { case PROFILE_TYPE_GENERIC: - swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg); + swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits * pf_hash_node->token_multiple.token_get_multiple, shaper_token_get_cb, arg); break; case PROFILE_TYPE_HOST_FARINESS: case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS: - swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg); + swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits * TOKEN_MULTIPLE_DEFAULT, shaper_token_get_cb, arg); break; case PROFILE_TYPE_SPLIT_BY_LOCAL_HOST: - swarmkv_btconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg); + swarmkv_btconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, req_token_bits * TOKEN_MULTIPLE_DEFAULT, shaper_token_get_cb, arg); break; default: if (arg) { @@ -583,21 +626,6 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct break; } - swarmkv_caller_loop(ctx->swarmkv_db, SWARMKV_LOOP_NONBLOCK, NULL); - - if (pf_info->hash_node->is_invalid) { - if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {//for primary, means this rule don't need get token - return SHAPER_TOKEN_GET_SUCCESS; - } else {//for borrowing, means this profile has no token to borrow - return SHAPER_TOKEN_GET_FAILED; - } - } - - if (shaper_deposit_token_is_enough(pf_info, req_token_bits, direction, pf_info->priority)) { - shaper_deposit_token_sub(pf_info, req_token_bits, direction, pf_info->priority); - return SHAPER_TOKEN_GET_SUCCESS; - } - return SHAPER_TOKEN_GET_FAILED; } @@ -606,7 +634,6 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb struct shaping_hmget_cb_arg *arg = (struct shaping_hmget_cb_arg *)cb_arg; struct shaping_thread_ctx *ctx = arg->ctx; struct shaping_profile_hash_node *pf_hash_node = arg->pf_hash_node; - int priority = arg->priority; struct timespec curr_time; long long curr_time_us; long long curr_time_ms; @@ -632,15 +659,15 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb if (reply->elements[i]->type == SWARMKV_REPLY_STRING) { char tmp_str[32] = {0}; memcpy(tmp_str, reply->elements[i]->str, reply->elements[i]->len); - if (strtoll(tmp_str, NULL, 10) > 0) { - pf_hash_node->priority_blocked_time_ms[priority] = curr_time_ms; - break; - } + pf_hash_node->queue_len[i] = strtoll(tmp_str, NULL, 10); + } else { + pf_hash_node->queue_len[i] = 0; } } END: - pf_hash_node->last_hmget_ms[priority] = curr_time_ms; + pf_hash_node->last_hmget_ms = curr_time_ms; + pf_hash_node->hmget_ref_cnt--; free(cb_arg); cb_arg = NULL; @@ -659,19 +686,31 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st return 0; } - if (curr_time_ms - profile->hash_node->last_hmget_ms[priority] < HMGET_REQUEST_INTERVAL_MS) {//don't send hmget command in 10 ms + if (profile->hash_node->hmget_ref_cnt > 0) {//if hmget command is pending, don't send hmget command again + goto END; + } + + if (curr_time_ms - profile->hash_node->last_hmget_ms < HMGET_REQUEST_INTERVAL_MS) {//don't send hmget command in 10 ms goto END; } arg = (struct shaping_hmget_cb_arg *)calloc(1, sizeof(struct shaping_hmget_cb_arg)); arg->ctx = ctx; arg->pf_hash_node = profile->hash_node; - arg->priority = priority; arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; + profile->hash_node->hmget_ref_cnt++; + shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat); shaper_global_stat_hmget_invoke_inc(&ctx->thread_global_stat); - swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, swarmkv_queue_len_get_cmd[priority], profile->id); + swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, SWARMKV_QUEUE_LEN_GET_CMD, profile->id); + + for (int i = 0; i < priority; i++) { + if (profile->hash_node->queue_len[i] > 0) { + profile->hash_node->priority_blocked_time_ms[priority] = curr_time_ms; + goto END; + } + } END: if (curr_time_ms - profile->hash_node->priority_blocked_time_ms[priority] < PRIORITY_BLOCK_MIN_TIME_MS) { @@ -682,7 +721,7 @@ END: } } -static void shaper_profile_hash_node_update(struct shaping_profile_info *profile) +void shaper_profile_hash_node_update(struct shaping_thread_ctx *ctx, struct shaping_profile_info *profile) { if (profile->hash_node == NULL) { struct shaping_profile_hash_node *hash_node = NULL; @@ -692,7 +731,10 @@ static void shaper_profile_hash_node_update(struct shaping_profile_info *profile } else { profile->hash_node = (struct shaping_profile_hash_node*)calloc(1, sizeof(struct shaping_profile_hash_node)); profile->hash_node->id = profile->id; + profile->hash_node->token_multiple.token_get_multiple = TOKEN_MULTIPLE_DEFAULT; HASH_ADD_INT(thread_sp_hashtbl, id, profile->hash_node); + timeout_init(&profile->hash_node->timeout_handle, TIMEOUT_ABS); + timeouts_add(ctx->expires, &profile->hash_node->timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC); } } @@ -701,7 +743,7 @@ static void shaper_profile_hash_node_update(struct shaping_profile_info *profile static int shaping_swarmkv_is_too_short_interval(long long curr_time_ms, struct shaping_profile_info *profile) { - long long last_failed_ms; + long long last_failed_ms = 0; switch (profile->type) { case PROFILE_TYPE_GENERIC: @@ -721,10 +763,9 @@ static int shaping_swarmkv_is_too_short_interval(long long curr_time_ms, struct } } -static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int req_token_bytes, - struct shaping_profile_info *profile, int profile_type, unsigned char direction) +static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, int profile_type, int req_token_bytes, unsigned char direction, struct timespec *curr_timespec) { - if (profile_type == PROFILE_IN_RULE_TYPE_BORROW && !(sf->flag & SESSION_BORROW)) { + if (profile_type == PROFILE_IN_RULE_TYPE_BORROW && !(sf->flag & SESSION_BORROW)) {//TODO: 会减慢swarmkv请求速度 return SHAPER_TOKEN_GET_FAILED; } @@ -745,31 +786,22 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f return SHAPER_TOKEN_GET_PASS;//rule is disabled, don't need to get token and forward packet } - shaper_profile_hash_node_update(profile); - if (shaper_deposit_token_is_enough(profile, req_token_bytes * 8, direction, profile->priority)) { shaper_deposit_token_sub(profile, req_token_bytes * 8, direction, profile->priority); return SHAPER_TOKEN_GET_SUCCESS; } - struct timespec curr_timespec; - clock_gettime(CLOCK_MONOTONIC, &curr_timespec); - long long curr_time_ms = curr_timespec.tv_sec * MILLI_SECONDS_PER_SEC + curr_timespec.tv_nsec / NANO_SECONDS_PER_MILLI_SEC; + long long curr_time_ms = curr_timespec->tv_sec * MILLI_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MILLI_SEC; if (shaping_swarmkv_is_too_short_interval(curr_time_ms, profile)) { return SHAPER_TOKEN_GET_FAILED; } - if (shaper_swarmkv_pending_queue_aqm_drop(ctx) == 1) { - profile->hash_node->last_failed_get_token_ms = curr_time_ms; - return SHAPER_TOKEN_GET_FAILED; - } - - if (shaper_profile_is_priority_blocked(ctx, sf, profile, &curr_timespec, curr_time_ms)) { + if (shaper_profile_is_priority_blocked(ctx, sf, profile, curr_timespec, curr_time_ms)) { return SHAPER_TOKEN_GET_FAILED; } else { int req_token_bits = req_token_bytes * 8; - return shaper_token_get_from_profile(ctx, sf, profile, profile_type, req_token_bits, direction, &curr_timespec); + return shaper_token_get_from_profile(ctx, sf, profile, profile_type, req_token_bits, direction, curr_timespec); } } @@ -841,19 +873,31 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi pkt_wrapper = shaper_first_pkt_get(sf); assert(pkt_wrapper != NULL); + clock_gettime(CLOCK_MONOTONIC, &curr_time); + if (pf_container[0].pf_type == PROFILE_IN_RULE_TYPE_PRIMARY) { - clock_gettime(CLOCK_MONOTONIC, &curr_time); if (shaper_pkt_latency_us_calculate(pf_container[0].pf_info, &curr_time) > ctx->conf.pkt_max_delay_time_us) { - shaper_flow_pop(ctx, sf); + shaper_flow_pop(ctx, sf, &curr_time); goto DROP; } } - /*todo: AQM, just for primary profile*/ for (int i = 0; i < profile_num; i++) { profile = pf_container[i].pf_info; profile_type = pf_container[i].pf_type; - int ret = shaper_token_consume(ctx, sf, pkt_wrapper->length, profile, profile_type, pkt_wrapper->direction); + + /*AQM process, if aqm not pass, for primary profile drop packet, for borrow profile just don't give token to this packet*/ + if (shaper_aqm_need_drop(profile, pkt_wrapper)) { + if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) { + shaper_flow_pop(ctx, sf, &curr_time); + goto DROP; + } else { + shaper_flow_specific_borrow_priority_pop(ctx, sf, priority); + continue; + } + } + + int ret = shaper_token_consume(ctx, sf, profile, profile_type, pkt_wrapper->length, pkt_wrapper->direction, &curr_time); if (ret >= SHAPER_TOKEN_GET_SUCCESS) { if (ret == SHAPER_TOKEN_GET_SUCCESS) { shaper_stat_forward_inc(&profile->stat, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index); @@ -867,14 +911,13 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi return SHAPING_QUEUED; } - shaper_flow_pop(ctx, sf); + shaper_flow_pop(ctx, sf, &curr_time); sf->anchor = shaper_next_anchor_get(sf, pkt_wrapper->direction); if (sf->anchor == 0) {//no next rule return SHAPING_FORWARD; } //push sf for next rule - clock_gettime(CLOCK_MONOTONIC, &curr_time); enqueue_time = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; if (0 == shaper_flow_push(ctx, sf, enqueue_time)) { return SHAPING_QUEUED; @@ -896,7 +939,8 @@ static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shapi unsigned long long enqueue_time; int enqueue_success = 0; - int ret = shaper_token_consume(ctx, sf, meta->raw_len, profile, profile_type, meta->dir); + clock_gettime(CLOCK_MONOTONIC, &curr_time); + int ret = shaper_token_consume(ctx, sf, profile, profile_type, meta->raw_len, meta->dir, &curr_time); if (ret >= SHAPER_TOKEN_GET_SUCCESS) { if (ret == SHAPER_TOKEN_GET_SUCCESS) { shaper_stat_forward_inc(&profile->stat, meta->dir, meta->raw_len, ctx->thread_index); @@ -905,13 +949,11 @@ static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shapi sf->anchor = shaper_next_anchor_get(sf, meta->dir); if (sf->anchor == 0) {//no next rule return SHAPING_FORWARD; - } else { - goto FLOW_PUSH; } } -FLOW_PUSH: - if (shaper_packet_enqueue(ctx, sf, rx_buff, meta) == 0) { + //get token failed, or have multiple rules, enqueue packet and push sf + if (shaper_packet_enqueue(ctx, sf, rx_buff, meta, &curr_time) == 0) { enqueue_success = 1; } else { char *addr_str = addr_tuple4_to_str(&sf->tuple4); @@ -922,7 +964,6 @@ FLOW_PUSH: goto DROP; } - clock_gettime(CLOCK_MONOTONIC, &curr_time); enqueue_time = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; if (0 == shaper_flow_push(ctx, sf, enqueue_time)) { return SHAPING_QUEUED; @@ -934,8 +975,12 @@ DROP: if (enqueue_success) { shaper_packet_dequeue(sf); } - shaper_stat_drop_inc(&sf->matched_rule_infos[sf->anchor].primary.stat, meta->dir, ctx->thread_index); + struct shaping_profile_info *pf_info = &sf->matched_rule_infos[sf->anchor].primary; + shaper_stat_drop_inc(&pf_info->stat, meta->dir, ctx->thread_index); sf->anchor = 0; + + pf_info->hash_node->token_multiple.has_drop_by_queue_full = 1; + shaper_token_multiple_update(ctx, pf_info); return SHAPING_DROP; } @@ -980,7 +1025,7 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ break; } - shaper_stat_refresh(ctx, sf, ctx->thread_index, 0); + shaper_stat_refresh(ctx, sf, 0); if (shaper_queue_empty(sf)) { if (sf->flag & SESSION_CLOSE) { @@ -1010,7 +1055,6 @@ static void shaper_token_consume_force(struct shaping_flow *sf, struct metadata for (int i = 0; i < sf->rule_num; i++) { rule = &sf->matched_rule_infos[i]; - shaper_profile_hash_node_update(&rule->primary); shaper_deposit_token_sub(&rule->primary, meta->raw_len * 8, meta->dir, rule->primary.priority); } @@ -1031,17 +1075,25 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1); shaper_global_stat_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len); shaper_global_stat_hit_policy_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len); + shaper_global_stat_hit_policy_throughput_tx_syn_ack_inc(&ctx->thread_global_stat); shaper_stat_forward_all_rule_inc(stat, sf, meta->dir, meta->raw_len, ctx->thread_index); goto END;//for tcp pure control pkt, transmit it directly } if (!shaper_queue_empty(sf)) {//already have queueing pkt, enqueue directly + struct timespec curr_time; + clock_gettime(CLOCK_MONOTONIC, &curr_time); + s_rule = &sf->matched_rule_infos[sf->anchor]; - if (0 == shaper_packet_enqueue(ctx, sf, rx_buff, meta)) { + if (0 == shaper_packet_enqueue(ctx, sf, rx_buff, meta, &curr_time)) { shaper_stat_queueing_pkt_inc_for_rule(s_rule, meta->dir, ctx->thread_index); shaper_global_stat_queueing_inc(&ctx->thread_global_stat, meta->raw_len); } else { - shaper_stat_drop_inc(&s_rule->primary.stat, meta->dir, ctx->thread_index); + struct shaping_profile_info *pf_info = &s_rule->primary; + pf_info->hash_node->token_multiple.has_drop_by_queue_full = 1; + shaper_token_multiple_update(ctx, pf_info); + + shaper_stat_drop_inc(&pf_info->stat, meta->dir, ctx->thread_index); shaper_global_stat_drop_inc(&ctx->thread_global_stat, meta->raw_len); shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, meta->raw_len); marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index); @@ -1072,12 +1124,7 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu } END: - shaper_stat_refresh(ctx, sf, ctx->thread_index, 0); - time_t curr_time = time(NULL); - if (curr_time > sf->last_update_timeout_sec) { - timeouts_add(ctx->expires, &sf->timeout_handle, curr_time + SHAPING_STAT_REFRESH_INTERVAL_SEC);//timeouts_add will delete if sf exist, then add - sf->last_update_timeout_sec = curr_time; - } + shaper_stat_refresh(ctx, sf, 0); if(sf->flag & SESSION_CLOSE) { if (shaper_queue_empty(sf)) { @@ -1100,7 +1147,7 @@ void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_ { swarmkv_caller_loop(ctx->swarmkv_db, SWARMKV_LOOP_NONBLOCK, NULL); - struct shaping_flow *sf = NULL; + struct shaping_profile_hash_node *hash_node = NULL; time_t curr_time = time(NULL); int cnt = 0; @@ -1116,9 +1163,9 @@ void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_ break; } - sf = container_of(t, struct shaping_flow, timeout_handle); - shaper_stat_refresh(ctx, sf, ctx->thread_index, 0); - timeouts_add(ctx->expires, &sf->timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC);//timeouts_get will delete sf from queue, add it back + hash_node = container_of(t, struct shaping_profile_hash_node, timeout_handle); + shaper_stat_priority_queue_len_refresh_all(ctx, hash_node); + timeouts_add(ctx->expires, &hash_node->timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC);//timeouts_get will delete item from queue, add it back cnt++; } @@ -1228,7 +1275,6 @@ void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx) return; } - prefetch(rx_buff[0]); for (int i = 0; i < rx_num; i++) { if (marsio_buff_is_ctrlbuf(rx_buff[i])) { sf = shaper_ctrl_pkt_session_handle(ctx, rx_buff[i], &meta); @@ -1245,8 +1291,6 @@ void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx) } polling_entry(ctx->sp, ctx->stat, ctx); - - prefetch(rx_buff[i+1]); } return; @@ -1352,6 +1396,8 @@ int shaper_global_conf_init(struct shaping_system_conf *conf) MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "CHECK_RULE_ENABLE_INTERVAL_SEC", &conf->check_rule_enable_interval_sec, 120); MESA_load_profile_uint_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "PKT_MAX_DELAY_TIME_US", &conf->pkt_max_delay_time_us, 2000000); + MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "TOKEN_MULTIPLE_MIN", &conf->token_multiple_min, 10); + MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "TOKEN_MULTIPLE_MAX", &conf->token_multiple_max, 50); return 0; @@ -1451,4 +1497,4 @@ struct shaping_ctx *shaping_engine_init() ERROR: shaping_engine_destroy(ctx); return NULL; -}
\ No newline at end of file +} diff --git a/shaping/src/shaper_aqm.cpp b/shaping/src/shaper_aqm.cpp new file mode 100644 index 0000000..20c4190 --- /dev/null +++ b/shaping/src/shaper_aqm.cpp @@ -0,0 +1,82 @@ +#include <time.h> +#include "shaper.h" +#include "shaper_aqm.h" + +#define PROBABILITY_MAX 100 +#define INCREMENT 10 +#define DECREMENT 1 +#define FREEZE_TIME 2 //unit:s +#define QUEUE_LEN_MAX 100 +static int shaper_aqm_blue_need_drop(struct shaping_packet_wrapper *pkt_wrapper, struct shaper_aqm_blue_para *para, int curr_queue_len) +{ + time_t curr_time; + + if (time(&curr_time) - para->update_time >= FREEZE_TIME) { + para->update_time = curr_time; + if (curr_queue_len >= QUEUE_LEN_MAX) { + para->probability = (para->probability + INCREMENT) > PROBABILITY_MAX ? PROBABILITY_MAX : (para->probability + INCREMENT); + } else if (curr_queue_len == 0) { + para->probability = (para->probability - DECREMENT) >= 0 ? (para->probability - DECREMENT) : 0; + } + } + + if (rand() % PROBABILITY_MAX < para->probability) { + return 1; + } + + return 0; +} + +static int shaper_aqm_have_processed(struct shaping_packet_wrapper *pkt_wrapper, int profile_id) +{ + int i = 0; + + for (i = 0; i < SHAPING_REF_PROFILE_NUM_MAX; i++) { + if (pkt_wrapper->aqm_processed_pf_ids[i] == profile_id) { + return 1; + } else if (pkt_wrapper->aqm_processed_pf_ids[i] == 0) { + break; + } + } + + return 0; +} + +static void shaper_aqm_mark_processed(struct shaping_packet_wrapper *pkt_wrapper, int profile_id) +{ + int i = 0; + + for (i = 0; i < SHAPING_REF_PROFILE_NUM_MAX; i++) { + if (pkt_wrapper->aqm_processed_pf_ids[i] == 0) { + pkt_wrapper->aqm_processed_pf_ids[i] = profile_id; + break; + } + } +} + +int shaper_aqm_need_drop(struct shaping_profile_info *profile, struct shaping_packet_wrapper *pkt_wrapper) +{ + int ret = 0; + + if (profile->hash_node->aqm_type == AQM_TYPE_NONE) { + return 0; + } + + if (shaper_aqm_have_processed(pkt_wrapper, profile->id)) { + return 0; + } + + switch (profile->hash_node->aqm_type) { + case AQM_TYPE_BLUE: + ret = shaper_aqm_blue_need_drop(pkt_wrapper, &profile->hash_node->aqm_blue_para, profile->hash_node->queue_len[profile->priority]); + break; + case AQM_TYPE_CODEL: + break; + default: + break; + } + + shaper_aqm_mark_processed(pkt_wrapper, profile->id); + + return ret; +}
\ No newline at end of file diff --git a/shaping/src/shaper_global_stat.cpp b/shaping/src/shaper_global_stat.cpp index 7144658..d59b290 100644 --- a/shaping/src/shaper_global_stat.cpp +++ b/shaping/src/shaper_global_stat.cpp @@ -63,6 +63,7 @@ static void shaper_global_stat_fieldstat_reg(struct shaping_global_stat *stat) stat->column_ids[HIT_POLICY_RX_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_rx_bytes", NULL, 0); stat->column_ids[HIT_POLICY_TX_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_tx_pkts", NULL, 0); stat->column_ids[HIT_POLICY_TX_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_tx_bytes", NULL, 0); + stat->column_ids[HIT_POLICY_TX_SYN_ACK_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_tx_syn_ack_pkts", NULL, 0); stat->column_ids[HIT_POLICY_DROP_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_drop_pkts", NULL, 0); stat->column_ids[HIT_POLICY_DROP_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_drop_bytes", NULL, 0); @@ -344,6 +345,12 @@ void shaper_global_stat_hit_policy_throughput_tx_inc(struct shaping_global_stat_ data->tx_bytes += pkt_len; } +void shaper_global_stat_hit_policy_throughput_tx_syn_ack_inc(struct shaping_global_stat_data *thread_global_stat) +{ + struct shaping_global_stat_traffic_data *data = &thread_global_stat->hit_policy_traffic; + data->tx_syn_ack_pkts++; +} + void shaper_global_stat_hit_policy_drop_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len) { struct shaping_global_stat_traffic_data *data = &thread_global_stat->hit_policy_traffic; @@ -402,6 +409,7 @@ void shaper_global_stat_refresh(struct shaping_ctx *ctx) sum.hit_policy_traffic.rx_bytes += stat_data[i].hit_policy_traffic.rx_bytes; sum.hit_policy_traffic.tx_pkts += stat_data[i].hit_policy_traffic.tx_pkts; sum.hit_policy_traffic.tx_bytes += stat_data[i].hit_policy_traffic.tx_bytes; + sum.hit_policy_traffic.tx_syn_ack_pkts += stat_data[i].hit_policy_traffic.tx_syn_ack_pkts; sum.hit_policy_traffic.drop_pkts += stat_data[i].hit_policy_traffic.drop_pkts; sum.hit_policy_traffic.drop_bytes += stat_data[i].hit_policy_traffic.drop_bytes; } @@ -446,6 +454,7 @@ void shaper_global_stat_refresh(struct shaping_ctx *ctx) fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_RX_BYTES_IDX], hit_policy_traffic_data->rx_bytes); fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_TX_PKTS_IDX], hit_policy_traffic_data->tx_pkts); fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_TX_BYTES_IDX], hit_policy_traffic_data->tx_bytes); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_TX_SYN_ACK_PKTS_IDX], hit_policy_traffic_data->tx_syn_ack_pkts); fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_DROP_PKTS_IDX], hit_policy_traffic_data->drop_pkts); fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_DROP_BYTES_IDX], hit_policy_traffic_data->drop_bytes); diff --git a/shaping/src/shaper_maat.cpp b/shaping/src/shaper_maat.cpp index f159c2f..8a0fa88 100644 --- a/shaping/src/shaper_maat.cpp +++ b/shaping/src/shaper_maat.cpp @@ -253,15 +253,16 @@ void shaper_profile_ex_free(int table_id, void **ad, long argl, void *argp) return; } -void shaper_profile_update(struct shaping_profile_info *s_pf_info, struct shaping_profile *s_pf_ex) +void shaper_profile_update(struct shaping_thread_ctx *ctx, struct shaping_profile_info *s_pf_info, struct shaping_profile *s_pf_ex) { s_pf_info->id = s_pf_ex->id; s_pf_info->type = s_pf_ex->type; + shaper_profile_hash_node_update(ctx, s_pf_info); return; } -static void shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_rule_info *s_rule_info, long long rule_compile_id, int *priority_changed) +static int shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_rule_info *s_rule_info, long long rule_compile_id, int *priority_changed) { struct shaping_rule *s_rule = NULL; struct shaping_profile *s_pf = NULL; @@ -270,7 +271,7 @@ static void shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_fl s_rule = (struct shaping_rule*)maat_plugin_table_get_ex_data(g_maat_instance, ctx->maat_info->rule_table_id, (char *)&rule_compile_id, sizeof(rule_compile_id)); if (!s_rule) { LOG_ERROR("%s maat_plugin_table_get_ex_data get rule failed for compile id %lld", LOG_TAG_MAAT, rule_compile_id); - goto END; + return -1; } s_rule_info->id = s_rule->id; s_rule_info->fair_factor = s_rule->fair_factor; @@ -282,9 +283,9 @@ static void shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_fl s_pf = (struct shaping_profile *)maat_plugin_table_get_ex_data(g_maat_instance, ctx->maat_info->profile_table_id, pf_id_key, strlen(pf_id_key)); if (!s_pf) { LOG_ERROR("%s maat_plugin_table_get_ex_data get profile failed for key %s", LOG_TAG_MAAT, pf_id_key); - goto END; + return -1; } - shaper_profile_update(&s_rule_info->primary, s_pf); + shaper_profile_update(ctx, &s_rule_info->primary, s_pf); if (sf->processed_pkts <= CONFIRM_PRIORITY_PKTS) { if (sf->priority > s_rule->priority) { @@ -294,7 +295,7 @@ static void shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_fl } if (s_rule->borrow_pf_num == 0) { - goto END; + return 0; } for (int i = 0; i < s_rule->borrow_pf_num; i++) { @@ -303,15 +304,14 @@ static void shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_fl s_pf = (struct shaping_profile *)maat_plugin_table_get_ex_data(g_maat_instance, ctx->maat_info->profile_table_id, pf_id_key, strlen(pf_id_key)); if (!s_pf) { LOG_ERROR("%s maat_plugin_table_get_ex_data get profile failed for key %s", LOG_TAG_MAAT, pf_id_key); - goto END; + return -1; } - shaper_profile_update(&s_rule_info->borrowing[i], s_pf); + shaper_profile_update(ctx, &s_rule_info->borrowing[i], s_pf); s_rule_info->borrowing_num++; } -END: - return; + return 0; } static void shaper_profiles_priority_update(struct shaping_flow *sf) @@ -360,10 +360,10 @@ static int shaper_rules_dup_remove(struct shaping_flow *sf, long long *rule_comp void shaper_rules_update(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, long long *rule_compile_ids, int rule_num) { - int i, j; int priority_changed = 0; long long rule_ids_remove_dup[SHAPING_RULE_NUM_MAX] = {0}; int rule_num_remove_dup = 0; + int old_rule_num = sf->rule_num; if (rule_num > SHAPING_RULE_NUM_MAX) { char *addr_str = addr_tuple4_to_str(&sf->tuple4); @@ -388,15 +388,16 @@ void shaper_rules_update(struct shaping_thread_ctx *ctx, struct shaping_flow *sf return; } - for (i = sf->rule_num, j = 0; i < sf->rule_num + rule_num_remove_dup; i++, j++) { - shaper_rule_update(ctx, sf, &sf->matched_rule_infos[i], rule_ids_remove_dup[j], &priority_changed); + for (int i = 0; i < rule_num_remove_dup; i++) { + if (shaper_rule_update(ctx, sf, &sf->matched_rule_infos[sf->rule_num], rule_ids_remove_dup[i], &priority_changed) == 0) { + sf->rule_num++; + } } - if (sf->rule_num > 0 && priority_changed) { - shaper_stat_refresh(ctx, sf, ctx->thread_index, 1); + if (old_rule_num > 0 && priority_changed) { + shaper_stat_refresh(ctx, sf, 1); } - sf->rule_num += rule_num_remove_dup; shaper_profiles_priority_update(sf); return; diff --git a/shaping/src/shaper_session.cpp b/shaping/src/shaper_session.cpp index 4a42391..4384a06 100644 --- a/shaping/src/shaper_session.cpp +++ b/shaping/src/shaper_session.cpp @@ -122,11 +122,16 @@ static void shaper_session_log_send(struct shaping_thread_ctx *ctx, struct shapi int pkt_header_len = sf->ctrl_meta.l7_offset; struct metadata *ctrl_meta = &sf->ctrl_meta; struct sids sids; + char *dst = NULL; char *addr_str = addr_tuple4_to_str(&sf->tuple4); - marsio_buff_malloc_global(ctx->marsio_info->instance, &tx_buff, 1, 0, ctx->thread_index); + if (marsio_buff_malloc_global(ctx->marsio_info->instance, &tx_buff, 1, 0, ctx->thread_index) < 0) { + LOG_ERROR("%s: marsio_buff_malloc_global failed for session %s", LOG_TAG_SHAPING, addr_str); + goto END; + } + marsio_buff_set_ctrlbuf(tx_buff); - char *dst = marsio_buff_append(tx_buff, pkt_header_len + mpack_size); + dst = marsio_buff_append(tx_buff, pkt_header_len + mpack_size); memcpy(dst, pkt_header_data, pkt_header_len); memcpy(dst + pkt_header_len, mpack_data, mpack_size); diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp index 735e154..808abd1 100644 --- a/shaping/src/shaper_stat.cpp +++ b/shaping/src/shaper_stat.cpp @@ -14,7 +14,7 @@ #define SHAPER_STAT_ROW_NAME "traffic_shaping_rule_hits" -#define SHAPER_STAT_REFRESH_TIME_NS 10000000 //10 ms +#define SHAPER_STAT_REFRESH_TIME_US 10000 //10 ms struct shaper_stat_conf { int enable_backgroud_thread; @@ -150,10 +150,11 @@ static void shaper_stat_swarmkv_hincrby_cb(const struct swarmkv_reply *reply, vo if (reply->type != SWARMKV_REPLY_INTEGER) { shaper_global_stat_async_hincrby_failed_inc(&ctx->thread_global_stat); + arg->start_time_us = curr_time_us; shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat);//hincrby failed, retry shaper_global_stat_hincrby_invoke_inc(&ctx->thread_global_stat); - LOG_INFO("%s: shaping stat hincrby failed, retry for profile id %d priority %d, operate queue_len %lld", LOG_TAG_STAT, arg->profile_id, arg->priority, arg->queue_len); + LOG_DEBUG("%s: shaping stat hincrby failed, retry for profile id %d priority %d, operate queue_len %lld", LOG_TAG_STAT, arg->profile_id, arg->priority, arg->queue_len); swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, arg, "HINCRBY tsg-shaping-%d priority-%d %lld", arg->profile_id, arg->priority, arg->queue_len); return; @@ -164,13 +165,67 @@ static void shaper_stat_swarmkv_hincrby_cb(const struct swarmkv_reply *reply, vo return; } -static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, int vsys_id, int thread_id, int rule_id, struct shaping_profile_info *profile, int profile_type, int need_update_guage) +static void shaper_stat_priority_queue_len_refresh(struct shaping_thread_ctx *ctx, struct shaping_profile_hash_node *profile_hash_node, int priority, long long curr_time_us) +{ + if (profile_hash_node->local_queue_len[priority] == 0) { + return; + } + + if (curr_time_us - profile_hash_node->local_queue_len_update_time_us[priority] < SHAPER_STAT_REFRESH_TIME_US) { + return; + } + + struct shaping_hincrby_cb_arg *arg = (struct shaping_hincrby_cb_arg *)calloc(1, sizeof(struct shaping_hincrby_cb_arg)); + + arg->ctx = ctx; + arg->start_time_us = curr_time_us; + arg->profile_id = profile_hash_node->id; + arg->priority = priority; + arg->queue_len = profile_hash_node->local_queue_len[priority]; + shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat); + shaper_global_stat_hincrby_invoke_inc(&ctx->thread_global_stat); + swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, arg, "HINCRBY tsg-shaping-%d priority-%d %lld", arg->profile_id, arg->priority, arg->queue_len); + + profile_hash_node->local_queue_len_update_time_us[priority] = curr_time_us; + profile_hash_node->local_queue_len[priority] = 0; + + return; +} + +void shaper_stat_priority_queue_len_refresh_all(struct shaping_thread_ctx *ctx, struct shaping_profile_hash_node *profile_hash_node) +{ + struct timespec curr_time; + long long curr_time_us; + + clock_gettime(CLOCK_MONOTONIC_COARSE, &curr_time); + curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; + + for (int i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) { + shaper_stat_priority_queue_len_refresh(ctx, profile_hash_node, i, curr_time_us); + } + + return; +} + +static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, struct shaping_rule_info *rule, struct shaping_profile_info *profile, int profile_type, int need_refresh_stat, int need_update_guage, long long curr_time_us) { struct shaping_stat_for_profile *profile_stat = &profile->stat; struct shaping_stat *stat = ctx->stat; + int priority = profile->priority; + int thread_id = ctx->thread_index; unsigned long long old_latency; + + if (need_update_guage) { + profile->hash_node->local_queue_len[priority] += profile_stat->priority_queue_len; + profile_stat->priority_queue_len = 0; + shaper_stat_priority_queue_len_refresh(ctx, profile->hash_node, priority, curr_time_us); + } + + if (!need_refresh_stat) { + return; + } - shaper_stat_tags_build(vsys_id, rule_id, profile->id, profile->priority, profile_type); + shaper_stat_tags_build(rule->vsys_id, rule->id, profile->id, priority, profile_type); fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_DROP_PKTS_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.drop_pkts, tags, TAG_IDX_MAX, thread_id); fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_PKTS_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.pkts, tags, TAG_IDX_MAX, thread_id); fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_BYTES_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.bytes, tags, TAG_IDX_MAX, thread_id); @@ -195,19 +250,6 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, i fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_QUEUE_LEN_IDX], SHAPER_STAT_ROW_NAME, profile_stat->out.queue_len, tags, TAG_IDX_MAX, thread_id); } - struct shaping_hincrby_cb_arg *arg = (struct shaping_hincrby_cb_arg *)calloc(1, sizeof(struct shaping_hincrby_cb_arg)); - struct timespec curr_time; - - clock_gettime(CLOCK_MONOTONIC, &curr_time); - arg->ctx = ctx; - arg->start_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; - arg->profile_id = profile->id; - arg->priority = profile->priority; - arg->queue_len = profile_stat->in.queue_len + profile_stat->out.queue_len; - shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat); - shaper_global_stat_hincrby_invoke_inc(&ctx->thread_global_stat); - swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, arg, "HINCRBY tsg-shaping-%d priority-%d %lld", arg->profile_id, arg->priority, arg->queue_len); - memset(profile_stat, 0, sizeof(struct shaping_stat_for_profile)); } else { profile_stat->in.pkts = 0; @@ -224,34 +266,37 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, i return; } -void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int thread_id, int force) +void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int force) { struct shaping_rule_info *rule; struct timespec curr_time; int need_refresh = 0; + long long curr_time_us; + + clock_gettime(CLOCK_MONOTONIC_COARSE, &curr_time); + curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; if (force) { need_refresh = 1; } else { - clock_gettime(CLOCK_MONOTONIC, &curr_time); - if (curr_time.tv_sec - sf->stat_update_time.tv_sec > 0 || curr_time.tv_nsec - sf->stat_update_time.tv_nsec >= SHAPER_STAT_REFRESH_TIME_NS) { + if (curr_time_us - sf->stat_update_time_us >= SHAPER_STAT_REFRESH_TIME_US) { need_refresh = 1; - memcpy(&sf->stat_update_time, &curr_time, sizeof(struct timespec)); + sf->stat_update_time_us = curr_time_us; } } - if (!need_refresh) { + int need_update_guage = sf->processed_pkts > CONFIRM_PRIORITY_PKTS ? 1 : 0; + + if (!need_refresh && !need_update_guage) { return; } - int need_update_guage = sf->processed_pkts > CONFIRM_PRIORITY_PKTS ? 1 : 0; - for (int i = 0; i < sf->rule_num; i++) { rule = &sf->matched_rule_infos[i]; - shaper_stat_profile_metirc_refresh(ctx, rule->vsys_id, thread_id, rule->id, &rule->primary, PROFILE_IN_RULE_TYPE_PRIMARY, need_update_guage); + shaper_stat_profile_metirc_refresh(ctx, rule, &rule->primary, PROFILE_IN_RULE_TYPE_PRIMARY, need_refresh, need_update_guage, curr_time_us); for (int j = 0; j < rule->borrowing_num; j++) { - shaper_stat_profile_metirc_refresh(ctx, rule->vsys_id, thread_id, rule->id, &rule->borrowing[j], PROFILE_IN_RULE_TYPE_BORROW, need_update_guage); + shaper_stat_profile_metirc_refresh(ctx, rule, &rule->borrowing[j], PROFILE_IN_RULE_TYPE_BORROW, need_refresh, need_update_guage, curr_time_us); } } @@ -303,6 +348,8 @@ void shaper_stat_queueing_pkt_inc(struct shaping_stat_for_profile *profile_stat, profile_stat->out.queue_len++; } + profile_stat->priority_queue_len++; + return; } @@ -314,6 +361,8 @@ void shaper_stat_queueing_pkt_dec(struct shaping_stat_for_profile *profile_stat, profile_stat->out.queue_len--; } + profile_stat->priority_queue_len--; + return; } diff --git a/shaping/src/shaper_swarmkv.cpp b/shaping/src/shaper_swarmkv.cpp index 81d8ac9..6ac1db5 100644 --- a/shaping/src/shaper_swarmkv.cpp +++ b/shaping/src/shaper_swarmkv.cpp @@ -6,7 +6,7 @@ #include "utils.h" #include "shaper_swarmkv.h" -#define PROBABILITY_MAX 100 +#define PROBABILITY_MAX 500 #define INCREMENT 10 #define DECREMENT 1 #define FREEZE_TIME 1 //unit:s @@ -106,36 +106,6 @@ void swarmkv_reload_log_level() return; } -int shaper_swarmkv_pending_queue_aqm_drop(struct shaping_thread_ctx *ctx) -{ - long long pending_queue_len = swarmkv_caller_get_pending_commands(ctx->swarmkv_db); - time_t now = time(NULL); - - if (now - ctx->swarmkv_aqm_update_time < FREEZE_TIME) { - goto END; - } - - if (pending_queue_len > PENDING_QUEUE_LEN_MAX) { - if (ctx->swarmkv_aqm_prob < PROBABILITY_MAX) { - ctx->swarmkv_aqm_prob += INCREMENT; - } - LOG_DEBUG("%s: shaping pending queue len %lld, aqm prob %d", LOG_TAG_SWARMKV, pending_queue_len, ctx->swarmkv_aqm_prob); - } else { - if (ctx->swarmkv_aqm_prob >= DECREMENT) { - ctx->swarmkv_aqm_prob -= DECREMENT; - } - LOG_DEBUG("%s: shaping pending queue len %lld, aqm prob %d", LOG_TAG_SWARMKV, pending_queue_len, ctx->swarmkv_aqm_prob); - } - ctx->swarmkv_aqm_update_time = now; - -END: - if (rand() % PROBABILITY_MAX < ctx->swarmkv_aqm_prob) { - return 1; - } - - return 0; -} - struct swarmkv* shaper_swarmkv_init(int caller_thread_num) { struct swarmkv_options *swarmkv_opts = NULL; diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp index f262412..2b8f296 100644 --- a/shaping/test/gtest_shaper.cpp +++ b/shaping/test/gtest_shaper.cpp @@ -287,7 +287,7 @@ TEST(single_session, udp_tx_in_order) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 170000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts + shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); @@ -366,7 +366,7 @@ TEST(max_min_host_fairness_profile, udp_tx_in_order) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 170000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts + shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); @@ -419,7 +419,7 @@ TEST(single_session, tcp_tx_in_order) /***********send stat data here********************/ - shaper_stat_refresh(&ctx->thread_ctx[0], sf, ctx->thread_ctx[0].thread_index, 1); + shaper_stat_refresh(&ctx->thread_ctx[0], sf, 1); fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy sleep(2);//wait telegraf generate metric @@ -432,7 +432,7 @@ TEST(single_session, tcp_tx_in_order) ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));//pure ctrl pkts force consume 1000 tokens, current token: -1000--->0, so no pkt can be sent stub_refresh_token_bucket(0); - for (int i = 0; i < 10; i++) {//10 pkts which is not pure control + for (int i = 0; i < 11; i++) {//10 pkts which is not pure control, first polling request 10 times token, then 10 loops send 10 pkts polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); } @@ -459,7 +459,7 @@ TEST(single_session, tcp_tx_in_order) shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 10, 0, SHAPING_DIR_OUT, profile_type_primary); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 10, 1000, 0, 0, 30000, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 0, 0, 1, 10, 1000, 0, 0, 31000, SHAPING_DIR_OUT, profile_type_primary); fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file @@ -506,7 +506,7 @@ TEST(single_session, udp_diff_direction) ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); stub_refresh_token_bucket(0); - for (int i = 0; i < 20; i++) { + for (int i = 0; i < 22; i++) {//first polling just request token and don't send pkt polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); } @@ -534,9 +534,9 @@ TEST(single_session, udp_diff_direction) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 20000, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 21000, SHAPING_DIR_OUT, profile_type_primary); - shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 20000, SHAPING_DIR_IN, profile_type_primary); + shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 22000, SHAPING_DIR_IN, profile_type_primary); fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); @@ -575,7 +575,7 @@ TEST(single_session, udp_multi_rules) shaper_rules_update(&ctx->thread_ctx[0], sf, rule_id, 3); /*******send packets***********/ - send_packets(&ctx->thread_ctx[0], sf, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue, 2, 0); + send_packets(&ctx->thread_ctx[0], sf, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue, 5, 0); //first 10 packets @@ -612,13 +612,13 @@ TEST(single_session, udp_multi_rules) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 0 - shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 506000, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 507000, SHAPING_DIR_OUT, profile_type_primary); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1 - shaping_stat_judge(line, 1, 1, 1, 100, 10000, 0, 0, 1000, SHAPING_DIR_OUT, profile_type_primary);//latency of every queued pkt is 1 + shaping_stat_judge(line, 1, 1, 1, 100, 10000, 0, 0, 2000, SHAPING_DIR_OUT, profile_type_primary); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2 - shaping_stat_judge(line, 2, 2, 1, 100, 10000, 0, 0, 90000, SHAPING_DIR_OUT, profile_type_primary);//max latency is first queued pkt + shaping_stat_judge(line, 2, 2, 1, 100, 10000, 0, 0, 91000, SHAPING_DIR_OUT, profile_type_primary);//max latency is first queued pkt fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file @@ -687,7 +687,7 @@ TEST(single_session, udp_borrow) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1, primary - shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 170000, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, borrow shaping_stat_judge(line, 1, 2, 2, 100, 10000, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow); @@ -762,7 +762,7 @@ TEST(single_session, udp_borrow_same_priority_9) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1, primary - shaping_stat_judge(line, 1, 1, 9, 0, 0, 0, 0, 170000, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 1, 1, 9, 0, 0, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary); #if 0 //fieldstat don't output a row when all values is zero ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, borrow @@ -777,312 +777,6 @@ TEST(single_session, udp_borrow_same_priority_9) fclose(stat_file); } -/*session1 match rule1, session2 match rule2 - rule1: - priority:2 - profile1: limit 1000 - rule2: - priority:1 - profile2: limit 1000*/ -TEST(two_session_diff_priority, udp_in_order) -{ - struct stub_pkt_queue expec_tx_queue1; - struct stub_pkt_queue expec_tx_queue2; - struct stub_pkt_queue *actual_tx_queue; - struct shaping_ctx *ctx = NULL; - struct shaping_flow *sf1 = NULL; - struct shaping_flow *sf2 = NULL; - long long rule_ids[] = {0, 1}; - long long rule_id1[] = {0}; - long long rule_id2[] = {1}; - int profile_nums[] = {1, 1}; - int prioritys[] = {2, 1}; - int profile_ids[][MAX_REF_PROFILE] = {{0}, {1}}; - - stub_init(); - TAILQ_INIT(&expec_tx_queue1); - TAILQ_INIT(&expec_tx_queue2); - ctx = shaping_engine_init(); - ASSERT_TRUE(ctx != NULL); - sf1 = shaping_flow_new(&ctx->thread_ctx[0]); - ASSERT_TRUE(sf1 != NULL); - sf2 = shaping_flow_new(&ctx->thread_ctx[0]); - ASSERT_TRUE(sf2 != NULL); - - stub_set_matched_shaping_rules(2, rule_ids, prioritys, profile_nums, profile_ids); - stub_set_token_bucket_avl_per_sec(0, 1000, SHAPING_DIR_OUT); - stub_set_token_bucket_avl_per_sec(1, 1000, SHAPING_DIR_OUT); - - actual_tx_queue = stub_get_tx_queue(); - shaper_rules_update(&ctx->thread_ctx[0], sf1, rule_id1, 1); - shaper_rules_update(&ctx->thread_ctx[0], sf2, rule_id2, 1); - - - /*******send packets***********/ - send_packets(&ctx->thread_ctx[0], sf1, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 1, 0); - send_packets(&ctx->thread_ctx[0], sf2, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue2, 1, 0); - - - //first 10 packets for stream1 - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10)); - - //first 10 packets for stream2 - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10)); - - ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); - - while (!TAILQ_EMPTY(&expec_tx_queue2)) {//last 90 delay packets - stub_refresh_token_bucket(0); - stub_refresh_token_bucket(1); - for (int i = 0; i < 10; i++) { - polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); - } - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10));//stream2 priority 1, first - ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); - } - - while (!TAILQ_EMPTY(&expec_tx_queue1)) {//last 90 delay packets - stub_refresh_token_bucket(0); - stub_refresh_token_bucket(1); - for (int i = 0; i < 10; i++) { - polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); - } - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10));//stream1 priority 2 - ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); - } - - shaping_flow_free(&ctx->thread_ctx[0], sf1); - shaping_flow_free(&ctx->thread_ctx[0], sf2); - fieldstat_global_disable_prometheus_endpoint(); - - /***********send stat data here********************/ - fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy - - shaper_thread_resource_clear(); - shaping_engine_destroy(ctx); - stub_clear_matched_shaping_rules(); - - /*******test statistics***********/ - sleep(2);//wait telegraf to output - FILE *stat_file; - - stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); - memset(line, 0, sizeof(line)); - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 0 - shaping_stat_judge(line, 0, 0, 2, 100, 10000, 0, 0, 280000, SHAPING_DIR_OUT, profile_type_primary);//max latency is every queued pkts - - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1 - shaping_stat_judge(line, 1, 1, 1, 100, 10000, 0, 0, 90000, SHAPING_DIR_OUT, profile_type_primary);//max latency is every queued pkts - - fclose(stat_file); - stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file - fclose(stat_file); -} - -/*session1 match rule1,rule2,rule4; session2 match rule3 - rule1: - priority:1 - profile1: limit 1000 - rule2: - priority:2 - profile2: limit 1000 - rule3: - priority:3 - profile3: limit 1000 - rule4: - priority:4 - profile4: limit 1000*/ -TEST(two_session_diff_priority, udp_in_order_multi_rule) -{ - struct stub_pkt_queue expec_tx_queue1; - struct stub_pkt_queue expec_tx_queue2; - struct stub_pkt_queue *actual_tx_queue; - struct shaping_ctx *ctx = NULL; - struct shaping_flow *sf1 = NULL; - struct shaping_flow *sf2 = NULL; - long long rule_ids[] = {1, 2, 3, 4}; - long long rule_id1[] = {1, 2, 4}; - long long rule_id2[] = {3}; - int profile_nums[] = {1, 1, 1, 1}; - int prioritys[] = {1, 2, 3, 4}; - int profile_ids[][MAX_REF_PROFILE] = {{1}, {2}, {3}, {4}}; - - - TAILQ_INIT(&expec_tx_queue1); - TAILQ_INIT(&expec_tx_queue2); - stub_init(); - - ctx = shaping_engine_init(); - ASSERT_TRUE(ctx != NULL); - sf1 = shaping_flow_new(&ctx->thread_ctx[0]); - ASSERT_TRUE(sf1 != NULL); - sf2 = shaping_flow_new(&ctx->thread_ctx[0]); - ASSERT_TRUE(sf2 != NULL); - - stub_set_matched_shaping_rules(4, rule_ids, prioritys, profile_nums, profile_ids); - - stub_set_token_bucket_avl_per_sec(1, 1000, SHAPING_DIR_OUT); - stub_set_token_bucket_avl_per_sec(2, 1000, SHAPING_DIR_OUT); - stub_set_token_bucket_avl_per_sec(3, 1000, SHAPING_DIR_OUT); - stub_set_token_bucket_avl_per_sec(4, 1000, SHAPING_DIR_OUT); - - actual_tx_queue = stub_get_tx_queue(); - shaper_rules_update(&ctx->thread_ctx[0], sf1, rule_id1, 3); - shaper_rules_update(&ctx->thread_ctx[0], sf2, rule_id2, 1); - - - /*******send packets***********/ - send_packets(&ctx->thread_ctx[0], sf1, 20, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 2, 0); - send_packets(&ctx->thread_ctx[0], sf2, 20, 100, SHAPING_DIR_OUT, &expec_tx_queue2, 1, 0); - - - //first 10 packets for stream1 - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10)); - - //first 10 packets for stream2 - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10)); - - ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); - - while (!TAILQ_EMPTY(&expec_tx_queue1)) {//stream1, priority 1, last 10 packets - stub_refresh_token_bucket(1); - stub_refresh_token_bucket(2); - stub_refresh_token_bucket(3); - stub_refresh_token_bucket(4); - for (int i = 0; i < 30; i++) { - polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); - } - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10)); - ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); - } - - while (!TAILQ_EMPTY(&expec_tx_queue2)) { - stub_refresh_token_bucket(1); - stub_refresh_token_bucket(2); - stub_refresh_token_bucket(3); - stub_refresh_token_bucket(4); - for (int i = 0; i < 10; i++) { - polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); - } - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10));//stream2 priority 3 - ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); - } - - shaping_flow_free(&ctx->thread_ctx[0], sf1); - shaping_flow_free(&ctx->thread_ctx[0], sf2); - fieldstat_global_disable_prometheus_endpoint(); - - /***********send stat data here********************/ - fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy - - shaper_thread_resource_clear(); - shaping_engine_destroy(ctx); - stub_clear_matched_shaping_rules(); - - /*******test statistics***********/ - sleep(2);//wait telegraf to output - FILE *stat_file; - - stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); - memset(line, 0, sizeof(line)); - - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 1, 1, 1, 20, 2000, 0, 0, 48000, SHAPING_DIR_OUT, profile_type_primary);//profile_id 1, max latency is last pkt - - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 2, 2, 1, 20, 2000, 0, 0, 1000, SHAPING_DIR_OUT, profile_type_primary);//profile_id 2, evevy queued pkt's latency is 1 - - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 4, 4, 1, 20, 2000, 0, 0, 1000, SHAPING_DIR_OUT, profile_type_primary);//profile_id 4, max latency is first queued pkt - - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 3, 3, 3, 20, 2000, 0, 0, 40000, SHAPING_DIR_OUT, profile_type_primary);//profile_id 3, every queued pkt's latency is 40 - - fclose(stat_file); - stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file - fclose(stat_file); -} - -/*session1 match rule1 - rule1: - priority:1 - profile1: limit 1000, first 20 pkts async, then sync -*/ -TEST(single_session_async, udp_tx_in_order) -{ - struct stub_pkt_queue expec_tx_queue; - struct stub_pkt_queue *actual_tx_queue; - struct shaping_ctx *ctx = NULL; - struct shaping_flow *sf = NULL; - long long rule_id[] = {0}; - int priority[] = {1}; - int profile_num[] = {1}; - int profile_id[][MAX_REF_PROFILE] = {{0}}; - - - TAILQ_INIT(&expec_tx_queue); - stub_init(); - ctx = shaping_engine_init(); - ASSERT_TRUE(ctx != NULL); - sf = shaping_flow_new(&ctx->thread_ctx[0]); - ASSERT_TRUE(sf != NULL); - - stub_set_matched_shaping_rules(1, rule_id, priority, profile_num, profile_id); - stub_set_token_bucket_avl_per_sec(0, 1000, SHAPING_DIR_OUT); - stub_set_async_token_get_times(0, 20); - actual_tx_queue = stub_get_tx_queue(); - shaper_rules_update(&ctx->thread_ctx[0], sf, rule_id, 1); - - - /*******packets, OP_STATE_DATA***********/ - send_packets(&ctx->thread_ctx[0], sf, 20, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); - - sleep(1);//wait stub async thread complete - - send_packets(&ctx->thread_ctx[0], sf, 80, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); - - - //first 10 packets, got token - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); - ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); - - while (!TAILQ_EMPTY(&expec_tx_queue)) {//last 90 delay packets - stub_refresh_token_bucket(0); - for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent - polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); - } - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); - ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); - } - - shaping_flow_free(&ctx->thread_ctx[0], sf); - fieldstat_global_disable_prometheus_endpoint(); - - /***********send stat data here********************/ - fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy - - shaper_thread_resource_clear(); - shaping_engine_destroy(ctx); - stub_clear_matched_shaping_rules(); - - /*******test statistics***********/ - sleep(2);//wait telegraf to output - FILE *stat_file; - - stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); - memset(line, 0, sizeof(line)); - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 170000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts - fclose(stat_file); - stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file - fclose(stat_file); -} - /*session1 match rule1 rule1: priority:1 @@ -1199,7 +893,7 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order) ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); } - shaper_stat_refresh(&ctx->thread_ctx[0], sf2, 0, 1);//refresh stat, to ensure priority queue_len in swarmkv is correct + shaper_stat_refresh(&ctx->thread_ctx[0], sf2, 1);//refresh stat, to ensure priority queue_len in swarmkv is correct stub_curr_time_s_inc(1);//inc time to refresh hmget interval while (!TAILQ_EMPTY(&expec_tx_queue1)) {//last 90 delay packets @@ -1230,99 +924,19 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1, primary - shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 1470000, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 1471000, SHAPING_DIR_OUT, profile_type_primary); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, borrow shaping_stat_judge(line, 1, 2, 2, 100, 10000, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, primary - shaping_stat_judge(line, 2, 2, 1, 100, 10000, 0, 0, 190000, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 2, 2, 1, 100, 10000, 0, 0, 191000, SHAPING_DIR_OUT, profile_type_primary); fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); } -/*session1 match rule1; session2 match rule1 - rule1: - priority:1 - primary profile_a: - -profile_a: limit 1000 -*/ -TEST(two_session_same_rule, udp_tx_in_order) -{ - struct stub_pkt_queue expec_tx_queue; - struct stub_pkt_queue *actual_tx_queue; - struct shaping_ctx *ctx = NULL; - struct shaping_flow *sf1 = NULL; - struct shaping_flow *sf2 = NULL; - long long rule_id[] = {1}; - int profile_num[] = {1}; - int priority[] = {1}; - int profile_id[][MAX_REF_PROFILE] = {{1}}; - - - TAILQ_INIT(&expec_tx_queue); - stub_init(); - - ctx = shaping_engine_init(); - ASSERT_TRUE(ctx != NULL); - sf1 = shaping_flow_new(&ctx->thread_ctx[0]); - ASSERT_TRUE(sf1 != NULL); - sf2 = shaping_flow_new(&ctx->thread_ctx[0]); - ASSERT_TRUE(sf2 != NULL); - - stub_set_matched_shaping_rules(1, rule_id, priority, profile_num, profile_id); - - stub_set_token_bucket_avl_per_sec(1, 1000, SHAPING_DIR_OUT); - actual_tx_queue = stub_get_tx_queue(); - - shaper_rules_update(&ctx->thread_ctx[0], sf1, rule_id, 1); - shaper_rules_update(&ctx->thread_ctx[0], sf2, rule_id, 1); - - /*******packets, OP_STATE_DATA***********/ - send_packets(&ctx->thread_ctx[0], sf1, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); - send_packets(&ctx->thread_ctx[0], sf2, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); - - //first 10 packets - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); - ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); - - while (!TAILQ_EMPTY(&expec_tx_queue)) {//last 190 delay packets - stub_refresh_token_bucket(1); - for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent - polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); - } - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); - ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); - } - - shaping_flow_free(&ctx->thread_ctx[0], sf1); - shaping_flow_free(&ctx->thread_ctx[0], sf2); - fieldstat_global_disable_prometheus_endpoint(); - - /***********send stat data here********************/ - fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy - - shaper_thread_resource_clear(); - shaping_engine_destroy(ctx); - stub_clear_matched_shaping_rules(); - - /*******test statistics***********/ - sleep(2);//wait telegraf to output - FILE *stat_file; - - stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); - memset(line, 0, sizeof(line)); - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 1, 1, 1, 200, 20000, 0, 0, 370000, SHAPING_DIR_OUT, profile_type_primary); - fclose(stat_file); - stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file - fclose(stat_file); -} - /*session1 match rule1; session2 match rule2 rule1: priority:1 @@ -1373,7 +987,7 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order) ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); - shaper_stat_refresh(&ctx->thread_ctx[0], sf1, 0, 1);//刷新线程0中的优先级队列长度到swarmkv中 + shaper_stat_refresh(&ctx->thread_ctx[0], sf1, 1);//刷新线程0中的优先级队列长度到swarmkv中 stub_curr_time_s_inc(1);//inc time to refresh hmget interval for (int i = 0; i < 10; i++) {//线程1中的session优先级为2,被线程0中优先级为1的session阻断 stub_refresh_token_bucket(0); @@ -1383,16 +997,18 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order) while (!TAILQ_EMPTY(&expec_tx_queue1)) { stub_refresh_token_bucket(0); - polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); + polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);//require tokens + polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);//send pkt ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//sf1 priority 1 } - shaper_stat_refresh(&ctx->thread_ctx[0], sf1, 0, 1);//刷新线程0中的优先级队列长度到swarmkv中 stub_curr_time_s_inc(1);//inc time to refresh hmget interval + shaper_stat_refresh(&ctx->thread_ctx[0], sf1, 1);//刷新线程0中的优先级队列长度到swarmkv中 while (!TAILQ_EMPTY(&expec_tx_queue2)) { stub_refresh_token_bucket(0); polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]); + polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]); ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1));//sf2 priority 2 } @@ -1417,7 +1033,7 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order) profile_a: limit 1000 */ -TEST(two_session_diff_priority_same_profile, session_timer_test) +TEST(two_session_diff_priority_same_profile, profile_timer_test) { struct stub_pkt_queue expec_tx_queue1; struct stub_pkt_queue expec_tx_queue2; @@ -1457,7 +1073,7 @@ TEST(two_session_diff_priority_same_profile, session_timer_test) ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); - sleep(3);//wait session timer to expire, to refresh priority queue_len to swarmkv + sleep(3);//wait profile timer to expire, to refresh priority queue_len to swarmkv for (int i = 0; i < 500; i++) { stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);//inc time to refresh stat in timer } @@ -1475,7 +1091,8 @@ TEST(two_session_diff_priority_same_profile, session_timer_test) while (!TAILQ_EMPTY(&expec_tx_queue1)) { stub_refresh_token_bucket(0); - polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); + polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);//first polling request token + polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);//then send pkt stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//sf1 priority 1 @@ -1491,7 +1108,8 @@ TEST(two_session_diff_priority_same_profile, session_timer_test) stub_curr_time_s_inc(1);//inc time to refresh hmget interval while (!TAILQ_EMPTY(&expec_tx_queue2)) { stub_refresh_token_bucket(0); - polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]); + polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);//first polling request token + polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);//then send pkt stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1));//sf2 priority 2 @@ -1557,7 +1175,7 @@ TEST(two_sessions, priority_non_block) shaper_rules_update(&ctx->thread_ctx[1], sf2, rule_id2, 1); /*******send packets***********/ - send_packets(&ctx->thread_ctx[0], sf1, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 2, 0);//sf1 blocked by rule2(profile id 1), while rule3(profile id 0) still has 1000 token + send_packets(&ctx->thread_ctx[0], sf1, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 3, 0);//sf1 blocked by rule2(profile id 1), while rule3(profile id 0) still has 1000 token send_packets(&ctx->thread_ctx[1], sf2, 10, 100, SHAPING_DIR_OUT, &expec_tx_queue2, 1, 0); ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10));//sf1 should send 10 pkts ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10));//sf2 should send 10 pkts cause rule3(profile id 0) has 1000 token @@ -1566,8 +1184,10 @@ TEST(two_sessions, priority_non_block) while (!TAILQ_EMPTY(&expec_tx_queue1)) { stub_refresh_token_bucket(0); stub_refresh_token_bucket(1); - polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);//tow rules per pkt need two polling + + for (int i = 0; i < 4; i++) { + polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);//two rules, one rule need two polling, request token and send pkt + } ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//sf1 remaining 90 pkts } @@ -1642,6 +1262,7 @@ TEST(two_sessions, borrow_when_primary_profile_priority_blocked) while (!TAILQ_EMPTY(&expec_tx_queue1)) { stub_refresh_token_bucket(0); polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); + polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1)); } @@ -1709,6 +1330,7 @@ TEST(two_sessions, primary_profile_priority_blocked_by_borrow_profile) while (!TAILQ_EMPTY(&expec_tx_queue1)) { stub_refresh_token_bucket(1); polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); + polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);//blocked by priority, sf1 has priority 2 for profile_b(id 1) stub_curr_time_ns_inc(STUB_TIME_INC_FOR_HMGET); @@ -1719,6 +1341,7 @@ TEST(two_sessions, primary_profile_priority_blocked_by_borrow_profile) while (!TAILQ_EMPTY(&expec_tx_queue2)) { stub_refresh_token_bucket(1); polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]); + polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]); ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1)); } @@ -1768,6 +1391,7 @@ TEST(statistics, udp_drop_pkt) while (!TAILQ_EMPTY(&expec_tx_queue)) { stub_refresh_token_bucket(0); polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); + polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 1)); } @@ -1837,7 +1461,7 @@ TEST(statistics, udp_queueing_pkt) /***********send stat data here********************/ - shaper_stat_refresh(&ctx->thread_ctx[0], sf, ctx->thread_ctx[0].thread_index, 1); + shaper_stat_refresh(&ctx->thread_ctx[0], sf, 1); fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy shaper_global_stat_refresh(ctx); sleep(2);//wait telegraf generate metric @@ -1856,6 +1480,7 @@ TEST(statistics, udp_queueing_pkt) while (!TAILQ_EMPTY(&expec_tx_queue)) {//last 90 delay packets stub_refresh_token_bucket(0); polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); + polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 1)); } @@ -1899,6 +1524,6 @@ TEST(statistics, udp_queueing_pkt) int main(int argc, char **argv) { testing::InitGoogleTest(&argc, argv); - //testing::GTEST_FLAG(filter) = "two_sessions.primary_profile_priority_blocked_by_borrow_profile"; + //testing::GTEST_FLAG(filter) = "single_session.udp_tx_in_order"; return RUN_ALL_TESTS(); }
\ No newline at end of file diff --git a/shaping/test/stub.cpp b/shaping/test/stub.cpp index 45a2e55..93ce156 100644 --- a/shaping/test/stub.cpp +++ b/shaping/test/stub.cpp @@ -3,6 +3,7 @@ #include <MESA/maat.h> #include <cstdio> +#include <cstring> #include <marsio.h> #include <vector> #include <stdlib.h> @@ -194,6 +195,7 @@ void stub_init() pf_array[i].in_limit_bandwidth = DEFAULT_AVALIABLE_TOKEN_PER_SEC; pf_array[i].out_limit_bandwidth = DEFAULT_AVALIABLE_TOKEN_PER_SEC; pf_async_times[i] = 0; + memset(profile_priority_len[i], 0, 10 * sizeof(int)); } return; } |
