diff options
Diffstat (limited to 'shaping/src')
| -rw-r--r-- | shaping/src/main.cpp | 19 | ||||
| -rw-r--r-- | shaping/src/shaper.cpp | 297 | ||||
| -rw-r--r-- | shaping/src/shaper_maat.cpp | 6 | ||||
| -rw-r--r-- | shaping/src/shaper_marsio.cpp | 21 | ||||
| -rw-r--r-- | shaping/src/shaper_session.cpp | 2 | ||||
| -rw-r--r-- | shaping/src/shaper_stat.cpp | 314 | ||||
| -rw-r--r-- | shaping/src/shaper_swarmkv.cpp | 1 |
7 files changed, 324 insertions, 336 deletions
diff --git a/shaping/src/main.cpp b/shaping/src/main.cpp index 16f0aa8..476816f 100644 --- a/shaping/src/main.cpp +++ b/shaping/src/main.cpp @@ -8,29 +8,10 @@ #include "shaper_marsio.h" #include "shaper_session.h" -static int thread_set_affinity(int core_id) -{ - int num_cores = sysconf(_SC_NPROCESSORS_ONLN); - if (core_id < 0 || core_id >= num_cores) - { - return EINVAL; - } - - cpu_set_t cpuset; - CPU_ZERO(&cpuset); - CPU_SET(core_id, &cpuset); - - return pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); -} - static void *shaper_thread_loop(void *data) { struct shaping_thread_ctx *ctx = (struct shaping_thread_ctx *)data; - if (ctx->cpu_mask >= 0) - { - thread_set_affinity(ctx->cpu_mask); - } marsio_thread_init(ctx->marsio_info->instance); //loop to process pkts diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index cdc3657..6090a15 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -1,7 +1,4 @@ -#include "log.h" -#include "session_table.h" #include <MESA/swarmkv.h> -#include <cstring> #include <marsio.h> #include <cjson/cJSON.h> #include <MESA/MESA_prof_load.h> @@ -13,6 +10,8 @@ extern "C" { #include "libavl.h" } +#include "log.h" +#include "session_table.h" #include "addr_tuple4.h" #include "raw_packet.h" #include "shaper.h" @@ -42,6 +41,11 @@ struct shaping_async_cb_arg { unsigned char direction; }; +struct shaping_profile_container { + struct shaping_profile_info *pf_info; + int pf_type; +}; + struct shaper* shaper_new(unsigned int priority_queue_len_max) { struct shaper *sp = NULL; @@ -186,18 +190,19 @@ void shaper_packet_dequeue(struct shaping_flow *sf) return; } -void shaper_queue_clear(struct shaping_flow *sf, struct shaping_stat_data **stat_hashtbl, struct shaping_thread_ctx *ctx) +void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx) { struct shaping_packet_wrapper *pkt_wrapper; + struct shaping_stat *stat = ctx->stat; struct shaping_rule_info *rule = &sf->matched_rule_infos[0]; while (!shaper_queue_empty(sf)) { pkt_wrapper = shaper_first_pkt_get(sf); - shaper_stat_queueing_pkt_dec(stat_hashtbl, rule->id, rule->primary.id, rule->primary.priority, - pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY); - shaper_stat_drop_inc(stat_hashtbl, rule->id, rule->primary.id, rule->primary.priority, - pkt_wrapper->direction, pkt_wrapper->length); + shaper_stat_queueing_pkt_dec(stat, rule->id, rule->primary.id, rule->primary.priority, + pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index); + shaper_stat_drop_inc(stat, rule->id, rule->primary.id, rule->primary.priority, + pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index); marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index); shaper_packet_dequeue(sf); @@ -207,11 +212,11 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_stat_data **stat } //return success(0) while any avl tree insert success -int shaper_flow_push(struct shaping_flow *sf, struct shaper *sp, - struct shaping_stat_data **stat_hashtbl, unsigned long long enqueue_time) +int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time) { struct shaping_node *s_node = (struct shaping_node*)sf; struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor]; + struct shaper *sp = ctx->sp; struct shaping_packet_wrapper *pkt_wrapper = NULL; int priority; int ret = -1; @@ -224,9 +229,9 @@ int shaper_flow_push(struct shaping_flow *sf, struct shaper *sp, avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns); if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) { ret = 0; - shaper_stat_queueing_pkt_inc(stat_hashtbl, s_rule_info->id, s_rule_info->primary.id, - priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY); - shaper_stat_queueing_session_inc(stat_hashtbl, s_rule_info->id, s_rule_info->primary.id, priority, SHAPING_PROFILE_TYPE_PRIMARY); + shaper_stat_queueing_pkt_inc(ctx->stat, s_rule_info->id, s_rule_info->primary.id, + priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index); + shaper_stat_queueing_session_inc(ctx->stat, s_rule_info->id, s_rule_info->primary.id, priority, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index); s_rule_info->primary.enqueue_time_us = enqueue_time; } @@ -238,9 +243,9 @@ int shaper_flow_push(struct shaping_flow *sf, struct shaper *sp, avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns); if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) { ret = 0; - shaper_stat_queueing_pkt_inc(stat_hashtbl, s_rule_info->id, s_rule_info->borrowing[i].id, - priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_BORROW); - shaper_stat_queueing_session_inc(stat_hashtbl, s_rule_info->id, s_rule_info->borrowing[i].id, priority, SHAPING_PROFILE_TYPE_BORROW); + shaper_stat_queueing_pkt_inc(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id, + priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index); + shaper_stat_queueing_session_inc(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id, priority, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index); s_rule_info->borrowing[i].enqueue_time_us = enqueue_time; } } @@ -258,10 +263,11 @@ static unsigned long long shaper_pkt_latency_calculate(struct shaping_profile_in return (curr_time - enqueue_time); } -static void shaping_flow_remove_from_pool(struct shaping_flow *sf, struct shaper *sp, struct shaping_stat_data **stat_hashtbl) +void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) { struct shaping_node *s_node = (struct shaping_node*)sf; struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor]; + struct shaper *sp = ctx->sp; struct shaping_packet_wrapper *pkt_wrapper = NULL; struct timespec curr_time; unsigned long long latency; @@ -276,13 +282,13 @@ static void shaping_flow_remove_from_pool(struct shaping_flow *sf, struct shaper priority = s_rule_info->primary.priority; if (avl_node_in_tree(s_node->avl_node[priority])) { avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); - shaper_stat_queueing_pkt_dec(stat_hashtbl, s_rule_info->id, s_rule_info->primary.id, - priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY); - shaper_stat_queueing_session_dec(stat_hashtbl, s_rule_info->id, s_rule_info->primary.id, priority, SHAPING_PROFILE_TYPE_PRIMARY); + shaper_stat_queueing_pkt_dec(ctx->stat, s_rule_info->id, s_rule_info->primary.id, + priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index); + shaper_stat_queueing_session_dec(ctx->stat, s_rule_info->id, s_rule_info->primary.id, priority, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index); latency = shaper_pkt_latency_calculate(&s_rule_info->primary, &curr_time); - shaper_stat_max_latency_update(stat_hashtbl, s_rule_info->id, s_rule_info->primary.id, - priority, pkt_wrapper->direction, latency, SHAPING_PROFILE_TYPE_PRIMARY); + shaper_stat_max_latency_update(ctx->stat, s_rule_info->id, s_rule_info->primary.id, + priority, pkt_wrapper->direction, latency, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index); } if (s_rule_info->borrowing_num == 0) { @@ -293,13 +299,13 @@ static void shaping_flow_remove_from_pool(struct shaping_flow *sf, struct shaper priority = s_rule_info->borrowing[i].priority; if (avl_node_in_tree(s_node->avl_node[priority])) { avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); - shaper_stat_queueing_pkt_dec(stat_hashtbl, s_rule_info->id, s_rule_info->borrowing[i].id, - priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_BORROW); - shaper_stat_queueing_session_dec(stat_hashtbl, s_rule_info->id, s_rule_info->borrowing[i].id, priority, SHAPING_PROFILE_TYPE_BORROW); + shaper_stat_queueing_pkt_dec(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id, + priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index); + shaper_stat_queueing_session_dec(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id, priority, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index); - latency = shaper_pkt_latency_calculate(&s_rule_info->primary, &curr_time); - shaper_stat_max_latency_update(stat_hashtbl, s_rule_info->id, s_rule_info->borrowing[i].id, - priority, pkt_wrapper->direction, latency, SHAPING_PROFILE_TYPE_BORROW); + latency = shaper_pkt_latency_calculate(&s_rule_info->borrowing[i], &curr_time); + shaper_stat_max_latency_update(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id, + priority, pkt_wrapper->direction, latency, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index); } } @@ -331,13 +337,6 @@ int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_i return count; } -void shaper_flow_pop(struct shaper *sp, struct shaping_flow *sf, struct shaping_stat_data **stat_hashtbl) -{ - shaping_flow_remove_from_pool(sf, sp, stat_hashtbl); - - return; -} - static void shaper_deposit_token_add(struct shaping_profile_info *pf_info, int req_token, unsigned char direction) { if (direction == SHAPING_DIR_IN) { @@ -486,23 +485,43 @@ enum shaping_packet_action shaper_pkt_action_decide(struct shaping_flow *sf, str } #endif -static struct shaping_profile_info * shaper_profile_get(struct shaping_rule_info *s_rule_info, int priority, int *profile_type) +int shaper_profile_get(struct shaping_rule_info *s_rule_info, int priority, struct shaping_profile_container pf_container[]) { - int i; + int num = 0; - if (s_rule_info->primary.priority == priority) { - *profile_type = SHAPING_PROFILE_TYPE_PRIMARY; - return &s_rule_info->primary; - } + if (priority == SHAPING_PRIORITY_NUM_MAX - 1) {//priority 9 allow multi profiles for one priority + if (s_rule_info->primary.priority == priority) { + pf_container[num].pf_type = SHAPING_PROFILE_TYPE_PRIMARY; + pf_container[num].pf_info = &s_rule_info->primary; + num++; + } - for (i = 0; i < s_rule_info->borrowing_num; i++) { - if (s_rule_info->borrowing[i].priority == priority) { - *profile_type = SHAPING_PROFILE_TYPE_BORROW; - return &s_rule_info->borrowing[i]; + for (int i = 0; i < s_rule_info->borrowing_num; i++) { + if (s_rule_info->borrowing[i].priority == priority) { + pf_container[num].pf_type = SHAPING_PROFILE_TYPE_BORROW; + pf_container[num].pf_info = &s_rule_info->borrowing[i]; + num++; + } + } + + return num; + } else { + if (s_rule_info->primary.priority == priority) { + pf_container[0].pf_type = SHAPING_PROFILE_TYPE_PRIMARY; + pf_container[0].pf_info = &s_rule_info->primary; + return 1; + } + + for (int i = 0; i < s_rule_info->borrowing_num; i++) { + if (s_rule_info->borrowing[i].priority == priority) { + pf_container[0].pf_type = SHAPING_PROFILE_TYPE_BORROW; + pf_container[0].pf_info = &s_rule_info->borrowing[i]; + return 1; + } } } - return NULL; + return num; } static int shaper_next_anchor_get(struct shaping_flow *sf, unsigned char direction) @@ -516,37 +535,85 @@ static int shaper_next_anchor_get(struct shaping_flow *sf, unsigned char directi return anchor; } -enum shaping_packet_action shaper_pkt_action_decide(struct swarmkv *db, struct shaping_flow *sf, struct shaper *sp, - int priority, struct shaping_stat_data **stat_hashtbl, int sf_in_queue) +static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int priority) { - int profile_type = 0; - struct shaping_profile_info *profile = NULL; struct shaping_rule_info *rule = NULL; + struct shaping_profile_info *profile = NULL; + int profile_type; struct shaping_packet_wrapper *pkt_wrapper = NULL; + struct shaping_profile_container pf_container[SHAPING_PRIORITY_NUM_MAX]; struct timespec curr_time; unsigned long long enqueue_time; + int get_token_success = 0; + int profile_num; rule = &sf->matched_rule_infos[sf->anchor]; - profile = shaper_profile_get(rule, priority, &profile_type); - assert(profile != NULL); + profile_num = shaper_profile_get(rule, priority, pf_container); + assert(profile_num > 0); pkt_wrapper = shaper_first_pkt_get(sf); assert(pkt_wrapper != NULL); if (pkt_wrapper->tcp_pure_contorl) { - if (sf_in_queue) { - shaper_flow_pop(sp, sf, stat_hashtbl); - } - shaper_stat_forward_all_rule_inc(stat_hashtbl, sf, pkt_wrapper->direction, pkt_wrapper->length); + shaper_flow_pop(ctx, sf); + shaper_stat_forward_all_rule_inc(ctx->stat, sf, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index); return SHAPING_FORWARD; } - if (0 == shaper_token_consume(db, sf, pkt_wrapper->length, profile, profile_type, pkt_wrapper->direction)) { - shaper_stat_forward_inc(stat_hashtbl, rule->id, profile->id, profile->priority, - pkt_wrapper->direction, pkt_wrapper->length, profile_type); - - if (sf_in_queue) { - shaper_flow_pop(sp, sf, stat_hashtbl); + for (int i = 0; i < profile_num; i++) { + profile = pf_container[i].pf_info; + profile_type = pf_container[i].pf_type; + if (0 == shaper_token_consume(ctx->swarmkv_db, sf, pkt_wrapper->length, profile, profile_type, pkt_wrapper->direction)) { + shaper_stat_forward_inc(ctx->stat, rule->id, profile->id, profile->priority, + pkt_wrapper->direction, pkt_wrapper->length, profile_type, ctx->thread_index); + get_token_success = 1; + break; } + } + + if (!get_token_success) { + return SHAPING_QUEUED; + } + + shaper_flow_pop(ctx, sf); + sf->anchor = shaper_next_anchor_get(sf, pkt_wrapper->direction); + if (sf->anchor == 0) {//no next rule + return SHAPING_FORWARD; + } + + //push sf for next rule + clock_gettime(CLOCK_MONOTONIC, &curr_time); + enqueue_time = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; + if (0 == shaper_flow_push(ctx, sf, enqueue_time)) { + return SHAPING_QUEUED; + } else { + rule = &sf->matched_rule_infos[sf->anchor]; + shaper_stat_drop_inc(ctx->stat, rule->id, rule->primary.id, + rule->primary.priority, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index); + sf->anchor = 0; + return SHAPING_DROP; + } +} + +static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile) +{ + int profile_type = SHAPING_PROFILE_TYPE_PRIMARY; + struct shaping_rule_info *rule = NULL; + struct shaping_packet_wrapper *pkt_wrapper = NULL; + struct timespec curr_time; + unsigned long long enqueue_time; + + rule = &sf->matched_rule_infos[sf->anchor]; + pkt_wrapper = shaper_first_pkt_get(sf); + assert(pkt_wrapper != NULL); + + if (pkt_wrapper->tcp_pure_contorl) { + shaper_stat_forward_all_rule_inc(ctx->stat, sf, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index); + return SHAPING_FORWARD; + } + + if (0 == shaper_token_consume(ctx->swarmkv_db, sf, pkt_wrapper->length, profile, profile_type, pkt_wrapper->direction)) { + shaper_stat_forward_inc(ctx->stat, rule->id, profile->id, profile->priority, + pkt_wrapper->direction, pkt_wrapper->length, profile_type, ctx->thread_index); sf->anchor = shaper_next_anchor_get(sf, pkt_wrapper->direction); if (sf->anchor == 0) {//no next rule @@ -557,29 +624,24 @@ enum shaping_packet_action shaper_pkt_action_decide(struct swarmkv *db, struct s goto FLOW_PUSH; } } else { - if (sf_in_queue) { - return SHAPING_QUEUED; - } else { - enqueue_time = pkt_wrapper->enqueue_time_us; - goto FLOW_PUSH; - } + enqueue_time = pkt_wrapper->enqueue_time_us; + goto FLOW_PUSH; } FLOW_PUSH: - if (0 == shaper_flow_push(sf, sp, stat_hashtbl, enqueue_time)) { + if (0 == shaper_flow_push(ctx, sf, enqueue_time)) { return SHAPING_QUEUED; } else { rule = &sf->matched_rule_infos[sf->anchor]; - shaper_stat_drop_inc(stat_hashtbl, rule->id, rule->primary.id, - rule->primary.priority, pkt_wrapper->direction, pkt_wrapper->length); - + shaper_stat_drop_inc(ctx->stat, rule->id, rule->primary.id, + rule->primary.priority, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index); sf->anchor = 0; return SHAPING_DROP; } } static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_flow *sf, int priority, - struct shaping_stat_data **stat_hashtbl, struct shaping_thread_ctx *ctx) + struct shaping_stat *stat, struct shaping_thread_ctx *ctx) { struct shaping_packet_wrapper *pkt_wrapper; struct shaping_rule_info *rule = NULL; @@ -600,7 +662,7 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ shaping_ret = shaper_pkt_action_decide(g_swarmkv_db, sf, sp, priority, stat_hashtbl, 1); } #endif - shaping_ret = shaper_pkt_action_decide(ctx->swarmkv_db, sf, sp, priority, stat_hashtbl, 1); + shaping_ret = shaper_pkt_action_decide_queueing(ctx, sf, priority); switch (shaping_ret) { case SHAPING_QUEUED: @@ -632,16 +694,16 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ pkt_wrapper = shaper_first_pkt_get(sf); sf->anchor = 0; - if (0 == shaper_flow_push(sf, sp, stat_hashtbl, pkt_wrapper->enqueue_time_us)) { + if (0 == shaper_flow_push(ctx, sf, pkt_wrapper->enqueue_time_us)) { /*in pkt process, when queue not empty, new pkt's queueing stat was added to primary profile of first rule. while shaper_flow_push() here will add queueing stat to every profile of first rule, so need adjust queueing stat here*/ rule = &sf->matched_rule_infos[sf->anchor]; - shaper_stat_queueing_pkt_dec(stat_hashtbl, rule->id, rule->primary.id, rule->primary.priority, - pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY); + shaper_stat_queueing_pkt_dec(stat, rule->id, rule->primary.id, rule->primary.priority, + pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index); } else { - shaper_queue_clear(sf, stat_hashtbl, ctx);//first packet fail, then every packet will fail + shaper_queue_clear(sf, ctx);//first packet fail, then every packet will fail if (sf->flag & STREAM_CLOSE) { shaping_flow_free(sf); } @@ -650,12 +712,10 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ } } -void shaping_stream_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_buff, struct metadata *meta, struct shaping_flow *sf) +void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_buff, struct metadata *meta, struct shaping_flow *sf) { - int priority; int shaping_ret; struct shaping_rule_info *s_rule; - struct shaper *sp = ctx->sp; struct shaping_stat *stat = ctx->stat; struct shaping_marsio_info *marsio_info = ctx->marsio_info; struct timespec curr_time; @@ -664,19 +724,18 @@ void shaping_stream_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu if (!shaper_queue_empty(sf)) {//already have queueing pkt, enqueue directly s_rule = &sf->matched_rule_infos[0]; if (0 == shaper_packet_enqueue(ctx, sf, rx_buff, &curr_time, meta)) { - shaper_stat_queueing_pkt_inc(&stat->stat_hashtbl, s_rule->id, + shaper_stat_queueing_pkt_inc(stat, s_rule->id, s_rule->primary.id, s_rule->primary.priority, meta->dir, meta->raw_len, - SHAPING_PROFILE_TYPE_PRIMARY); + SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index); } else { - shaper_stat_drop_inc(&stat->stat_hashtbl, s_rule->id, - s_rule->primary.id, s_rule->primary.priority, meta->dir, meta->raw_len); + shaper_stat_drop_inc(stat, s_rule->id, s_rule->primary.id, s_rule->primary.priority, meta->dir, meta->raw_len, ctx->thread_index); marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index); } } else { if (meta->is_tcp_pure_ctrl) { marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1); - shaper_stat_forward_all_rule_inc(&stat->stat_hashtbl, sf, meta->dir, meta->raw_len); + shaper_stat_forward_all_rule_inc(stat, sf, meta->dir, meta->raw_len, ctx->thread_index); goto JUDGE_CLOSE;//for tcp pure control pkt, transmit it directly } @@ -688,9 +747,7 @@ void shaping_stream_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu sf->anchor = 0; - priority = sf->matched_rule_infos[sf->anchor].primary.priority; - shaping_ret = shaper_pkt_action_decide(ctx->swarmkv_db, sf, sp, priority, - &stat->stat_hashtbl, 0); + shaping_ret = shaper_pkt_action_decide_no_queue(ctx, sf, &sf->matched_rule_infos[sf->anchor].primary); switch (shaping_ret) { case SHAPING_QUEUED: break; @@ -716,8 +773,6 @@ JUDGE_CLOSE: } } - shaper_stat_send(stat, &stat->stat_hashtbl); - return; } @@ -734,16 +789,13 @@ void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_ } for (int j = 0; j < sf_num; j++) { - ret = shaper_polling_first_pkt_token_get(sp, sf_ins[j].sf, sf_ins[j].priority, &stat->stat_hashtbl, ctx); + ret = shaper_polling_first_pkt_token_get(sp, sf_ins[j].sf, sf_ins[j].priority, stat, ctx); if (ret == 0) { - goto STAT_DATA_SEND; + return; } } } -STAT_DATA_SEND: - shaper_stat_send(stat, &stat->stat_hashtbl); - return; } @@ -811,40 +863,44 @@ static struct shaping_flow *shaper_raw_pkt_session_handle(struct shaping_thread_ void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx) { - marsio_buff_t *rx_buff; + marsio_buff_t *rx_buff[SHAPER_MARSIO_RX_BRUST_MAX]; struct shaping_flow *sf = NULL; struct metadata meta; int rx_num; + int i; - rx_num = marsio_recv_burst(ctx->marsio_info->mr_dev, ctx->thread_index, &rx_buff, 1); + rx_num = marsio_recv_burst(ctx->marsio_info->mr_dev, ctx->thread_index, rx_buff, ctx->marsio_info->rx_brust_max); if (rx_num <= 0) { polling_entry(ctx->sp, ctx->stat, ctx); return; } - if (marsio_buff_is_ctrlbuf(rx_buff)) { - sf = shaper_ctrl_pkt_session_handle(ctx, rx_buff, &meta); - } else { - sf = shaper_raw_pkt_session_handle(ctx, rx_buff, &meta); - } + for (i = 0; i < rx_num; i++) { + if (marsio_buff_is_ctrlbuf(rx_buff[i])) { + sf = shaper_ctrl_pkt_session_handle(ctx, rx_buff[i], &meta); + } else { + sf = shaper_raw_pkt_session_handle(ctx, rx_buff[i], &meta); + } - if (meta.is_ctrl_pkt || !sf) {//ctrl pkt need send directly - marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &rx_buff, 1); - } else { - shaping_stream_process(ctx, rx_buff, &meta, sf); + if (meta.is_ctrl_pkt || !sf) {//ctrl pkt need send directly + marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &rx_buff[i], 1); + } else { + shaping_packet_process(ctx, rx_buff[i], &meta, sf); + } + polling_entry(ctx->sp, ctx->stat, ctx); } - polling_entry(ctx->sp, ctx->stat, ctx); return; } -int shaper_global_conf_init(struct shaping_global_conf *conf) +int shaper_global_conf_init(struct shaping_system_conf *conf) { int ret; int array_num; cJSON *json = NULL; cJSON *tmp_obj = NULL, *tmp_array_obj = NULL; char polling_node_num_max[128] = {0}; + unsigned int cpu_mask[SHAPING_WROK_THREAD_NUM_MAX] = {0}; ret = MESA_load_profile_int_nodef(SHAPING_GLOBAL_CONF_FILE, "SYSTEM", "WORK_THREAD_NUM", &conf->work_thread_num); if (ret < 0) { @@ -859,11 +915,14 @@ int shaper_global_conf_init(struct shaping_global_conf *conf) return ret; } - ret = MESA_load_profile_uint_range(SHAPING_GLOBAL_CONF_FILE, "SYSTEM", "CPU_AFFINITY_MASK", SHAPING_WROK_THREAD_NUM_MAX, (unsigned int *)conf->cpu_affinity_mask); + ret = MESA_load_profile_uint_range(SHAPING_GLOBAL_CONF_FILE, "SYSTEM", "CPU_AFFINITY_MASK", SHAPING_WROK_THREAD_NUM_MAX, cpu_mask); if (ret < 0 || ret != conf->work_thread_num) { LOG_ERROR("%s: shaping init global conf get CPU_AFFINITY_MASK failed or incomplete config", LOG_TAG_SHAPING); return -1; } + for (int i = 0; i < conf->work_thread_num; i++) { + conf->cpu_affinity_mask |= 1 << cpu_mask[i]; + } #if 0 //temporarily not support array config array_num = SHAPING_PRIORITY_NUM_MAX; @@ -921,10 +980,6 @@ int shaper_global_conf_init(struct shaping_global_conf *conf) } /*************************************************************************/ - - MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "TELEGRAF_IP", conf->telegraf_ip, sizeof(conf->telegraf_ip), "127.0.0.1"); - MESA_load_profile_short_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "TELEGRAF_PORT", &conf->telegraf_port, 6379); - MESA_load_profile_uint_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "SESSION_QUEUE_LEN_MAX", &conf->session_queue_len_max, 128); MESA_load_profile_uint_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "PRIORITY_QUEUE_LEN_MAX", &conf->priority_queue_len_max, 1024); @@ -945,11 +1000,11 @@ void shaping_engine_destroy(struct shaping_ctx *ctx) shaper_swarmkv_destroy(ctx->swarmkv_db); shaper_maat_destroy(ctx->maat_info); shaper_marsio_destroy(ctx->marsio_info); + shaper_stat_destroy(ctx->stat); if (ctx->thread_ctx) { for (int i = 0; i < ctx->thread_num; i++) { shaper_free(ctx->thread_ctx[i].sp); - shaper_stat_send_free(ctx->thread_ctx[i].stat); session_table_destory(ctx->thread_ctx[i].session_table); } free(ctx->thread_ctx); @@ -963,7 +1018,7 @@ void shaping_engine_destroy(struct shaping_ctx *ctx) struct shaping_ctx *shaping_engine_init() { - struct shaping_global_conf conf; + struct shaping_system_conf conf; struct shaping_ctx *ctx = NULL; int ret; @@ -994,22 +1049,26 @@ struct shaping_ctx *shaping_engine_init() } /*init marsio*/ - ctx->marsio_info = shaper_marsio_init(conf.work_thread_num); + ctx->marsio_info = shaper_marsio_init(&conf); if (ctx->marsio_info == NULL) { goto ERROR; } + + ctx->stat = shaper_stat_init(conf.work_thread_num); + if (ctx->stat == NULL) { + goto ERROR; + } ctx->thread_ctx = (struct shaping_thread_ctx *)calloc(conf.work_thread_num, sizeof(struct shaping_thread_ctx)); ctx->thread_num = conf.work_thread_num; for (int i = 0; i < conf.work_thread_num; i++) { ctx->thread_ctx[i].thread_index = i; ctx->thread_ctx[i].sp = shaper_new(conf.priority_queue_len_max); - ctx->thread_ctx[i].stat = shaper_stat_new(conf.telegraf_ip, conf.telegraf_port); + ctx->thread_ctx[i].stat = ctx->stat; ctx->thread_ctx[i].session_table = session_table_create(); ctx->thread_ctx[i].maat_info = ctx->maat_info; ctx->thread_ctx[i].marsio_info = ctx->marsio_info; ctx->thread_ctx[i].swarmkv_db = ctx->swarmkv_db; - ctx->thread_ctx[i].cpu_mask = conf.cpu_affinity_enable ? conf.cpu_affinity_mask[i] : -1; ctx->thread_ctx[i].ref_ctx = ctx; ctx->thread_ctx[i].session_queue_len_max = conf.session_queue_len_max; memcpy(ctx->thread_ctx[i].polling_node_num_max, conf.polling_node_num_max, sizeof(conf.polling_node_num_max)); diff --git a/shaping/src/shaper_maat.cpp b/shaping/src/shaper_maat.cpp index 7d62314..1849987 100644 --- a/shaping/src/shaper_maat.cpp +++ b/shaping/src/shaper_maat.cpp @@ -283,12 +283,12 @@ static void shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_ru goto END; } - if (s_rule->priority + i + 1 < SHAPING_PRIORITY_NUM_MAX) {//TODO: 优先级大于9的都按9处理 + if (s_rule->priority + i + 1 < SHAPING_PRIORITY_NUM_MAX) { shaper_profile_update(&s_rule_info->borrowing[i], s_pf, s_rule->priority + i + 1); - s_rule_info->borrowing_num++; } else { - goto END; + shaper_profile_update(&s_rule_info->borrowing[i], s_pf, SHAPING_PRIORITY_NUM_MAX - 1); } + s_rule_info->borrowing_num++; } END: diff --git a/shaping/src/shaper_marsio.cpp b/shaping/src/shaper_marsio.cpp index 3ea774d..cbc5d09 100644 --- a/shaping/src/shaper_marsio.cpp +++ b/shaping/src/shaper_marsio.cpp @@ -1,6 +1,7 @@ #include <MESA_prof_load.h> #include <MESA/MESA_handle_logger.h> #include <cjson/cJSON.h> +#include <marsio.h> #include "log.h" #include "raw_packet.h" @@ -10,6 +11,7 @@ struct shaper_marsio_config { + int rx_brust_max; char app_symbol[256]; char dev_interface[256]; }; @@ -29,6 +31,9 @@ static int shaper_marsio_config_load(struct shaper_marsio_config *conf) LOG_ERROR("%s: shaping load MARSIO conf APP_SYMBOL failed", LOG_TAG_MARSIO); return ret; } + + ret = MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "SYSTEM", "RX_BRUST_MAX", &conf->rx_brust_max, 1); + conf->rx_brust_max = conf->rx_brust_max <= SHAPER_MARSIO_RX_BRUST_MAX ? conf->rx_brust_max : SHAPER_MARSIO_RX_BRUST_MAX; return 0; } @@ -56,7 +61,7 @@ void shaper_marsio_destroy(struct shaping_marsio_info *marsio_info) return; } -struct shaping_marsio_info* shaper_marsio_init(int thread_num) +struct shaping_marsio_info* shaper_marsio_init(struct shaping_system_conf *sys_conf) { struct shaper_marsio_config conf; struct shaping_marsio_info *marsio_info; @@ -75,18 +80,24 @@ struct shaping_marsio_info* shaper_marsio_init(int thread_num) goto ERROR; } - if (marsio_option_set(marsio_info->instance, MARSIO_OPT_EXIT_WHEN_ERR, &opt, sizeof(opt)) != 0) - { + if (marsio_option_set(marsio_info->instance, MARSIO_OPT_EXIT_WHEN_ERR, &opt, sizeof(opt)) != 0) { LOG_ERROR("%s: shaping marsio set MARSIO_OPT_EXIT_WHEN_ERR failed", LOG_TAG_MARSIO); goto ERROR; } + if (sys_conf->cpu_affinity_enable) { + if (marsio_option_set(marsio_info->instance, MARSIO_OPT_THREAD_MASK, &sys_conf->cpu_affinity_mask, sizeof(sys_conf->cpu_affinity_mask)) != 0) { + LOG_ERROR("%s: shaping marsio set MARSIO_OPT_THREAD_MASK failed", LOG_TAG_MARSIO); + goto ERROR; + } + } + if (marsio_init(marsio_info->instance, conf.app_symbol) != 0) { LOG_ERROR("%s: shaping marsio init failed", LOG_TAG_MARSIO); goto ERROR; } - marsio_info->mr_dev = marsio_open_device(marsio_info->instance, conf.dev_interface, thread_num, thread_num); + marsio_info->mr_dev = marsio_open_device(marsio_info->instance, conf.dev_interface, sys_conf->work_thread_num, sys_conf->work_thread_num); if (!marsio_info->mr_dev) { LOG_ERROR("%s: shaping marsio open device %s failed", LOG_TAG_MARSIO, conf.dev_interface); goto ERROR; @@ -98,6 +109,8 @@ struct shaping_marsio_info* shaper_marsio_init(int thread_num) goto ERROR; } + marsio_info->rx_brust_max = conf.rx_brust_max; + return marsio_info; ERROR: diff --git a/shaping/src/shaper_session.cpp b/shaping/src/shaper_session.cpp index badcea9..6194a90 100644 --- a/shaping/src/shaper_session.cpp +++ b/shaping/src/shaper_session.cpp @@ -80,7 +80,7 @@ void shaper_session_data_free_cb(void *session_data, void *data) struct shaping_thread_ctx *ctx = (struct shaping_thread_ctx *)data; if (sf) { - shaper_queue_clear(sf, &ctx->stat->stat_hashtbl, ctx); + shaper_queue_clear(sf, ctx); shaping_flow_free(sf); } diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp index 9ed1669..6c9644b 100644 --- a/shaping/src/shaper_stat.cpp +++ b/shaping/src/shaper_stat.cpp @@ -2,297 +2,231 @@ #include <time.h> #include <sys/socket.h> #include <arpa/inet.h> +#include <MESA/MESA_prof_load.h> +#include <fieldstat.h> -#include <MESA/stream.h> - +#include "log.h" +#include "utils.h" #include "shaper.h" #include "shaper_stat.h" +struct shaper_stat_conf { + int enable_backgroud_thread; + int output_interval_ms; + char telegraf_ip[16]; + short telegraf_port; +}; -#define SHAPING_STAT_SEND_INTERVAL_SEC 1 //unit: second -#define SHAPING_STAT_SEND_INTERVAL_NS 500000000 //unit: nano second - -#define SHAPING_STAT_FORMAT "SHAPING-STAT,rule_id=%d,profile_id=%d,priority=%d,profile_type=%s "\ - "queueing_sessions=%d,in_rx_pkts=%llu,in_rx_bytes=%llu,"\ - "in_tx_pkts=%llu,in_tx_bytes=%llu,in_drop_pkts=%llu,in_max_latency_us=%llu,in_queue_len=%lld,"\ - "out_rx_pkts=%llu,out_rx_bytes=%llu,out_tx_pkts=%llu,out_tx_bytes=%llu,"\ - "out_drop_pkts=%llu,out_max_latency_us=%llu,out_queue_len=%lld" - -static void shaper_stat_counter_clear(struct shaping_stat_data *s) -{ - long long in_queue_len, out_queue_len; - struct shaping_stat_data_dir *in = &s->incoming; - struct shaping_stat_data_dir *out = &s->outgoing; - - in_queue_len = in->queue_len;//queue_len is gauge metric, do not clear - out_queue_len = out->queue_len; - - memset(in, 0, sizeof(struct shaping_stat_data_dir)); - memset(out, 0, sizeof(struct shaping_stat_data_dir)); - - in->queue_len = in_queue_len; - out->queue_len = out_queue_len; - - return; -} - -static void shaper_stat_data_send(struct shaping_stat *stat, struct shaping_stat_data *s) -{ - char buf[1024]; - struct shaping_stat_data_dir *in = &s->incoming; - struct shaping_stat_data_dir *out = &s->outgoing; - - snprintf(buf, sizeof(buf), SHAPING_STAT_FORMAT, s->key.rule_id, s->key.profile_id, s->key.priority, - s->key.profile_type == SHAPING_PROFILE_TYPE_PRIMARY ? "primary" : "borrow", - s->queueing_session_num, in->rx_pkts, in->rx_bytes, in->tx_pkts, in->tx_bytes, - in->drop_pkts, in->max_latency, in->queue_len, out->rx_pkts, out->rx_bytes, out->tx_pkts, out->tx_bytes, - out->drop_pkts, out->max_latency, out->queue_len); - - sendto(stat->sock_fd, buf, strlen(buf), 0, (struct sockaddr*)&stat->sock_addr, sizeof(stat->sock_addr)); - - shaper_stat_counter_clear(s); - return; -} +thread_local struct fieldstat_tag tags[TAG_IDX_MAX]; -static void shaper_stat_data_send_free(struct shaping_stat *stat, struct shaping_stat_data **stat_hashtbl) +void shaper_stat_destroy(struct shaping_stat *stat) { - struct shaping_stat_data *s, *tmp = NULL; - - if (!stat || !*stat_hashtbl) { + if (!stat) { return; } - HASH_ITER(hh, *stat_hashtbl, s, tmp) { - shaper_stat_data_send(stat, s); - HASH_DEL(*stat_hashtbl, s); - free(s); + if (stat->instance) { + fieldstat_dynamic_instance_free(stat->instance); } + free(stat); return; } -void shaper_stat_send_free(struct shaping_stat *stat) +static int shaper_stat_conf_load(struct shaper_stat_conf *conf) { - if (!stat) { - return; - } + memset(conf, 0, sizeof(struct shaper_stat_conf)); - if (stat->stat_hashtbl) { - shaper_stat_data_send_free(stat, &stat->stat_hashtbl); - } - free(stat); + MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "TELEGRAF_IP", conf->telegraf_ip, sizeof(conf->telegraf_ip), "127.0.0.1"); + MESA_load_profile_short_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "TELEGRAF_PORT", &conf->telegraf_port, 6379); + MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "FIELDSTAT_OUTPUT_INTERVAL_MS", &conf->output_interval_ms, 500); + MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "FIELDSTAT_ENABLE_BACKGRUND_THREAD", &conf->enable_backgroud_thread, 1); - return; + return 0; } -void shaper_stat_send(struct shaping_stat *stat, struct shaping_stat_data **stat_hashtbl) +struct shaping_stat* shaper_stat_init(int thread_num) { - struct shaping_stat_data *s, *tmp = NULL; - struct timespec curr_time; - - if (!stat || !*stat_hashtbl) { - return; + struct shaping_stat *stat = NULL; + int column_num; + struct shaper_stat_conf conf; + const char *column_name[] = {"queueing_sessions", "in_max_latency_us", "in_queue_len", "out_max_latency_us", "out_queue_len", //first line is gauge, second line is counter + "in_pkts", "in_bytes", "in_drop_pkts", "out_pkts", "out_bytes", "out_drop_pkts"}; + enum field_type column_type[] = {FIELD_TYPE_GAUGE, FIELD_TYPE_GAUGE, FIELD_TYPE_GAUGE, FIELD_TYPE_GAUGE, FIELD_TYPE_GAUGE, + FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER}; + + column_num = sizeof(column_name)/sizeof(column_name[0]); + if (column_num != STAT_COLUNM_IDX_MAX) { + LOG_ERROR("%s: shaping init fieldstat failed, column_num %d != index num %d", LOG_TAG_STAT, column_num, STAT_COLUNM_IDX_MAX); + goto ERROR; } - clock_gettime(CLOCK_MONOTONIC, &curr_time); - if (curr_time.tv_sec - stat->update_time.tv_sec >= SHAPING_STAT_SEND_INTERVAL_SEC|| - curr_time.tv_nsec - stat->update_time.tv_nsec >= SHAPING_STAT_SEND_INTERVAL_NS) { - stat->update_time = curr_time; - HASH_ITER(hh, *stat_hashtbl, s, tmp) { - shaper_stat_data_send(stat, s); - } + if (shaper_stat_conf_load(&conf) != 0) { + LOG_ERROR("%s: shaping init metric conf failed", LOG_TAG_STAT); + goto ERROR; } - return; -} + stat = (struct shaping_stat *)calloc(1, sizeof(struct shaping_stat)); -struct shaping_stat* shaper_stat_new(char *telegraf_ip, short telegraf_port) -{ - struct shaping_stat *stat = NULL; - struct timespec curr_time; + stat->instance = fieldstat_dynamic_instance_new("shaping_engine", thread_num); + if (stat->instance == NULL) { + LOG_ERROR("%s: shaping init fieldstat instance failed", LOG_TAG_STAT); + goto ERROR; + } - clock_gettime(CLOCK_MONOTONIC, &curr_time); + fieldstat_dynamic_set_output_interval(stat->instance, conf.output_interval_ms); + fieldstat_dynamic_set_line_protocol_server(stat->instance, conf.telegraf_ip, conf.telegraf_port); + if (conf.enable_backgroud_thread == 0) { + fieldstat_dynamic_disable_background_thread(stat->instance); + } - stat = (struct shaping_stat *)calloc(1, sizeof(struct shaping_stat)); + stat->table_id = fieldstat_register_dynamic_table(stat->instance, "shaping_metric", column_name, column_type, column_num, stat->column_ids); + if (stat->table_id < 0) { + LOG_ERROR("%s: shaping fieldstat register table failed", LOG_TAG_STAT); + goto ERROR; + } + + tags[TAG_RULE_ID_IDX].key = "rule_id"; + tags[TAG_RULE_ID_IDX].value_type = 0; + tags[TAG_PROFILE_ID_IDX].key = "profile_id"; + tags[TAG_PROFILE_ID_IDX].value_type = 0; + tags[TAG_PRIORITY_IDX].key = "priority"; + tags[TAG_PRIORITY_IDX].value_type = 0; + tags[TAG_PROFILE_TYPE_IDX].key = "profile_type"; + tags[TAG_PROFILE_TYPE_IDX].value_type = 2; - stat->sock_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - stat->sock_addr.sin_family = AF_INET; - stat->sock_addr.sin_port = htons(telegraf_port); - stat->sock_addr.sin_addr.s_addr = inet_addr(telegraf_ip); - stat->update_time = curr_time; + fieldstat_dynamic_instance_start(stat->instance); return stat; + +ERROR: + if (stat) { + if (stat->instance) { + fieldstat_dynamic_instance_free(stat->instance); + } + free(stat); + } + return NULL; } -static struct shaping_stat_data *shaper_stat_ins_get(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id, int priority, int profile_type) +static void shaper_stat_tags_build(int rule_id, int profile_id, int priority, int profile_type) { - struct shaping_stat_data *s_stat_data = NULL; - struct shaping_stat_data_key key; - memset(&key, 0, sizeof(key));//important for uthash opration - key.rule_id = rule_id; - key.profile_id = profile_id; - key.priority = priority; - key.profile_type = profile_type; + tags[TAG_RULE_ID_IDX].value_int = rule_id; - HASH_FIND(hh, *stat_hashtbl, &key, sizeof(struct shaping_stat_data_key), s_stat_data); - if (!s_stat_data) { - s_stat_data = (struct shaping_stat_data *)calloc(1, sizeof(struct shaping_stat_data)); + tags[TAG_PROFILE_ID_IDX].value_int = profile_id; - memcpy(&s_stat_data->key, &key, sizeof(key)); + tags[TAG_PRIORITY_IDX].value_int = priority; - HASH_ADD(hh, *stat_hashtbl, key, sizeof(struct shaping_stat_data_key), s_stat_data); + if (profile_type == SHAPING_PROFILE_TYPE_PRIMARY) { + tags[TAG_PROFILE_TYPE_IDX].value_str = "primary"; + } else { + tags[TAG_PROFILE_TYPE_IDX].value_str = "borrow"; } - return s_stat_data; + return; } -void shaper_stat_drop_inc(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id, - int priority, unsigned char direction, int pkt_len) +void shaper_stat_drop_inc(struct shaping_stat *stat, int rule_id, int profile_id, + int priority, unsigned char direction, int pkt_len, int thread_id) { - struct shaping_stat_data *s_stat_data = NULL; - - s_stat_data = shaper_stat_ins_get(stat_hashtbl, rule_id, profile_id, priority, SHAPING_PROFILE_TYPE_PRIMARY); + shaper_stat_tags_build(rule_id, profile_id, priority, SHAPING_PROFILE_TYPE_PRIMARY); if (direction == SHAPING_DIR_IN) { - s_stat_data->incoming.drop_pkts++; - s_stat_data->incoming.rx_pkts++; - s_stat_data->incoming.rx_bytes += pkt_len; + fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_DROP_PKTS_IDX], "shaping_metric_row", 1, tags, TAG_IDX_MAX, thread_id); } else { - s_stat_data->outgoing.drop_pkts++; - s_stat_data->outgoing.rx_pkts++; - s_stat_data->outgoing.rx_bytes += pkt_len; + fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_DROP_PKTS_IDX], "shaping_metric_row", 1, tags, TAG_IDX_MAX, thread_id); } return; } -void shaper_stat_forward_inc(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id, - int priority, unsigned char direction, int pkt_len, int profile_type) +void shaper_stat_forward_inc(struct shaping_stat *stat, int rule_id, int profile_id, + int priority, unsigned char direction, int pkt_len, int profile_type, int thread_id) { - struct shaping_stat_data *s_stat_data = NULL; - - s_stat_data = shaper_stat_ins_get(stat_hashtbl, rule_id, profile_id, priority, profile_type); + shaper_stat_tags_build(rule_id, profile_id, priority, profile_type); if (direction == SHAPING_DIR_IN) { - s_stat_data->incoming.tx_pkts++; - s_stat_data->incoming.tx_bytes += pkt_len; - s_stat_data->incoming.rx_pkts++; - s_stat_data->incoming.rx_bytes += pkt_len; + fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_PKTS_IDX], "shaping_metric_row", 1, tags, TAG_IDX_MAX, thread_id); + fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_BYTES_IDX], "shaping_metric_row", pkt_len, tags, TAG_IDX_MAX, thread_id); } else { - s_stat_data->outgoing.tx_pkts++; - s_stat_data->outgoing.tx_bytes += pkt_len; - s_stat_data->outgoing.rx_pkts++; - s_stat_data->outgoing.rx_bytes += pkt_len; + fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_PKTS_IDX], "shaping_metric_row", 1, tags, TAG_IDX_MAX, thread_id); + fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_BYTES_IDX], "shaping_metric_row", pkt_len, tags, TAG_IDX_MAX, thread_id); } return; } -void shaper_stat_forward_all_rule_inc(struct shaping_stat_data **stat_hashtbl, struct shaping_flow *sf, unsigned char direction, int pkt_len) +void shaper_stat_forward_all_rule_inc(struct shaping_stat *stat, struct shaping_flow *sf, unsigned char direction, int pkt_len, int thread_id) { struct shaping_rule_info *rule; int i; for (i = 0; i < sf->rule_num; i++) { rule = &sf->matched_rule_infos[i]; - shaper_stat_forward_inc(stat_hashtbl, rule->id, rule->primary.id, rule->primary.priority, direction, pkt_len, SHAPING_PROFILE_TYPE_PRIMARY); + shaper_stat_forward_inc(stat, rule->id, rule->primary.id, rule->primary.priority, direction, pkt_len, SHAPING_PROFILE_TYPE_PRIMARY, thread_id); } return; } -void shaper_stat_queueing_session_inc(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id, int priority, int profile_type) +void shaper_stat_queueing_session_inc(struct shaping_stat *stat, int rule_id, int profile_id, int priority, int profile_type, int thread_id) { - struct shaping_stat_data *s_stat_data = NULL; - - s_stat_data = shaper_stat_ins_get(stat_hashtbl, rule_id, profile_id, priority, profile_type); - - s_stat_data->queueing_session_num++; + shaper_stat_tags_build(rule_id, profile_id, priority, profile_type); + fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[QUEUEING_SESSIONS_IDX], "shaping_metric_row", 1, tags, TAG_IDX_MAX, thread_id); return; } -void shaper_stat_queueing_session_dec(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id, int priority, int profile_type) +void shaper_stat_queueing_session_dec(struct shaping_stat *stat, int rule_id, int profile_id, int priority, int profile_type, int thread_id) { - struct shaping_stat_data *s_stat_data = NULL; - - s_stat_data = shaper_stat_ins_get(stat_hashtbl, rule_id, profile_id, priority, profile_type); - - s_stat_data->queueing_session_num--; + shaper_stat_tags_build(rule_id, profile_id, priority, profile_type); + fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[QUEUEING_SESSIONS_IDX], "shaping_metric_row", -1, tags, TAG_IDX_MAX, thread_id); return; } -void shaper_stat_queueing_pkt_inc(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id, - int priority, unsigned char direction, int pkt_len, int profile_type) +void shaper_stat_queueing_pkt_inc(struct shaping_stat *stat, int rule_id, int profile_id, + int priority, unsigned char direction, int pkt_len, int profile_type, int thread_id) { - struct shaping_stat_data *s_stat_data = NULL; - - s_stat_data = shaper_stat_ins_get(stat_hashtbl, rule_id, profile_id, priority, profile_type); - + shaper_stat_tags_build(rule_id, profile_id, priority, profile_type); if (direction == SHAPING_DIR_IN) { - s_stat_data->incoming.rx_pkts++; - s_stat_data->incoming.rx_bytes += pkt_len; - s_stat_data->incoming.queue_len++; + fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_QUEUE_LEN_IDX], "shaping_metric_row", 1, tags, TAG_IDX_MAX, thread_id); } else { - s_stat_data->outgoing.rx_pkts++; - s_stat_data->outgoing.rx_bytes += pkt_len; - s_stat_data->outgoing.queue_len++; + fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_QUEUE_LEN_IDX], "shaping_metric_row", 1, tags, TAG_IDX_MAX, thread_id); } return; } -void shaper_stat_queueing_pkt_dec(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id, - int priority, unsigned char direction, int pkt_len, int profile_type) +void shaper_stat_queueing_pkt_dec(struct shaping_stat *stat, int rule_id, int profile_id, + int priority, unsigned char direction, int pkt_len, int profile_type, int thread_id) { - struct shaping_stat_data *s_stat_data = NULL; - - s_stat_data = shaper_stat_ins_get(stat_hashtbl, rule_id, profile_id, priority, profile_type); - + shaper_stat_tags_build(rule_id, profile_id, priority, profile_type); if (direction == SHAPING_DIR_IN) { - s_stat_data->incoming.rx_pkts--; - s_stat_data->incoming.rx_bytes -= pkt_len; - s_stat_data->incoming.queue_len--; + fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_QUEUE_LEN_IDX], "shaping_metric_row", -1, tags, TAG_IDX_MAX, thread_id); } else { - s_stat_data->outgoing.rx_pkts--; - s_stat_data->outgoing.rx_bytes -= pkt_len; - s_stat_data->outgoing.queue_len--; + fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_QUEUE_LEN_IDX], "shaping_metric_row", -1, tags, TAG_IDX_MAX, thread_id); } return; } -void shaper_stat_max_latency_update(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id, - int priority, unsigned char direction, unsigned long long latency, int profile_type) +void shaper_stat_max_latency_update(struct shaping_stat *stat, int rule_id, int profile_id, + int priority, unsigned char direction, unsigned long long latency, int profile_type, int thread_id) { - struct shaping_stat_data *s_stat_data = NULL; - - s_stat_data = shaper_stat_ins_get(stat_hashtbl, rule_id, profile_id, priority, profile_type); + unsigned long long old_latency; + shaper_stat_tags_build(rule_id, profile_id, priority, profile_type); if (direction == SHAPING_DIR_IN) { - if (latency > s_stat_data->incoming.max_latency) { - s_stat_data->incoming.max_latency = latency; + old_latency = fieldstat_dynamic_table_metric_value_get(stat->instance, stat->table_id, stat->column_ids[IN_MAX_LATENCY_IDX], "shaping_metric_row", tags, TAG_IDX_MAX, thread_id); + if (latency > old_latency) { + fieldstat_dynamic_table_metric_value_set(stat->instance, stat->table_id, stat->column_ids[IN_MAX_LATENCY_IDX], "shaping_metric_row", latency, tags, TAG_IDX_MAX, thread_id); } } else { - if (latency > s_stat_data->outgoing.max_latency) { - s_stat_data->outgoing.max_latency = latency; + old_latency = fieldstat_dynamic_table_metric_value_get(stat->instance, stat->table_id, stat->column_ids[OUT_MAX_LATENCY_IDX], "shaping_metric_row", tags, TAG_IDX_MAX, thread_id); + if (latency > old_latency) { + fieldstat_dynamic_table_metric_value_set(stat->instance, stat->table_id, stat->column_ids[OUT_MAX_LATENCY_IDX], "shaping_metric_row", latency, tags, TAG_IDX_MAX, thread_id); } } return; -} - -#if 0 -/*********just for self test stub****************/ -void stub_shaper_stat_send(int thread_seq) -{ - struct shaping_stat_data *s, *tmp; - - HASH_ITER(hh, g_rt_para.stat[thread_seq]->stat_hashtbl, s, tmp) { - shaper_stat_data_send(g_rt_para.stat[thread_seq], s); - } - - return; -} -/************************************************/ -#endif
\ No newline at end of file +}
\ No newline at end of file diff --git a/shaping/src/shaper_swarmkv.cpp b/shaping/src/shaper_swarmkv.cpp index 2c871a1..c0cfb6a 100644 --- a/shaping/src/shaper_swarmkv.cpp +++ b/shaping/src/shaper_swarmkv.cpp @@ -106,6 +106,7 @@ struct swarmkv* shaper_swarmkv_init() swarmkv_options_set_health_check_port(swarmkv_opts, conf.swarmkv_health_check_port); swarmkv_options_set_health_check_announce_port(swarmkv_opts, conf.swarmkv_health_check_announce_port); swarmkv_options_set_log_path(swarmkv_opts, "log"); + swarmkv_options_set_log_level(swarmkv_opts, 4); swarmkv_db = swarmkv_open(swarmkv_opts, conf.swarmkv_cluster_name, &err); if (err) { |
