summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--common/src/raw_packet.cpp50
-rw-r--r--common/src/session_table.cpp20
-rw-r--r--conf/zlog.conf4
-rw-r--r--shaping/include/shaper.h3
-rw-r--r--shaping/include/shaper_swarmkv.h3
-rw-r--r--shaping/src/shaper.cpp40
-rw-r--r--shaping/src/shaper_swarmkv.cpp36
-rw-r--r--shaping/test/gtest_shaper.cpp50
-rw-r--r--shaping/test/stub.cpp27
-rw-r--r--shaping/test/stub.h5
-rw-r--r--shaping/test/test_conf/shaping.conf1
11 files changed, 159 insertions, 80 deletions
diff --git a/common/src/raw_packet.cpp b/common/src/raw_packet.cpp
index 54c610a..03be2b1 100644
--- a/common/src/raw_packet.cpp
+++ b/common/src/raw_packet.cpp
@@ -148,7 +148,7 @@ int raw_packet_tcp_payload_len_get(struct raw_pkt_parser *handler)
const struct layer_result *layer = &results->layers[i];
enum layer_type type = layer->type;
- LOG_DEBUG("%s: find most inner tcp, pkt_trace_id: %lu, layer[%d/%d]: %s", LOG_TAG_RAWPKT, handler->pkt_trace_id, i, results->layers_size, layer_type2str(type));
+ //LOG_DEBUG("%s: find most inner tcp, pkt_trace_id: %lu, layer[%d/%d]: %s", LOG_TAG_RAWPKT, handler->pkt_trace_id, i, results->layers_size, layer_type2str(type));
// first get L4 layer
if (type & LAYER_TYPE_TCP)
@@ -190,7 +190,7 @@ int raw_packet_parser_get_most_inner_tuple4(struct raw_pkt_parser *handler, stru
const struct layer_result *layer = &results->layers[i];
enum layer_type type = layer->type;
- LOG_DEBUG("%s: find most inner tuple4, pkt_trace_id: %lu, layer[%d/%d]: %s", LOG_TAG_RAWPKT, handler->pkt_trace_id, i, results->layers_size, layer_type2str(type));
+ //LOG_DEBUG("%s: find most inner tuple4, pkt_trace_id: %lu, layer[%d/%d]: %s", LOG_TAG_RAWPKT, handler->pkt_trace_id, i, results->layers_size, layer_type2str(type));
// first get L4 layer
if (type & LAYER_TYPE_L4)
@@ -245,7 +245,7 @@ int raw_packet_parser_get_most_outer_tuple4(struct raw_pkt_parser *handler, stru
const struct layer_result *layer = &results->layers[i];
enum layer_type type = layer->type;
- LOG_DEBUG("%s: find most outer tuple4, pkt_trace_id: %lu, layer[%d/%d]: %s", LOG_TAG_RAWPKT, handler->pkt_trace_id, i, results->layers_size, layer_type2str(type));
+ //LOG_DEBUG("%s: find most outer tuple4, pkt_trace_id: %lu, layer[%d/%d]: %s", LOG_TAG_RAWPKT, handler->pkt_trace_id, i, results->layers_size, layer_type2str(type));
// first get L3 layer
if (type & LAYER_TYPE_L3)
@@ -297,7 +297,7 @@ int raw_packet_parser_get_most_inner_address(struct raw_pkt_parser *handler, str
const struct layer_result *layer = &results->layers[i];
enum layer_type type = layer->type;
- LOG_DEBUG("%s: find most inner address, pkt_trace_id: %lu, layer[%d/%d]: %s", LOG_TAG_RAWPKT, handler->pkt_trace_id, i, results->layers_size, layer_type2str(type));
+ //LOG_DEBUG("%s: find most inner address, pkt_trace_id: %lu, layer[%d/%d]: %s", LOG_TAG_RAWPKT, handler->pkt_trace_id, i, results->layers_size, layer_type2str(type));
if (type & LAYER_TYPE_L3)
{
l3_layer_data = (const char *)handler->ptr_pkt_start + layer->offset;
@@ -322,7 +322,7 @@ int raw_packet_parser_get_most_outer_address(struct raw_pkt_parser *handler, str
const struct layer_result *layer = &results->layers[i];
enum layer_type type = layer->type;
- LOG_DEBUG("%s: find most outer address, pkt_trace_id: %lu, layer[%d/%d]: %s", LOG_TAG_RAWPKT, handler->pkt_trace_id, i, results->layers_size, layer_type2str(type));
+ //LOG_DEBUG("%s: find most outer address, pkt_trace_id: %lu, layer[%d/%d]: %s", LOG_TAG_RAWPKT, handler->pkt_trace_id, i, results->layers_size, layer_type2str(type));
if (type & LAYER_TYPE_L3)
{
l3_layer_data = (const char *)handler->ptr_pkt_start + layer->offset;
@@ -452,12 +452,12 @@ uint64_t raw_packet_parser_get_hash_value(struct raw_pkt_parser *handler, enum l
return hash_value;
}
- char *inner_addr_str = addr_tuple4_to_str(&inner_addr);
- char *outer_addr_str = addr_tuple4_to_str(&outer_addr);
- LOG_DEBUG("%s: pkt_trace_id: %lu, outer_addr: %s, inner_addr: %s, is_internal: %d, hash_method: %s, hash_value: %lu",
- LOG_TAG_RAWPKT, handler->pkt_trace_id, outer_addr_str, inner_addr_str, dir_is_internal, ldbc_method_to_string(method), hash_value);
- free(inner_addr_str);
- free(outer_addr_str);
+ //char *inner_addr_str = addr_tuple4_to_str(&inner_addr);
+ //char *outer_addr_str = addr_tuple4_to_str(&outer_addr);
+ //LOG_DEBUG("%s: pkt_trace_id: %lu, outer_addr: %s, inner_addr: %s, is_internal: %d, hash_method: %s, hash_value: %lu",
+ // LOG_TAG_RAWPKT, handler->pkt_trace_id, outer_addr_str, inner_addr_str, dir_is_internal, ldbc_method_to_string(method), hash_value);
+ //free(inner_addr_str);
+ //free(outer_addr_str);
return hash_value;
}
@@ -664,7 +664,7 @@ static const void *parse_ether(struct raw_pkt_parser *handler, const void *data,
const void *data_next_layer = (const char *)data + hdr_len;
size_t data_next_length = length - hdr_len;
- LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
+ //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
switch (next_proto)
{
case ETH_P_8021Q:
@@ -712,7 +712,7 @@ static const void *parse_ipv4(struct raw_pkt_parser *handler, const void *data,
const void *data_next_layer = (const char *)data + hdr_len;
size_t data_next_length = length - hdr_len;
- LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
+ //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
switch (next_proto)
{
case IPPROTO_TCP:
@@ -752,7 +752,7 @@ static const void *parse_ipv6(struct raw_pkt_parser *handler, const void *data,
const void *data_next_layer = (const char *)data + hdr_len;
size_t data_next_length = length - hdr_len;
- LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
+ //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
switch (next_proto)
{
case IPPROTO_TCP:
@@ -791,7 +791,7 @@ static const void *parse_tcp(struct raw_pkt_parser *handler, const void *data, s
const void *data_next_layer = (const char *)data + hdr_len;
size_t data_next_length = length - hdr_len;
- LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
+ //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
return data_next_layer;
}
@@ -814,7 +814,7 @@ static const void *parse_udp(struct raw_pkt_parser *handler, const void *data, s
const void *data_next_layer = (const char *)data + hdr_len;
size_t data_next_length = length - hdr_len;
- LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
+ //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
switch (ntohs(hdr->uh_dport))
{
// VXLAN_DPORT
@@ -848,7 +848,7 @@ static const void *parse_pppoe_ses(struct raw_pkt_parser *handler, const void *d
const void *data_next_layer = (const char *)data + hdr_len;
size_t data_next_length = length - hdr_len;
- LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
+ //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
switch (next_proto)
{
// PPPOE_TYPE_IPV4
@@ -883,7 +883,7 @@ static const void *parse_vxlan(struct raw_pkt_parser *handler, const void *data,
const void *data_next_layer = (const char *)data + hdr_len;
size_t data_next_length = length - hdr_len;
- LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
+ //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
// TESTED
return parse_ether(handler, data_next_layer, data_next_length, LAYER_TYPE_ETHER);
}
@@ -907,7 +907,7 @@ static const void *parse_vlan8021q(struct raw_pkt_parser *handler, const void *d
const void *data_next_layer = (const char *)data + hdr_len;
size_t data_next_length = length - hdr_len;
- LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
+ //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
switch (next_proto)
{
case ETH_P_8021Q:
@@ -954,7 +954,7 @@ static const void *parse_gtpv1_u(struct raw_pkt_parser *handler, const void *dat
const void *data_next_layer = (const char *)data + hdr_len;
size_t data_next_length = length - hdr_len;
- LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
+ //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
switch (next_proto)
{
case 4:
@@ -1027,32 +1027,32 @@ static const void *parse_mpls(struct raw_pkt_parser *handler, const void *data,
data_next_layer = (const char *)data_next_layer + 4;
data_next_length = data_next_length - 4;
- LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
+ //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
// TESTED
return parse_ether(handler, data_next_layer, data_next_length, LAYER_TYPE_ETHER);
}
else if (ip_version == 4)
{
- LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
+ //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
// TESTED
return parse_ipv4(handler, data_next_layer, data_next_length, LAYER_TYPE_IPV4);
}
else if (ip_version == 6)
{
- LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
+ //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
// TODO
return parse_ipv6(handler, data_next_layer, data_next_length, LAYER_TYPE_IPV6);
}
else
{
- LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
+ //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
// TODO
return parse_ether(handler, data_next_layer, data_next_length, LAYER_TYPE_ETHER);
}
}
else
{
- LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
+ //LOG_DEBUG("%s: pkt_trace_id: %lu, this_layer: %s, payload_len: [%lu/%lu]", LOG_TAG_RAWPKT, handler->pkt_trace_id, layer_type2str(this_type), data_next_length, length);
// TESTED
return parse_mpls(handler, data_next_layer, data_next_length, LAYER_TYPE_MPLS);
}
diff --git a/common/src/session_table.cpp b/common/src/session_table.cpp
index 0764e14..b5a2278 100644
--- a/common/src/session_table.cpp
+++ b/common/src/session_table.cpp
@@ -113,7 +113,7 @@ int session_table_insert(struct session_table *table, uint64_t session_id, const
HASH_FIND(hh1, table->root_by_id, &session_id, sizeof(session_id), temp);
if (temp)
{
- LOG_DEBUG("%s: insert: key %lu exists", LOG_TAG_STABLE, session_id);
+ LOG_ERROR("%s: insert: key %lu exists", LOG_TAG_STABLE, session_id);
return -1;
}
@@ -128,7 +128,7 @@ int session_table_insert(struct session_table *table, uint64_t session_id, const
HASH_ADD(hh1, table->root_by_id, session_id, sizeof(temp->session_id), temp);
HASH_ADD(hh2, table->root_by_addr, session_addr, sizeof(temp->session_addr), temp);
- LOG_DEBUG("%s: insert: key %lu success", LOG_TAG_STABLE, session_id);
+ //LOG_DEBUG("%s: insert: key %lu success", LOG_TAG_STABLE, session_id);
table->session_node_count++;
return 0;
@@ -140,7 +140,7 @@ int session_table_delete_by_id(struct session_table *table, uint64_t session_id)
HASH_FIND(hh1, table->root_by_id, &session_id, sizeof(session_id), temp);
if (!temp)
{
- LOG_DEBUG("%s: delete: key %lu not exists", LOG_TAG_STABLE, session_id);
+ LOG_ERROR("%s: delete: key %lu not exists", LOG_TAG_STABLE, session_id);
return -1;
}
@@ -156,7 +156,7 @@ int session_table_delete_by_id(struct session_table *table, uint64_t session_id)
free(temp);
temp = NULL;
- LOG_DEBUG("%s: delete: key %lu success", LOG_TAG_STABLE, session_id);
+ //LOG_DEBUG("%s: delete: key %lu success", LOG_TAG_STABLE, session_id);
table->session_node_count--;
return 0;
@@ -174,7 +174,7 @@ int session_table_delete_by_addr(struct session_table *table, const struct addr_
HASH_FIND(hh2, table->root_by_addr, &reverse_addr, sizeof(struct addr_tuple4), temp);
if (!temp)
{
- LOG_DEBUG("%s: delete: key %s not exists", LOG_TAG_STABLE, addr_str);
+ LOG_ERROR("%s: delete: key %s not exists", LOG_TAG_STABLE, addr_str);
free(addr_str);
return -1;
}
@@ -192,7 +192,7 @@ int session_table_delete_by_addr(struct session_table *table, const struct addr_
free(temp);
temp = NULL;
- LOG_DEBUG("%s: delete: key %s success", LOG_TAG_STABLE, addr_str);
+ //LOG_DEBUG("%s: delete: key %s success", LOG_TAG_STABLE, addr_str);
free(addr_str);
addr_str = NULL;
table->session_node_count--;
@@ -206,11 +206,11 @@ struct session_node *session_table_search_by_id(struct session_table *table, uin
HASH_FIND(hh1, table->root_by_id, &session_id, sizeof(session_id), temp);
if (!temp)
{
- LOG_DEBUG("%s: search: key %lu not exists", LOG_TAG_STABLE, session_id);
+ //LOG_DEBUG("%s: search: key %lu not exists", LOG_TAG_STABLE, session_id);
return NULL;
}
- LOG_DEBUG("%s: search: key %lu success", LOG_TAG_STABLE, session_id);
+ //LOG_DEBUG("%s: search: key %lu success", LOG_TAG_STABLE, session_id);
return temp;
}
@@ -227,14 +227,14 @@ struct session_node *session_table_search_by_addr(struct session_table *table, c
HASH_FIND(hh2, table->root_by_addr, &reverse_addr, sizeof(struct addr_tuple4), temp);
if (!temp)
{
- LOG_DEBUG("%s: search: key %s not exists", LOG_TAG_STABLE, addr_str);
+ //LOG_DEBUG("%s: search: key %s not exists", LOG_TAG_STABLE, addr_str);
free(addr_str);
addr_str = NULL;
return NULL;
}
}
- LOG_DEBUG("%s: search: key %s success", LOG_TAG_STABLE, addr_str);
+ //LOG_DEBUG("%s: search: key %s success", LOG_TAG_STABLE, addr_str);
free(addr_str);
addr_str = NULL;
diff --git a/conf/zlog.conf b/conf/zlog.conf
index 12d1fb8..587b955 100644
--- a/conf/zlog.conf
+++ b/conf/zlog.conf
@@ -8,4 +8,6 @@ FATAL=30
[rules]
log_shaping.fatal "./log/shaping.log.%d(%F)";
-log_shaping.fatal >stdout;
+#log_shaping.fatal >stdout;
+#log_shaping.info "./log/info_shaping.log.%d(%F)";
+#log_shaping.debug "./log/debug_shaping.log.%d(%F)";
diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h
index da32ef8..d130222 100644
--- a/shaping/include/shaper.h
+++ b/shaping/include/shaper.h
@@ -33,6 +33,7 @@ extern "C" {
struct shaping_system_conf {
unsigned int session_queue_len_max;
unsigned int priority_queue_len_max;
+ unsigned int pkt_max_delay_time_us;
int polling_node_num_max[SHAPING_PRIORITY_NUM_MAX];
int work_thread_num;
int cpu_affinity_enable;
@@ -50,6 +51,8 @@ struct shaping_thread_ctx {
struct shaping_global_stat *global_stat;
struct shaping_marsio_info *marsio_info;
struct swarmkv *swarmkv_db;//handle of swarmkv
+ int swarmkv_aqm_prob;
+ time_t swarmkv_aqm_update_time;
struct shaping_maat_info *maat_info;
struct session_table *session_table;
struct timeouts *expires;
diff --git a/shaping/include/shaper_swarmkv.h b/shaping/include/shaper_swarmkv.h
index e533802..963fff0 100644
--- a/shaping/include/shaper_swarmkv.h
+++ b/shaping/include/shaper_swarmkv.h
@@ -2,4 +2,5 @@
struct swarmkv* shaper_swarmkv_init(int caller_thread_num);
void shaper_swarmkv_destroy(struct swarmkv* swarmkv_db);
-void swarmkv_reload_log_level(); \ No newline at end of file
+void swarmkv_reload_log_level();
+int shaper_swarmkv_pending_queue_aqm_drop(struct shaping_thread_ctx *ctx); \ No newline at end of file
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index 6792ee2..1013d0f 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -30,10 +30,9 @@ extern "C" {
#define NANO_SECONDS_PER_MILLI_SEC 1000000
#define MILLI_SECONDS_PER_SEC 1000
-#define SHAPING_LATENCY_THRESHOLD 2000000 //2s
-
#define TOKEN_ENLARGE_TIMES 10
#define TOKEN_GET_FAILED_INTERVAL_MS 1
+#define HMGET_REQUEST_INTERVAL_MS 1000
#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 "HMGET tsg-shaping-%d priority-0"
#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 " priority-1"
@@ -79,7 +78,10 @@ enum shaper_token_get_result {
struct shaping_profile_hash_node {
int id;
- unsigned long long last_failed_get_token_ms;
+ long long last_failed_get_token_ms;
+ long long last_hmget_ms[SHAPING_PRIORITY_NUM_MAX];
+ unsigned char is_priority_blocked[SHAPING_PRIORITY_NUM_MAX];
+ unsigned char is_invalid;
UT_hash_handle hh;
};
@@ -511,12 +513,13 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb_arg)
{
struct shaping_async_cb_arg *arg = (struct shaping_async_cb_arg *)cb_arg;
- struct shaping_profile_info *s_pf_info = arg->s_pf_info;
+ struct shaping_profile_hash_node *pf_hash_node = arg->s_pf_info->hash_node;
struct shaping_flow *sf = arg->sf;
+ int priority = arg->priority;
shaper_global_stat_async_callback_inc(arg->ctx->global_stat);
- s_pf_info->is_priority_blocked = 0;
+ pf_hash_node->is_priority_blocked[priority] = 0;
if (!reply || (reply->type != SWARMKV_REPLY_NIL && reply->type != SWARMKV_REPLY_ARRAY)) {
shaper_global_stat_async_hmget_failed_inc(arg->ctx->global_stat);
@@ -532,19 +535,23 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb
char tmp_str[32] = {0};
memcpy(tmp_str, reply->elements[i]->str, reply->elements[i]->len);
if (strtoll(tmp_str, NULL, 10) > 0) {
- s_pf_info->is_priority_blocked = 1;
+ pf_hash_node->is_priority_blocked[priority] = 1;
break;
}
}
}
END:
+ struct timespec curr_time;
+ clock_gettime(CLOCK_MONOTONIC, &curr_time);
+ pf_hash_node->last_hmget_ms[priority] = curr_time.tv_sec * MILLI_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MILLI_SEC;
+
shaping_flow_free(arg->ctx, sf);//sub ref count and decide if need to free
free(cb_arg);
cb_arg = NULL;
}
-static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile)
+static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, long long curr_time_ms)
{
struct shaping_async_cb_arg *arg;
int priority = profile->priority;
@@ -553,6 +560,10 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st
return 0;
}
+ if (curr_time_ms - profile->hash_node->last_hmget_ms[priority] < HMGET_REQUEST_INTERVAL_MS) {//don't send hmget command in 1s
+ goto END;
+ }
+
arg = (struct shaping_async_cb_arg *)calloc(1, sizeof(struct shaping_async_cb_arg));
arg->ctx = ctx;
arg->s_pf_info = profile;
@@ -564,7 +575,8 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st
shaper_global_stat_async_invoke_inc(ctx->global_stat);
swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, swarmkv_queue_len_get_cmd[priority], profile->id);
- if (profile->is_priority_blocked) {
+END:
+ if (profile->hash_node->is_priority_blocked[priority] == 1) {
return 1;
} else {
return 0;
@@ -610,12 +622,17 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f
struct timespec curr_timespec;
clock_gettime(CLOCK_MONOTONIC, &curr_timespec);
- unsigned long long curr_time_ms = curr_timespec.tv_sec * MILLI_SECONDS_PER_SEC + curr_timespec.tv_nsec / NANO_SECONDS_PER_MILLI_SEC;
+ long long curr_time_ms = curr_timespec.tv_sec * MILLI_SECONDS_PER_SEC + curr_timespec.tv_nsec / NANO_SECONDS_PER_MILLI_SEC;
if (curr_time_ms - profile->hash_node->last_failed_get_token_ms < TOKEN_GET_FAILED_INTERVAL_MS) {//if failed to get token in last 1ms, return failed; for swarmkv can't reproduce token in 1ms
return SHAPER_TOKEN_GET_FAILED;
}
- if (shaper_profile_is_priority_blocked(ctx, sf, profile)) {
+ if (shaper_swarmkv_pending_queue_aqm_drop(ctx) == 1) {
+ profile->hash_node->last_failed_get_token_ms = curr_time_ms;
+ return SHAPER_TOKEN_GET_FAILED;
+ }
+
+ if (shaper_profile_is_priority_blocked(ctx, sf, profile, curr_time_ms)) {
return SHAPER_TOKEN_GET_FAILED;
} else {
int req_token_bits = req_token_bytes * 8;
@@ -699,7 +716,7 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi
if (pf_container[0].pf_type == PROFILE_IN_RULE_TYPE_PRIMARY) {
clock_gettime(CLOCK_MONOTONIC, &curr_time);
- if (shaper_pkt_latency_us_calculate(pf_container[0].pf_info, &curr_time) > SHAPING_LATENCY_THRESHOLD) {
+ if (shaper_pkt_latency_us_calculate(pf_container[0].pf_info, &curr_time) > ctx->conf.pkt_max_delay_time_us) {
shaper_flow_pop(ctx, sf);
goto DROP;
}
@@ -1196,6 +1213,7 @@ int shaper_global_conf_init(struct shaping_system_conf *conf)
MESA_load_profile_uint_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "PRIORITY_QUEUE_LEN_MAX", &conf->priority_queue_len_max, 1024);
MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "CHECK_RULE_ENABLE_INTERVAL_SEC", &conf->check_rule_enable_interval_sec, 120);
+ MESA_load_profile_uint_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "PKT_MAX_DELAY_TIME_US", &conf->pkt_max_delay_time_us, 2000000);
return 0;
diff --git a/shaping/src/shaper_swarmkv.cpp b/shaping/src/shaper_swarmkv.cpp
index 3f6df0c..1fea20c 100644
--- a/shaping/src/shaper_swarmkv.cpp
+++ b/shaping/src/shaper_swarmkv.cpp
@@ -6,6 +6,12 @@
#include "utils.h"
#include "shaper_swarmkv.h"
+#define PROBABILITY_MAX 100
+#define INCREMENT 10
+#define DECREMENT 1
+#define FREEZE_TIME 1 //unit:s
+#define PENDING_QUEUE_LEN_MAX 1500
+
struct shaper_swarmkv_conf
{
char swarmkv_cluster_name[64];
@@ -100,6 +106,36 @@ void swarmkv_reload_log_level()
return;
}
+int shaper_swarmkv_pending_queue_aqm_drop(struct shaping_thread_ctx *ctx)
+{
+ long long pending_queue_len = swarmkv_caller_get_pending_commands(ctx->swarmkv_db);
+ time_t now = time(NULL);
+
+ if (now - ctx->swarmkv_aqm_update_time < FREEZE_TIME) {
+ goto END;
+ }
+
+ if (pending_queue_len > PENDING_QUEUE_LEN_MAX) {
+ if (ctx->swarmkv_aqm_prob < PROBABILITY_MAX) {
+ ctx->swarmkv_aqm_prob += INCREMENT;
+ }
+ LOG_DEBUG("%s: shaping pending queue len %lld, aqm prob %d", LOG_TAG_SWARMKV, pending_queue_len, ctx->swarmkv_aqm_prob);
+ } else {
+ if (ctx->swarmkv_aqm_prob >= DECREMENT) {
+ ctx->swarmkv_aqm_prob -= DECREMENT;
+ }
+ LOG_DEBUG("%s: shaping pending queue len %lld, aqm prob %d", LOG_TAG_SWARMKV, pending_queue_len, ctx->swarmkv_aqm_prob);
+ }
+ ctx->swarmkv_aqm_update_time = now;
+
+END:
+ if (rand() % PROBABILITY_MAX < ctx->swarmkv_aqm_prob) {
+ return 1;
+ }
+
+ return 0;
+}
+
struct swarmkv* shaper_swarmkv_init(int caller_thread_num)
{
struct swarmkv_options *swarmkv_opts = NULL;
diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp
index 41520aa..a625068 100644
--- a/shaping/test/gtest_shaper.cpp
+++ b/shaping/test/gtest_shaper.cpp
@@ -56,7 +56,7 @@ static void send_packets(struct shaping_thread_ctx *ctx, struct shaping_flow *sf
for (int i = 0; i < pkt_num; i++) {
memset(&meta, 0, sizeof(meta));
- time = stub_curr_time_get();
+ time = stub_curr_time_ns_get();
packet = packet_new(time, pkt_len, dir);
if (expec_tx_queue) {
pkt_node = packet_node_new(packet);
@@ -75,7 +75,7 @@ static void send_packets(struct shaping_thread_ctx *ctx, struct shaping_flow *sf
polling_entry(ctx->sp, ctx->stat, ctx);
}
- stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
+ stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
}
return;
@@ -261,7 +261,7 @@ TEST(single_session, udp_tx_in_order)
stub_refresh_token_bucket(0);
for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
+ stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
}
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
@@ -348,7 +348,7 @@ TEST(single_session, tcp_tx_in_order)
stub_refresh_token_bucket(0);
for (int i = 0; i < 10; i++) {//10 pkts which is not pure control
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
+ stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
}
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
@@ -422,7 +422,7 @@ TEST(single_session, udp_diff_direction)
stub_refresh_token_bucket(0);
for (int i = 0; i < 20; i++) {
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
+ stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
}
//10 out packets
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
@@ -503,7 +503,7 @@ TEST(single_session, udp_multi_rules)
//there are 3 rules, send one packet need 3 polling process, so 10 packets need 30 polling
//even though invoke polling more than 30 times, there should be only 10 pkts be sent
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
+ stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
}
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
@@ -577,7 +577,7 @@ TEST(single_session, udp_borrow)
stub_refresh_token_bucket(2);
for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
+ stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
}
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
@@ -651,7 +651,7 @@ TEST(single_session, udp_borrow_same_priority_9)
stub_refresh_token_bucket(3);
for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
+ stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
}
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
@@ -747,7 +747,7 @@ TEST(two_session_diff_priority, udp_in_order)
stub_refresh_token_bucket(1);
for (int i = 0; i < 10; i++) {
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
+ stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
}
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10));//stream2 priority 1, first
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
@@ -758,7 +758,7 @@ TEST(two_session_diff_priority, udp_in_order)
stub_refresh_token_bucket(1);
for (int i = 0; i < 10; i++) {
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
+ stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
}
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10));//stream1 priority 2
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
@@ -864,7 +864,7 @@ TEST(two_session_diff_priority, udp_in_order_multi_rule)
stub_refresh_token_bucket(4);
for (int i = 0; i < 30; i++) {
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
+ stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
}
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
@@ -877,7 +877,7 @@ TEST(two_session_diff_priority, udp_in_order_multi_rule)
stub_refresh_token_bucket(4);
for (int i = 0; i < 10; i++) {
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
+ stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
}
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10));//stream2 priority 3
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
@@ -965,7 +965,7 @@ TEST(single_session_async, udp_tx_in_order)
stub_refresh_token_bucket(0);
for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
+ stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
}
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
@@ -1103,7 +1103,7 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order)
stub_refresh_token_bucket(2);
for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
+ stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
}
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
@@ -1114,7 +1114,7 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order)
stub_refresh_token_bucket(2);
for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
+ stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
}
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
@@ -1200,7 +1200,7 @@ TEST(two_session_same_rule, udp_tx_in_order)
stub_refresh_token_bucket(1);
for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
+ stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
}
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
@@ -1280,6 +1280,7 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order)
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
shaper_stat_refresh(&ctx->thread_ctx[0], sf1, 0, 1);//刷新线程0中的优先级队列长度到swarmkv中
+ stub_curr_time_s_inc(1);//inc time to refresh hmget interval
for (int i = 0; i < 10; i++) {//线程1中的session优先级为2,被线程0中优先级为1的session阻断
stub_refresh_token_bucket(0);
polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);
@@ -1294,6 +1295,7 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order)
}
shaper_stat_refresh(&ctx->thread_ctx[0], sf1, 0, 1);//刷新线程0中的优先级队列长度到swarmkv中
+ stub_curr_time_s_inc(1);//inc time to refresh hmget interval
while (!TAILQ_EMPTY(&expec_tx_queue2)) {
stub_refresh_token_bucket(0);
polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);
@@ -1362,15 +1364,16 @@ TEST(two_session_diff_priority_same_profile, session_timer_test)
sleep(3);//wait session timer to expire, to refresh priority queue_len to swarmkv
for (int i = 0; i < 500; i++) {
- stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);//inc time to refresh stat in timer
+ stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);//inc time to refresh stat in timer
}
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);//timer triggered in polling
polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);
+ stub_curr_time_s_inc(1);//inc time to refresh hmget interval
for (int i = 0; i < 10; i++) {//线程1中的session优先级为2,被线程0中优先级为1的session阻断
stub_refresh_token_bucket(0);
polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);
- stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
+ stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
ASSERT_EQ(-1, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1));//优先级低,不能发出报文
}
@@ -1378,22 +1381,23 @@ TEST(two_session_diff_priority_same_profile, session_timer_test)
while (!TAILQ_EMPTY(&expec_tx_queue1)) {
stub_refresh_token_bucket(0);
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
+ stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//sf1 priority 1
}
sleep(3);//wait session timer to expire, to refresh priority queue_len to swarmkv
for (int i = 0; i < 500; i++) {
- stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);//inc time to refresh stat in timer
+ stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);//inc time to refresh stat in timer
}
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);//timer triggered in polling
polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);
+ stub_curr_time_s_inc(1);//inc time to refresh hmget interval
while (!TAILQ_EMPTY(&expec_tx_queue2)) {
stub_refresh_token_bucket(0);
polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);
- stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
+ stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1));//sf2 priority 2
}
@@ -1444,7 +1448,7 @@ TEST(statistics, udp_drop_pkt)
while (!TAILQ_EMPTY(&expec_tx_queue)) {
stub_refresh_token_bucket(0);
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
+ stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 1));
}
@@ -1531,7 +1535,7 @@ TEST(statistics, udp_queueing_pkt)
while (!TAILQ_EMPTY(&expec_tx_queue)) {//last 90 delay packets
stub_refresh_token_bucket(0);
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
+ stub_curr_time_ns_inc(STUB_TIME_INC_FOR_PACKET);
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 1));
}
diff --git a/shaping/test/stub.cpp b/shaping/test/stub.cpp
index 25d4550..db7ce23 100644
--- a/shaping/test/stub.cpp
+++ b/shaping/test/stub.cpp
@@ -50,7 +50,8 @@ struct stub_matched_rules matched_rules;
struct shaping_profile pf_array[MAX_STUB_PROFILE_NUM];
static int profile_priority_len[MAX_STUB_PROFILE_NUM][10] = {{0}};
-static unsigned long long curr_time = 2000000000;//2s
+static unsigned long long curr_time_ns = 2000000000;//2s
+static unsigned int curr_time_s = 0;
void * stub_get_token_thread_func(void *data)
{
@@ -151,16 +152,23 @@ struct stub_pkt_queue* stub_get_tx_queue()
return &tx_queue;
}
-void stub_curr_time_inc(unsigned long long time_ns)
+void stub_curr_time_ns_inc(unsigned long long time_ns)
{
- curr_time += time_ns;
+ curr_time_ns += time_ns;
return;
}
-unsigned long long stub_curr_time_get()
+void stub_curr_time_s_inc(int time_s)
{
- return curr_time;
+ curr_time_s += time_s;
+
+ return;
+}
+
+unsigned long long stub_curr_time_ns_get()
+{
+ return curr_time_ns;
}
void stub_init()
@@ -193,8 +201,8 @@ void stub_init()
int clock_gettime (clockid_t __clock_id, struct timespec *__tp)
{
- __tp->tv_sec = 0;
- __tp->tv_nsec = curr_time;
+ __tp->tv_sec = curr_time_s;
+ __tp->tv_nsec = curr_time_ns;
return 0;
}
@@ -262,6 +270,11 @@ struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char * cluster_
return db;
}
+long long swarmkv_caller_get_pending_commands(struct swarmkv *db)
+{
+ return 0;
+}
+
int swarmkv_options_set_log_path(struct swarmkv_options *opts, const char *logpath)
{
return 0;
diff --git a/shaping/test/stub.h b/shaping/test/stub.h
index 80e099b..48b9b0b 100644
--- a/shaping/test/stub.h
+++ b/shaping/test/stub.h
@@ -41,8 +41,9 @@ struct stub_pkt_queue* stub_get_tx_queue();
int stub_AQM_drop_packet(int queue_len, unsigned long long income_time);
-void stub_curr_time_inc(unsigned long long time_ns);
-unsigned long long stub_curr_time_get();
+void stub_curr_time_ns_inc(unsigned long long time_ns);
+void stub_curr_time_s_inc(int time_s);
+unsigned long long stub_curr_time_ns_get();
void stub_init();
diff --git a/shaping/test/test_conf/shaping.conf b/shaping/test/test_conf/shaping.conf
index bcba85c..015ab3c 100644
--- a/shaping/test/test_conf/shaping.conf
+++ b/shaping/test/test_conf/shaping.conf
@@ -40,5 +40,6 @@ LINE_PROTOCOL_SERVER_PORT=6667
#PROFILE_QUEUE_LEN_PER_PRIORITY_MAX=128
SESSION_QUEUE_LEN_MAX=128
QUEUEING_SESSIONS_PER_PRIORITY_PER_THREAD_MAX=1024
+PKT_MAX_DELAY_TIME_US=999999999
POLLING_NODE_NUM_MAX={"polling_node_num_max":[ 3, 2, 2, 2, 2, 2, 2, 2, 2, 2 ]}