summaryrefslogtreecommitdiff
path: root/shaping/src/shaper.cpp
diff options
context:
space:
mode:
author刘畅 <[email protected]>2023-03-28 10:35:04 +0000
committer刘畅 <[email protected]>2023-03-28 10:35:04 +0000
commit54d7bdfd760309474ed2bd13f7940d96fc007451 (patch)
tree955c23b1cd46b6837a2a6cb69b1a411e396b22ee /shaping/src/shaper.cpp
parentdaac158a2890ab58a826ae973a2e775625d6af07 (diff)
parentd6e6708cbeefcb90e09bb5fa76133d8f56a7f6ad (diff)
Merge branch 'shaping_fieldstat3' into 'rel'v1.1.0
Shaping fieldstat3 See merge request liuchang/shaping-engine!1
Diffstat (limited to 'shaping/src/shaper.cpp')
-rw-r--r--shaping/src/shaper.cpp297
1 files changed, 178 insertions, 119 deletions
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index cdc3657..6090a15 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -1,7 +1,4 @@
-#include "log.h"
-#include "session_table.h"
#include <MESA/swarmkv.h>
-#include <cstring>
#include <marsio.h>
#include <cjson/cJSON.h>
#include <MESA/MESA_prof_load.h>
@@ -13,6 +10,8 @@
extern "C" {
#include "libavl.h"
}
+#include "log.h"
+#include "session_table.h"
#include "addr_tuple4.h"
#include "raw_packet.h"
#include "shaper.h"
@@ -42,6 +41,11 @@ struct shaping_async_cb_arg {
unsigned char direction;
};
+struct shaping_profile_container {
+ struct shaping_profile_info *pf_info;
+ int pf_type;
+};
+
struct shaper* shaper_new(unsigned int priority_queue_len_max)
{
struct shaper *sp = NULL;
@@ -186,18 +190,19 @@ void shaper_packet_dequeue(struct shaping_flow *sf)
return;
}
-void shaper_queue_clear(struct shaping_flow *sf, struct shaping_stat_data **stat_hashtbl, struct shaping_thread_ctx *ctx)
+void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx)
{
struct shaping_packet_wrapper *pkt_wrapper;
+ struct shaping_stat *stat = ctx->stat;
struct shaping_rule_info *rule = &sf->matched_rule_infos[0];
while (!shaper_queue_empty(sf)) {
pkt_wrapper = shaper_first_pkt_get(sf);
- shaper_stat_queueing_pkt_dec(stat_hashtbl, rule->id, rule->primary.id, rule->primary.priority,
- pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY);
- shaper_stat_drop_inc(stat_hashtbl, rule->id, rule->primary.id, rule->primary.priority,
- pkt_wrapper->direction, pkt_wrapper->length);
+ shaper_stat_queueing_pkt_dec(stat, rule->id, rule->primary.id, rule->primary.priority,
+ pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index);
+ shaper_stat_drop_inc(stat, rule->id, rule->primary.id, rule->primary.priority,
+ pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index);
marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index);
shaper_packet_dequeue(sf);
@@ -207,11 +212,11 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_stat_data **stat
}
//return success(0) while any avl tree insert success
-int shaper_flow_push(struct shaping_flow *sf, struct shaper *sp,
- struct shaping_stat_data **stat_hashtbl, unsigned long long enqueue_time)
+int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time)
{
struct shaping_node *s_node = (struct shaping_node*)sf;
struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor];
+ struct shaper *sp = ctx->sp;
struct shaping_packet_wrapper *pkt_wrapper = NULL;
int priority;
int ret = -1;
@@ -224,9 +229,9 @@ int shaper_flow_push(struct shaping_flow *sf, struct shaper *sp,
avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns);
if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {
ret = 0;
- shaper_stat_queueing_pkt_inc(stat_hashtbl, s_rule_info->id, s_rule_info->primary.id,
- priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY);
- shaper_stat_queueing_session_inc(stat_hashtbl, s_rule_info->id, s_rule_info->primary.id, priority, SHAPING_PROFILE_TYPE_PRIMARY);
+ shaper_stat_queueing_pkt_inc(ctx->stat, s_rule_info->id, s_rule_info->primary.id,
+ priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index);
+ shaper_stat_queueing_session_inc(ctx->stat, s_rule_info->id, s_rule_info->primary.id, priority, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index);
s_rule_info->primary.enqueue_time_us = enqueue_time;
}
@@ -238,9 +243,9 @@ int shaper_flow_push(struct shaping_flow *sf, struct shaper *sp,
avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns);
if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {
ret = 0;
- shaper_stat_queueing_pkt_inc(stat_hashtbl, s_rule_info->id, s_rule_info->borrowing[i].id,
- priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_BORROW);
- shaper_stat_queueing_session_inc(stat_hashtbl, s_rule_info->id, s_rule_info->borrowing[i].id, priority, SHAPING_PROFILE_TYPE_BORROW);
+ shaper_stat_queueing_pkt_inc(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id,
+ priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index);
+ shaper_stat_queueing_session_inc(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id, priority, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index);
s_rule_info->borrowing[i].enqueue_time_us = enqueue_time;
}
}
@@ -258,10 +263,11 @@ static unsigned long long shaper_pkt_latency_calculate(struct shaping_profile_in
return (curr_time - enqueue_time);
}
-static void shaping_flow_remove_from_pool(struct shaping_flow *sf, struct shaper *sp, struct shaping_stat_data **stat_hashtbl)
+void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
{
struct shaping_node *s_node = (struct shaping_node*)sf;
struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor];
+ struct shaper *sp = ctx->sp;
struct shaping_packet_wrapper *pkt_wrapper = NULL;
struct timespec curr_time;
unsigned long long latency;
@@ -276,13 +282,13 @@ static void shaping_flow_remove_from_pool(struct shaping_flow *sf, struct shaper
priority = s_rule_info->primary.priority;
if (avl_node_in_tree(s_node->avl_node[priority])) {
avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]);
- shaper_stat_queueing_pkt_dec(stat_hashtbl, s_rule_info->id, s_rule_info->primary.id,
- priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY);
- shaper_stat_queueing_session_dec(stat_hashtbl, s_rule_info->id, s_rule_info->primary.id, priority, SHAPING_PROFILE_TYPE_PRIMARY);
+ shaper_stat_queueing_pkt_dec(ctx->stat, s_rule_info->id, s_rule_info->primary.id,
+ priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index);
+ shaper_stat_queueing_session_dec(ctx->stat, s_rule_info->id, s_rule_info->primary.id, priority, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index);
latency = shaper_pkt_latency_calculate(&s_rule_info->primary, &curr_time);
- shaper_stat_max_latency_update(stat_hashtbl, s_rule_info->id, s_rule_info->primary.id,
- priority, pkt_wrapper->direction, latency, SHAPING_PROFILE_TYPE_PRIMARY);
+ shaper_stat_max_latency_update(ctx->stat, s_rule_info->id, s_rule_info->primary.id,
+ priority, pkt_wrapper->direction, latency, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index);
}
if (s_rule_info->borrowing_num == 0) {
@@ -293,13 +299,13 @@ static void shaping_flow_remove_from_pool(struct shaping_flow *sf, struct shaper
priority = s_rule_info->borrowing[i].priority;
if (avl_node_in_tree(s_node->avl_node[priority])) {
avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]);
- shaper_stat_queueing_pkt_dec(stat_hashtbl, s_rule_info->id, s_rule_info->borrowing[i].id,
- priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_BORROW);
- shaper_stat_queueing_session_dec(stat_hashtbl, s_rule_info->id, s_rule_info->borrowing[i].id, priority, SHAPING_PROFILE_TYPE_BORROW);
+ shaper_stat_queueing_pkt_dec(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id,
+ priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index);
+ shaper_stat_queueing_session_dec(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id, priority, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index);
- latency = shaper_pkt_latency_calculate(&s_rule_info->primary, &curr_time);
- shaper_stat_max_latency_update(stat_hashtbl, s_rule_info->id, s_rule_info->borrowing[i].id,
- priority, pkt_wrapper->direction, latency, SHAPING_PROFILE_TYPE_BORROW);
+ latency = shaper_pkt_latency_calculate(&s_rule_info->borrowing[i], &curr_time);
+ shaper_stat_max_latency_update(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id,
+ priority, pkt_wrapper->direction, latency, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index);
}
}
@@ -331,13 +337,6 @@ int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_i
return count;
}
-void shaper_flow_pop(struct shaper *sp, struct shaping_flow *sf, struct shaping_stat_data **stat_hashtbl)
-{
- shaping_flow_remove_from_pool(sf, sp, stat_hashtbl);
-
- return;
-}
-
static void shaper_deposit_token_add(struct shaping_profile_info *pf_info, int req_token, unsigned char direction)
{
if (direction == SHAPING_DIR_IN) {
@@ -486,23 +485,43 @@ enum shaping_packet_action shaper_pkt_action_decide(struct shaping_flow *sf, str
}
#endif
-static struct shaping_profile_info * shaper_profile_get(struct shaping_rule_info *s_rule_info, int priority, int *profile_type)
+int shaper_profile_get(struct shaping_rule_info *s_rule_info, int priority, struct shaping_profile_container pf_container[])
{
- int i;
+ int num = 0;
- if (s_rule_info->primary.priority == priority) {
- *profile_type = SHAPING_PROFILE_TYPE_PRIMARY;
- return &s_rule_info->primary;
- }
+ if (priority == SHAPING_PRIORITY_NUM_MAX - 1) {//priority 9 allow multi profiles for one priority
+ if (s_rule_info->primary.priority == priority) {
+ pf_container[num].pf_type = SHAPING_PROFILE_TYPE_PRIMARY;
+ pf_container[num].pf_info = &s_rule_info->primary;
+ num++;
+ }
- for (i = 0; i < s_rule_info->borrowing_num; i++) {
- if (s_rule_info->borrowing[i].priority == priority) {
- *profile_type = SHAPING_PROFILE_TYPE_BORROW;
- return &s_rule_info->borrowing[i];
+ for (int i = 0; i < s_rule_info->borrowing_num; i++) {
+ if (s_rule_info->borrowing[i].priority == priority) {
+ pf_container[num].pf_type = SHAPING_PROFILE_TYPE_BORROW;
+ pf_container[num].pf_info = &s_rule_info->borrowing[i];
+ num++;
+ }
+ }
+
+ return num;
+ } else {
+ if (s_rule_info->primary.priority == priority) {
+ pf_container[0].pf_type = SHAPING_PROFILE_TYPE_PRIMARY;
+ pf_container[0].pf_info = &s_rule_info->primary;
+ return 1;
+ }
+
+ for (int i = 0; i < s_rule_info->borrowing_num; i++) {
+ if (s_rule_info->borrowing[i].priority == priority) {
+ pf_container[0].pf_type = SHAPING_PROFILE_TYPE_BORROW;
+ pf_container[0].pf_info = &s_rule_info->borrowing[i];
+ return 1;
+ }
}
}
- return NULL;
+ return num;
}
static int shaper_next_anchor_get(struct shaping_flow *sf, unsigned char direction)
@@ -516,37 +535,85 @@ static int shaper_next_anchor_get(struct shaping_flow *sf, unsigned char directi
return anchor;
}
-enum shaping_packet_action shaper_pkt_action_decide(struct swarmkv *db, struct shaping_flow *sf, struct shaper *sp,
- int priority, struct shaping_stat_data **stat_hashtbl, int sf_in_queue)
+static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int priority)
{
- int profile_type = 0;
- struct shaping_profile_info *profile = NULL;
struct shaping_rule_info *rule = NULL;
+ struct shaping_profile_info *profile = NULL;
+ int profile_type;
struct shaping_packet_wrapper *pkt_wrapper = NULL;
+ struct shaping_profile_container pf_container[SHAPING_PRIORITY_NUM_MAX];
struct timespec curr_time;
unsigned long long enqueue_time;
+ int get_token_success = 0;
+ int profile_num;
rule = &sf->matched_rule_infos[sf->anchor];
- profile = shaper_profile_get(rule, priority, &profile_type);
- assert(profile != NULL);
+ profile_num = shaper_profile_get(rule, priority, pf_container);
+ assert(profile_num > 0);
pkt_wrapper = shaper_first_pkt_get(sf);
assert(pkt_wrapper != NULL);
if (pkt_wrapper->tcp_pure_contorl) {
- if (sf_in_queue) {
- shaper_flow_pop(sp, sf, stat_hashtbl);
- }
- shaper_stat_forward_all_rule_inc(stat_hashtbl, sf, pkt_wrapper->direction, pkt_wrapper->length);
+ shaper_flow_pop(ctx, sf);
+ shaper_stat_forward_all_rule_inc(ctx->stat, sf, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index);
return SHAPING_FORWARD;
}
- if (0 == shaper_token_consume(db, sf, pkt_wrapper->length, profile, profile_type, pkt_wrapper->direction)) {
- shaper_stat_forward_inc(stat_hashtbl, rule->id, profile->id, profile->priority,
- pkt_wrapper->direction, pkt_wrapper->length, profile_type);
-
- if (sf_in_queue) {
- shaper_flow_pop(sp, sf, stat_hashtbl);
+ for (int i = 0; i < profile_num; i++) {
+ profile = pf_container[i].pf_info;
+ profile_type = pf_container[i].pf_type;
+ if (0 == shaper_token_consume(ctx->swarmkv_db, sf, pkt_wrapper->length, profile, profile_type, pkt_wrapper->direction)) {
+ shaper_stat_forward_inc(ctx->stat, rule->id, profile->id, profile->priority,
+ pkt_wrapper->direction, pkt_wrapper->length, profile_type, ctx->thread_index);
+ get_token_success = 1;
+ break;
}
+ }
+
+ if (!get_token_success) {
+ return SHAPING_QUEUED;
+ }
+
+ shaper_flow_pop(ctx, sf);
+ sf->anchor = shaper_next_anchor_get(sf, pkt_wrapper->direction);
+ if (sf->anchor == 0) {//no next rule
+ return SHAPING_FORWARD;
+ }
+
+ //push sf for next rule
+ clock_gettime(CLOCK_MONOTONIC, &curr_time);
+ enqueue_time = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
+ if (0 == shaper_flow_push(ctx, sf, enqueue_time)) {
+ return SHAPING_QUEUED;
+ } else {
+ rule = &sf->matched_rule_infos[sf->anchor];
+ shaper_stat_drop_inc(ctx->stat, rule->id, rule->primary.id,
+ rule->primary.priority, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index);
+ sf->anchor = 0;
+ return SHAPING_DROP;
+ }
+}
+
+static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile)
+{
+ int profile_type = SHAPING_PROFILE_TYPE_PRIMARY;
+ struct shaping_rule_info *rule = NULL;
+ struct shaping_packet_wrapper *pkt_wrapper = NULL;
+ struct timespec curr_time;
+ unsigned long long enqueue_time;
+
+ rule = &sf->matched_rule_infos[sf->anchor];
+ pkt_wrapper = shaper_first_pkt_get(sf);
+ assert(pkt_wrapper != NULL);
+
+ if (pkt_wrapper->tcp_pure_contorl) {
+ shaper_stat_forward_all_rule_inc(ctx->stat, sf, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index);
+ return SHAPING_FORWARD;
+ }
+
+ if (0 == shaper_token_consume(ctx->swarmkv_db, sf, pkt_wrapper->length, profile, profile_type, pkt_wrapper->direction)) {
+ shaper_stat_forward_inc(ctx->stat, rule->id, profile->id, profile->priority,
+ pkt_wrapper->direction, pkt_wrapper->length, profile_type, ctx->thread_index);
sf->anchor = shaper_next_anchor_get(sf, pkt_wrapper->direction);
if (sf->anchor == 0) {//no next rule
@@ -557,29 +624,24 @@ enum shaping_packet_action shaper_pkt_action_decide(struct swarmkv *db, struct s
goto FLOW_PUSH;
}
} else {
- if (sf_in_queue) {
- return SHAPING_QUEUED;
- } else {
- enqueue_time = pkt_wrapper->enqueue_time_us;
- goto FLOW_PUSH;
- }
+ enqueue_time = pkt_wrapper->enqueue_time_us;
+ goto FLOW_PUSH;
}
FLOW_PUSH:
- if (0 == shaper_flow_push(sf, sp, stat_hashtbl, enqueue_time)) {
+ if (0 == shaper_flow_push(ctx, sf, enqueue_time)) {
return SHAPING_QUEUED;
} else {
rule = &sf->matched_rule_infos[sf->anchor];
- shaper_stat_drop_inc(stat_hashtbl, rule->id, rule->primary.id,
- rule->primary.priority, pkt_wrapper->direction, pkt_wrapper->length);
-
+ shaper_stat_drop_inc(ctx->stat, rule->id, rule->primary.id,
+ rule->primary.priority, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index);
sf->anchor = 0;
return SHAPING_DROP;
}
}
static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_flow *sf, int priority,
- struct shaping_stat_data **stat_hashtbl, struct shaping_thread_ctx *ctx)
+ struct shaping_stat *stat, struct shaping_thread_ctx *ctx)
{
struct shaping_packet_wrapper *pkt_wrapper;
struct shaping_rule_info *rule = NULL;
@@ -600,7 +662,7 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
shaping_ret = shaper_pkt_action_decide(g_swarmkv_db, sf, sp, priority, stat_hashtbl, 1);
}
#endif
- shaping_ret = shaper_pkt_action_decide(ctx->swarmkv_db, sf, sp, priority, stat_hashtbl, 1);
+ shaping_ret = shaper_pkt_action_decide_queueing(ctx, sf, priority);
switch (shaping_ret) {
case SHAPING_QUEUED:
@@ -632,16 +694,16 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
pkt_wrapper = shaper_first_pkt_get(sf);
sf->anchor = 0;
- if (0 == shaper_flow_push(sf, sp, stat_hashtbl, pkt_wrapper->enqueue_time_us)) {
+ if (0 == shaper_flow_push(ctx, sf, pkt_wrapper->enqueue_time_us)) {
/*in pkt process, when queue not empty,
new pkt's queueing stat was added to primary profile of first rule.
while shaper_flow_push() here will add queueing stat to every profile of first rule,
so need adjust queueing stat here*/
rule = &sf->matched_rule_infos[sf->anchor];
- shaper_stat_queueing_pkt_dec(stat_hashtbl, rule->id, rule->primary.id, rule->primary.priority,
- pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY);
+ shaper_stat_queueing_pkt_dec(stat, rule->id, rule->primary.id, rule->primary.priority,
+ pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index);
} else {
- shaper_queue_clear(sf, stat_hashtbl, ctx);//first packet fail, then every packet will fail
+ shaper_queue_clear(sf, ctx);//first packet fail, then every packet will fail
if (sf->flag & STREAM_CLOSE) {
shaping_flow_free(sf);
}
@@ -650,12 +712,10 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
}
}
-void shaping_stream_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_buff, struct metadata *meta, struct shaping_flow *sf)
+void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_buff, struct metadata *meta, struct shaping_flow *sf)
{
- int priority;
int shaping_ret;
struct shaping_rule_info *s_rule;
- struct shaper *sp = ctx->sp;
struct shaping_stat *stat = ctx->stat;
struct shaping_marsio_info *marsio_info = ctx->marsio_info;
struct timespec curr_time;
@@ -664,19 +724,18 @@ void shaping_stream_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
if (!shaper_queue_empty(sf)) {//already have queueing pkt, enqueue directly
s_rule = &sf->matched_rule_infos[0];
if (0 == shaper_packet_enqueue(ctx, sf, rx_buff, &curr_time, meta)) {
- shaper_stat_queueing_pkt_inc(&stat->stat_hashtbl, s_rule->id,
+ shaper_stat_queueing_pkt_inc(stat, s_rule->id,
s_rule->primary.id, s_rule->primary.priority, meta->dir, meta->raw_len,
- SHAPING_PROFILE_TYPE_PRIMARY);
+ SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index);
} else {
- shaper_stat_drop_inc(&stat->stat_hashtbl, s_rule->id,
- s_rule->primary.id, s_rule->primary.priority, meta->dir, meta->raw_len);
+ shaper_stat_drop_inc(stat, s_rule->id, s_rule->primary.id, s_rule->primary.priority, meta->dir, meta->raw_len, ctx->thread_index);
marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index);
}
} else {
if (meta->is_tcp_pure_ctrl) {
marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1);
- shaper_stat_forward_all_rule_inc(&stat->stat_hashtbl, sf, meta->dir, meta->raw_len);
+ shaper_stat_forward_all_rule_inc(stat, sf, meta->dir, meta->raw_len, ctx->thread_index);
goto JUDGE_CLOSE;//for tcp pure control pkt, transmit it directly
}
@@ -688,9 +747,7 @@ void shaping_stream_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
sf->anchor = 0;
- priority = sf->matched_rule_infos[sf->anchor].primary.priority;
- shaping_ret = shaper_pkt_action_decide(ctx->swarmkv_db, sf, sp, priority,
- &stat->stat_hashtbl, 0);
+ shaping_ret = shaper_pkt_action_decide_no_queue(ctx, sf, &sf->matched_rule_infos[sf->anchor].primary);
switch (shaping_ret) {
case SHAPING_QUEUED:
break;
@@ -716,8 +773,6 @@ JUDGE_CLOSE:
}
}
- shaper_stat_send(stat, &stat->stat_hashtbl);
-
return;
}
@@ -734,16 +789,13 @@ void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_
}
for (int j = 0; j < sf_num; j++) {
- ret = shaper_polling_first_pkt_token_get(sp, sf_ins[j].sf, sf_ins[j].priority, &stat->stat_hashtbl, ctx);
+ ret = shaper_polling_first_pkt_token_get(sp, sf_ins[j].sf, sf_ins[j].priority, stat, ctx);
if (ret == 0) {
- goto STAT_DATA_SEND;
+ return;
}
}
}
-STAT_DATA_SEND:
- shaper_stat_send(stat, &stat->stat_hashtbl);
-
return;
}
@@ -811,40 +863,44 @@ static struct shaping_flow *shaper_raw_pkt_session_handle(struct shaping_thread_
void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx)
{
- marsio_buff_t *rx_buff;
+ marsio_buff_t *rx_buff[SHAPER_MARSIO_RX_BRUST_MAX];
struct shaping_flow *sf = NULL;
struct metadata meta;
int rx_num;
+ int i;
- rx_num = marsio_recv_burst(ctx->marsio_info->mr_dev, ctx->thread_index, &rx_buff, 1);
+ rx_num = marsio_recv_burst(ctx->marsio_info->mr_dev, ctx->thread_index, rx_buff, ctx->marsio_info->rx_brust_max);
if (rx_num <= 0) {
polling_entry(ctx->sp, ctx->stat, ctx);
return;
}
- if (marsio_buff_is_ctrlbuf(rx_buff)) {
- sf = shaper_ctrl_pkt_session_handle(ctx, rx_buff, &meta);
- } else {
- sf = shaper_raw_pkt_session_handle(ctx, rx_buff, &meta);
- }
+ for (i = 0; i < rx_num; i++) {
+ if (marsio_buff_is_ctrlbuf(rx_buff[i])) {
+ sf = shaper_ctrl_pkt_session_handle(ctx, rx_buff[i], &meta);
+ } else {
+ sf = shaper_raw_pkt_session_handle(ctx, rx_buff[i], &meta);
+ }
- if (meta.is_ctrl_pkt || !sf) {//ctrl pkt need send directly
- marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &rx_buff, 1);
- } else {
- shaping_stream_process(ctx, rx_buff, &meta, sf);
+ if (meta.is_ctrl_pkt || !sf) {//ctrl pkt need send directly
+ marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &rx_buff[i], 1);
+ } else {
+ shaping_packet_process(ctx, rx_buff[i], &meta, sf);
+ }
+ polling_entry(ctx->sp, ctx->stat, ctx);
}
- polling_entry(ctx->sp, ctx->stat, ctx);
return;
}
-int shaper_global_conf_init(struct shaping_global_conf *conf)
+int shaper_global_conf_init(struct shaping_system_conf *conf)
{
int ret;
int array_num;
cJSON *json = NULL;
cJSON *tmp_obj = NULL, *tmp_array_obj = NULL;
char polling_node_num_max[128] = {0};
+ unsigned int cpu_mask[SHAPING_WROK_THREAD_NUM_MAX] = {0};
ret = MESA_load_profile_int_nodef(SHAPING_GLOBAL_CONF_FILE, "SYSTEM", "WORK_THREAD_NUM", &conf->work_thread_num);
if (ret < 0) {
@@ -859,11 +915,14 @@ int shaper_global_conf_init(struct shaping_global_conf *conf)
return ret;
}
- ret = MESA_load_profile_uint_range(SHAPING_GLOBAL_CONF_FILE, "SYSTEM", "CPU_AFFINITY_MASK", SHAPING_WROK_THREAD_NUM_MAX, (unsigned int *)conf->cpu_affinity_mask);
+ ret = MESA_load_profile_uint_range(SHAPING_GLOBAL_CONF_FILE, "SYSTEM", "CPU_AFFINITY_MASK", SHAPING_WROK_THREAD_NUM_MAX, cpu_mask);
if (ret < 0 || ret != conf->work_thread_num) {
LOG_ERROR("%s: shaping init global conf get CPU_AFFINITY_MASK failed or incomplete config", LOG_TAG_SHAPING);
return -1;
}
+ for (int i = 0; i < conf->work_thread_num; i++) {
+ conf->cpu_affinity_mask |= 1 << cpu_mask[i];
+ }
#if 0 //temporarily not support array config
array_num = SHAPING_PRIORITY_NUM_MAX;
@@ -921,10 +980,6 @@ int shaper_global_conf_init(struct shaping_global_conf *conf)
}
/*************************************************************************/
-
- MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "TELEGRAF_IP", conf->telegraf_ip, sizeof(conf->telegraf_ip), "127.0.0.1");
- MESA_load_profile_short_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "TELEGRAF_PORT", &conf->telegraf_port, 6379);
-
MESA_load_profile_uint_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "SESSION_QUEUE_LEN_MAX", &conf->session_queue_len_max, 128);
MESA_load_profile_uint_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "PRIORITY_QUEUE_LEN_MAX", &conf->priority_queue_len_max, 1024);
@@ -945,11 +1000,11 @@ void shaping_engine_destroy(struct shaping_ctx *ctx)
shaper_swarmkv_destroy(ctx->swarmkv_db);
shaper_maat_destroy(ctx->maat_info);
shaper_marsio_destroy(ctx->marsio_info);
+ shaper_stat_destroy(ctx->stat);
if (ctx->thread_ctx) {
for (int i = 0; i < ctx->thread_num; i++) {
shaper_free(ctx->thread_ctx[i].sp);
- shaper_stat_send_free(ctx->thread_ctx[i].stat);
session_table_destory(ctx->thread_ctx[i].session_table);
}
free(ctx->thread_ctx);
@@ -963,7 +1018,7 @@ void shaping_engine_destroy(struct shaping_ctx *ctx)
struct shaping_ctx *shaping_engine_init()
{
- struct shaping_global_conf conf;
+ struct shaping_system_conf conf;
struct shaping_ctx *ctx = NULL;
int ret;
@@ -994,22 +1049,26 @@ struct shaping_ctx *shaping_engine_init()
}
/*init marsio*/
- ctx->marsio_info = shaper_marsio_init(conf.work_thread_num);
+ ctx->marsio_info = shaper_marsio_init(&conf);
if (ctx->marsio_info == NULL) {
goto ERROR;
}
+
+ ctx->stat = shaper_stat_init(conf.work_thread_num);
+ if (ctx->stat == NULL) {
+ goto ERROR;
+ }
ctx->thread_ctx = (struct shaping_thread_ctx *)calloc(conf.work_thread_num, sizeof(struct shaping_thread_ctx));
ctx->thread_num = conf.work_thread_num;
for (int i = 0; i < conf.work_thread_num; i++) {
ctx->thread_ctx[i].thread_index = i;
ctx->thread_ctx[i].sp = shaper_new(conf.priority_queue_len_max);
- ctx->thread_ctx[i].stat = shaper_stat_new(conf.telegraf_ip, conf.telegraf_port);
+ ctx->thread_ctx[i].stat = ctx->stat;
ctx->thread_ctx[i].session_table = session_table_create();
ctx->thread_ctx[i].maat_info = ctx->maat_info;
ctx->thread_ctx[i].marsio_info = ctx->marsio_info;
ctx->thread_ctx[i].swarmkv_db = ctx->swarmkv_db;
- ctx->thread_ctx[i].cpu_mask = conf.cpu_affinity_enable ? conf.cpu_affinity_mask[i] : -1;
ctx->thread_ctx[i].ref_ctx = ctx;
ctx->thread_ctx[i].session_queue_len_max = conf.session_queue_len_max;
memcpy(ctx->thread_ctx[i].polling_node_num_max, conf.polling_node_num_max, sizeof(conf.polling_node_num_max));