diff options
| -rw-r--r-- | common/src/raw_packet.cpp | 50 | ||||
| -rw-r--r-- | common/src/session_table.cpp | 20 | ||||
| -rw-r--r-- | conf/zlog.conf | 4 | ||||
| -rw-r--r-- | shaping/include/shaper.h | 3 | ||||
| -rw-r--r-- | shaping/include/shaper_swarmkv.h | 3 | ||||
| -rw-r--r-- | shaping/src/shaper.cpp | 40 | ||||
| -rw-r--r-- | shaping/src/shaper_swarmkv.cpp | 36 | ||||
| -rw-r--r-- | shaping/test/gtest_shaper.cpp | 50 | ||||
| -rw-r--r-- | shaping/test/stub.cpp | 27 | ||||
| -rw-r--r-- | shaping/test/stub.h | 5 | ||||
| -rw-r--r-- | shaping/test/test_conf/shaping.conf | 1 |
11 files changed, 159 insertions, 80 deletions
diff --git a/common/src/raw_packet.cpp b/common/src/raw_packet.cpp index 54c610a..03be2b1 100644 --- a/common/src/raw_packet.cpp +++ b/common/src/raw_packet.cpp @@ -148,7 +148,7 @@ int raw_packet_tcp_payload_len_get(struct raw_pkt_parser *handler) const struct layer_result *layer = &results->layers[i]; enum layer_type type = layer->type; - LOG_DEBUG("%s: find most inner tcp, pkt_trace_id: %lu, layer[%d/%d]: %s", LOG_TAG_RAWPKT, handler->pkt_trace_id, i, results->layers_size, layer_type2str(type)); + //LOG_DEBUG("%s: find most inner tcp, pkt_trace_id: %lu, layer[%d/%d]: %s", LOG_TAG_RAWPKT, handler->pkt_trace_id, i, results->layers_size, layer_type2str(type)); // first get L4 layer if (type & LAYER_TYPE_TCP) @@ -190,7 +190,7 @@ int raw_packet_parser_get_most_inner_tuple4(struct raw_pkt_parser *handler, stru const struct layer_result *layer = &results->layers[i]; enum layer_type type = layer->type; - LOG_DEBUG("%s: find most inner tuple4, pkt_trace_id: %lu, layer[%d/%d]: %s", LOG_TAG_RAWPKT, handler->pkt_trace_id, i, results->layers_size, layer_type2str(type)); + //LOG_DEBUG("%s: find most inner tuple4, pkt_trace_id: %lu, layer[%d/%d]: %s", LOG_TAG_RAWPKT, handler->pkt_trace_id, i, results->layers_size, layer_type2str(type)); // first get L4 layer if (type & LAYER_TYPE_L4) @@ -245,7 +245,7 @@ int raw_packet_parser_get_most_outer_tuple4(struct raw_pkt_parser *handler, stru const struct layer_result *layer = &results->layers[i]; enum layer_type type = layer->type; - LOG_DEBUG("%s: find most outer tuple4, pkt_trace_id: %lu, layer[%d/%d]: %s", LOG_TAG_RAWPKT, handler->pkt_trace_id, i, results->layers_size, layer_type2str(type)); + //LOG_DEBUG("%s: find most outer tuple4, pkt_trace_id: %lu, layer[%d/%d]: %s", LOG_TAG_RAWPKT, handler->pkt_trace_id, i, results->layers_size, layer_type2str(type)); // first get L3 layer if (type & LAYER_TYPE_L3) @@ -297,7 +297,7 @@ int raw_packet_parser_get_most_inner_address(struct raw_pkt_parser *handler, str const struct layer_result *layer = &results->layers[i]; enum layer_type type = layer->type; - LOG_DEBUG("%s: find most inner address, pkt_trace_id: %lu, layer[%d/%d]: %s", LOG_TAG_RAWPKT, handler->pkt_trace_id, i, results->layers_size, layer_type2str(type)); + //LOG_DEBUG("%s: find most inner address, pkt_trace_id: %lu, layer[%d/%d]: %s", LOG_TAG_RAWPKT, handler->pkt_trace_id, i, results->layers_size, layer_type2str(type)); if (type & LAYER_TYPE_L3) { l3_layer_data = (const char *)handler->ptr_pkt_start + layer->offset; @@ -322,7 +322,7 @@ int raw_packet_parser_get_most_outer_address(struct raw_pkt_parser *handler, str const struct layer_result *layer = &results->layers[i]; enum layer_type type = layer->type; - LOG_DEBUG("%s: find most outer address, pkt_trace_id: %lu, layer[%d/%d]: %s", LOG_TAG_RAWPKT, handler->pkt_trace_id, i, results->layers_size, layer_type2str(type)); + //LOG_DEBUG("%s: find most outer address, pkt_trace_id: %lu, layer[%d/%d]: %s", LOG_TAG_RAWPKT, handler->pkt_trace_id, i, results->layers_size, layer_type2str(type)); if (type & LAYER_TYPE_L3) { l3_layer_data = (const char *)handler->ptr_pkt_start + layer->offset; @@ -452,12 +452,12 @@ uint64_t raw_packet_parser_get_hash_value(struct raw_pkt_parser *handler, enum l return hash_value; } - char *inner_addr_str = addr_tuple4_to_str(&inner_addr); - char *outer_addr_str = addr_tuple4_to_str(&outer_addr); - LOG_DEBUG("%s: pkt_trace_id: %lu, outer_addr: %s, inner_addr: %s, is_internal: %d, hash_method: %s, hash_value: %lu", - LOG_TAG_RAWPKT, handler->pkt_trace_id, outer_addr_str, inner_addr_str, dir_is_internal, ldbc_method_to_string(method), hash_value); - free(inner_addr_str); - free(outer_addr_str); + //char *inner_addr_str = addr_tuple4_to_str(&inner_addr); + //char *outer_addr_str = addr_tuple4_to_str(&outer_addr); + //LOG_DEBUG("%s: pkt_trace_id: %lu, outer_addr: %s, inner_addr: %s, is_internal: %d, hash_method: %s, hash_value: %lu", + // LOG_TAG_RAWPKT, handler->pkt_trace_id, outer_addr_str, inner_addr_str, dir_is_internal, ldbc_method_to_string(method), hash_value); + //free(inner_addr_str); + //free(outer_addr_str); return hash_value; } @@ -664,7 +664,7 @@ static const void *parse_ether(struct raw_pkt_parser *handler, const void *data, const void *data_next_layer = (const char *)data + hdr_len; size_t data_next_length = length - hdr_len; - LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); + //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); switch (next_proto) { case ETH_P_8021Q: @@ -712,7 +712,7 @@ static const void *parse_ipv4(struct raw_pkt_parser *handler, const void *data, const void *data_next_layer = (const char *)data + hdr_len; size_t data_next_length = length - hdr_len; - LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); + //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); switch (next_proto) { case IPPROTO_TCP: @@ -752,7 +752,7 @@ static const void *parse_ipv6(struct raw_pkt_parser *handler, const void *data, const void *data_next_layer = (const char *)data + hdr_len; size_t data_next_length = length - hdr_len; - LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); + //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); switch (next_proto) { case IPPROTO_TCP: @@ -791,7 +791,7 @@ static const void *parse_tcp(struct raw_pkt_parser *handler, const void *data, s const void *data_next_layer = (const char *)data + hdr_len; size_t data_next_length = length - hdr_len; - LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); + //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); return data_next_layer; } @@ -814,7 +814,7 @@ static const void *parse_udp(struct raw_pkt_parser *handler, const void *data, s const void *data_next_layer = (const char *)data + hdr_len; size_t data_next_length = length - hdr_len; - LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); + //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); switch (ntohs(hdr->uh_dport)) { // VXLAN_DPORT @@ -848,7 +848,7 @@ static const void *parse_pppoe_ses(struct raw_pkt_parser *handler, const void *d const void *data_next_layer = (const char *)data + hdr_len; size_t data_next_length = length - hdr_len; - LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); + //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); switch (next_proto) { // PPPOE_TYPE_IPV4 @@ -883,7 +883,7 @@ static const void *parse_vxlan(struct raw_pkt_parser *handler, const void *data, const void *data_next_layer = (const char *)data + hdr_len; size_t data_next_length = length - hdr_len; - LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); + //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); // TESTED return parse_ether(handler, data_next_layer, data_next_length, LAYER_TYPE_ETHER); } @@ -907,7 +907,7 @@ static const void *parse_vlan8021q(struct raw_pkt_parser *handler, const void *d const void *data_next_layer = (const char *)data + hdr_len; size_t data_next_length = length - hdr_len; - LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); + //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); switch (next_proto) { case ETH_P_8021Q: @@ -954,7 +954,7 @@ static const void *parse_gtpv1_u(struct raw_pkt_parser *handler, const void *dat const void *data_next_layer = (const char *)data + hdr_len; size_t data_next_length = length - hdr_len; - LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); + //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); switch (next_proto) { case 4: @@ -1027,32 +1027,32 @@ static const void *parse_mpls(struct raw_pkt_parser *handler, const void *data, data_next_layer = (const char *)data_next_layer + 4; data_next_length = data_next_length - 4; - LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); + //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); // TESTED return parse_ether(handler, data_next_layer, data_next_length, LAYER_TYPE_ETHER); } else if (ip_version == 4) { - LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); + //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); // TESTED return parse_ipv4(handler, data_next_layer, data_next_length, LAYER_TYPE_IPV4); } else if (ip_version == 6) { - LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); + //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); // TODO return parse_ipv6(handler, data_next_layer, data_next_length, LAYER_TYPE_IPV6); } else { - LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); + //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); // TODO return parse_ether(handler, data_next_layer, data_next_length, LAYER_TYPE_ETHER); } } else { - LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); + //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length); // TESTED return parse_mpls(handler, data_next_layer, data_next_length, LAYER_TYPE_MPLS); } diff --git a/common/src/session_table.cpp b/common/src/session_table.cpp index 0764e14..b5a2278 100644 --- a/common/src/session_table.cpp +++ b/common/src/session_table.cpp @@ -113,7 +113,7 @@ int session_table_insert(struct session_table *table, uint64_t session_id, const HASH_FIND(hh1, table->root_by_id, &session_id, sizeof(session_id), temp); if (temp) { - LOG_DEBUG("%s: insert: key %lu exists", LOG_TAG_STABLE, session_id); + LOG_ERROR("%s: insert: key %lu exists", LOG_TAG_STABLE, session_id); return -1; } @@ -128,7 +128,7 @@ int session_table_insert(struct session_table *table, uint64_t session_id, const HASH_ADD(hh1, table->root_by_id, session_id, sizeof(temp->session_id), temp); HASH_ADD(hh2, table->root_by_addr, session_addr, sizeof(temp->session_addr), temp); - LOG_DEBUG("%s: insert: key %lu success", LOG_TAG_STABLE, session_id); + //LOG_DEBUG("%s: insert: key %lu success", LOG_TAG_STABLE, session_id); table->session_node_count++; return 0; @@ -140,7 +140,7 @@ int session_table_delete_by_id(struct session_table *table, uint64_t session_id) HASH_FIND(hh1, table->root_by_id, &session_id, sizeof(session_id), temp); if (!temp) { - LOG_DEBUG("%s: delete: key %lu not exists", LOG_TAG_STABLE, session_id); + LOG_ERROR("%s: delete: key %lu not exists", LOG_TAG_STABLE, session_id); return -1; } @@ -156,7 +156,7 @@ int session_table_delete_by_id(struct session_table *table, uint64_t session_id) free(temp); temp = NULL; - LOG_DEBUG("%s: delete: key %lu success", LOG_TAG_STABLE, session_id); + //LOG_DEBUG("%s: delete: key %lu success", LOG_TAG_STABLE, session_id); table->session_node_count--; return 0; @@ -174,7 +174,7 @@ int session_table_delete_by_addr(struct session_table *table, const struct addr_ HASH_FIND(hh2, table->root_by_addr, &reverse_addr, sizeof(struct addr_tuple4), temp); if (!temp) { - LOG_DEBUG("%s: delete: key %s not exists", LOG_TAG_STABLE, addr_str); + LOG_ERROR("%s: delete: key %s not exists", LOG_TAG_STABLE, addr_str); free(addr_str); return -1; } @@ -192,7 +192,7 @@ int session_table_delete_by_addr(struct session_table *table, const struct addr_ free(temp); temp = NULL; - LOG_DEBUG("%s: delete: key %s success", LOG_TAG_STABLE, addr_str); + //LOG_DEBUG("%s: delete: key %s success", LOG_TAG_STABLE, addr_str); free(addr_str); addr_str = NULL; table->session_node_count--; @@ -206,11 +206,11 @@ struct session_node *session_table_search_by_id(struct session_table *table, uin HASH_FIND(hh1, table->root_by_id, &session_id, sizeof(session_id), temp); if (!temp) { - LOG_DEBUG("%s: search: key %lu not exists", LOG_TAG_STABLE, session_id); + //LOG_DEBUG("%s: search: key %lu not exists", LOG_TAG_STABLE, session_id); return NULL; } - LOG_DEBUG("%s: search: key %lu success", LOG_TAG_STABLE, session_id); + //LOG_DEBUG("%s: search: key %lu success", LOG_TAG_STABLE, session_id); return temp; } @@ -227,14 +227,14 @@ struct session_node *session_table_search_by_addr(struct session_table *table, c HASH_FIND(hh2, table->root_by_addr, &reverse_addr, sizeof(struct addr_tuple4), temp); if (!temp) { - LOG_DEBUG("%s: search: key %s not exists", LOG_TAG_STABLE, addr_str); + //LOG_DEBUG("%s: search: key %s not exists", LOG_TAG_STABLE, addr_str); free(addr_str); addr_str = NULL; return NULL; } } - LOG_DEBUG("%s: search: key %s success", LOG_TAG_STABLE, addr_str); + //LOG_DEBUG("%s: search: key %s success", LOG_TAG_STABLE, addr_str); free(addr_str); addr_str = NULL; diff --git a/conf/zlog.conf b/conf/zlog.conf index 12d1fb8..587b955 100644 --- a/conf/zlog.conf +++ b/conf/zlog.conf @@ -8,4 +8,6 @@ FATAL=30 [rules] log_shaping.fatal "./log/shaping.log.%d(%F)"; -log_shaping.fatal >stdout; +#log_shaping.fatal >stdout; +#log_shaping.info "./log/info_shaping.log.%d(%F)"; +#log_shaping.debug "./log/debug_shaping.log.%d(%F)"; diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h index da32ef8..d130222 100644 --- a/shaping/include/shaper.h +++ b/shaping/include/shaper.h @@ -33,6 +33,7 @@ extern "C" { struct shaping_system_conf { unsigned int session_queue_len_max; unsigned int priority_queue_len_max; + unsigned int pkt_max_delay_time_us; int polling_node_num_max[SHAPING_PRIORITY_NUM_MAX]; int work_thread_num; int cpu_affinity_enable; @@ -50,6 +51,8 @@ struct shaping_thread_ctx { struct shaping_global_stat *global_stat; struct shaping_marsio_info *marsio_info; struct swarmkv *swarmkv_db;//handle of swarmkv + int swarmkv_aqm_prob; + time_t swarmkv_aqm_update_time; struct shaping_maat_info *maat_info; struct session_table *session_table; struct timeouts *expires; diff --git a/shaping/include/shaper_swarmkv.h b/shaping/include/shaper_swarmkv.h index e533802..963fff0 100644 --- a/shaping/include/shaper_swarmkv.h +++ b/shaping/include/shaper_swarmkv.h @@ -2,4 +2,5 @@ struct swarmkv* shaper_swarmkv_init(int caller_thread_num); void shaper_swarmkv_destroy(struct swarmkv* swarmkv_db); -void swarmkv_reload_log_level();
\ No newline at end of file +void swarmkv_reload_log_level(); +int shaper_swarmkv_pending_queue_aqm_drop(struct shaping_thread_ctx *ctx);
\ No newline at end of file diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 6792ee2..1013d0f 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -30,10 +30,9 @@ extern "C" { #define NANO_SECONDS_PER_MILLI_SEC 1000000 #define MILLI_SECONDS_PER_SEC 1000 -#define SHAPING_LATENCY_THRESHOLD 2000000 //2s - #define TOKEN_ENLARGE_TIMES 10 #define TOKEN_GET_FAILED_INTERVAL_MS 1 +#define HMGET_REQUEST_INTERVAL_MS 1000 #define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 "HMGET tsg-shaping-%d priority-0" #define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 " priority-1" @@ -79,7 +78,10 @@ enum shaper_token_get_result { struct shaping_profile_hash_node { int id; - unsigned long long last_failed_get_token_ms; + long long last_failed_get_token_ms; + long long last_hmget_ms[SHAPING_PRIORITY_NUM_MAX]; + unsigned char is_priority_blocked[SHAPING_PRIORITY_NUM_MAX]; + unsigned char is_invalid; UT_hash_handle hh; }; @@ -511,12 +513,13 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb_arg) { struct shaping_async_cb_arg *arg = (struct shaping_async_cb_arg *)cb_arg; - struct shaping_profile_info *s_pf_info = arg->s_pf_info; + struct shaping_profile_hash_node *pf_hash_node = arg->s_pf_info->hash_node; struct shaping_flow *sf = arg->sf; + int priority = arg->priority; shaper_global_stat_async_callback_inc(arg->ctx->global_stat); - s_pf_info->is_priority_blocked = 0; + pf_hash_node->is_priority_blocked[priority] = 0; if (!reply || (reply->type != SWARMKV_REPLY_NIL && reply->type != SWARMKV_REPLY_ARRAY)) { shaper_global_stat_async_hmget_failed_inc(arg->ctx->global_stat); @@ -532,19 +535,23 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb char tmp_str[32] = {0}; memcpy(tmp_str, reply->elements[i]->str, reply->elements[i]->len); if (strtoll(tmp_str, NULL, 10) > 0) { - s_pf_info->is_priority_blocked = 1; + pf_hash_node->is_priority_blocked[priority] = 1; break; } } } END: + struct timespec curr_time; + clock_gettime(CLOCK_MONOTONIC, &curr_time); + pf_hash_node->last_hmget_ms[priority] = curr_time.tv_sec * MILLI_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MILLI_SEC; + shaping_flow_free(arg->ctx, sf);//sub ref count and decide if need to free free(cb_arg); cb_arg = NULL; } -static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile) +static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, long long curr_time_ms) { struct shaping_async_cb_arg *arg; int priority = profile->priority; @@ -553,6 +560,10 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st return 0; } + if (curr_time_ms - profile->hash_node->last_hmget_ms[priority] < HMGET_REQUEST_INTERVAL_MS) {//don't send hmget command in 1s + goto END; + } + arg = (struct shaping_async_cb_arg *)calloc(1, sizeof(struct shaping_async_cb_arg)); arg->ctx = ctx; arg->s_pf_info = profile; @@ -564,7 +575,8 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st shaper_global_stat_async_invoke_inc(ctx->global_stat); swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, swarmkv_queue_len_get_cmd[priority], profile->id); - if (profile->is_priority_blocked) { +END: + if (profile->hash_node->is_priority_blocked[priority] == 1) { return 1; } else { return 0; @@ -610,12 +622,17 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f struct timespec curr_timespec; clock_gettime(CLOCK_MONOTONIC, &curr_timespec); - unsigned long long curr_time_ms = curr_timespec.tv_sec * MILLI_SECONDS_PER_SEC + curr_timespec.tv_nsec / NANO_SECONDS_PER_MILLI_SEC; + long long curr_time_ms = curr_timespec.tv_sec * MILLI_SECONDS_PER_SEC + curr_timespec.tv_nsec / NANO_SECONDS_PER_MILLI_SEC; if (curr_time_ms - profile->hash_node->last_failed_get_token_ms < TOKEN_GET_FAILED_INTERVAL_MS) {//if failed to get token in last 1ms, return failed; for swarmkv can't reproduce token in 1ms return SHAPER_TOKEN_GET_FAILED; } - if (shaper_profile_is_priority_blocked(ctx, sf, profile)) { + if (shaper_swarmkv_pending_queue_aqm_drop(ctx) == 1) { + profile->hash_node->last_failed_get_token_ms = curr_time_ms; + return SHAPER_TOKEN_GET_FAILED; + } + + if (shaper_profile_is_priority_blocked(ctx, sf, profile, curr_time_ms)) { return SHAPER_TOKEN_GET_FAILED; } else { int req_token_bits = req_token_bytes * 8; @@ -699,7 +716,7 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi if (pf_container[0].pf_type == PROFILE_IN_RULE_TYPE_PRIMARY) { clock_gettime(CLOCK_MONOTONIC, &curr_time); - if (shaper_pkt_latency_us_calculate(pf_container[0].pf_info, &curr_time) > SHAPING_LATENCY_THRESHOLD) { + if (shaper_pkt_latency_us_calculate(pf_container[0].pf_info, &curr_time) > ctx->conf.pkt_max_delay_time_us) { shaper_flow_pop(ctx, sf); goto DROP; } @@ -1196,6 +1213,7 @@ int shaper_global_conf_init(struct shaping_system_conf *conf) MESA_load_profile_uint_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "PRIORITY_QUEUE_LEN_MAX", &conf->priority_queue_len_max, 1024); MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "CHECK_RULE_ENABLE_INTERVAL_SEC", &conf->check_rule_enable_interval_sec, 120); + MESA_load_profile_uint_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "PKT_MAX_DELAY_TIME_US", &conf->pkt_max_delay_time_us, 2000000); return 0; diff --git a/shaping/src/shaper_swarmkv.cpp b/shaping/src/shaper_swarmkv.cpp index 3f6df0c..1fea20c 100644 --- a/shaping/src/shaper_swarmkv.cpp +++ b/shaping/src/shaper_swarmkv.cpp @@ -6,6 +6,12 @@ #include "utils.h" #include "shaper_swarmkv.h" +#define PROBABILITY_MAX 100 +#define INCREMENT 10 +#define DECREMENT 1 +#define FREEZE_TIME 1 //unit:s +#define PENDING_QUEUE_LEN_MAX 1500 + struct shaper_swarmkv_conf { char swarmkv_cluster_name[64]; @@ -100,6 +106,36 @@ void swarmkv_reload_log_level() return; } +int shaper_swarmkv_pending_queue_aqm_drop(struct shaping_thread_ctx *ctx) +{ + long long pending_queue_len = swarmkv_caller_get_pending_commands(ctx->swarmkv_db); + time_t now = time(NULL); + + if (now - ctx->swarmkv_aqm_update_time < FREEZE_TIME) { + goto END; + } + + if (pending_queue_len > PENDING_QUEUE_LEN_MAX) { + if (ctx->swarmkv_aqm_prob < PROBABILITY_MAX) { + ctx->swarmkv_aqm_prob += INCREMENT; + } + LOG_DEBUG("%s: shaping pending queue len %lld, aqm prob %d", LOG_TAG_SWARMKV, pending_queue_len, ctx->swarmkv_aqm_prob); + } else { + if (ctx->swarmkv_aqm_prob >= DECREMENT) { + ctx->swarmkv_aqm_prob -= DECREMENT; + } + LOG_DEBUG("%s: shaping pending queue len %lld, aqm prob %d", LOG_TAG_SWARMKV, pending_queue_len, ctx->swarmkv_aqm_prob); + } + ctx->swarmkv_aqm_update_time = now; + +END: + if (rand() % PROBABILITY_MAX < ctx->swarmkv_aqm_prob) { + return 1; + } + + return 0; +} + struct swarmkv* shaper_swarmkv_init(int caller_thread_num) { struct swarmkv_options *swarmkv_opts = NULL; diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp index 41520aa..a625068 100644 --- a/shaping/test/gtest_shaper.cpp +++ b/shaping/test/gtest_shaper.cpp @@ -56,7 +56,7 @@ static void send_packets(struct shaping_thread_ctx *ctx, struct shaping_flow *sf for (int i = 0; i < pkt_num; i++) { memset(&meta, 0, sizeof(meta)); - time = stub_curr_time_get(); + time = stub_curr_time_ns_get(); packet = packet_new(time, pkt_len, dir); if (expec_tx_queue) { pkt_node = packet_node_new(packet); @@ -75,7 +75,7 @@ static void send_packets(struct shaping_thread_ctx *ctx, struct shaping_flow *sf polling_entry(ctx->sp, ctx->stat, ctx); } - stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); + stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); } return; @@ -261,7 +261,7 @@ TEST(single_session, udp_tx_in_order) stub_refresh_token_bucket(0); for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); + stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); @@ -348,7 +348,7 @@ TEST(single_session, tcp_tx_in_order) stub_refresh_token_bucket(0); for (int i = 0; i < 10; i++) {//10 pkts which is not pure control polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); + stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); @@ -422,7 +422,7 @@ TEST(single_session, udp_diff_direction) stub_refresh_token_bucket(0); for (int i = 0; i < 20; i++) { polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); + stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); } //10 out packets ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); @@ -503,7 +503,7 @@ TEST(single_session, udp_multi_rules) //there are 3 rules, send one packet need 3 polling process, so 10 packets need 30 polling //even though invoke polling more than 30 times, there should be only 10 pkts be sent polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); + stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); @@ -577,7 +577,7 @@ TEST(single_session, udp_borrow) stub_refresh_token_bucket(2); for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); + stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); @@ -651,7 +651,7 @@ TEST(single_session, udp_borrow_same_priority_9) stub_refresh_token_bucket(3); for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); + stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); @@ -747,7 +747,7 @@ TEST(two_session_diff_priority, udp_in_order) stub_refresh_token_bucket(1); for (int i = 0; i < 10; i++) { polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); + stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10));//stream2 priority 1, first ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); @@ -758,7 +758,7 @@ TEST(two_session_diff_priority, udp_in_order) stub_refresh_token_bucket(1); for (int i = 0; i < 10; i++) { polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); + stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10));//stream1 priority 2 ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); @@ -864,7 +864,7 @@ TEST(two_session_diff_priority, udp_in_order_multi_rule) stub_refresh_token_bucket(4); for (int i = 0; i < 30; i++) { polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); + stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); @@ -877,7 +877,7 @@ TEST(two_session_diff_priority, udp_in_order_multi_rule) stub_refresh_token_bucket(4); for (int i = 0; i < 10; i++) { polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); + stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10));//stream2 priority 3 ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); @@ -965,7 +965,7 @@ TEST(single_session_async, udp_tx_in_order) stub_refresh_token_bucket(0); for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); + stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); @@ -1103,7 +1103,7 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order) stub_refresh_token_bucket(2); for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); + stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); @@ -1114,7 +1114,7 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order) stub_refresh_token_bucket(2); for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); + stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); @@ -1200,7 +1200,7 @@ TEST(two_session_same_rule, udp_tx_in_order) stub_refresh_token_bucket(1); for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); + stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); } ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); @@ -1280,6 +1280,7 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order) ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); shaper_stat_refresh(&ctx->thread_ctx[0], sf1, 0, 1);//刷新线程0中的优先级队列长度到swarmkv中 + stub_curr_time_s_inc(1);//inc time to refresh hmget interval for (int i = 0; i < 10; i++) {//线程1中的session优先级为2,被线程0中优先级为1的session阻断 stub_refresh_token_bucket(0); polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]); @@ -1294,6 +1295,7 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order) } shaper_stat_refresh(&ctx->thread_ctx[0], sf1, 0, 1);//刷新线程0中的优先级队列长度到swarmkv中 + stub_curr_time_s_inc(1);//inc time to refresh hmget interval while (!TAILQ_EMPTY(&expec_tx_queue2)) { stub_refresh_token_bucket(0); polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]); @@ -1362,15 +1364,16 @@ TEST(two_session_diff_priority_same_profile, session_timer_test) sleep(3);//wait session timer to expire, to refresh priority queue_len to swarmkv for (int i = 0; i < 500; i++) { - stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);//inc time to refresh stat in timer + stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);//inc time to refresh stat in timer } polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);//timer triggered in polling polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]); + stub_curr_time_s_inc(1);//inc time to refresh hmget interval for (int i = 0; i < 10; i++) {//线程1中的session优先级为2,被线程0中优先级为1的session阻断 stub_refresh_token_bucket(0); polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]); - stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); + stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); ASSERT_EQ(-1, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1));//优先级低,不能发出报文 } @@ -1378,22 +1381,23 @@ TEST(two_session_diff_priority_same_profile, session_timer_test) while (!TAILQ_EMPTY(&expec_tx_queue1)) { stub_refresh_token_bucket(0); polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); + stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//sf1 priority 1 } sleep(3);//wait session timer to expire, to refresh priority queue_len to swarmkv for (int i = 0; i < 500; i++) { - stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);//inc time to refresh stat in timer + stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);//inc time to refresh stat in timer } polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);//timer triggered in polling polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]); + stub_curr_time_s_inc(1);//inc time to refresh hmget interval while (!TAILQ_EMPTY(&expec_tx_queue2)) { stub_refresh_token_bucket(0); polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]); - stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); + stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1));//sf2 priority 2 } @@ -1444,7 +1448,7 @@ TEST(statistics, udp_drop_pkt) while (!TAILQ_EMPTY(&expec_tx_queue)) { stub_refresh_token_bucket(0); polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); + stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 1)); } @@ -1531,7 +1535,7 @@ TEST(statistics, udp_queueing_pkt) while (!TAILQ_EMPTY(&expec_tx_queue)) {//last 90 delay packets stub_refresh_token_bucket(0); polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); + stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET); ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 1)); } diff --git a/shaping/test/stub.cpp b/shaping/test/stub.cpp index 25d4550..db7ce23 100644 --- a/shaping/test/stub.cpp +++ b/shaping/test/stub.cpp @@ -50,7 +50,8 @@ struct stub_matched_rules matched_rules; struct shaping_profile pf_array[MAX_STUB_PROFILE_NUM]; static int profile_priority_len[MAX_STUB_PROFILE_NUM][10] = {{0}}; -static unsigned long long curr_time = 2000000000;//2s +static unsigned long long curr_time_ns = 2000000000;//2s +static unsigned int curr_time_s = 0; void * stub_get_token_thread_func(void *data) { @@ -151,16 +152,23 @@ struct stub_pkt_queue* stub_get_tx_queue() return &tx_queue; } -void stub_curr_time_inc(unsigned long long time_ns) +void stub_curr_time_ns_inc(unsigned long long time_ns) { - curr_time += time_ns; + curr_time_ns += time_ns; return; } -unsigned long long stub_curr_time_get() +void stub_curr_time_s_inc(int time_s) { - return curr_time; + curr_time_s += time_s; + + return; +} + +unsigned long long stub_curr_time_ns_get() +{ + return curr_time_ns; } void stub_init() @@ -193,8 +201,8 @@ void stub_init() int clock_gettime (clockid_t __clock_id, struct timespec *__tp) { - __tp->tv_sec = 0; - __tp->tv_nsec = curr_time; + __tp->tv_sec = curr_time_s; + __tp->tv_nsec = curr_time_ns; return 0; } @@ -262,6 +270,11 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char * cluster_ return db; } +long long swarmkv_caller_get_pending_commands(struct swarmkv *db) +{ + return 0; +} + int swarmkv_options_set_log_path(struct swarmkv_options *opts, const char *logpath) { return 0; diff --git a/shaping/test/stub.h b/shaping/test/stub.h index 80e099b..48b9b0b 100644 --- a/shaping/test/stub.h +++ b/shaping/test/stub.h @@ -41,8 +41,9 @@ struct stub_pkt_queue* stub_get_tx_queue(); int stub_AQM_drop_packet(int queue_len, unsigned long long income_time); -void stub_curr_time_inc(unsigned long long time_ns); -unsigned long long stub_curr_time_get(); +void stub_curr_time_ns_inc(unsigned long long time_ns); +void stub_curr_time_s_inc(int time_s); +unsigned long long stub_curr_time_ns_get(); void stub_init(); diff --git a/shaping/test/test_conf/shaping.conf b/shaping/test/test_conf/shaping.conf index bcba85c..015ab3c 100644 --- a/shaping/test/test_conf/shaping.conf +++ b/shaping/test/test_conf/shaping.conf @@ -40,5 +40,6 @@ LINE_PROTOCOL_SERVER_PORT=6667 #PROFILE_QUEUE_LEN_PER_PRIORITY_MAX=128 SESSION_QUEUE_LEN_MAX=128 QUEUEING_SESSIONS_PER_PRIORITY_PER_THREAD_MAX=1024 +PKT_MAX_DELAY_TIME_US=999999999 POLLING_NODE_NUM_MAX={"polling_node_num_max":[ 3, 2, 2, 2, 2, 2, 2, 2, 2, 2 ]} |
