diff options
Diffstat (limited to 'shaping')
| -rw-r--r-- | shaping/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | shaping/include/shaper.h | 42 | ||||
| -rw-r--r-- | shaping/include/shaper_aqm.h | 10 | ||||
| -rw-r--r-- | shaping/include/shaper_stat.h | 2 | ||||
| -rw-r--r-- | shaping/src/shaper.cpp | 108 | ||||
| -rw-r--r-- | shaping/src/shaper_aqm.cpp | 177 | ||||
| -rw-r--r-- | shaping/src/shaper_maat.cpp | 2 | ||||
| -rw-r--r-- | shaping/src/shaper_stat.cpp | 26 | ||||
| -rw-r--r-- | shaping/test/gtest_shaper.cpp | 4 |
9 files changed, 288 insertions, 85 deletions
diff --git a/shaping/CMakeLists.txt b/shaping/CMakeLists.txt index 846a1bb..9c6daf2 100644 --- a/shaping/CMakeLists.txt +++ b/shaping/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(shaper src/shaper_maat.cpp src/shaper_marsio.cpp src/shaper_session.cpp src/shaper_stat.cpp src/shaper_swarmkv.cpp src/shaper.cpp src/shaper_global_stat.cpp) +add_library(shaper src/shaper_maat.cpp src/shaper_marsio.cpp src/shaper_session.cpp src/shaper_stat.cpp src/shaper_swarmkv.cpp src/shaper.cpp src/shaper_global_stat.cpp src/shaper_aqm.cpp) target_link_libraries(shaper PUBLIC common) target_link_libraries(shaper PUBLIC avl_tree) target_link_libraries(shaper PUBLIC cjson) diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h index 2be4d65..f629124 100644 --- a/shaping/include/shaper.h +++ b/shaping/include/shaper.h @@ -16,7 +16,6 @@ #define SHAPER_FLOW_POP_NUM_MAX 10 #define SESSION_CLOSE 0x1 -#define SESSION_UPDATE_PF_PRIO_LEN 0x2 #define CONFIRM_PRIORITY_PKTS 20 @@ -24,6 +23,10 @@ #define SHAPING_GLOBAL_CONF_FILE "./conf/shaping.conf" +#define NANO_SECONDS_PER_MICRO_SEC 1000 +#define MICRO_SECONDS_PER_SEC 1000000 +#define NANO_SECONDS_PER_SEC 1000000000 + struct shaping_system_conf { unsigned int session_queue_len_max; unsigned int priority_queue_len_max; @@ -78,9 +81,26 @@ enum shaping_profile_type { PROFILE_TYPE_SPLIT_BY_LOCAL_HOST }; +enum shaper_aqm_type { + AQM_TYPE_NONE = 0, + AQM_TYPE_BLUE, + AQM_TYPE_CODEL, + AQM_TYPE_MAX +}; + +struct shaper_aqm_blue_para { + time_t update_time; + int queue_len_max; + int probability; + int queue_len; + //int d1;//increase delta + //int d2;//decrease delta +}; + struct shaping_profile_info { int id;//profile_id enum shaping_profile_type type; + enum shaper_aqm_type aqm_type; int priority; int in_deposit_token; int out_deposit_token; @@ -90,6 +110,10 @@ struct shaping_profile_info { unsigned char is_priority_blocked; unsigned char is_invalid; struct shaping_stat_for_profile stat; + union { + struct shaper_aqm_blue_para blue_para; + //struct shaper_aqm_codel_para codel_para; + }aqm_para; }; struct shaping_rule_info { @@ -153,22 +177,22 @@ struct shaper;//instance of shaping, thread unsafe struct shaping_flow* shaping_flow_new(); void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf); -struct shaper* shaper_new(unsigned int priority_queue_len_max); -void shaper_free(struct shaper *sp); +//struct shaper* shaper_new(unsigned int priority_queue_len_max); +//void shaper_free(struct shaper *sp); bool shaper_queue_empty(struct shaping_flow *sf); -void shaper_packet_dequeue(struct shaping_flow *sf); -struct shaping_packet_wrapper* shaper_first_pkt_get(struct shaping_flow *sf); +//void shaper_packet_dequeue(struct shaping_flow *sf); +//struct shaping_packet_wrapper* shaper_first_pkt_get(struct shaping_flow *sf); void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx); /*return value: 0 for success, -1 for failed*/ -int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time); +//int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time); /*return num of sf_ins*/ -void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf); -int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num); +//void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf); +//int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num); //enum shaping_packet_action shaper_pkt_action_decide(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int priority, int sf_in_queue); -int shaper_global_conf_init(struct shaping_system_conf *conf); +//int shaper_global_conf_init(struct shaping_system_conf *conf); void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx); void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_buff, struct metadata *meta, struct shaping_flow *sf); diff --git a/shaping/include/shaper_aqm.h b/shaping/include/shaper_aqm.h new file mode 100644 index 0000000..f7e0123 --- /dev/null +++ b/shaping/include/shaper_aqm.h @@ -0,0 +1,10 @@ +#pragma once + +enum shaper_aqm_action { + AQM_ACTION_PASS, + AQM_ACTION_DROP, +}; + +int shaper_aqm_enqueue(struct shaping_profile_info *profile); +int shaper_aqm_dequeue(); +int shaper_aqm_need_drop(struct shaping_profile_info *profile, struct shaping_packet_wrapper *pkt_wrapper);
\ No newline at end of file diff --git a/shaping/include/shaper_stat.h b/shaping/include/shaper_stat.h index 20da941..a2527cd 100644 --- a/shaping/include/shaper_stat.h +++ b/shaping/include/shaper_stat.h @@ -55,4 +55,4 @@ void shaper_stat_forward_all_rule_inc(struct shaping_stat *stat, struct shaping_ void shaper_stat_drop_inc(struct shaping_stat_for_profile *profile_stat, unsigned char direction, int thread_id); void shaper_stat_max_latency_update(struct shaping_stat_for_profile *profile_stat, unsigned char direction, unsigned long long latency, int thread_id); -void shaper_stat_refresh(struct shaping_stat *stat, struct shaping_flow *sf, int thread_id, int force);
\ No newline at end of file +void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int thread_id, int force);
\ No newline at end of file diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 2d88b01..aef5a65 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -22,10 +22,7 @@ extern "C" { #include "shaper_swarmkv.h" #include "shaper_maat.h" #include "shaper_global_stat.h" - -#define NANO_SECONDS_PER_MICRO_SEC 1000 -#define MICRO_SECONDS_PER_SEC 1000000 -#define NANO_SECONDS_PER_SEC 1000000000 +#include "shaper_aqm.h" #define SHAPING_LATENCY_THRESHOLD 2000000 //2s @@ -65,7 +62,23 @@ struct shaping_profile_container { int pf_type; }; -struct shaper* shaper_new(unsigned int priority_queue_len_max) +static void shaper_free(struct shaper *sp) +{ + int i; + + if (sp) { + for (i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) { + if (sp->priority_trees[i]) { + avl_tree_destroy(sp->priority_trees[i]); + } + } + free(sp); + } + + return; +} + +static struct shaper* shaper_new(unsigned int priority_queue_len_max) { struct shaper *sp = NULL; int i; @@ -90,22 +103,6 @@ ERROR: return NULL; } -void shaper_free(struct shaper *sp) -{ - int i; - - if (sp) { - for (i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) { - if (sp->priority_trees[i]) { - avl_tree_destroy(sp->priority_trees[i]); - } - } - free(sp); - } - - return; -} - static void shaping_node_free(struct shaping_node *s_node) { int i; @@ -164,7 +161,7 @@ void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) struct shaping_node *s_node = (struct shaping_node*)sf; if (__atomic_sub_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST) == 0) { - shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 1); + shaper_stat_refresh(ctx, sf, ctx->thread_index, 1); shaping_node_free(s_node); } @@ -180,6 +177,10 @@ static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_ return -1; } + if (shaper_aqm_enqueue(&sf->matched_rule_infos[sf->anchor].primary) == AQM_ACTION_DROP) { + return -1; + } + s_pkt = (struct shaping_packet_wrapper*)calloc(1, sizeof(struct shaping_packet_wrapper)); if (!s_pkt) { return -1; @@ -204,12 +205,12 @@ bool shaper_queue_empty(struct shaping_flow *sf) return TAILQ_EMPTY(&sf->packet_queue); } -struct shaping_packet_wrapper* shaper_first_pkt_get(struct shaping_flow *sf) +static struct shaping_packet_wrapper* shaper_first_pkt_get(struct shaping_flow *sf) { return TAILQ_FIRST(&sf->packet_queue); } -void shaper_packet_dequeue(struct shaping_flow *sf) +static void shaper_packet_dequeue(struct shaping_flow *sf) { struct shaping_packet_wrapper *s_pkt; @@ -244,21 +245,8 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx) return; } -static void swarmkv_reply_cb_do_nothing(const struct swarmkv_reply *reply, void * cb_arg) -{ - struct shaping_global_stat *global_stat = (struct shaping_global_stat *)cb_arg; - - shaper_global_stat_async_callback_inc(global_stat); - - if (reply->type != SWARMKV_REPLY_INTEGER) { - shaper_global_stat_async_hincrby_failed_inc(global_stat); - } - - return; -} - //return success(0) while any avl tree insert success -int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time) +static int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time) { struct shaping_node *s_node = (struct shaping_node*)sf; struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor]; @@ -271,20 +259,10 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un pkt_wrapper = shaper_first_pkt_get(sf); assert(pkt_wrapper != NULL); - if ((sf->flag & SESSION_UPDATE_PF_PRIO_LEN) == 0) { - if (sf->processed_pkts > CONFIRM_PRIORITY_PKTS) { - sf->flag |= SESSION_UPDATE_PF_PRIO_LEN; - } - } - priority = s_rule_info->primary.priority; avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns); if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) { ret = 0; - if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) { - shaper_global_stat_async_invoke_inc(ctx->global_stat); - swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->primary.id, priority); - } } if (s_rule_info->borrowing_num == 0) {// no borrow profile @@ -296,10 +274,8 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns); if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) { ret = 0; - if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) { - shaper_global_stat_async_invoke_inc(ctx->global_stat); - swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->borrowing[i].id, priority); - } + //TODO: calculate queue_len for borrow profile and add judge when refresh stat???? + //shaper_stat_queueing_pkt_inc(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index); } } @@ -320,7 +296,7 @@ static unsigned long long shaper_pkt_latency_us_calculate(struct shaping_profile return (curr_time - enqueue_time); } -void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) +static void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) { struct shaping_node *s_node = (struct shaping_node*)sf; struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor]; @@ -339,10 +315,6 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) priority = s_rule_info->primary.priority; if (avl_node_in_tree(s_node->avl_node[priority])) { avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); - if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) { - shaper_global_stat_async_invoke_inc(ctx->global_stat); - swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->primary.id, priority); - } } if (s_rule_info->borrowing_num == 0) { @@ -353,10 +325,7 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) priority = s_rule_info->borrowing[i].priority; if (avl_node_in_tree(s_node->avl_node[priority])) { avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); - if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) { - shaper_global_stat_async_invoke_inc(ctx->global_stat); - swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->borrowing[i].id, priority); - } + //TODO: calculate queue_len for borrow profile and add judge when refresh stat } } @@ -368,7 +337,7 @@ END: return; } -int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num) +static int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num) { struct avl_node *avl_node = NULL; int count = 0; @@ -691,8 +660,13 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi shaper_flow_pop(ctx, sf); goto DROP; } - } - /*todo: AQM, just for primary profile*/ + + if (shaper_aqm_need_drop(pf_container[0].pf_info, pkt_wrapper)) { + shaper_flow_pop(ctx, sf); + goto DROP; + } + } + /*todo: AQM*/ for (int i = 0; i < profile_num; i++) { profile = pf_container[i].pf_info; @@ -819,7 +793,7 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ break; } - shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 0); + shaper_stat_refresh(ctx, sf, ctx->thread_index, 0); if (shaper_queue_empty(sf)) { if (sf->flag & SESSION_CLOSE) { @@ -899,7 +873,7 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu } END: - shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 0); + shaper_stat_refresh(ctx, sf, ctx->thread_index, 0); if(sf->flag & SESSION_CLOSE) { if (shaper_queue_empty(sf)) { char *addr_str = addr_tuple4_to_str(&sf->tuple4); @@ -1046,7 +1020,7 @@ void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx) return; } -int shaper_global_conf_init(struct shaping_system_conf *conf) +static int shaper_global_conf_init(struct shaping_system_conf *conf) { int ret; int array_num; diff --git a/shaping/src/shaper_aqm.cpp b/shaping/src/shaper_aqm.cpp new file mode 100644 index 0000000..e7b00e5 --- /dev/null +++ b/shaping/src/shaper_aqm.cpp @@ -0,0 +1,177 @@ +#include <cstdlib> +#include <time.h> + +#include "shaper.h" +#include "shaper_aqm.h" + + +/**************************blue******************************/ +#define PROBABILITY_MAX 100 +#define INCREMENT 10 +#define DECREMENT 1 +#define FREEZE_TIME 3 //unit:s + +static int shaper_aqm_blue_need_drop(int profile_id, struct shaper_aqm_blue_para *para) +{ + time_t curr_time; + if (time(&curr_time) - para->update_time >= FREEZE_TIME) { + para->update_time = curr_time; + if (para->queue_len >= para->queue_len_max) { + para->probability = (para->probability + INCREMENT) > PROBABILITY_MAX ? PROBABILITY_MAX : (para->probability + INCREMENT); + } else if (para->queue_len == 0) { + para->probability = (para->probability - DECREMENT) >= 0 ? (para->probability - DECREMENT) : 0; + } + } + + if (rand() / PROBABILITY_MAX < para->probability) { + return 1; + } + + return 0; +} +/**************************blue*****************************/ + +#if 0 +/**************************stochastic fair blue*****************************/ +/* + * SFB uses two B[l][n] : L x N arrays of bins (L levels, N bins per level) + * This implementation uses L = 8 and N = 16 + * This permits us to split one 32bit hash (provided per packet by rxhash or + * external classifier) into 8 subhashes of 4 bits. + */ +#define SFB_BUCKET_SHIFT 4 +#define SFB_NUMBUCKETS (1 << SFB_BUCKET_SHIFT) /* N bins per Level */ +#define SFB_BUCKET_MASK (SFB_NUMBUCKETS - 1) +#define SFB_LEVELS (32 / SFB_BUCKET_SHIFT) /* L */ + +struct sfb_bucket { + int queue_len; + int probability; +}; +struct shaper_aqm_sfb_para { + struct sfb_bucket bins[SFB_LEVELS][SFB_NUMBUCKETS]; +}; +/**************************stochastic fair blue*****************************/ +#endif + + +#if 0 +/**************************codel*****************************/ +#define CODEL_MAX_LATENCY 1500000 //unit:us + +#define REC_INV_SQRT_BITS (8 * sizeof(unsigned short)) /* or sizeof_in_bits(rec_inv_sqrt) */ +/* needed shift to get a Q0.32 number from rec_inv_sqrt */ +#define REC_INV_SQRT_SHIFT (32 - REC_INV_SQRT_BITS) + +static void shaper_aqm_codel_enqueue() +{ + return; +} + +static void shaper_aqm_codel_Newton_step(struct codel_vars *vars) +{ + unsigned int invsqrt = ((unsigned int)vars->rec_inv_sqrt) << REC_INV_SQRT_SHIFT; + unsigned int invsqrt2 = ((unsigned long long)invsqrt * invsqrt) >> 32; + unsigned long long val = (3LL << 32) - ((unsigned long long)vars->count * invsqrt2); + + val >>= 2; /* avoid overflow in following multiply */ + val = (val * invsqrt) >> (32 - 2 + 1); + + vars->rec_inv_sqrt = val >> REC_INV_SQRT_SHIFT; +} + +static inline unsigned int reciprocal_scale(unsigned int val, unsigned int ep_ro) +{ + return (unsigned int)(((unsigned long long) val * ep_ro) >> 32); +} + +static unsigned long long shaper_aqm_codel_control_law(unsigned long long t, + unsigned long long interval, + unsigned int rec_inv_sqrt) +{ + return t + reciprocal_scale(interval, rec_inv_sqrt << REC_INV_SQRT_SHIFT); +} + +static int shaper_aqm_codel_need_drop(struct shaper_aqm_codel_para *para, struct shaping_profile_info *profile) +{ + struct timespec time; + unsigned long long curr_time; + + clock_gettime(CLOCK_MONOTONIC, &time); + curr_time = time.tv_sec * MICRO_SECONDS_PER_SEC + time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; + + if (curr_time - profile->enqueue_time_us < CODEL_MAX_LATENCY) { + //TODO:swarmkv set first above time to 0 + return 0; + } + + if (first_above_time_us == 0) { + first_above_time_us = curr_time + para->interval;//set in swarmkv + return 0; + } else if (curr_time > first_above_time_us) { + return 1; + } +} + +static int shaper_aqm_codel_dequeue_action() +{ + int need_drop = shaper_aqm_codel_need_drop(); + + if (dropping_state) { + if (!need_drop) { + dropping_state = 0; + } else if (curr_time > drop_next) { + drop_count++; + shaper_aqm_codel_Newton_step(); + drop_next = shaper_aqm_codel_control_law(); + } + } else if (need_drop) { + dropping_state = 1; + delta = drop_count - last_drop_count; + if (delta > 1 && (curr_time - drop_next) < 16 * interval) { + drop_count = delta; + shaper_aqm_codel_Newton_step(); + } else { + drop_count = 1; + rec_inv_sqrt = ~0U >> REC_INV_SQRT_SHIFT; + } + + last_drop_count = drop_count; + drop_next = shaper_aqm_codel_control_law(); + } +} +/**************************codel*****************************/ +#endif + + +int shaper_aqm_enqueue(struct shaping_profile_info *profile) +{ + switch (profile->aqm_type) { + case AQM_TYPE_BLUE: + if (shaper_aqm_blue_need_drop(profile->id, &profile->aqm_para.blue_para)) { + return AQM_ACTION_DROP; + } else { + return AQM_ACTION_PASS; + } + case AQM_TYPE_CODEL: + default: + return AQM_ACTION_PASS; + } +} + +int shaper_aqm_dequeue() +{ + return AQM_ACTION_PASS; +} + +int shaper_aqm_need_drop(struct shaping_profile_info *profile, struct shaping_packet_wrapper *pkt_wrapper) +{ + switch (profile->aqm_type) { + case AQM_TYPE_BLUE: + + case AQM_TYPE_CODEL: + + default: + return 0; + } +}
\ No newline at end of file diff --git a/shaping/src/shaper_maat.cpp b/shaping/src/shaper_maat.cpp index 4b4f21f..64db3e6 100644 --- a/shaping/src/shaper_maat.cpp +++ b/shaping/src/shaper_maat.cpp @@ -381,7 +381,7 @@ void shaper_rules_update(struct shaping_thread_ctx *ctx, struct shaping_flow *sf } if (sf->rule_num > 0 && priority_changed) { - shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 1); + shaper_stat_refresh(ctx, sf, ctx->thread_index, 1); } sf->rule_num += rule_num; diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp index e7d03a1..71965fa 100644 --- a/shaping/src/shaper_stat.cpp +++ b/shaping/src/shaper_stat.cpp @@ -4,12 +4,14 @@ #include <sys/socket.h> #include <arpa/inet.h> #include <MESA/MESA_prof_load.h> +#include <MESA/swarmkv.h> #include <fieldstat.h> #include "log.h" #include "utils.h" #include "shaper.h" #include "shaper_stat.h" +#include "shaper_global_stat.h" #define SHAPER_STAT_ROW_NAME "traffic_shaping_rule_hits" @@ -131,9 +133,23 @@ static void shaper_stat_tags_build(int vsys_id, int rule_id, int profile_id, int return; } -static void shaper_stat_profile_metirc_refresh(struct shaping_stat *stat, int vsys_id, int thread_id, int rule_id, struct shaping_profile_info *profile, int profile_type, int need_update_guage) +static void shaper_stat_swarmkv_hincrby_cb(const struct swarmkv_reply *reply, void * cb_arg) +{ + struct shaping_global_stat *global_stat = (struct shaping_global_stat *)cb_arg; + + shaper_global_stat_async_callback_inc(global_stat); + + if (reply->type != SWARMKV_REPLY_INTEGER) { + shaper_global_stat_async_hincrby_failed_inc(global_stat); + } + + return; +} + +static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, int vsys_id, int thread_id, int rule_id, struct shaping_profile_info *profile, int profile_type, int need_update_guage) { struct shaping_stat_for_profile *profile_stat = &profile->stat; + struct shaping_stat *stat = ctx->stat; unsigned long long old_latency; shaper_stat_tags_build(vsys_id, rule_id, profile->id, profile->priority, profile_type); @@ -158,6 +174,8 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_stat *stat, int vs if (need_update_guage) { fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_QUEUE_LEN_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.queue_len, tags, TAG_IDX_MAX, thread_id); fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_QUEUE_LEN_IDX], SHAPER_STAT_ROW_NAME, profile_stat->out.queue_len, tags, TAG_IDX_MAX, thread_id); + shaper_global_stat_async_invoke_inc(ctx->global_stat); + swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d %lld", profile->id, profile->priority, profile_stat->in.queue_len + profile_stat->out.queue_len); memset(profile_stat, 0, sizeof(struct shaping_stat_for_profile)); } else { profile_stat->in.pkts = 0; @@ -174,7 +192,7 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_stat *stat, int vs return; } -void shaper_stat_refresh(struct shaping_stat *stat, struct shaping_flow *sf, int thread_id, int force) +void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int thread_id, int force) { struct shaping_rule_info *rule; struct timespec curr_time; @@ -199,10 +217,10 @@ void shaper_stat_refresh(struct shaping_stat *stat, struct shaping_flow *sf, int for (int i = 0; i < sf->rule_num; i++) { rule = &sf->matched_rule_infos[i]; - shaper_stat_profile_metirc_refresh(stat, rule->vsys_id, thread_id, rule->id, &rule->primary, PROFILE_IN_RULE_TYPE_PRIMARY, need_update_guage); + shaper_stat_profile_metirc_refresh(ctx, rule->vsys_id, thread_id, rule->id, &rule->primary, PROFILE_IN_RULE_TYPE_PRIMARY, need_update_guage); for (int j = 0; j < rule->borrowing_num; j++) { - shaper_stat_profile_metirc_refresh(stat, rule->vsys_id, thread_id, rule->id, &rule->borrowing[j], PROFILE_IN_RULE_TYPE_BORROW, need_update_guage); + shaper_stat_profile_metirc_refresh(ctx, rule->vsys_id, thread_id, rule->id, &rule->borrowing[j], PROFILE_IN_RULE_TYPE_BORROW, need_update_guage); } } diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp index a1a9b77..c292f91 100644 --- a/shaping/test/gtest_shaper.cpp +++ b/shaping/test/gtest_shaper.cpp @@ -341,7 +341,7 @@ TEST(single_session, tcp_tx_in_order) /***********send stat data here********************/ stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric - shaper_stat_refresh(ctx->thread_ctx[0].stat, sf, ctx->thread_ctx[0].thread_index, 1); + shaper_stat_refresh(&ctx->thread_ctx[0], sf, ctx->thread_ctx[0].thread_index, 1); fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy sleep(2);//wait telegraf generate metric @@ -1578,7 +1578,7 @@ TEST(statistics, udp_queueing_pkt) /***********send stat data here********************/ stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric - shaper_stat_refresh(ctx->thread_ctx[0].stat, sf, ctx->thread_ctx[0].thread_index, 1); + shaper_stat_refresh(&ctx->thread_ctx[0], sf, ctx->thread_ctx[0].thread_index, 1); fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy shaper_global_stat_refresh(ctx->global_stat); sleep(2);//wait telegraf generate metric |
