#include #include #include #include #include #include #include extern "C" { #include "libavl.h" } #include "log.h" #include "session_table.h" #include "addr_tuple4.h" #include "raw_packet.h" #include "shaper.h" #include "shaper_stat.h" #include "utils.h" #include "shaper_marsio.h" #include "shaper_session.h" #include "shaper_swarmkv.h" #include "shaper_maat.h" #include "shaper_global_stat.h" #include "shaper_aqm.h" #define TOKEN_MULTIPLE_UPDATE_INTERVAL_S 1 #define TOKEN_MULTIPLE_DEFAULT 10 #define TOKEN_GET_FAILED_INTERVAL_MS 1 #define HMGET_REQUEST_INTERVAL_MS 10 #define PRIORITY_BLOCK_MIN_TIME_MS 50 #define PROFILE_HASH_NODE_REFRESH_MS 500 #define SWARMKV_CALLER_LOOP_DIVISOR_MIN 1 #define SWARMKV_CALLER_LOOP_DIVISOR_MAX 10 #define SWARMKV_QUEUE_LEN_GET_CMD "HMGET tsg-shaping-%d priority-0 priority-1 priority-2 priority-3 priority-4 priority-5 priority-6 priority-7 priority-8 priority-9" struct shaper {//trees in one thread struct avl_tree *priority_trees[SHAPING_PRIORITY_NUM_MAX];//represent 10 avl tree corresponding to 10 priority }; struct shaping_node {//a session will have 10 nodes, corresponding 10 avl tree struct shaping_flow shaping_flow; struct avl_node *avl_node[SHAPING_PRIORITY_NUM_MAX]; }; struct shaping_profile_container { struct shaping_profile_info *pf_info; int pf_type; }; enum shaper_token_get_result { SHAPER_TOKEN_GET_FAILED = -1, SHAPER_TOKEN_GET_SUCCESS = 0, SHAPER_TOKEN_GET_PASS = 1,//don't need to get token, regard as success }; thread_local struct shaping_profile_hash_node *thread_sp_hashtbl = NULL; thread_local static int thread_swarmkv_cb_cnt = 0; struct shaper* shaper_new(unsigned int priority_queue_len_max) { struct shaper *sp = NULL; int i; sp = (struct shaper*)calloc(1, sizeof(struct shaper)); if (!sp) { goto ERROR; } for (i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) { sp->priority_trees[i] = avl_tree_init(priority_queue_len_max); if (!sp->priority_trees[i]) { goto ERROR; } } return sp; ERROR: shaper_free(sp); return NULL; } void shaper_free(struct shaper *sp) { int i; if (sp) { for (i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) { if (sp->priority_trees[i]) { avl_tree_destroy(sp->priority_trees[i]); } } free(sp); } return; } static void shaping_node_free(struct shaping_node *s_node) { int i; if (s_node) { for (i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) { if (s_node->avl_node[i]) { avl_tree_node_free(s_node->avl_node[i]); } } if (s_node->shaping_flow.src_ip_str) { free(s_node->shaping_flow.src_ip_str); } if (s_node->shaping_flow.ctrl_meta.raw_data) { free(s_node->shaping_flow.ctrl_meta.raw_data); } free(s_node); } return; } struct shaping_flow* shaping_flow_new(struct shaping_thread_ctx *ctx) { struct shaping_node *s_node = NULL; int i; s_node = (struct shaping_node*)calloc(1, sizeof(struct shaping_node)); if (!s_node) { goto ERROR; } for (i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) { s_node->avl_node[i] = avl_tree_node_new(0, &s_node->shaping_flow, NULL); if (!s_node->avl_node[i]) { goto ERROR; } } TAILQ_INIT(&s_node->shaping_flow.packet_queue); s_node->shaping_flow.priority = SHAPING_PRIORITY_NUM_MAX - 1; s_node->shaping_flow.ref_cnt = 1; return &s_node->shaping_flow; ERROR: shaping_node_free(s_node); return NULL; } void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) { sf->ref_cnt--; if (sf->ref_cnt > 0) { return; } struct shaping_node *s_node = (struct shaping_node*)sf; shaper_stat_refresh(ctx, sf, 1); shaping_node_free(s_node); return; } void shaper_thread_resource_clear() { struct shaping_profile_hash_node *thread_sp_hashtbl_tmp = NULL; struct shaping_profile_hash_node *node = NULL; HASH_ITER(hh, thread_sp_hashtbl, node, thread_sp_hashtbl_tmp) { HASH_DEL(thread_sp_hashtbl, node); free(node); } } static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, void *pkt_buff, struct metadata *meta, struct timespec *curr_time) { struct shaping_packet_wrapper *s_pkt = NULL; if (sf->queue_len == ctx->conf.session_queue_len_max) { return -1; } s_pkt = (struct shaping_packet_wrapper*)calloc(1, sizeof(struct shaping_packet_wrapper)); if (!s_pkt) { return -1; } s_pkt->pkt_buff = pkt_buff; s_pkt->direction = meta->dir; s_pkt->length = meta->raw_len; s_pkt->rule_anchor = sf->anchor; s_pkt->income_time_ns = curr_time->tv_sec * NANO_SECONDS_PER_SEC + curr_time->tv_nsec; s_pkt->enqueue_time_us = curr_time->tv_sec * MICRO_SECONDS_PER_SEC + curr_time->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; TAILQ_INSERT_TAIL(&sf->packet_queue, s_pkt, node); sf->queue_len++; return 0; } bool shaper_queue_empty(struct shaping_flow *sf) { return TAILQ_EMPTY(&sf->packet_queue); } struct shaping_packet_wrapper* shaper_first_pkt_get(struct shaping_flow *sf) { return TAILQ_FIRST(&sf->packet_queue); } void shaper_packet_dequeue(struct shaping_flow *sf) { struct shaping_packet_wrapper *s_pkt; s_pkt = TAILQ_FIRST(&sf->packet_queue); if (s_pkt) { TAILQ_REMOVE(&sf->packet_queue, s_pkt, node); sf->queue_len--; free(s_pkt); } return; } void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx) { struct shaping_packet_wrapper *pkt_wrapper; struct shaping_rule_info *rule = &sf->matched_rule_infos[0]; while (!shaper_queue_empty(sf)) { pkt_wrapper = shaper_first_pkt_get(sf); shaper_stat_queueing_pkt_dec(&rule->primary.stat, pkt_wrapper->direction, ctx->thread_index); shaper_stat_drop_inc(&rule->primary.stat, pkt_wrapper->length, ctx->thread_index); shaper_global_stat_queueing_dec(&ctx->thread_global_stat, pkt_wrapper->length); shaper_global_stat_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length); shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length); marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index); shaper_packet_dequeue(sf); } return; } //return success(0) while any avl tree insert success static int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time_us) { struct shaping_node *s_node = (struct shaping_node*)sf; struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor]; struct shaper *sp = ctx->sp; struct shaping_packet_wrapper *pkt_wrapper = NULL; int priority; int i; pkt_wrapper = shaper_first_pkt_get(sf); assert(pkt_wrapper != NULL); priority = s_rule_info->primary.priority; avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns); if (0 != avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {//primary profile failed means flow push failed, ignore borrow profile return -1; } if (s_rule_info->borrowing_num == 0) {// no borrow profile goto END; } for (i = 0; i < s_rule_info->borrowing_num; i++) { priority = s_rule_info->borrowing[i].priority; avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns); if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) { shaper_stat_queueing_pkt_inc(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index); } } END: s_rule_info->primary.enqueue_time_us = enqueue_time_us; shaper_stat_queueing_pkt_inc(&s_rule_info->primary.stat, pkt_wrapper->direction, ctx->thread_index); return 0; } static unsigned long long shaper_pkt_latency_us_calculate(struct shaping_profile_info *profile, struct timespec *time) { unsigned long long enqueue_time = profile->enqueue_time_us; unsigned long long curr_time = time->tv_sec * MICRO_SECONDS_PER_SEC + time->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; assert(curr_time >= enqueue_time); return (curr_time - enqueue_time); } static void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct timespec *curr_time) { struct shaping_node *s_node = (struct shaping_node*)sf; struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor]; struct shaper *sp = ctx->sp; struct shaping_packet_wrapper *pkt_wrapper = NULL; unsigned long long latency; int priority; int i; pkt_wrapper = shaper_first_pkt_get(sf); assert(pkt_wrapper != NULL); priority = s_rule_info->primary.priority; if (avl_node_in_tree(s_node->avl_node[priority])) { avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); } if (s_rule_info->borrowing_num == 0) { goto END; } for (i = 0; i < s_rule_info->borrowing_num; i++) { priority = s_rule_info->borrowing[i].priority; if (avl_node_in_tree(s_node->avl_node[priority])) { avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); shaper_stat_queueing_pkt_dec(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index); } } END: latency = shaper_pkt_latency_us_calculate(&s_rule_info->primary, curr_time); shaper_stat_max_latency_update(&s_rule_info->primary.stat, pkt_wrapper->direction, latency, ctx->thread_index); shaper_stat_queueing_pkt_dec(&s_rule_info->primary.stat, pkt_wrapper->direction, ctx->thread_index); return; } static void shaper_flow_specific_borrow_priority_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int priority) { struct shaping_node *s_node = (struct shaping_node*)sf; struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor]; struct shaper *sp = ctx->sp; struct shaping_packet_wrapper *pkt_wrapper = NULL; pkt_wrapper = shaper_first_pkt_get(sf); assert(pkt_wrapper != NULL); for (int i = 0; i < s_rule_info->borrowing_num; i++) { if (priority == s_rule_info->borrowing[i].priority) { if (avl_node_in_tree(s_node->avl_node[priority])) { avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); shaper_stat_queueing_pkt_dec(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index); } } } return; } int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num) { struct avl_node *avl_node = NULL; int count = 0; if (max_sf_num <= 0) { return 0; } avl_node = avl_tree_minimum_node_get(sp->priority_trees[priority]); while(avl_node) { sf_ins[count].sf = (struct shaping_flow*)avl_tree_node_data_get(avl_node); sf_ins[count].priority = priority; count++; if (count == max_sf_num) { return count; } avl_node = avl_tree_next_in_order_node_get(avl_node); } return count; } static void shaper_profile_async_pass_set(struct shaping_profile_info *profile, unsigned char direction, int priority, int async_pass_enabled) { struct shaping_profile_hash_node *pf_hash_node = profile->hash_node; unsigned char *async_pass = NULL; if (profile->type == PROFILE_TYPE_GENERIC) { async_pass = &pf_hash_node->async_pass[priority][direction]; } else { async_pass = &profile->async_pass[direction]; } if (*async_pass != async_pass_enabled) { *async_pass = async_pass_enabled; } return; } static int shaper_profile_async_pass_get(struct shaping_profile_info *profile, unsigned char direction, int priority) { struct shaping_profile_hash_node *pf_hash_node = profile->hash_node; if (profile->type == PROFILE_TYPE_GENERIC) { return pf_hash_node->async_pass[priority][direction]; } else { return profile->async_pass[direction]; } } static void shaper_token_multiple_update(struct shaping_thread_ctx *ctx, struct shaping_profile_info *profile) { if (profile->type != PROFILE_TYPE_GENERIC) { return; } struct shaper_token_multiple *token_multiple = &profile->hash_node->token_multiple; int curr_multiple = token_multiple->token_get_multiple; time_t curr_time_s = time(NULL); int token_multiple_min = ctx->conf.token_multiple_min; int token_multiple_max = ctx->conf.token_multiple_max; if (curr_time_s - token_multiple->token_multiple_update_time_s < TOKEN_MULTIPLE_UPDATE_INTERVAL_S) { return; } token_multiple->token_multiple_update_time_s = curr_time_s; if (token_multiple->has_failed_get_token) { token_multiple->token_get_multiple = (curr_multiple - 1) < token_multiple_min ? token_multiple_min : (curr_multiple - 1); goto END; } if (token_multiple->token_not_enough) { token_multiple->token_get_multiple = (curr_multiple + 1) > token_multiple_max ? token_multiple_max : (curr_multiple + 1); goto END; } END: LOG_INFO("%s: profile id %d, token_get_multiple %d, has_failed_get_token %d, token_not_enough %d", LOG_TAG_SHAPING, profile->id, token_multiple->token_get_multiple, token_multiple->has_failed_get_token, token_multiple->token_not_enough); token_multiple->has_failed_get_token = 0; token_multiple->token_not_enough = 0; return; } static void shaper_deposit_token_add(struct shaping_thread_ctx *ctx, struct shaping_profile_info *profile, int req_token_bits, unsigned char direction, int priority) { long long *deposit_token; struct shaping_profile_hash_node *pf_hash_node = profile->hash_node; switch (profile->type) { case PROFILE_TYPE_GENERIC: if (pf_hash_node->limit_direction == PROFILE_LIMIT_DIRECTION_BIDIRECTION) { deposit_token = &pf_hash_node->bidirection_deposit_token_bits[priority]; } else if (direction == SHAPING_DIR_IN) { deposit_token = &pf_hash_node->in_deposit_token_bits[priority]; } else { deposit_token = &pf_hash_node->out_deposit_token_bits[priority]; } break; case PROFILE_TYPE_HOST_FARINESS: case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS: case PROFILE_TYPE_SPLIT_BY_LOCAL_HOST: if (pf_hash_node->limit_direction == PROFILE_LIMIT_DIRECTION_BIDIRECTION) { deposit_token = &profile->bidirection_deposit_token_bits; } else if (direction == SHAPING_DIR_IN) { deposit_token = &profile->in_deposit_token_bits; } else { deposit_token = &profile->out_deposit_token_bits; } break; default: LOG_ERROR("%s: invalid profile type %d, profile id %d", LOG_TAG_SHAPING, profile->type, profile->id); return; } *deposit_token += req_token_bits; if (*deposit_token > 0) { shaper_profile_async_pass_set(profile, direction, priority, 1); } else { pf_hash_node->token_multiple.token_not_enough = 1; shaper_token_multiple_update(ctx, profile); } } static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg) { struct shaping_tconsume_cb_arg *arg = (struct shaping_tconsume_cb_arg*)cb_arg; struct shaping_thread_ctx *ctx = arg->ctx; struct shaping_profile_info *profile = arg->profile; struct shaping_profile_hash_node *pf_hash_node = profile->hash_node; struct shaping_flow *sf = arg->sf; struct timespec curr_time; long long curr_time_us; thread_swarmkv_cb_cnt++; clock_gettime(CLOCK_MONOTONIC, &curr_time); curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; shaper_global_stat_swarmkv_latency_update(ctx->ref_ctx->global_stat, curr_time_us - arg->start_time_us); shaper_global_stat_async_callback_inc(&ctx->thread_global_stat); shaper_global_stat_tconsume_callback_inc(&ctx->thread_global_stat); LOG_DEBUG("Swarmkv reply type =%d, profile_id %d, direction =%d, integer =%llu",reply->type, profile->id, arg->direction, reply->integer); if (reply->type != SWARMKV_REPLY_INTEGER) { shaper_global_stat_async_tconsume_failed_inc(&ctx->thread_global_stat); goto END; } if (reply->integer < 0) {//profile not exist pf_hash_node->is_invalid = 1; goto END; } else { pf_hash_node->is_invalid = 0; } if (reply->integer > 0) { shaper_deposit_token_add(ctx, profile, reply->integer, arg->direction, profile->priority);//deposit tokens to profile } if (reply->integer == 0) { shaper_profile_async_pass_set(profile, arg->direction, profile->priority, 0); if (profile->type == PROFILE_TYPE_GENERIC) { pf_hash_node->token_multiple.has_failed_get_token = 1; shaper_token_multiple_update(ctx, profile); } } END: pf_hash_node->tconsume_ref_cnt--; if (reply->type != SWARMKV_REPLY_INTEGER || reply->integer == 0) { switch (profile->type) { case PROFILE_TYPE_GENERIC: pf_hash_node->last_failed_get_token_ms[arg->direction] = curr_time.tv_sec * MILLI_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MILLI_SEC; break; case PROFILE_TYPE_HOST_FARINESS: case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS: case PROFILE_TYPE_SPLIT_BY_LOCAL_HOST: profile->last_failed_get_token_ms[arg->direction] = curr_time.tv_sec * MILLI_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MILLI_SEC; break; } } shaping_flow_free(ctx, sf); free(cb_arg); cb_arg = NULL; return; } static int shaper_deposit_token_get(struct shaping_profile_info *profile, int req_token_bits, unsigned char direction, int priority, int force, int *need_get_token) { long long *deposit_token; struct shaping_profile_hash_node *pf_hash_node = profile->hash_node; int ret = -1; int token_multiple; switch (profile->type) { case PROFILE_TYPE_GENERIC: if (pf_hash_node->limit_direction == PROFILE_LIMIT_DIRECTION_BIDIRECTION) { deposit_token = &pf_hash_node->bidirection_deposit_token_bits[priority]; } else if (direction == SHAPING_DIR_IN) { deposit_token = &pf_hash_node->in_deposit_token_bits[priority]; } else { deposit_token = &pf_hash_node->out_deposit_token_bits[priority]; } token_multiple = pf_hash_node->token_multiple.token_get_multiple; break; case PROFILE_TYPE_HOST_FARINESS: case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS: if (pf_hash_node->limit_direction == PROFILE_LIMIT_DIRECTION_BIDIRECTION) { deposit_token = &profile->bidirection_deposit_token_bits; } else if (direction == SHAPING_DIR_IN) { deposit_token = &profile->in_deposit_token_bits; } else { deposit_token = &profile->out_deposit_token_bits; } token_multiple = 1; break; case PROFILE_TYPE_SPLIT_BY_LOCAL_HOST: if (pf_hash_node->limit_direction == PROFILE_LIMIT_DIRECTION_BIDIRECTION) { deposit_token = &profile->bidirection_deposit_token_bits; } else if (direction == SHAPING_DIR_IN) { deposit_token = &profile->in_deposit_token_bits; } else { deposit_token = &profile->out_deposit_token_bits; } token_multiple = TOKEN_MULTIPLE_DEFAULT; break; default: LOG_ERROR("%s: invalid profile type %d, profile id %d", LOG_TAG_SHAPING, profile->type, profile->id); return 0; } if (*deposit_token < req_token_bits) { *need_get_token = 1; } if (force || *deposit_token >= req_token_bits) { *deposit_token -= req_token_bits; ret = 0; } if (*deposit_token + (req_token_bits * token_multiple * 2) < 0) { shaper_profile_async_pass_set(profile, direction, priority, 0); } return ret; } static void shaper_profile_hash_node_refresh(struct shaping_thread_ctx *ctx, struct shaping_profile_hash_node *pf_hash_node, struct timespec *curr_timespec) { unsigned long long curr_time_ms = curr_timespec->tv_sec * MILLI_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MILLI_SEC; if (curr_time_ms - pf_hash_node->last_refresh_time_ms < PROFILE_HASH_NODE_REFRESH_MS) { return; } struct shaping_profile *profile = shaper_maat_profile_get(ctx, pf_hash_node->id); if (profile) { pf_hash_node->limit_direction = profile->limit_direction; pf_hash_node->aqm_type = profile->aqm_type; } pf_hash_node->last_refresh_time_ms = curr_time_ms; return; } static void shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *pf_info, int req_token_bits, unsigned char direction, struct timespec *curr_timespec) { struct shaping_tconsume_cb_arg *arg = NULL; struct shaping_profile_hash_node *pf_hash_node = pf_info->hash_node; char key[32] = {0}; if (pf_hash_node->tconsume_ref_cnt > 0) { return; } shaper_profile_hash_node_refresh(ctx, pf_hash_node, curr_timespec); if (pf_hash_node->limit_direction == PROFILE_LIMIT_DIRECTION_BIDIRECTION) { snprintf(key, sizeof(key), "tsg-shaping-%d-bidirectional", pf_info->id); } else { snprintf(key, sizeof(key), "tsg-shaping-%d-%s", pf_info->id, direction == SHAPING_DIR_OUT ? "outgoing" : "incoming"); } arg = (struct shaping_tconsume_cb_arg *)calloc(1, sizeof(struct shaping_tconsume_cb_arg)); arg->ctx = ctx; arg->profile = pf_info; arg->sf = sf; arg->direction = direction; arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat); sheper_global_stat_tconsume_invoke_inc(&ctx->thread_global_stat); sf->ref_cnt++; pf_hash_node->tconsume_ref_cnt++; switch (pf_info->type) { case PROFILE_TYPE_GENERIC: swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits * pf_hash_node->token_multiple.token_get_multiple, shaper_token_get_cb, arg); break; case PROFILE_TYPE_HOST_FARINESS: case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS: swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits, shaper_token_get_cb, arg); //TODO: ftconsume with flexiable //swarmkv_async_command(ctx->swarmkv_db, shaper_token_get_cb, arg, "FTCONSUME %s %s %d %d %s", key, sf->src_ip_str, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits, "FLEXIBLE"); break; case PROFILE_TYPE_SPLIT_BY_LOCAL_HOST: swarmkv_btconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, req_token_bits * TOKEN_MULTIPLE_DEFAULT, shaper_token_get_cb, arg); break; default: if (arg) { free(arg); } break; } return; } static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb_arg) { struct shaping_hmget_cb_arg *arg = (struct shaping_hmget_cb_arg *)cb_arg; struct shaping_thread_ctx *ctx = arg->ctx; struct shaping_profile_hash_node *pf_hash_node = arg->pf_hash_node; struct timespec curr_time; long long curr_time_us; long long curr_time_ms; thread_swarmkv_cb_cnt++; clock_gettime(CLOCK_MONOTONIC, &curr_time); curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; curr_time_ms = curr_time_us / 1000; shaper_global_stat_swarmkv_latency_update(ctx->ref_ctx->global_stat, curr_time_us - arg->start_time_us); shaper_global_stat_async_callback_inc(&ctx->thread_global_stat); shaper_global_stat_hmget_callback_inc(&ctx->thread_global_stat); if (!reply || (reply->type != SWARMKV_REPLY_NIL && reply->type != SWARMKV_REPLY_ARRAY)) { shaper_global_stat_async_hmget_failed_inc(&ctx->thread_global_stat); goto END; } if (reply->type != SWARMKV_REPLY_ARRAY) { goto END; } for (unsigned int i = 0; i < reply->n_element; i++) { if (reply->elements[i]->type == SWARMKV_REPLY_STRING) { char tmp_str[32] = {0}; memcpy(tmp_str, reply->elements[i]->str, reply->elements[i]->len); pf_hash_node->queue_len[i] = strtoll(tmp_str, NULL, 10); } else { pf_hash_node->queue_len[i] = 0; } } END: pf_hash_node->last_hmget_ms = curr_time_ms; pf_hash_node->hmget_ref_cnt--; free(cb_arg); cb_arg = NULL; } static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, struct timespec *curr_timespec, long long curr_time_ms) { struct shaping_hmget_cb_arg *arg; int priority = profile->priority; if (priority == 0) {//highest priority, can't be blocked return 0; } if (profile->type == PROFILE_TYPE_SPLIT_BY_LOCAL_HOST) {//split-by profile don't need to check priority return 0; } if (profile->hash_node->hmget_ref_cnt > 0) {//if hmget command is pending, don't send hmget command again goto END; } if (curr_time_ms - profile->hash_node->last_hmget_ms < HMGET_REQUEST_INTERVAL_MS) {//don't send hmget command in 10 ms goto END; } arg = (struct shaping_hmget_cb_arg *)calloc(1, sizeof(struct shaping_hmget_cb_arg)); arg->ctx = ctx; arg->pf_hash_node = profile->hash_node; arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; profile->hash_node->hmget_ref_cnt++; shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat); shaper_global_stat_hmget_invoke_inc(&ctx->thread_global_stat); swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, SWARMKV_QUEUE_LEN_GET_CMD, profile->id); for (int i = 0; i < priority; i++) { if (profile->hash_node->queue_len[i] > 0) { profile->hash_node->priority_blocked_time_ms[priority] = curr_time_ms; goto END; } } END: if (curr_time_ms - profile->hash_node->priority_blocked_time_ms[priority] < PRIORITY_BLOCK_MIN_TIME_MS) { shaper_profile_async_pass_set(profile, SHAPING_DIR_OUT, priority, 0); shaper_profile_async_pass_set(profile, SHAPING_DIR_IN, priority, 0); return 1; } else { return 0; } } void shaper_profile_hash_node_set(struct shaping_thread_ctx *ctx, struct shaping_profile_info *profile) { if (profile->hash_node == NULL) { struct shaping_profile_hash_node *hash_node = NULL; HASH_FIND_INT(thread_sp_hashtbl, &profile->id, hash_node); if (hash_node) { profile->hash_node = hash_node; } else { profile->hash_node = (struct shaping_profile_hash_node*)calloc(1, sizeof(struct shaping_profile_hash_node)); profile->hash_node->id = profile->id; profile->hash_node->token_multiple.token_get_multiple = TOKEN_MULTIPLE_DEFAULT; HASH_ADD_INT(thread_sp_hashtbl, id, profile->hash_node); timeout_init(&profile->hash_node->timeout_handle, TIMEOUT_ABS); timeouts_add(ctx->expires, &profile->hash_node->timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC); } } return; } static int shaping_swarmkv_is_too_short_interval(long long curr_time_ms, struct shaping_profile_info *profile, unsigned char direction) { long long last_failed_ms = 0; switch (profile->type) { case PROFILE_TYPE_GENERIC: last_failed_ms = profile->hash_node->last_failed_get_token_ms[direction]; break; case PROFILE_TYPE_HOST_FARINESS: case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS: case PROFILE_TYPE_SPLIT_BY_LOCAL_HOST: last_failed_ms = profile->last_failed_get_token_ms[direction]; break; } if (curr_time_ms - last_failed_ms < TOKEN_GET_FAILED_INTERVAL_MS) {//if failed to get token in last 1ms, it't too short interval; for swarmkv can't reproduce token in 1ms return 1; } else { return 0; } } static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, int profile_type, int req_token_bytes, unsigned char direction, struct timespec *curr_timespec) { struct shaping_rule_info *rule = &sf->matched_rule_infos[sf->anchor]; int need_get_token = 0; int ret = SHAPER_TOKEN_GET_FAILED; time_t curr_time = time(NULL); if (curr_time - sf->check_rule_time >= ctx->conf.check_rule_enable_interval_sec) { sf->check_rule_time = curr_time; if (shaper_rule_is_enabled(ctx, rule->id) != 1) { rule->is_enabled = 0; return SHAPER_TOKEN_GET_PASS;//rule is disabled, don't need to get token and forward packet } else { rule->is_enabled = 1; } } if (rule->is_enabled != 1) { return SHAPER_TOKEN_GET_PASS;//rule is disabled, don't need to get token and forward packet } if (shaper_profile_async_pass_get(profile, direction, profile->priority) == 1) { shaper_deposit_token_get(profile, req_token_bytes * 8, direction, profile->priority, 1, &need_get_token); ret = SHAPER_TOKEN_GET_SUCCESS; } else if (shaper_deposit_token_get(profile, req_token_bytes * 8, direction, profile->priority, 0, &need_get_token) == 0) { ret = SHAPER_TOKEN_GET_SUCCESS; } if (!need_get_token) { return ret; } long long curr_time_ms = curr_timespec->tv_sec * MILLI_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MILLI_SEC; if (shaping_swarmkv_is_too_short_interval(curr_time_ms, profile, direction)) { return ret; } if (shaper_profile_is_priority_blocked(ctx, sf, profile, curr_timespec, curr_time_ms)) { return ret; } int req_token_bits = req_token_bytes * 8; shaper_token_get_from_profile(ctx, sf, profile, req_token_bits, direction, curr_timespec); if (profile->hash_node->is_invalid && profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {//for primary, means this rule don't need get token return SHAPER_TOKEN_GET_SUCCESS; } else { return ret; } } int shaper_profile_get(struct shaping_rule_info *s_rule_info, int priority, struct shaping_profile_container pf_container[]) { int num = 0; if (priority == SHAPING_PRIORITY_NUM_MAX - 1) {//priority 9 allow multi profiles for one priority if (s_rule_info->primary.priority == priority) { pf_container[num].pf_type = PROFILE_IN_RULE_TYPE_PRIMARY; pf_container[num].pf_info = &s_rule_info->primary; num++; } for (int i = 0; i < s_rule_info->borrowing_num; i++) { if (s_rule_info->borrowing[i].priority == priority) { pf_container[num].pf_type = PROFILE_IN_RULE_TYPE_BORROW; pf_container[num].pf_info = &s_rule_info->borrowing[i]; num++; } } return num; } else { if (s_rule_info->primary.priority == priority) { pf_container[0].pf_type = PROFILE_IN_RULE_TYPE_PRIMARY; pf_container[0].pf_info = &s_rule_info->primary; return 1; } for (int i = 0; i < s_rule_info->borrowing_num; i++) { if (s_rule_info->borrowing[i].priority == priority) { pf_container[0].pf_type = PROFILE_IN_RULE_TYPE_BORROW; pf_container[0].pf_info = &s_rule_info->borrowing[i]; return 1; } } } return num; } static int shaper_next_anchor_get(struct shaping_flow *sf, unsigned char direction) { int anchor = sf->anchor + 1; if (anchor > sf->rule_num - 1) { return 0; } return anchor; } static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int priority) { struct shaping_rule_info *rule = NULL; struct shaping_profile_info *profile = NULL; int profile_type; struct shaping_packet_wrapper *pkt_wrapper = NULL; struct shaping_profile_container pf_container[SHAPING_PRIORITY_NUM_MAX]; struct timespec curr_time; unsigned long long enqueue_time_us; unsigned long long latency_us; int get_token_success = 0; int profile_num; rule = &sf->matched_rule_infos[sf->anchor]; profile_num = shaper_profile_get(rule, priority, pf_container); assert(profile_num > 0); pkt_wrapper = shaper_first_pkt_get(sf); assert(pkt_wrapper != NULL); clock_gettime(CLOCK_MONOTONIC, &curr_time); latency_us = shaper_pkt_latency_us_calculate(&rule->primary, &curr_time); if (pf_container[0].pf_type == PROFILE_IN_RULE_TYPE_PRIMARY) { if (latency_us > ctx->conf.pkt_max_delay_time_us) { shaper_flow_pop(ctx, sf, &curr_time); goto DROP; } } for (int i = 0; i < profile_num; i++) { profile = pf_container[i].pf_info; profile_type = pf_container[i].pf_type; /*AQM process, if aqm not pass, for primary profile drop packet, for borrow profile just don't give token to this packet*/ if (shaper_aqm_need_drop(profile, pkt_wrapper, &curr_time, latency_us)) { if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) { shaper_flow_pop(ctx, sf, &curr_time); goto DROP; } else { shaper_flow_specific_borrow_priority_pop(ctx, sf, priority); continue; } } int ret = shaper_token_consume(ctx, sf, profile, profile_type, pkt_wrapper->length, pkt_wrapper->direction, &curr_time); if (ret >= SHAPER_TOKEN_GET_SUCCESS) { if (ret == SHAPER_TOKEN_GET_SUCCESS) { shaper_stat_forward_inc(&profile->stat, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index); } get_token_success = 1; break; } } if (!get_token_success) { return SHAPING_QUEUED; } shaper_flow_pop(ctx, sf, &curr_time); sf->anchor = shaper_next_anchor_get(sf, pkt_wrapper->direction); if (sf->anchor == 0) {//no next rule return SHAPING_FORWARD; } //push sf for next rule enqueue_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; if (0 == shaper_flow_push(ctx, sf, enqueue_time_us)) { return SHAPING_QUEUED; } else { goto DROP; } DROP: rule = &sf->matched_rule_infos[sf->anchor]; shaper_stat_drop_inc(&rule->primary.stat, pkt_wrapper->direction, ctx->thread_index); sf->anchor = 0; return SHAPING_DROP; } static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct metadata *meta, struct shaping_profile_info *profile, marsio_buff_t *rx_buff) { int profile_type = PROFILE_IN_RULE_TYPE_PRIMARY; struct timespec curr_time; unsigned long long enqueue_time_us; int enqueue_success = 0; clock_gettime(CLOCK_MONOTONIC, &curr_time); int ret = shaper_token_consume(ctx, sf, profile, profile_type, meta->raw_len, meta->dir, &curr_time); if (ret >= SHAPER_TOKEN_GET_SUCCESS) { if (ret == SHAPER_TOKEN_GET_SUCCESS) { shaper_stat_forward_inc(&profile->stat, meta->dir, meta->raw_len, ctx->thread_index); } sf->anchor = shaper_next_anchor_get(sf, meta->dir); if (sf->anchor == 0) {//no next rule return SHAPING_FORWARD; } } //get token failed, or have multiple rules, enqueue packet and push sf if (shaper_packet_enqueue(ctx, sf, rx_buff, meta, &curr_time) == 0) { enqueue_success = 1; } else { char *addr_str = addr_tuple4_to_str(&sf->tuple4); LOG_ERROR("%s: shaping enqueue packet failed while queue empty for session: %s", LOG_TAG_SHAPING, addr_str); if (addr_str) { free(addr_str); } goto DROP; } enqueue_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; if (0 == shaper_flow_push(ctx, sf, enqueue_time_us)) { return SHAPING_QUEUED; } else { goto DROP; } DROP: if (enqueue_success) { shaper_packet_dequeue(sf); } struct shaping_profile_info *pf_info = &sf->matched_rule_infos[sf->anchor].primary; shaper_stat_drop_inc(&pf_info->stat, meta->dir, ctx->thread_index); sf->anchor = 0; return SHAPING_DROP; } static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_flow *sf, int priority, struct shaping_stat *stat, struct shaping_thread_ctx *ctx) { struct shaping_packet_wrapper *pkt_wrapper; int old_anchor = sf->anchor; int shaping_ret; pkt_wrapper = shaper_first_pkt_get(sf); assert(pkt_wrapper != NULL); shaping_ret = shaper_pkt_action_decide_queueing(ctx, sf, priority); switch (shaping_ret) { case SHAPING_QUEUED: if (old_anchor == sf->anchor) {//didn't get token return -1; } else {//got token for one rule and waiting get token for next rule return 0; } break; case SHAPING_DROP: shaper_global_stat_queueing_dec(&ctx->thread_global_stat, pkt_wrapper->length); shaper_global_stat_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length); shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length); marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index); shaper_packet_dequeue(sf); break; case SHAPING_FORWARD: shaper_global_stat_queueing_dec(&ctx->thread_global_stat, pkt_wrapper->length); shaper_global_stat_throughput_tx_inc(&ctx->thread_global_stat, pkt_wrapper->length); shaper_global_stat_hit_policy_throughput_tx_inc(&ctx->thread_global_stat, pkt_wrapper->length); marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &pkt_wrapper->pkt_buff, 1); shaper_packet_dequeue(sf); break; default: assert(0);//impossible path break; } shaper_stat_refresh(ctx, sf, 0); if (shaper_queue_empty(sf)) { if (sf->flag & SESSION_CLOSE) { sf->flag &= (~SESSION_CLOSE); shaping_flow_free(ctx, sf); } return 0; } else { pkt_wrapper = shaper_first_pkt_get(sf); shaper_stat_queueing_pkt_dec_for_rule(&sf->matched_rule_infos[pkt_wrapper->rule_anchor], pkt_wrapper->direction, ctx->thread_index); sf->anchor = 0; if (shaper_flow_push(ctx, sf, pkt_wrapper->enqueue_time_us) != 0) { shaper_queue_clear(sf, ctx);//first packet fail, then every packet will fail if (sf->flag & SESSION_CLOSE) { sf->flag &= (~SESSION_CLOSE); shaping_flow_free(ctx, sf); } } return 0; } } static void shaper_token_consume_force(struct shaping_flow *sf, struct metadata *meta) { struct shaping_rule_info *rule; int need_get_token = 0; for (int i = 0; i < sf->rule_num; i++) { rule = &sf->matched_rule_infos[i]; if (!rule->is_enabled) { continue; } shaper_deposit_token_get(&rule->primary, meta->raw_len * 8, meta->dir, rule->primary.priority, 1, &need_get_token); } return; } void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_buff, struct metadata *meta, struct shaping_flow *sf) { int shaping_ret; struct shaping_rule_info *s_rule; struct shaping_stat *stat = ctx->stat; struct shaping_marsio_info *marsio_info = ctx->marsio_info; sf->processed_pkts++; if (sf->dscp_enable) { struct ethhdr *eth_hdr = (struct ethhdr*)marsio_buff_mtod(rx_buff); raw_packet_set_dscp(eth_hdr, sf->dscp_value); } if (meta->is_tcp_pure_ctrl) { shaper_token_consume_force(sf, meta); marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1); shaper_global_stat_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len); shaper_global_stat_hit_policy_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len); shaper_global_stat_hit_policy_throughput_tx_syn_ack_inc(&ctx->thread_global_stat); shaper_stat_forward_all_rule_inc(stat, sf, meta->dir, meta->raw_len, ctx->thread_index); goto END;//for tcp pure control pkt, transmit it directly } if (!shaper_queue_empty(sf)) {//already have queueing pkt, enqueue directly struct timespec curr_time; clock_gettime(CLOCK_MONOTONIC, &curr_time); s_rule = &sf->matched_rule_infos[sf->anchor]; if (0 == shaper_packet_enqueue(ctx, sf, rx_buff, meta, &curr_time)) { shaper_stat_queueing_pkt_inc_for_rule(s_rule, meta->dir, ctx->thread_index); shaper_global_stat_queueing_inc(&ctx->thread_global_stat, meta->raw_len); } else { struct shaping_profile_info *pf_info = &s_rule->primary; shaper_stat_drop_inc(&pf_info->stat, meta->dir, ctx->thread_index); shaper_global_stat_drop_inc(&ctx->thread_global_stat, meta->raw_len); shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, meta->raw_len); marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index); } } else {//no queueing pkt, decide action sf->anchor = 0; shaping_ret = shaper_pkt_action_decide_no_queue(ctx, sf, meta, &sf->matched_rule_infos[sf->anchor].primary, rx_buff); switch (shaping_ret) { case SHAPING_QUEUED: shaper_global_stat_queueing_inc(&ctx->thread_global_stat, meta->raw_len); break; case SHAPING_DROP: marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index); shaper_global_stat_drop_inc(&ctx->thread_global_stat, meta->raw_len); shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, meta->raw_len); break; case SHAPING_FORWARD: marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1); shaper_global_stat_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len); shaper_global_stat_hit_policy_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len); break; default: assert(0); break; } } END: shaper_stat_refresh(ctx, sf, 0); if(sf->flag & SESSION_CLOSE) { if (shaper_queue_empty(sf)) { char *addr_str = addr_tuple4_to_str(&sf->tuple4); LOG_DEBUG("%s: shaping free a shaping_flow for session: %s", LOG_TAG_SHAPING, addr_str); sf->flag &= (~SESSION_CLOSE); shaping_flow_free(ctx, sf); if (addr_str) { free(addr_str); } } } return; } void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_thread_ctx *ctx) { static thread_local int swarmkv_caller_loop_divisor = SWARMKV_CALLER_LOOP_DIVISOR_MIN; static thread_local unsigned int polling_cnt = 0; polling_cnt++; if (polling_cnt % swarmkv_caller_loop_divisor == 0) { swarmkv_caller_loop(ctx->swarmkv_db, SWARMKV_LOOP_NONBLOCK, NULL); if (thread_swarmkv_cb_cnt > 0) { swarmkv_caller_loop_divisor = MAX(swarmkv_caller_loop_divisor - 1, SWARMKV_CALLER_LOOP_DIVISOR_MIN); } else { swarmkv_caller_loop_divisor = MIN(swarmkv_caller_loop_divisor + 1, SWARMKV_CALLER_LOOP_DIVISOR_MAX); } thread_swarmkv_cb_cnt = 0; } struct shaping_profile_hash_node *hash_node = NULL; time_t curr_time = time(NULL); int cnt = 0; if (curr_time > ctx->last_update_timeout_sec) { timeouts_update(ctx->expires, curr_time); ctx->last_update_timeout_sec = curr_time; } while (cnt < SHAPING_STAT_REFRESH_MAX_PER_POLLING) { struct timeout *t = NULL; t = timeouts_get(ctx->expires); if (!t) { break; } hash_node = container_of(t, struct shaping_profile_hash_node, timeout_handle); shaper_stat_priority_queue_len_refresh_all(ctx, hash_node); timeouts_add(ctx->expires, &hash_node->timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC);//timeouts_get will delete item from queue, add it back cnt++; } if (shaper_global_stat_queueing_pkts_get(&ctx->thread_global_stat) == 0) { return; } struct shaper_flow_instance sf_ins[SHAPER_FLOW_POP_NUM_MAX]; int sf_num; int ret; for (int i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) { sf_num = shaper_flow_in_order_get(sp, sf_ins, i, ctx->conf.polling_node_num_max[i]); if (sf_num == 0) { continue; } for (int j = 0; j < sf_num; j++) { ret = shaper_polling_first_pkt_token_get(sp, sf_ins[j].sf, sf_ins[j].priority, stat, ctx); if (ret == 0) { return; } } } return; } static struct shaping_flow* shaper_ctrl_pkt_session_handle(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_buff, struct metadata *meta) { struct ctrl_pkt_data ctrl_data; struct shaping_flow *sf = NULL; struct raw_pkt_parser raw_parser; if (shaper_marsio_pkt_metadata_get(rx_buff, meta, 1, &raw_parser) < 0) { LOG_ERROR("%s: shaping marsio get metadata from ctrl pkt failed", LOG_TAG_CTRLPKT); //TODO: dump meta data for debug return NULL; } if (shaper_marsio_ctrl_pkt_data_parse(&ctrl_data, meta->raw_data + meta->l7_offset, meta->raw_len - meta->l7_offset) < 0) { LOG_ERROR("%s: shaping marsio parse json data from ctrl pkt failed", LOG_TAG_CTRLPKT); return NULL; } if (ctrl_data.session_id != meta->session_id) { LOG_ERROR("%s: shaping marsio ctrl data session_id %ld != meta session_id %ld", LOG_TAG_CTRLPKT, ctrl_data.session_id, meta->session_id); return NULL; } switch (ctrl_data.state) { case SESSION_STATE_OPENING: shaper_global_stat_ctrlpkt_opening_inc(&ctx->thread_global_stat); //sf = shaper_session_opening(ctx, meta, &ctrl_data, &raw_parser); break; case SESSION_STATE_ACTIVE: shaper_global_stat_ctrlpkt_active_inc(&ctx->thread_global_stat); sf = shaper_session_active(ctx, meta, &ctrl_data, &raw_parser); break; case SESSION_STATE_CLOSING: shaper_global_stat_ctrlpkt_close_inc(&ctx->thread_global_stat); sf = shaper_session_close(ctx, meta); break; case SESSION_STATE_RESETALL: shaper_global_stat_ctrlpkt_resetall_inc(&ctx->thread_global_stat); sf = shaper_session_reset_all(ctx, meta); break; default: shaper_global_stat_ctrlpkt_err_inc(&ctx->thread_global_stat); assert(0); } return sf; } static struct shaping_flow *shaper_raw_pkt_session_handle(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_buff, struct metadata *meta) { struct shaping_flow *sf = NULL; struct session_node *session_node = NULL; struct raw_pkt_parser raw_parser; if (shaper_marsio_pkt_metadata_get(rx_buff, meta, 0, &raw_parser) < 0) { LOG_ERROR("%s: shaping marsio get metadata from raw pkt failed", LOG_TAG_CTRLPKT); //TODO: dump meta data for debug return NULL; } session_node = session_table_search_by_id(ctx->session_table, meta->session_id); if (session_node) { sf = (struct shaping_flow *)session_node->val_data; shaper_global_stat_hit_policy_throughput_rx_inc(&ctx->thread_global_stat, meta->raw_len); } return sf; } static void shaper_datapath_telemetry_info_append(struct shaping_marsio_info *marsio_info, marsio_buff_t *buff, struct shaping_flow *sf) { char datapath_telemetry_str[512] = {0}; struct shaping_rule_info *rule = NULL; int len = 0; if (marsio_dp_trace_measurements_can_emit(marsio_info->instance, buff, DP_TRACE_MEASUREMENT_TYPE_TELEMETRY)) { len += snprintf(datapath_telemetry_str + len, sizeof(datapath_telemetry_str) - len, "shaping:"); for (int i= 0; i < sf->rule_num; i++) { rule = &sf->matched_rule_infos[i]; len += snprintf(datapath_telemetry_str + len, sizeof(datapath_telemetry_str) - len, "rule_id=%d, primary_pf_id=%d", rule->id, rule->primary.id); if (rule->borrowing_num > 0) { len += snprintf(datapath_telemetry_str + len, sizeof(datapath_telemetry_str) - len, ", borrow_pf_ids:["); } for (int j = 0; j < rule->borrowing_num; j++) { if (j != 0) { len += snprintf(datapath_telemetry_str + len, sizeof(datapath_telemetry_str) - len, ","); } len += snprintf(datapath_telemetry_str + len, sizeof(datapath_telemetry_str) - len, "%d", rule->borrowing[j].id); } if (rule->borrowing_num > 0) { len += snprintf(datapath_telemetry_str + len, sizeof(datapath_telemetry_str) - len, "]"); } len += snprintf(datapath_telemetry_str + len, sizeof(datapath_telemetry_str) - len, ";"); } marsio_dp_trace_measurement_emit_str(marsio_info->instance, buff, DP_TRACE_MEASUREMENT_TYPE_TELEMETRY, "shaping", datapath_telemetry_str); } return; } void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx) { marsio_buff_t *rx_buff[SHAPER_MARSIO_RX_BRUST_MAX]; struct shaping_flow *sf = NULL; struct metadata meta; int rx_num; rx_num = marsio_recv_burst(ctx->marsio_info->mr_dev, ctx->thread_index, rx_buff, ctx->marsio_info->rx_brust_max); if (rx_num <= 0) { polling_entry(ctx->sp, ctx->stat, ctx); return; } for (int i = 0; i < rx_num; i++) { if (marsio_buff_is_ctrlbuf(rx_buff[i])) { sf = shaper_ctrl_pkt_session_handle(ctx, rx_buff[i], &meta); } else { sf = shaper_raw_pkt_session_handle(ctx, rx_buff[i], &meta); } shaper_global_stat_throughput_rx_inc(&ctx->thread_global_stat, meta.raw_len); if (meta.is_ctrl_pkt || !sf || sf->rule_num == 0) {//ctrl pkt need send directly marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &rx_buff[i], 1); shaper_global_stat_throughput_tx_inc(&ctx->thread_global_stat, meta.raw_len); } else { shaper_datapath_telemetry_info_append(ctx->marsio_info, rx_buff[i], sf); shaping_packet_process(ctx, rx_buff[i], &meta, sf); } polling_entry(ctx->sp, ctx->stat, ctx); } return; } int shaper_global_conf_init(struct shaping_system_conf *conf) { int ret; int array_num; cJSON *json = NULL; cJSON *tmp_obj = NULL, *tmp_array_obj = NULL; char polling_node_num_max[128] = {0}; unsigned int cpu_mask[SHAPING_WROK_THREAD_NUM_MAX] = {0}; ret = MESA_load_profile_int_nodef(SHAPING_GLOBAL_CONF_FILE, "SYSTEM", "WORK_THREAD_NUM", &conf->work_thread_num); if (ret < 0) { LOG_ERROR("%s: shaping init global conf WORK_THREAD_NUM failed", LOG_TAG_SHAPING); return ret; } conf->work_thread_num = MIN(conf->work_thread_num, SHAPING_WROK_THREAD_NUM_MAX); ret = MESA_load_profile_int_nodef(SHAPING_GLOBAL_CONF_FILE, "SYSTEM", "ENABLE_CPU_AFFINITY", &conf->cpu_affinity_enable); if (ret < 0) { LOG_ERROR("%s: shaping init global conf ENABLE_CPU_AFFINITY failed", LOG_TAG_SHAPING); return ret; } ret = MESA_load_profile_uint_range(SHAPING_GLOBAL_CONF_FILE, "SYSTEM", "CPU_AFFINITY_MASK", SHAPING_WROK_THREAD_NUM_MAX, cpu_mask); if (ret < 0 || ret != conf->work_thread_num) { LOG_ERROR("%s: shaping init global conf get CPU_AFFINITY_MASK failed or incomplete config", LOG_TAG_SHAPING); return -1; } CPU_ZERO(&conf->cpu_affinity_mask); for (int i = 0; i < conf->work_thread_num; i++) { int cpu_id = cpu_mask[i]; CPU_SET(cpu_id, &conf->cpu_affinity_mask); } ret = MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "SYSTEM", "firewall_sids", &conf->firewall_sid, 1001); if (ret < 0) { LOG_ERROR("%s: shaping init global conf firewall_sids failed", LOG_TAG_SHAPING); return ret; } #if 0 //temporarily not support array config array_num = SHAPING_PRIORITY_NUM_MAX; ret = MESA_load_profile_int_array(SHAPING_GLOBAL_CONF_FILE, "SHAPING", "POLLING_NODE_NUM_MAX", &array_num, g_sp_conf.polling_node_num_max); if (ret < 0) { MESA_handle_runtime_log(g_rt_para.log_handle, RLOG_LV_FATAL, SHAPING_LOG_MODULE, "shaping init global conf POLLING_NODE_NUM_MAX failed, ret is %d", ret); return ret; } for (int i = 0; i < array_num; i++) { if (g_sp_conf.polling_node_num_max[i] > SHAPER_FLOW_POP_NUM_MAX) { MESA_handle_runtime_log(g_rt_para.log_handle, RLOG_LV_FATAL, SHAPING_LOG_MODULE, "shaping init global conf POLLING_NODE_NUM_MAX failed, index %d value %d exceed limit %d", i, g_sp_conf.polling_node_num_max[i], SHAPER_FLOW_POP_NUM_MAX); return -1; } } #endif /*************parse max process num of each priority avl tree*************/ ret = MESA_load_profile_string_nodef(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "POLLING_NODE_NUM_MAX", polling_node_num_max, sizeof(polling_node_num_max)); if (ret < 0) { LOG_ERROR("%s: shaping init global conf POLLING_NODE_NUM_MAX failed", LOG_TAG_SHAPING); return ret; } json = cJSON_Parse(polling_node_num_max); if (!json) { LOG_ERROR("%s: shaping parse global conf json POLLING_NODE_NUM_MAX failed", LOG_TAG_SHAPING); goto ERROR; } tmp_obj = cJSON_GetObjectItem(json, "polling_node_num_max"); if (!tmp_obj) { LOG_ERROR("%s: shaping init global conf parse json polling_node_num_max failed", LOG_TAG_SHAPING); goto ERROR; } array_num = cJSON_GetArraySize(tmp_obj); if (array_num != SHAPING_PRIORITY_NUM_MAX) { LOG_ERROR("%s: shaping init global conf POLLING_NODE_NUM_MAX failed, array_num is %d", LOG_TAG_SHAPING, array_num); goto ERROR; } for (int i = 0; i < array_num; i++) { tmp_array_obj = cJSON_GetArrayItem(tmp_obj, i); if (tmp_array_obj->valueint > SHAPER_FLOW_POP_NUM_MAX) { LOG_ERROR("%s: shaping init global conf POLLING_NODE_NUM_MAX failed, value at index %d exceed limit %d", LOG_TAG_SHAPING, i, SHAPER_FLOW_POP_NUM_MAX); goto ERROR; } conf->polling_node_num_max[i] = tmp_array_obj->valueint; } if (json) { cJSON_Delete(json); } /*************************************************************************/ MESA_load_profile_uint_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "SESSION_QUEUE_LEN_MAX", &conf->session_queue_len_max, 128); MESA_load_profile_uint_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "PRIORITY_QUEUE_LEN_MAX", &conf->priority_queue_len_max, 1024); MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "CHECK_RULE_ENABLE_INTERVAL_SEC", &conf->check_rule_enable_interval_sec, 120); MESA_load_profile_uint_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "PKT_MAX_DELAY_TIME_US", &conf->pkt_max_delay_time_us, 2000000); MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "TOKEN_MULTIPLE_MIN", &conf->token_multiple_min, 10); MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "TOKEN_MULTIPLE_MAX", &conf->token_multiple_max, 50); return 0; ERROR: if (json) { cJSON_Delete(json); } return -1; } void shaping_engine_destroy(struct shaping_ctx *ctx) { LOG_DEBUG("%s: destroy shaping engine", LOG_TAG_SHAPING); if (ctx) { shaper_swarmkv_destroy(ctx->swarmkv_db); shaper_maat_destroy(ctx->maat_info); shaper_marsio_destroy(ctx->marsio_info); shaper_stat_destroy(ctx->stat); shaper_global_stat_destroy(ctx->global_stat); if (ctx->thread_ctx) { for (int i = 0; i < ctx->thread_num; i++) { shaper_free(ctx->thread_ctx[i].sp); session_table_destory(ctx->thread_ctx[i].session_table); timeouts_close(ctx->thread_ctx[i].expires); } free(ctx->thread_ctx); } free(ctx); } LOG_CLOSE(); return; } struct shaping_ctx *shaping_engine_init() { struct shaping_system_conf conf; struct shaping_ctx *ctx = NULL; int ret, error; memset(&conf, 0, sizeof(conf)); ctx = (struct shaping_ctx *)calloc(1, sizeof(struct shaping_ctx)); ret = shaper_global_conf_init(&conf); if (ret < 0) { LOG_ERROR("%s: shaping init global conf failed", LOG_TAG_SHAPING); goto ERROR; } /*init swarmkv*/ ctx->swarmkv_db = shaper_swarmkv_init(conf.work_thread_num); if (ctx->swarmkv_db == NULL) { goto ERROR; } /*init maat*/ ctx->maat_info = shaper_maat_init("SHAPING"); if (ctx->maat_info == NULL) { goto ERROR; } /*init marsio*/ ctx->marsio_info = shaper_marsio_init(&conf); if (ctx->marsio_info == NULL) { goto ERROR; } ctx->stat = shaper_stat_init(conf.work_thread_num); if (ctx->stat == NULL) { goto ERROR; } ctx->global_stat = shaper_global_stat_init(conf.work_thread_num); if (ctx->global_stat == NULL) { goto ERROR; } ctx->thread_ctx = (struct shaping_thread_ctx *)calloc(conf.work_thread_num, sizeof(struct shaping_thread_ctx)); ctx->thread_num = conf.work_thread_num; for (int i = 0; i < conf.work_thread_num; i++) { ctx->thread_ctx[i].thread_index = i; ctx->thread_ctx[i].sp = shaper_new(conf.priority_queue_len_max); ctx->thread_ctx[i].stat = ctx->stat; ctx->thread_ctx[i].session_table = session_table_create(); ctx->thread_ctx[i].maat_info = ctx->maat_info; ctx->thread_ctx[i].marsio_info = ctx->marsio_info; ctx->thread_ctx[i].swarmkv_db = ctx->swarmkv_db; ctx->thread_ctx[i].expires = timeouts_open(0, &error); ctx->thread_ctx[i].ref_ctx = ctx; memcpy(&ctx->thread_ctx[i].conf, &conf, sizeof(conf)); } return ctx; ERROR: shaping_engine_destroy(ctx); return NULL; }