diff options
| author | root <[email protected]> | 2024-06-04 07:29:04 +0000 |
|---|---|---|
| committer | root <[email protected]> | 2024-06-04 07:30:44 +0000 |
| commit | ddd6873dab598216449de3aec2bf55c40cb6b619 (patch) | |
| tree | ee7fb72dcea6fc7bcc3d02f902922963d6fcae3c | |
| parent | dbb0f42537aee97e8f393f9f8a84856fb8aec5af (diff) | |
每个session的缓存队列分为in和out两个方向队列
| -rw-r--r-- | shaping/include/shaper.h | 16 | ||||
| -rw-r--r-- | shaping/src/shaper.cpp | 290 | ||||
| -rw-r--r-- | shaping/src/shaper_session.cpp | 5 | ||||
| -rw-r--r-- | shaping/src/shaper_swarmkv.cpp | 2 | ||||
| -rw-r--r-- | shaping/test/gtest_shaper.cpp | 140 | ||||
| -rw-r--r-- | shaping/test/stub.cpp | 5 |
6 files changed, 230 insertions, 228 deletions
diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h index 8aabc42..9a1dca6 100644 --- a/shaping/include/shaper.h +++ b/shaping/include/shaper.h @@ -146,7 +146,7 @@ struct shaping_profile_info { long long out_deposit_token_bits; long long bidirection_deposit_token_bits; long long last_failed_get_token_ms[SHAPING_DIR_MAX]; - unsigned long long enqueue_time_us;//to calculate max latency + unsigned long long enqueue_time_us[SHAPING_DIR_MAX];//to calculate max latency struct shaping_stat_for_profile stat; struct shaping_profile_hash_node *hash_node; }; @@ -168,7 +168,6 @@ struct shaping_packet_wrapper { unsigned int length; int rule_anchor; int aqm_processed_pf_ids[SHAPING_REF_PROFILE_NUM_MAX]; - unsigned char direction; TAILQ_ENTRY(shaping_packet_wrapper) node; }; TAILQ_HEAD(delay_queue, shaping_packet_wrapper); @@ -178,7 +177,7 @@ struct metadata uint64_t session_id; char *raw_data; int raw_len; - int dir; + enum shaping_packet_dir dir; int is_tcp_pure_ctrl; int is_ctrl_pkt; uint16_t l7_offset; // only control packet set l7_offset @@ -189,14 +188,14 @@ struct metadata struct shaping_flow { char *src_ip_str; size_t src_ip_str_len; - struct delay_queue packet_queue; + struct delay_queue packet_queue[SHAPING_DIR_MAX]; int rule_num; struct shaping_rule_info matched_rule_infos[SHAPING_RULE_NUM_MAX]; int priority; unsigned char dscp_enable; unsigned char dscp_value; struct addr_tuple4 tuple4; - int anchor;//rule_idx + int anchor[SHAPING_DIR_MAX];//rule_idx int ref_cnt; unsigned int queue_len; unsigned int flag; @@ -241,12 +240,9 @@ void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf); struct shaper* shaper_new(unsigned int priority_queue_len_max); void shaper_free(struct shaper *sp); -bool shaper_queue_empty(struct shaping_flow *sf); -void shaper_packet_dequeue(struct shaping_flow *sf); -struct shaping_packet_wrapper* shaper_first_pkt_get(struct shaping_flow *sf); -void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx); +bool shaper_queue_empty(struct shaping_flow *sf, enum shaping_packet_dir dir); +void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx, enum shaping_packet_dir dir); -int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num); void shaper_profile_hash_node_set(struct shaping_thread_ctx *ctx, struct shaping_profile_info *profile); int shaper_global_conf_init(struct shaping_system_conf *conf); diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 4accf9c..b55807e 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -38,12 +38,12 @@ extern "C" { #define SWARMKV_QUEUE_LEN_GET_CMD "HMGET tsg-shaping-%d priority-0 priority-1 priority-2 priority-3 priority-4 priority-5 priority-6 priority-7 priority-8 priority-9" struct shaper {//trees in one thread - struct avl_tree *priority_trees[SHAPING_PRIORITY_NUM_MAX];//represent 10 avl tree corresponding to 10 priority + struct avl_tree *priority_trees[SHAPING_DIR_MAX][SHAPING_PRIORITY_NUM_MAX];//represent 10 avl tree corresponding to 10 priority }; struct shaping_node {//a session will have 10 nodes, corresponding 10 avl tree struct shaping_flow shaping_flow; - struct avl_node *avl_node[SHAPING_PRIORITY_NUM_MAX]; + struct avl_node *avl_node[SHAPING_DIR_MAX][SHAPING_PRIORITY_NUM_MAX]; }; struct shaping_profile_container { @@ -63,18 +63,18 @@ thread_local static int thread_swarmkv_cb_cnt = 0; struct shaper* shaper_new(unsigned int priority_queue_len_max) { struct shaper *sp = NULL; - int i; sp = (struct shaper*)calloc(1, sizeof(struct shaper)); if (!sp) { goto ERROR; } - - for (i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) { - sp->priority_trees[i] = avl_tree_init(priority_queue_len_max); - if (!sp->priority_trees[i]) { - goto ERROR; + for (int i = 0; i < SHAPING_DIR_MAX; i++) { + for (int j = 0; j < SHAPING_PRIORITY_NUM_MAX; j++) { + sp->priority_trees[i][j] = avl_tree_init(priority_queue_len_max); + if (!sp->priority_trees[i][j]) { + goto ERROR; + } } } @@ -87,12 +87,12 @@ ERROR: void shaper_free(struct shaper *sp) { - int i; - if (sp) { - for (i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) { - if (sp->priority_trees[i]) { - avl_tree_destroy(sp->priority_trees[i]); + for (int i = 0; i < SHAPING_DIR_MAX; i++) { + for (int j = 0; j < SHAPING_PRIORITY_NUM_MAX; j++) { + if (sp->priority_trees[i][j]) { + avl_tree_destroy(sp->priority_trees[i][j]); + } } } free(sp); @@ -103,12 +103,12 @@ void shaper_free(struct shaper *sp) static void shaping_node_free(struct shaping_node *s_node) { - int i; - if (s_node) { - for (i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) { - if (s_node->avl_node[i]) { - avl_tree_node_free(s_node->avl_node[i]); + for (int i = 0; i < SHAPING_DIR_MAX; i++) { + for (int j = 0; j < SHAPING_PRIORITY_NUM_MAX; j++) { + if (s_node->avl_node[i][j]) { + avl_tree_node_free(s_node->avl_node[i][j]); + } } } @@ -129,23 +129,25 @@ static void shaping_node_free(struct shaping_node *s_node) struct shaping_flow* shaping_flow_new(struct shaping_thread_ctx *ctx) { struct shaping_node *s_node = NULL; - int i; s_node = (struct shaping_node*)calloc(1, sizeof(struct shaping_node)); if (!s_node) { goto ERROR; } - for (i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) { - s_node->avl_node[i] = avl_tree_node_new(0, &s_node->shaping_flow, NULL); - if (!s_node->avl_node[i]) { - goto ERROR; + for (int i = 0; i < SHAPING_DIR_MAX; i++) { + for (int j = 0; j < SHAPING_PRIORITY_NUM_MAX; j++) { + s_node->avl_node[i][j] = avl_tree_node_new(0, &s_node->shaping_flow, NULL); + if (!s_node->avl_node[i][j]) { + goto ERROR; + } } } - TAILQ_INIT(&s_node->shaping_flow.packet_queue); - s_node->shaping_flow.priority = SHAPING_PRIORITY_NUM_MAX - 1; + TAILQ_INIT(&s_node->shaping_flow.packet_queue[SHAPING_DIR_IN]); + TAILQ_INIT(&s_node->shaping_flow.packet_queue[SHAPING_DIR_OUT]); + s_node->shaping_flow.priority = SHAPING_PRIORITY_NUM_MAX - 1; s_node->shaping_flow.ref_cnt = 1; return &s_node->shaping_flow; @@ -195,34 +197,33 @@ static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_ } s_pkt->pkt_buff = pkt_buff; - s_pkt->direction = meta->dir; s_pkt->length = meta->raw_len; - s_pkt->rule_anchor = sf->anchor; + s_pkt->rule_anchor = sf->anchor[meta->dir]; s_pkt->income_time_ns = curr_time->tv_sec * NANO_SECONDS_PER_SEC + curr_time->tv_nsec; s_pkt->enqueue_time_us = curr_time->tv_sec * MICRO_SECONDS_PER_SEC + curr_time->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; - TAILQ_INSERT_TAIL(&sf->packet_queue, s_pkt, node); + TAILQ_INSERT_TAIL(&sf->packet_queue[meta->dir], s_pkt, node); sf->queue_len++; return 0; } -bool shaper_queue_empty(struct shaping_flow *sf) +bool shaper_queue_empty(struct shaping_flow *sf, enum shaping_packet_dir dir) { - return TAILQ_EMPTY(&sf->packet_queue); + return TAILQ_EMPTY(&sf->packet_queue[dir]); } -struct shaping_packet_wrapper* shaper_first_pkt_get(struct shaping_flow *sf) +static struct shaping_packet_wrapper* shaper_first_pkt_get(struct shaping_flow *sf, enum shaping_packet_dir dir) { - return TAILQ_FIRST(&sf->packet_queue); + return TAILQ_FIRST(&sf->packet_queue[dir]); } -void shaper_packet_dequeue(struct shaping_flow *sf) +static void shaper_packet_dequeue(struct shaping_flow *sf, enum shaping_packet_dir dir) { struct shaping_packet_wrapper *s_pkt; - s_pkt = TAILQ_FIRST(&sf->packet_queue); + s_pkt = TAILQ_FIRST(&sf->packet_queue[dir]); if (s_pkt) { - TAILQ_REMOVE(&sf->packet_queue, s_pkt, node); + TAILQ_REMOVE(&sf->packet_queue[dir], s_pkt, node); sf->queue_len--; free(s_pkt); } @@ -230,43 +231,43 @@ void shaper_packet_dequeue(struct shaping_flow *sf) return; } -void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx) +void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx, enum shaping_packet_dir dir) { struct shaping_packet_wrapper *pkt_wrapper; struct shaping_rule_info *rule = &sf->matched_rule_infos[0]; - while (!shaper_queue_empty(sf)) { - pkt_wrapper = shaper_first_pkt_get(sf); + while (!shaper_queue_empty(sf, dir)) { + pkt_wrapper = shaper_first_pkt_get(sf, dir); - shaper_stat_queueing_pkt_dec(&rule->primary.stat, pkt_wrapper->direction, ctx->thread_index); + shaper_stat_queueing_pkt_dec(&rule->primary.stat, dir, ctx->thread_index); shaper_stat_drop_inc(&rule->primary.stat, pkt_wrapper->length, ctx->thread_index); shaper_global_stat_queueing_dec(&ctx->thread_global_stat, pkt_wrapper->length); shaper_global_stat_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length); shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length); marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index); - shaper_packet_dequeue(sf); + shaper_packet_dequeue(sf, dir); } return; } //return success(0) while any avl tree insert success -static int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time_us) +static int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, enum shaping_packet_dir dir, unsigned long long enqueue_time_us) { struct shaping_node *s_node = (struct shaping_node*)sf; - struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor]; + struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor[dir]]; struct shaper *sp = ctx->sp; struct shaping_packet_wrapper *pkt_wrapper = NULL; int priority; int i; - pkt_wrapper = shaper_first_pkt_get(sf); + pkt_wrapper = shaper_first_pkt_get(sf, dir); assert(pkt_wrapper != NULL); priority = s_rule_info->primary.priority; - avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns); - if (0 != avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {//primary profile failed means flow push failed, ignore borrow profile + avl_tree_node_key_set(s_node->avl_node[dir][priority], pkt_wrapper->income_time_ns); + if (0 != avl_tree_node_insert(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority])) {//primary profile failed means flow push failed, ignore borrow profile return -1; } @@ -276,44 +277,44 @@ static int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow for (i = 0; i < s_rule_info->borrowing_num; i++) { priority = s_rule_info->borrowing[i].priority; - avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns); - if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) { - shaper_stat_queueing_pkt_inc(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index); + avl_tree_node_key_set(s_node->avl_node[dir][priority], pkt_wrapper->income_time_ns); + if (0 == avl_tree_node_insert(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority])) { + shaper_stat_queueing_pkt_inc(&s_rule_info->borrowing[i].stat, dir, ctx->thread_index); } } END: - s_rule_info->primary.enqueue_time_us = enqueue_time_us; - shaper_stat_queueing_pkt_inc(&s_rule_info->primary.stat, pkt_wrapper->direction, ctx->thread_index); + s_rule_info->primary.enqueue_time_us[dir] = enqueue_time_us; + shaper_stat_queueing_pkt_inc(&s_rule_info->primary.stat, dir, ctx->thread_index); return 0; } -static unsigned long long shaper_pkt_latency_us_calculate(struct shaping_profile_info *profile, struct timespec *time) +static unsigned long long shaper_pkt_latency_us_calculate(struct shaping_profile_info *profile, struct timespec *time, enum shaping_packet_dir dir) { - unsigned long long enqueue_time = profile->enqueue_time_us; + unsigned long long enqueue_time = profile->enqueue_time_us[dir]; unsigned long long curr_time = time->tv_sec * MICRO_SECONDS_PER_SEC + time->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; assert(curr_time >= enqueue_time); return (curr_time - enqueue_time); } -static void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct timespec *curr_time) +static void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, enum shaping_packet_dir dir, struct timespec *curr_time) { struct shaping_node *s_node = (struct shaping_node*)sf; - struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor]; + struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor[dir]]; struct shaper *sp = ctx->sp; struct shaping_packet_wrapper *pkt_wrapper = NULL; unsigned long long latency; int priority; int i; - pkt_wrapper = shaper_first_pkt_get(sf); + pkt_wrapper = shaper_first_pkt_get(sf, dir); assert(pkt_wrapper != NULL); priority = s_rule_info->primary.priority; - if (avl_node_in_tree(s_node->avl_node[priority])) { - avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); + if (avl_node_in_tree(s_node->avl_node[dir][priority])) { + avl_tree_node_remove(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority]); } if (s_rule_info->borrowing_num == 0) { @@ -322,35 +323,35 @@ static void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow for (i = 0; i < s_rule_info->borrowing_num; i++) { priority = s_rule_info->borrowing[i].priority; - if (avl_node_in_tree(s_node->avl_node[priority])) { - avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); - shaper_stat_queueing_pkt_dec(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index); + if (avl_node_in_tree(s_node->avl_node[dir][priority])) { + avl_tree_node_remove(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority]); + shaper_stat_queueing_pkt_dec(&s_rule_info->borrowing[i].stat, dir, ctx->thread_index); } } END: - latency = shaper_pkt_latency_us_calculate(&s_rule_info->primary, curr_time); - shaper_stat_max_latency_update(&s_rule_info->primary.stat, pkt_wrapper->direction, latency, ctx->thread_index); - shaper_stat_queueing_pkt_dec(&s_rule_info->primary.stat, pkt_wrapper->direction, ctx->thread_index); + latency = shaper_pkt_latency_us_calculate(&s_rule_info->primary, curr_time, dir); + shaper_stat_max_latency_update(&s_rule_info->primary.stat, dir, latency, ctx->thread_index); + shaper_stat_queueing_pkt_dec(&s_rule_info->primary.stat, dir, ctx->thread_index); return; } -static void shaper_flow_specific_borrow_priority_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int priority) +static void shaper_flow_specific_borrow_priority_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, enum shaping_packet_dir dir, int priority) { struct shaping_node *s_node = (struct shaping_node*)sf; - struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor]; + struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor[dir]]; struct shaper *sp = ctx->sp; struct shaping_packet_wrapper *pkt_wrapper = NULL; - pkt_wrapper = shaper_first_pkt_get(sf); + pkt_wrapper = shaper_first_pkt_get(sf, dir); assert(pkt_wrapper != NULL); for (int i = 0; i < s_rule_info->borrowing_num; i++) { if (priority == s_rule_info->borrowing[i].priority) { - if (avl_node_in_tree(s_node->avl_node[priority])) { - avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); - shaper_stat_queueing_pkt_dec(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index); + if (avl_node_in_tree(s_node->avl_node[dir][priority])) { + avl_tree_node_remove(sp->priority_trees[dir][priority], s_node->avl_node[dir][priority]); + shaper_stat_queueing_pkt_dec(&s_rule_info->borrowing[i].stat, dir, ctx->thread_index); } } } @@ -358,7 +359,7 @@ static void shaper_flow_specific_borrow_priority_pop(struct shaping_thread_ctx * return; } -int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num) +static int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], enum shaping_packet_dir dir, int priority, int max_sf_num) { struct avl_node *avl_node = NULL; int count = 0; @@ -367,7 +368,7 @@ int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_i return 0; } - avl_node = avl_tree_minimum_node_get(sp->priority_trees[priority]); + avl_node = avl_tree_minimum_node_get(sp->priority_trees[dir][priority]); while(avl_node) { sf_ins[count].sf = (struct shaping_flow*)avl_tree_node_data_get(avl_node); sf_ins[count].priority = priority; @@ -601,7 +602,7 @@ static int shaper_deposit_token_get(struct shaping_profile_info *profile, int re return 0; } - if (*deposit_token < req_token_bits) { + if (*deposit_token <= req_token_bits) { *need_get_token = 1; } @@ -635,7 +636,7 @@ static void shaper_profile_hash_node_refresh(struct shaping_thread_ctx *ctx, str return; } -static void shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *pf_info, int req_token_bits, unsigned char direction, struct timespec *curr_timespec) +static void shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *pf_info, int req_token_bits, enum shaping_packet_dir dir, struct timespec *curr_timespec) { struct shaping_tconsume_cb_arg *arg = NULL; struct shaping_profile_hash_node *pf_hash_node = pf_info->hash_node; @@ -650,14 +651,14 @@ static void shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct if (pf_hash_node->limit_direction == PROFILE_LIMIT_DIRECTION_BIDIRECTION) { snprintf(key, sizeof(key), "tsg-shaping-%d-bidirectional", pf_info->id); } else { - snprintf(key, sizeof(key), "tsg-shaping-%d-%s", pf_info->id, direction == SHAPING_DIR_OUT ? "outgoing" : "incoming"); + snprintf(key, sizeof(key), "tsg-shaping-%d-%s", pf_info->id, dir == SHAPING_DIR_OUT ? "outgoing" : "incoming"); } arg = (struct shaping_tconsume_cb_arg *)calloc(1, sizeof(struct shaping_tconsume_cb_arg)); arg->ctx = ctx; arg->profile = pf_info; arg->sf = sf; - arg->direction = direction; + arg->direction = dir; arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat); @@ -672,7 +673,7 @@ static void shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct break; case PROFILE_TYPE_HOST_FARINESS: case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS: - swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits, shaper_token_get_cb, arg); + swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor[dir]].fair_factor, req_token_bits, shaper_token_get_cb, arg); //TODO: ftconsume with flexiable //swarmkv_async_command(ctx->swarmkv_db, shaper_token_get_cb, arg, "FTCONSUME %s %s %d %d %s", key, sf->src_ip_str, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits, "FLEXIBLE"); break; @@ -826,9 +827,9 @@ static int shaping_swarmkv_is_too_short_interval(long long curr_time_ms, struct } } -static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, int profile_type, int req_token_bytes, unsigned char direction, struct timespec *curr_timespec) +static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, int profile_type, int req_token_bytes, enum shaping_packet_dir direction, struct timespec *curr_timespec) { - struct shaping_rule_info *rule = &sf->matched_rule_infos[sf->anchor]; + struct shaping_rule_info *rule = &sf->matched_rule_infos[sf->anchor[direction]]; int need_get_token = 0; int ret = SHAPER_TOKEN_GET_FAILED; @@ -917,9 +918,9 @@ int shaper_profile_get(struct shaping_rule_info *s_rule_info, int priority, stru return num; } -static int shaper_next_anchor_get(struct shaping_flow *sf, unsigned char direction) +static int shaper_next_anchor_get(struct shaping_flow *sf, enum shaping_packet_dir dir) { - int anchor = sf->anchor + 1; + int anchor = sf->anchor[dir] + 1; if (anchor > sf->rule_num - 1) { return 0; @@ -928,7 +929,7 @@ static int shaper_next_anchor_get(struct shaping_flow *sf, unsigned char directi return anchor; } -static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int priority) +static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, enum shaping_packet_dir dir, int priority) { struct shaping_rule_info *rule = NULL; struct shaping_profile_info *profile = NULL; @@ -941,18 +942,18 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi int get_token_success = 0; int profile_num; - rule = &sf->matched_rule_infos[sf->anchor]; + rule = &sf->matched_rule_infos[sf->anchor[dir]]; profile_num = shaper_profile_get(rule, priority, pf_container); assert(profile_num > 0); - pkt_wrapper = shaper_first_pkt_get(sf); + pkt_wrapper = shaper_first_pkt_get(sf, dir); assert(pkt_wrapper != NULL); clock_gettime(CLOCK_MONOTONIC, &curr_time); - latency_us = shaper_pkt_latency_us_calculate(&rule->primary, &curr_time); + latency_us = shaper_pkt_latency_us_calculate(&rule->primary, &curr_time, dir); if (pf_container[0].pf_type == PROFILE_IN_RULE_TYPE_PRIMARY) { if (latency_us > ctx->conf.pkt_max_delay_time_us) { - shaper_flow_pop(ctx, sf, &curr_time); + shaper_flow_pop(ctx, sf, dir, &curr_time); goto DROP; } } @@ -964,18 +965,18 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi /*AQM process, if aqm not pass, for primary profile drop packet, for borrow profile just don't give token to this packet*/ if (shaper_aqm_need_drop(profile, pkt_wrapper, &curr_time, latency_us)) { if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) { - shaper_flow_pop(ctx, sf, &curr_time); + shaper_flow_pop(ctx, sf, dir, &curr_time); goto DROP; } else { - shaper_flow_specific_borrow_priority_pop(ctx, sf, priority); + shaper_flow_specific_borrow_priority_pop(ctx, sf, dir, priority); continue; } } - int ret = shaper_token_consume(ctx, sf, profile, profile_type, pkt_wrapper->length, pkt_wrapper->direction, &curr_time); + int ret = shaper_token_consume(ctx, sf, profile, profile_type, pkt_wrapper->length, dir, &curr_time); if (ret >= SHAPER_TOKEN_GET_SUCCESS) { if (ret == SHAPER_TOKEN_GET_SUCCESS) { - shaper_stat_forward_inc(&profile->stat, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index); + shaper_stat_forward_inc(&profile->stat, dir, pkt_wrapper->length, ctx->thread_index); } get_token_success = 1; break; @@ -986,24 +987,24 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi return SHAPING_QUEUED; } - shaper_flow_pop(ctx, sf, &curr_time); - sf->anchor = shaper_next_anchor_get(sf, pkt_wrapper->direction); - if (sf->anchor == 0) {//no next rule + shaper_flow_pop(ctx, sf, dir, &curr_time); + sf->anchor[dir] = shaper_next_anchor_get(sf, dir); + if (sf->anchor[dir] == 0) {//no next rule return SHAPING_FORWARD; } //push sf for next rule enqueue_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; - if (0 == shaper_flow_push(ctx, sf, enqueue_time_us)) { + if (0 == shaper_flow_push(ctx, sf, dir, enqueue_time_us)) { return SHAPING_QUEUED; } else { goto DROP; } DROP: - rule = &sf->matched_rule_infos[sf->anchor]; - shaper_stat_drop_inc(&rule->primary.stat, pkt_wrapper->direction, ctx->thread_index); - sf->anchor = 0; + rule = &sf->matched_rule_infos[sf->anchor[dir]]; + shaper_stat_drop_inc(&rule->primary.stat, dir, ctx->thread_index); + sf->anchor[dir] = 0; return SHAPING_DROP; } @@ -1021,8 +1022,8 @@ static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shapi shaper_stat_forward_inc(&profile->stat, meta->dir, meta->raw_len, ctx->thread_index); } - sf->anchor = shaper_next_anchor_get(sf, meta->dir); - if (sf->anchor == 0) {//no next rule + sf->anchor[meta->dir] = shaper_next_anchor_get(sf, meta->dir); + if (sf->anchor[meta->dir] == 0) {//no next rule return SHAPING_FORWARD; } } @@ -1040,7 +1041,7 @@ static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shapi } enqueue_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; - if (0 == shaper_flow_push(ctx, sf, enqueue_time_us)) { + if (0 == shaper_flow_push(ctx, sf, meta->dir, enqueue_time_us)) { return SHAPING_QUEUED; } else { goto DROP; @@ -1048,30 +1049,30 @@ static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shapi DROP: if (enqueue_success) { - shaper_packet_dequeue(sf); + shaper_packet_dequeue(sf, meta->dir); } - struct shaping_profile_info *pf_info = &sf->matched_rule_infos[sf->anchor].primary; + struct shaping_profile_info *pf_info = &sf->matched_rule_infos[sf->anchor[meta->dir]].primary; shaper_stat_drop_inc(&pf_info->stat, meta->dir, ctx->thread_index); - sf->anchor = 0; + sf->anchor[meta->dir] = 0; return SHAPING_DROP; } -static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_flow *sf, int priority, - struct shaping_stat *stat, struct shaping_thread_ctx *ctx) +static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_flow *sf, enum shaping_packet_dir dir, + int priority, struct shaping_stat *stat, struct shaping_thread_ctx *ctx) { struct shaping_packet_wrapper *pkt_wrapper; - int old_anchor = sf->anchor; + int old_anchor = sf->anchor[dir]; int shaping_ret; - pkt_wrapper = shaper_first_pkt_get(sf); + pkt_wrapper = shaper_first_pkt_get(sf, dir); assert(pkt_wrapper != NULL); - shaping_ret = shaper_pkt_action_decide_queueing(ctx, sf, priority); + shaping_ret = shaper_pkt_action_decide_queueing(ctx, sf, dir, priority); switch (shaping_ret) { case SHAPING_QUEUED: - if (old_anchor == sf->anchor) {//didn't get token + if (old_anchor == sf->anchor[dir]) {//didn't get token return -1; } else {//got token for one rule and waiting get token for next rule return 0; @@ -1083,7 +1084,7 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length); marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index); - shaper_packet_dequeue(sf); + shaper_packet_dequeue(sf, dir); break; case SHAPING_FORWARD: shaper_global_stat_queueing_dec(&ctx->thread_global_stat, pkt_wrapper->length); @@ -1091,7 +1092,7 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ shaper_global_stat_hit_policy_throughput_tx_inc(&ctx->thread_global_stat, pkt_wrapper->length); marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &pkt_wrapper->pkt_buff, 1); - shaper_packet_dequeue(sf); + shaper_packet_dequeue(sf, dir); break; default: assert(0);//impossible path @@ -1100,20 +1101,21 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ shaper_stat_refresh(ctx, sf, 0); - if (shaper_queue_empty(sf)) { - if (sf->flag & SESSION_CLOSE) { + enum shaping_packet_dir dir_opposite = (dir == SHAPING_DIR_IN) ? SHAPING_DIR_OUT : SHAPING_DIR_IN; + if (shaper_queue_empty(sf, dir)) { + if (shaper_queue_empty(sf, dir_opposite) && sf->flag & SESSION_CLOSE) { sf->flag &= (~SESSION_CLOSE); shaping_flow_free(ctx, sf); } return 0; } else { - pkt_wrapper = shaper_first_pkt_get(sf); - shaper_stat_queueing_pkt_dec_for_rule(&sf->matched_rule_infos[pkt_wrapper->rule_anchor], pkt_wrapper->direction, ctx->thread_index); + pkt_wrapper = shaper_first_pkt_get(sf, dir); + shaper_stat_queueing_pkt_dec_for_rule(&sf->matched_rule_infos[pkt_wrapper->rule_anchor], dir, ctx->thread_index); - sf->anchor = 0; - if (shaper_flow_push(ctx, sf, pkt_wrapper->enqueue_time_us) != 0) { - shaper_queue_clear(sf, ctx);//first packet fail, then every packet will fail - if (sf->flag & SESSION_CLOSE) { + sf->anchor[dir] = 0; + if (shaper_flow_push(ctx, sf, dir, pkt_wrapper->enqueue_time_us) != 0) { + shaper_queue_clear(sf, ctx, dir);//first packet fail, then every packet will fail + if (shaper_queue_empty(sf, dir_opposite) && sf->flag & SESSION_CLOSE) { sf->flag &= (~SESSION_CLOSE); shaping_flow_free(ctx, sf); } @@ -1162,11 +1164,11 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu goto END;//for tcp pure control pkt, transmit it directly } - if (!shaper_queue_empty(sf)) {//already have queueing pkt, enqueue directly + if (!shaper_queue_empty(sf, meta->dir)) {//already have queueing pkt, enqueue directly struct timespec curr_time; clock_gettime(CLOCK_MONOTONIC, &curr_time); - s_rule = &sf->matched_rule_infos[sf->anchor]; + s_rule = &sf->matched_rule_infos[sf->anchor[meta->dir]]; if (0 == shaper_packet_enqueue(ctx, sf, rx_buff, meta, &curr_time)) { shaper_stat_queueing_pkt_inc_for_rule(s_rule, meta->dir, ctx->thread_index); shaper_global_stat_queueing_inc(&ctx->thread_global_stat, meta->raw_len); @@ -1180,9 +1182,9 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu } } else {//no queueing pkt, decide action - sf->anchor = 0; + sf->anchor[meta->dir] = 0; - shaping_ret = shaper_pkt_action_decide_no_queue(ctx, sf, meta, &sf->matched_rule_infos[sf->anchor].primary, rx_buff); + shaping_ret = shaper_pkt_action_decide_no_queue(ctx, sf, meta, &sf->matched_rule_infos[sf->anchor[meta->dir]].primary, rx_buff); switch (shaping_ret) { case SHAPING_QUEUED: shaper_global_stat_queueing_inc(&ctx->thread_global_stat, meta->raw_len); @@ -1207,7 +1209,7 @@ END: shaper_stat_refresh(ctx, sf, 0); if(sf->flag & SESSION_CLOSE) { - if (shaper_queue_empty(sf)) { + if (shaper_queue_empty(sf, SHAPING_DIR_IN) && shaper_queue_empty(sf, SHAPING_DIR_OUT)) { char *addr_str = addr_tuple4_to_str(&sf->tuple4); LOG_DEBUG("%s: shaping free a shaping_flow for session: %s", LOG_TAG_SHAPING, addr_str); @@ -1223,6 +1225,29 @@ END: return; } +static void shaper_polling_token_get(struct shaper *sp, struct shaping_stat *stat, struct shaping_thread_ctx *ctx, enum shaping_packet_dir dir) +{ + struct shaper_flow_instance sf_ins[SHAPER_FLOW_POP_NUM_MAX]; + int sf_num; + int ret; + + for (int i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) { + sf_num = shaper_flow_in_order_get(sp, sf_ins, dir, i, ctx->conf.polling_node_num_max[i]); + if (sf_num == 0) { + continue; + } + + for (int j = 0; j < sf_num; j++) { + ret = shaper_polling_first_pkt_token_get(sp, sf_ins[j].sf, dir, sf_ins[j].priority, stat, ctx); + if (ret == 0) { + return; + } + } + } + + return; +} + void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_thread_ctx *ctx) { static thread_local int swarmkv_caller_loop_divisor = SWARMKV_CALLER_LOOP_DIVISOR_MIN; @@ -1265,23 +1290,8 @@ void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_ return; } - struct shaper_flow_instance sf_ins[SHAPER_FLOW_POP_NUM_MAX]; - int sf_num; - int ret; - - for (int i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) { - sf_num = shaper_flow_in_order_get(sp, sf_ins, i, ctx->conf.polling_node_num_max[i]); - if (sf_num == 0) { - continue; - } - - for (int j = 0; j < sf_num; j++) { - ret = shaper_polling_first_pkt_token_get(sp, sf_ins[j].sf, sf_ins[j].priority, stat, ctx); - if (ret == 0) { - return; - } - } - } + shaper_polling_token_get(sp, stat, ctx, SHAPING_DIR_IN); + shaper_polling_token_get(sp, stat, ctx, SHAPING_DIR_OUT); return; } diff --git a/shaping/src/shaper_session.cpp b/shaping/src/shaper_session.cpp index 494950b..f58d55f 100644 --- a/shaping/src/shaper_session.cpp +++ b/shaping/src/shaper_session.cpp @@ -195,7 +195,7 @@ struct shaping_flow* shaper_session_close(struct shaping_thread_ctx *ctx, struct sf = (struct shaping_flow *)session_node->val_data; - if (shaper_queue_empty(sf)) { + if (shaper_queue_empty(sf, SHAPING_DIR_IN) && shaper_queue_empty(sf, SHAPING_DIR_OUT)) { shaping_flow_free(ctx, sf); } else { sf->flag |= SESSION_CLOSE; @@ -245,7 +245,8 @@ void shaper_session_data_free_cb(void *session_data, void *data) struct shaping_thread_ctx *ctx = (struct shaping_thread_ctx *)data; if (sf) { - shaper_queue_clear(sf, ctx); + shaper_queue_clear(sf, ctx, SHAPING_DIR_IN); + shaper_queue_clear(sf, ctx, SHAPING_DIR_OUT); shaping_flow_free(ctx, sf); } diff --git a/shaping/src/shaper_swarmkv.cpp b/shaping/src/shaper_swarmkv.cpp index 6ac1db5..069cf33 100644 --- a/shaping/src/shaper_swarmkv.cpp +++ b/shaping/src/shaper_swarmkv.cpp @@ -139,6 +139,8 @@ struct swarmkv* shaper_swarmkv_init(int caller_thread_num) return NULL; } + swarmkv_register_thread(swarmkv_db); + LOG_DEBUG("%s: shaping open swarmkv: %s", LOG_TAG_SWARMKV, conf.swarmkv_cluster_name); char cmd[256] = {0};//重启之后自动执行一次heal diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp index 905a4f8..78b8247 100644 --- a/shaping/test/gtest_shaper.cpp +++ b/shaping/test/gtest_shaper.cpp @@ -45,7 +45,7 @@ static struct stub_packet_node* packet_node_new(stub_packet *packet) return pkt_node; } -static void send_packets(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int pkt_num, int pkt_len, unsigned char dir, +static void send_packets(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int pkt_num, int pkt_len, enum shaping_packet_dir dir, struct stub_pkt_queue *expec_tx_queue, int polling_times, int is_tcp_pure_control) { struct stub_packet_node *pkt_node; @@ -250,12 +250,11 @@ TEST(single_session, udp_tx_in_order) /**********send packets*********************/ - send_packets(&ctx->thread_ctx[0], sf, 101, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); + send_packets(&ctx->thread_ctx[0], sf, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); /*******************************************/ //first 10 packets ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 1));//async pass 1 packet ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); while (!TAILQ_EMPTY(&expec_tx_queue)) {//last 90 delay packets @@ -288,7 +287,7 @@ TEST(single_session, udp_tx_in_order) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 101, 10100, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts + shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); @@ -297,7 +296,7 @@ TEST(single_session, udp_tx_in_order) stat_file = fopen(SHAPING_GLOBAL_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_global_stat_judge(line, 101, 10100, 0, 0, 0, 0); + shaping_global_stat_judge(line, 100, 10000, 0, 0, 0, 0); fclose(stat_file); } @@ -306,7 +305,8 @@ TEST(single_session, udp_tx_in_order) profile: bidirectional limit 1000*/ TEST(bidirectional, udp_tx_in_order) { - struct stub_pkt_queue expec_tx_queue; + struct stub_pkt_queue expec_tx_queue_in; + struct stub_pkt_queue expec_tx_queue_out; struct stub_pkt_queue *actual_tx_queue; struct shaping_ctx *ctx = NULL; struct shaping_flow *sf = NULL; @@ -315,7 +315,8 @@ TEST(bidirectional, udp_tx_in_order) int profile_num[] = {1}; int profile_id[][MAX_REF_PROFILE] = {{0}}; - TAILQ_INIT(&expec_tx_queue); + TAILQ_INIT(&expec_tx_queue_in); + TAILQ_INIT(&expec_tx_queue_out); stub_init(); ctx = shaping_engine_init(); ASSERT_TRUE(ctx != NULL); @@ -328,32 +329,32 @@ TEST(bidirectional, udp_tx_in_order) shaper_rules_update(&ctx->thread_ctx[0], sf, rule_id, 1); /*******send packets***********/ - send_packets(&ctx->thread_ctx[0], sf, 10, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); - send_packets(&ctx->thread_ctx[0], sf, 10, 100, SHAPING_DIR_IN, &expec_tx_queue, 1, 0); - send_packets(&ctx->thread_ctx[0], sf, 10, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); + send_packets(&ctx->thread_ctx[0], sf, 10, 100, SHAPING_DIR_OUT, &expec_tx_queue_out, 1, 0); + send_packets(&ctx->thread_ctx[0], sf, 10, 100, SHAPING_DIR_IN, &expec_tx_queue_in, 1, 0); + send_packets(&ctx->thread_ctx[0], sf, 10, 100, SHAPING_DIR_OUT, &expec_tx_queue_out, 1, 0); //first 10 out packets - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); + ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue_out, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); stub_refresh_token_bucket(0); - for (int i = 0; i < 11; i++) {//first polling just request token and don't send pkt - polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); - } - //10 out packets - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); - ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); + polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); + ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue_out, actual_tx_queue, 1)); - stub_refresh_token_bucket(0); - for (int i = 0; i < 11; i++) {//first polling just request token and don't send pkt + while(!TAILQ_EMPTY(&expec_tx_queue_out)) { + stub_refresh_token_bucket(0); polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); + + ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue_in, actual_tx_queue, 1)); + ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue_out, actual_tx_queue, 1)); } - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); + + polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); + ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue_in, actual_tx_queue, 1)); + ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); - ASSERT_TRUE(TAILQ_EMPTY(&expec_tx_queue)); + ASSERT_TRUE(TAILQ_EMPTY(&expec_tx_queue_in)); shaping_flow_free(&ctx->thread_ctx[0], sf); fieldstat_global_disable_prometheus_endpoint(); @@ -393,12 +394,11 @@ TEST(max_min_host_fairness_profile, udp_tx_in_order) /**********send packets*********************/ - send_packets(&ctx->thread_ctx[0], sf, 101, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); + send_packets(&ctx->thread_ctx[0], sf, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); /*******************************************/ //first 10 packets ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 1));//async pass 1 packet ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); while (!TAILQ_EMPTY(&expec_tx_queue)) {//last 90 delay packets @@ -431,7 +431,7 @@ TEST(max_min_host_fairness_profile, udp_tx_in_order) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 101, 10100, 0, 0, 172000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts + shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); @@ -440,7 +440,7 @@ TEST(max_min_host_fairness_profile, udp_tx_in_order) stat_file = fopen(SHAPING_GLOBAL_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_global_stat_judge(line, 101, 10100, 0, 0, 0, 0); + shaping_global_stat_judge(line, 100, 10000, 0, 0, 0, 0); fclose(stat_file); } @@ -473,12 +473,11 @@ TEST(single_session, tcp_tx_in_order) shaper_rules_update(&ctx->thread_ctx[0], sf, rule_id, 1); /*******send packets***********/ - send_packets(&ctx->thread_ctx[0], sf, 21, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); + send_packets(&ctx->thread_ctx[0], sf, 20, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); send_packets(&ctx->thread_ctx[0], sf, 10, 100, SHAPING_DIR_OUT, &expec_pure_ctl_tx_queue, 1, 1); //first 10 packets ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 1));//async pass 1 packet //10 pure ctrl pkts ASSERT_EQ(0, judge_packet_eq(&expec_pure_ctl_tx_queue, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); @@ -522,7 +521,7 @@ TEST(single_session, tcp_tx_in_order) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 21, 2100, 0, 10, 0, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 10, 0, SHAPING_DIR_OUT, profile_type_primary); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); shaping_stat_judge(line, 0, 0, 1, 10, 1000, 0, 0, 31000, SHAPING_DIR_OUT, profile_type_primary); @@ -538,7 +537,8 @@ TEST(single_session, tcp_tx_in_order) direction OUT*/ TEST(single_session, udp_diff_direction) { - struct stub_pkt_queue expec_tx_queue; + struct stub_pkt_queue expec_tx_queue_in; + struct stub_pkt_queue expec_tx_queue_out; struct stub_pkt_queue *actual_tx_queue; struct shaping_ctx *ctx = NULL; struct shaping_flow *sf = NULL; @@ -547,7 +547,8 @@ TEST(single_session, udp_diff_direction) int profile_num[] = {1}; int profile_id[][MAX_REF_PROFILE] = {{0}}; - TAILQ_INIT(&expec_tx_queue); + TAILQ_INIT(&expec_tx_queue_in); + TAILQ_INIT(&expec_tx_queue_out); stub_init(); ctx = shaping_engine_init(); ASSERT_TRUE(ctx != NULL); @@ -560,16 +561,17 @@ TEST(single_session, udp_diff_direction) shaper_rules_update(&ctx->thread_ctx[0], sf, rule_id, 1); /*******send packets***********/ - send_packets(&ctx->thread_ctx[0], sf, 10, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); - send_packets(&ctx->thread_ctx[0], sf, 10, 100, SHAPING_DIR_IN, &expec_tx_queue, 1, 0); - send_packets(&ctx->thread_ctx[0], sf, 10, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); - send_packets(&ctx->thread_ctx[0], sf, 10, 100, SHAPING_DIR_IN, &expec_tx_queue, 1, 0); + send_packets(&ctx->thread_ctx[0], sf, 10, 100, SHAPING_DIR_OUT, &expec_tx_queue_out, 1, 0); + send_packets(&ctx->thread_ctx[0], sf, 10, 100, SHAPING_DIR_IN, &expec_tx_queue_in, 1, 0); + send_packets(&ctx->thread_ctx[0], sf, 10, 100, SHAPING_DIR_OUT, &expec_tx_queue_out, 1, 0); + send_packets(&ctx->thread_ctx[0], sf, 10, 100, SHAPING_DIR_IN, &expec_tx_queue_in, 1, 0); //first 10 out packets - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); - //10 in pcakets without consume token - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); + ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue_out, actual_tx_queue, 10)); + //20 in pcakets without consume token + ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue_in, actual_tx_queue, 20)); + ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); stub_refresh_token_bucket(0); for (int i = 0; i < 22; i++) {//first polling just request token and don't send pkt @@ -577,9 +579,7 @@ TEST(single_session, udp_diff_direction) stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); } //10 out packets - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); - //10 in packtets - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); + ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue_out, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); shaping_flow_free(&ctx->thread_ctx[0], sf); @@ -600,9 +600,9 @@ TEST(single_session, udp_diff_direction) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 20000, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 21000, SHAPING_DIR_OUT, profile_type_primary); - shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 20000, SHAPING_DIR_IN, profile_type_primary); + shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 0, SHAPING_DIR_IN, profile_type_primary); fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); @@ -641,12 +641,11 @@ TEST(single_session, udp_multi_rules) shaper_rules_update(&ctx->thread_ctx[0], sf, rule_id, 3); /*******send packets***********/ - send_packets(&ctx->thread_ctx[0], sf, 101, 100, SHAPING_DIR_OUT, &expec_tx_queue, 5, 0); + send_packets(&ctx->thread_ctx[0], sf, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue, 5, 0); //first 10 packets ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 1));//async pass 1 packet while (!TAILQ_EMPTY(&expec_tx_queue)) {//last 90 delay packets stub_refresh_token_bucket(0); @@ -679,13 +678,13 @@ TEST(single_session, udp_multi_rules) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 0 - shaping_stat_judge(line, 0, 0, 1, 101, 10100, 0, 0, 507000, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 507000, SHAPING_DIR_OUT, profile_type_primary); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1 - shaping_stat_judge(line, 1, 1, 1, 101, 10100, 0, 0, 1000, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 1, 1, 1, 100, 10000, 0, 0, 1000, SHAPING_DIR_OUT, profile_type_primary); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2 - shaping_stat_judge(line, 2, 2, 1, 101, 10100, 0, 0, 91000, SHAPING_DIR_OUT, profile_type_primary);//max latency is first queued pkt + shaping_stat_judge(line, 2, 2, 1, 100, 10000, 0, 0, 91000, SHAPING_DIR_OUT, profile_type_primary);//max latency is first queued pkt fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file @@ -721,12 +720,11 @@ TEST(single_session, udp_borrow) shaper_rules_update(&ctx->thread_ctx[0], sf, rule_id, 1); /*******send packets***********/ - send_packets(&ctx->thread_ctx[0], sf, 101, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); + send_packets(&ctx->thread_ctx[0], sf, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); //first 10 packets ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 1));//async pass 1 packet while (!TAILQ_EMPTY(&expec_tx_queue)) {//last 90 delay packets stub_refresh_token_bucket(2); @@ -758,7 +756,7 @@ TEST(single_session, udp_borrow) shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 171000, SHAPING_DIR_OUT, profile_type_primary); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, borrow - shaping_stat_judge(line, 1, 2, 2, 101, 10100, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow); + shaping_stat_judge(line, 1, 2, 2, 100, 10000, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow); fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file @@ -797,12 +795,11 @@ TEST(single_session, udp_borrow_same_priority_9) shaper_rules_update(&ctx->thread_ctx[0], sf, rule_id, 1); /*******send packets***********/ - send_packets(&ctx->thread_ctx[0], sf, 101, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); + send_packets(&ctx->thread_ctx[0], sf, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); //first 10 packets ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 1));//async pass 1 packet while (!TAILQ_EMPTY(&expec_tx_queue)) {//last 90 delay packets stub_refresh_token_bucket(3); @@ -839,7 +836,7 @@ TEST(single_session, udp_borrow_same_priority_9) #endif ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 3, borrow - shaping_stat_judge(line, 1, 3, 9, 101, 10100, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow); + shaping_stat_judge(line, 1, 3, 9, 100, 10000, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow); fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file @@ -945,13 +942,12 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order) /*******send packets***********/ - send_packets(&ctx->thread_ctx[0], sf1, 101, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 1, 0); - send_packets(&ctx->thread_ctx[0], sf2, 101, 100, SHAPING_DIR_OUT, &expec_tx_queue2, 1, 0); + send_packets(&ctx->thread_ctx[0], sf1, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 1, 0); + send_packets(&ctx->thread_ctx[0], sf2, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue2, 1, 0); //first 10 packets ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10)); - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//async pass 1 packet ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); stub_refresh_token_bucket(2); @@ -960,7 +956,6 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order) stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10)); - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1));//async pass 1 packet ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); while (!TAILQ_EMPTY(&expec_tx_queue2)) { @@ -1003,13 +998,13 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1, primary - shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 1472000, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 1471000, SHAPING_DIR_OUT, profile_type_primary); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, borrow - shaping_stat_judge(line, 1, 2, 2, 101, 10100, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow); + shaping_stat_judge(line, 1, 2, 2, 100, 10000, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, primary - shaping_stat_judge(line, 2, 2, 1, 101, 10100, 0, 0, 191000, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 2, 2, 1, 100, 10000, 0, 0, 191000, SHAPING_DIR_OUT, profile_type_primary); fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file @@ -1064,7 +1059,6 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order) send_packets(&ctx->thread_ctx[0], sf1, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 1, 0); send_packets(&ctx->thread_ctx[1], sf2, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue2, 1, 0); ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10)); - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//async pass 1 packet ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); shaper_stat_refresh(&ctx->thread_ctx[0], sf1, 1);//刷新线程0中的优先级队列长度到swarmkv中 @@ -1151,7 +1145,6 @@ TEST(two_session_diff_priority_same_profile, profile_timer_test) send_packets(&ctx->thread_ctx[0], sf1, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 1, 0); send_packets(&ctx->thread_ctx[1], sf2, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue2, 1, 0); ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10)); - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//async pass 1 packet ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); sleep(3);//wait profile timer to expire, to refresh priority queue_len to swarmkv @@ -1259,7 +1252,6 @@ TEST(two_sessions, priority_non_block) send_packets(&ctx->thread_ctx[0], sf1, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 3, 0);//sf1 blocked by rule2(profile id 1), while rule3(profile id 0) still has 1000 token send_packets(&ctx->thread_ctx[1], sf2, 10, 100, SHAPING_DIR_OUT, &expec_tx_queue2, 1, 0); ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10));//sf1 should send 10 pkts - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//sf1 async pass 1 pkts ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10));//sf2 should send 10 pkts cause rule3(profile id 0) has 1000 token ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); @@ -1334,7 +1326,6 @@ TEST(two_sessions, borrow_when_primary_profile_priority_blocked) send_packets(&ctx->thread_ctx[0], sf1, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 1, 0); send_packets(&ctx->thread_ctx[1], sf2, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue2, 1, 0); ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10)); - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//async pass 1 packet while (!TAILQ_EMPTY(&expec_tx_queue2)) { stub_refresh_token_bucket(1); @@ -1407,11 +1398,10 @@ TEST(two_sessions, primary_profile_priority_blocked_by_borrow_profile) shaper_rules_update(&ctx->thread_ctx[1], sf2, rule_id2, 1); /*******send packets***********/ - send_packets(&ctx->thread_ctx[0], sf1, 101, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 1, 0); + send_packets(&ctx->thread_ctx[0], sf1, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 1, 0); send_packets(&ctx->thread_ctx[1], sf2, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue2, 1, 0); ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10)); - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//async pass 1 packet ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); while (!TAILQ_EMPTY(&expec_tx_queue1)) { @@ -1468,12 +1458,11 @@ TEST(statistics, udp_drop_pkt) shaper_rules_update(&ctx->thread_ctx[0], sf, rule_id, 1); /*******send packets***********/ - send_packets(&ctx->thread_ctx[0], sf, SHAPING_SESSION_QUEUE_LEN + 10 + 1, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); + send_packets(&ctx->thread_ctx[0], sf, SHAPING_SESSION_QUEUE_LEN + 10, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); send_packets(&ctx->thread_ctx[0], sf, 100, 100, SHAPING_DIR_OUT, NULL, 1, 0);//these 100 pkts will be dropped //first 10 packets ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 1));//async pass 1 packet ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); while (!TAILQ_EMPTY(&expec_tx_queue)) { @@ -1503,7 +1492,7 @@ TEST(statistics, udp_drop_pkt) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, SHAPING_SESSION_QUEUE_LEN+10+1, (SHAPING_SESSION_QUEUE_LEN+10+1)*100, 100, 0, 228000, SHAPING_DIR_OUT, profile_type_primary);//every queued pkt's latency is max + shaping_stat_judge(line, 0, 0, 1, SHAPING_SESSION_QUEUE_LEN+10, (SHAPING_SESSION_QUEUE_LEN+10)*100, 100, 0, 228000, SHAPING_DIR_OUT, profile_type_primary);//every queued pkt's latency is max fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); @@ -1512,7 +1501,7 @@ TEST(statistics, udp_drop_pkt) stat_file = fopen(SHAPING_GLOBAL_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_global_stat_judge(line, SHAPING_SESSION_QUEUE_LEN+10+1, (SHAPING_SESSION_QUEUE_LEN+10+1)*100, 100, 10000, 0, 0); + shaping_global_stat_judge(line, SHAPING_SESSION_QUEUE_LEN+10, (SHAPING_SESSION_QUEUE_LEN+10)*100, 100, 10000, 0, 0); fclose(stat_file); } @@ -1545,7 +1534,7 @@ TEST(statistics, udp_queueing_pkt) shaper_rules_update(&ctx->thread_ctx[0], sf, rule_id, 1); /*******send packets***********/ - send_packets(&ctx->thread_ctx[0], sf, 101, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); + send_packets(&ctx->thread_ctx[0], sf, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); /***********send stat data here********************/ @@ -1558,12 +1547,11 @@ TEST(statistics, udp_queueing_pkt) stat_file = fopen(SHAPING_GLOBAL_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_global_stat_judge(line, 11, 1100, 0, 0, 90, 9000); + shaping_global_stat_judge(line, 10, 1000, 0, 0, 90, 9000); fclose(stat_file); //first 10 packets ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 1));//async pass 1 packet ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); while (!TAILQ_EMPTY(&expec_tx_queue)) {//last 90 delay packets @@ -1593,7 +1581,7 @@ TEST(statistics, udp_queueing_pkt) memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//stat data first sent - shaping_stat_judge(line, 0, 0, 1, 11, 1100, 0, 90, 0, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 0, 0, 1, 10, 1000, 0, 90, 0, SHAPING_DIR_OUT, profile_type_primary); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//stat data last sent shaping_stat_judge(line, 0, 0, 1, 90, 9000, 0, 0, 90000, SHAPING_DIR_OUT, profile_type_primary); diff --git a/shaping/test/stub.cpp b/shaping/test/stub.cpp index 02b43ea..9285120 100644 --- a/shaping/test/stub.cpp +++ b/shaping/test/stub.cpp @@ -232,6 +232,11 @@ int clock_gettime (clockid_t __clock_id, struct timespec *__tp) /**************stub of swarmkv*****************/ +void swarmkv_register_thread(struct swarmkv *db) +{ + return; +} + struct swarmkv_options* swarmkv_options_new(void) { return NULL; |
