summaryrefslogtreecommitdiff
path: root/shaping/src/shaper.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'shaping/src/shaper.cpp')
-rw-r--r--shaping/src/shaper.cpp112
1 files changed, 50 insertions, 62 deletions
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index f8dbe6e..ca2893b 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -27,6 +27,8 @@ extern "C" {
#define MICRO_SECONDS_PER_SEC 1000000
#define NANO_SECONDS_PER_SEC 1000000000
+#define SHAPING_LATENCY_THRESHOLD 2000000 //2s
+
#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 "HMGET tsg-shaping-%d priority-0"
#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 " priority-1"
#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 " priority-2"
@@ -169,9 +171,10 @@ void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
return;
}
-static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, void *pkt_buff, struct timespec *income_time, struct metadata *meta)
+static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, void *pkt_buff, struct metadata *meta)
{
struct shaping_packet_wrapper *s_pkt = NULL;
+ struct timespec curr_time;
if (sf->queue_len == ctx->session_queue_len_max) {
return -1;
@@ -182,12 +185,14 @@ static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_
return -1;
}
+ clock_gettime(CLOCK_MONOTONIC, &curr_time);
+
s_pkt->pkt_buff = pkt_buff;
s_pkt->direction = meta->dir;
s_pkt->length = meta->raw_len;
s_pkt->tcp_pure_contorl = meta->is_tcp_pure_ctrl;
- s_pkt->income_time_ns = income_time->tv_sec * NANO_SECONDS_PER_SEC + income_time->tv_nsec;
- s_pkt->enqueue_time_us = income_time->tv_sec * MICRO_SECONDS_PER_SEC + income_time->tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
+ s_pkt->income_time_ns = curr_time.tv_sec * NANO_SECONDS_PER_SEC + curr_time.tv_nsec;
+ s_pkt->enqueue_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
TAILQ_INSERT_TAIL(&sf->packet_queue, s_pkt, node);
sf->queue_len++;
@@ -307,7 +312,7 @@ END:
return ret;
}
-static unsigned long long shaper_pkt_latency_calculate(struct shaping_profile_info *profile, struct timespec *time)
+static unsigned long long shaper_pkt_latency_us_calculate(struct shaping_profile_info *profile, struct timespec *time)
{
unsigned long long enqueue_time = profile->enqueue_time_us;
unsigned long long curr_time = time->tv_sec * MICRO_SECONDS_PER_SEC + time->tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
@@ -359,7 +364,7 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
}
END:
- latency = shaper_pkt_latency_calculate(&s_rule_info->primary, &curr_time);
+ latency = shaper_pkt_latency_us_calculate(&s_rule_info->primary, &curr_time);
shaper_stat_max_latency_update(&s_rule_info->primary.stat, pkt_wrapper->direction, latency, ctx->thread_index);
return;
@@ -665,6 +670,13 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi
return SHAPING_FORWARD;
}
+ clock_gettime(CLOCK_MONOTONIC, &curr_time);
+ if (shaper_pkt_latency_us_calculate(pf_container[0].pf_info, &curr_time) > SHAPING_LATENCY_THRESHOLD) {
+ shaper_flow_pop(ctx, sf);
+ goto DROP;
+ }
+ /*todo: AQM*/
+
for (int i = 0; i < profile_num; i++) {
profile = pf_container[i].pf_info;
profile_type = pf_container[i].pf_type;
@@ -691,55 +703,60 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi
if (0 == shaper_flow_push(ctx, sf, enqueue_time)) {
return SHAPING_QUEUED;
} else {
- rule = &sf->matched_rule_infos[sf->anchor];
- shaper_stat_drop_inc(&rule->primary.stat, pkt_wrapper->direction, ctx->thread_index);
- sf->anchor = 0;
- return SHAPING_DROP;
+ goto DROP;
}
+
+DROP:
+ rule = &sf->matched_rule_infos[sf->anchor];
+ shaper_stat_drop_inc(&rule->primary.stat, pkt_wrapper->direction, ctx->thread_index);
+ sf->anchor = 0;
+ return SHAPING_DROP;
}
-static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile)
+static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct metadata *meta, struct shaping_profile_info *profile, marsio_buff_t *rx_buff)
{
int profile_type = PROFILE_IN_RULE_TYPE_PRIMARY;
- struct shaping_rule_info *rule = NULL;
- struct shaping_packet_wrapper *pkt_wrapper = NULL;
struct timespec curr_time;
unsigned long long enqueue_time;
- rule = &sf->matched_rule_infos[sf->anchor];
- pkt_wrapper = shaper_first_pkt_get(sf);
- assert(pkt_wrapper != NULL);
-
- if (pkt_wrapper->tcp_pure_contorl) {
- shaper_stat_forward_all_rule_inc(ctx->stat, sf, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index);
+ if (meta->is_tcp_pure_ctrl) {
+ shaper_stat_forward_all_rule_inc(ctx->stat, sf, meta->dir, meta->raw_len, ctx->thread_index);
return SHAPING_FORWARD;
}
- if (0 == shaper_token_consume(ctx, sf, pkt_wrapper->length, profile, profile_type, pkt_wrapper->direction)) {
- shaper_stat_forward_inc(&profile->stat, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index);
+ if (0 == shaper_token_consume(ctx, sf, meta->raw_len, profile, profile_type, meta->dir)) {
+ shaper_stat_forward_inc(&profile->stat, meta->dir, meta->raw_len, ctx->thread_index);
- sf->anchor = shaper_next_anchor_get(sf, pkt_wrapper->direction);
+ sf->anchor = shaper_next_anchor_get(sf, meta->dir);
if (sf->anchor == 0) {//no next rule
return SHAPING_FORWARD;
} else {
- clock_gettime(CLOCK_MONOTONIC, &curr_time);
- enqueue_time = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
goto FLOW_PUSH;
}
- } else {
- enqueue_time = pkt_wrapper->enqueue_time_us;
- goto FLOW_PUSH;
}
FLOW_PUSH:
+ if (shaper_packet_enqueue(ctx, sf, rx_buff, meta) != 0) {
+ char *addr_str = addr_tuple4_to_str(&sf->tuple4);
+ LOG_ERROR("%s: shaping enqueue packet failed while queue empty for session: %s", LOG_TAG_SHAPING, addr_str);
+ if (addr_str) {
+ free(addr_str);
+ }
+ goto DROP;
+ }
+
+ clock_gettime(CLOCK_MONOTONIC, &curr_time);
+ enqueue_time = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
if (0 == shaper_flow_push(ctx, sf, enqueue_time)) {
return SHAPING_QUEUED;
} else {
- rule = &sf->matched_rule_infos[sf->anchor];
- shaper_stat_drop_inc(&rule->primary.stat, pkt_wrapper->direction, ctx->thread_index);
- sf->anchor = 0;
- return SHAPING_DROP;
+ goto DROP;
}
+
+DROP:
+ shaper_stat_drop_inc(&sf->matched_rule_infos[sf->anchor].primary.stat, meta->dir, ctx->thread_index);
+ sf->anchor = 0;
+ return SHAPING_DROP;
}
static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_flow *sf, int priority,
@@ -753,17 +770,6 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
pkt_wrapper = shaper_first_pkt_get(sf);
assert(pkt_wrapper != NULL);
-#if 0
- //AQM not implement yet
- if (stub_AQM_drop_packet(sf->queue_len, pkt_wrapper->enqueue_time_us)) {
- rule = &sf->matched_rule_infos[sf->anchor];
- shaper_stat_drop_inc(stat_hashtbl, rule->id, rule->primary.id, rule->primary.priority, pkt_wrapper->direction, pkt_wrapper->length);
-
- shaping_ret = SHAPING_DROP;
- } else {
- shaping_ret = shaper_pkt_action_decide(g_swarmkv_db, sf, sp, priority, stat_hashtbl, 1);
- }
-#endif
shaping_ret = shaper_pkt_action_decide_queueing(ctx, sf, priority);
switch (shaping_ret) {
@@ -831,7 +837,6 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
struct shaping_rule_info *s_rule;
struct shaping_stat *stat = ctx->stat;
struct shaping_marsio_info *marsio_info = ctx->marsio_info;
- struct timespec curr_time;
sf->processed_pkts++;
@@ -843,10 +848,9 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
goto END;//for tcp pure control pkt, transmit it directly
}
- clock_gettime(CLOCK_MONOTONIC, &curr_time);
if (!shaper_queue_empty(sf)) {//already have queueing pkt, enqueue directly
s_rule = &sf->matched_rule_infos[0];
- if (0 == shaper_packet_enqueue(ctx, sf, rx_buff, &curr_time, meta)) {
+ if (0 == shaper_packet_enqueue(ctx, sf, rx_buff, meta)) {
shaper_stat_queueing_pkt_inc(&s_rule->primary.stat, meta->dir, ctx->thread_index);
shaper_global_stat_queueing_inc(ctx->global_stat, meta->raw_len);
} else {
@@ -856,37 +860,21 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index);
}
- } else {
- if (0 != shaper_packet_enqueue(ctx, sf, rx_buff, &curr_time, meta)) {
- marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index);
- shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len);
- shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, meta->raw_len);
-
- char *addr_str = addr_tuple4_to_str(&sf->tuple4);
- LOG_ERROR("%s: shaping enqueue packet failed while queue empty for session: %s", LOG_TAG_SHAPING, addr_str);
- if (addr_str) {
- free(addr_str);
- }
-
- goto END;
- }
-
+ } else {//no queueing pkt, decide action
sf->anchor = 0;
- shaping_ret = shaper_pkt_action_decide_no_queue(ctx, sf, &sf->matched_rule_infos[sf->anchor].primary);
+ shaping_ret = shaper_pkt_action_decide_no_queue(ctx, sf, meta, &sf->matched_rule_infos[sf->anchor].primary, rx_buff);
switch (shaping_ret) {
case SHAPING_QUEUED:
shaper_global_stat_queueing_inc(ctx->global_stat, meta->raw_len);
break;
case SHAPING_DROP:
marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index);
- shaper_packet_dequeue(sf);
shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len);
shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, meta->raw_len);
break;
case SHAPING_FORWARD:
marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1);
- shaper_packet_dequeue(sf);
shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len);
shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len);
break;