diff options
| author | root <[email protected]> | 2023-11-13 08:07:52 +0000 |
|---|---|---|
| committer | root <[email protected]> | 2023-11-13 08:07:52 +0000 |
| commit | a1609051ec2e8a11cdf13377efb133c2f2b603e0 (patch) | |
| tree | 8fef799b5d8cc04d2ef54a72a2bb1eaae66d81a3 /shaping/src/shaper.cpp | |
| parent | f1c9565d486fe9a278996a3cfd8be6f77cf7a977 (diff) | |
TSG-17653: tcp pure ctrl packet force consume token and forward directly
Diffstat (limited to 'shaping/src/shaper.cpp')
| -rw-r--r-- | shaping/src/shaper.cpp | 108 |
1 files changed, 65 insertions, 43 deletions
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 1013d0f..bef1b56 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -78,6 +78,8 @@ enum shaper_token_get_result { struct shaping_profile_hash_node { int id; + int in_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX]; + int out_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX]; long long last_failed_get_token_ms; long long last_hmget_ms[SHAPING_PRIORITY_NUM_MAX]; unsigned char is_priority_blocked[SHAPING_PRIORITY_NUM_MAX]; @@ -85,7 +87,7 @@ struct shaping_profile_hash_node { UT_hash_handle hh; }; -struct shaping_profile_hash_node *g_shaping_profile_table = NULL; +thread_local struct shaping_profile_hash_node *thread_sp_hashtbl = NULL; struct shaper* shaper_new(unsigned int priority_queue_len_max) { @@ -196,6 +198,17 @@ void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) return; } +void shaper_thread_resource_clear() +{ + struct shaping_profile_hash_node *thread_sp_hashtbl_tmp = NULL; + struct shaping_profile_hash_node *node = NULL; + + HASH_ITER(hh, thread_sp_hashtbl, node, thread_sp_hashtbl_tmp) { + HASH_DEL(thread_sp_hashtbl, node); + free(node); + } +} + static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, void *pkt_buff, struct metadata *meta) { struct shaping_packet_wrapper *s_pkt = NULL; @@ -215,7 +228,6 @@ static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_ s_pkt->pkt_buff = pkt_buff; s_pkt->direction = meta->dir; s_pkt->length = meta->raw_len; - s_pkt->tcp_pure_contorl = meta->is_tcp_pure_ctrl; s_pkt->income_time_ns = curr_time.tv_sec * NANO_SECONDS_PER_SEC + curr_time.tv_nsec; s_pkt->enqueue_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; TAILQ_INSERT_TAIL(&sf->packet_queue, s_pkt, node); @@ -386,12 +398,12 @@ int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_i return count; } -static void shaper_deposit_token_add(struct shaping_profile_info *pf_info, int req_token, unsigned char direction) +static void shaper_deposit_token_add(struct shaping_profile_hash_node *pf_hash_node, int req_token_bits, unsigned char direction, int priority) { if (direction == SHAPING_DIR_IN) { - __atomic_add_fetch(&pf_info->in_deposit_token, req_token, __ATOMIC_SEQ_CST); + pf_hash_node->in_deposit_token_bits[priority] += req_token_bits; } else { - __atomic_add_fetch(&pf_info->out_deposit_token, req_token, __ATOMIC_SEQ_CST); + pf_hash_node->out_deposit_token_bits[priority] += req_token_bits; } } @@ -403,7 +415,7 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg shaper_global_stat_async_callback_inc(arg->ctx->global_stat); - LOG_INFO("Swarmkv reply type =%d, integer =%llu",reply->type, reply->integer); + LOG_INFO("Swarmkv reply type =%d, direction =%d, integer =%llu",reply->type, arg->direction, reply->integer); if (reply->type != SWARMKV_REPLY_INTEGER) { shaper_global_stat_async_tconsume_failed_inc(arg->ctx->global_stat); @@ -411,14 +423,14 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg } if (reply->integer < 0) {//profile not exist - s_pf_info->is_invalid = 1; + s_pf_info->hash_node->is_invalid = 1; goto END; } else { - s_pf_info->is_invalid = 0; + s_pf_info->hash_node->is_invalid = 0; } if (reply->integer > 0) { - shaper_deposit_token_add(s_pf_info, reply->integer, arg->direction);//deposit tokens to profile + shaper_deposit_token_add(s_pf_info->hash_node, reply->integer, arg->direction, s_pf_info->priority);//deposit tokens to profile } END: @@ -435,25 +447,25 @@ END: return; } -static void shaper_deposit_token_sub(struct shaping_profile_info *pf_info, int req_token, unsigned char direction) +static void shaper_deposit_token_sub(struct shaping_profile_hash_node *pf_hash_node, int req_token_bits, unsigned char direction, int priority) { if (direction == SHAPING_DIR_IN) { - __atomic_sub_fetch(&pf_info->in_deposit_token, req_token, __ATOMIC_SEQ_CST); + pf_hash_node->in_deposit_token_bits[priority] -= req_token_bits; } else { - __atomic_sub_fetch(&pf_info->out_deposit_token, req_token, __ATOMIC_SEQ_CST); + pf_hash_node->out_deposit_token_bits[priority] -= req_token_bits; } } -static int shaper_deposit_token_is_enough(struct shaping_profile_info *pf_info, int req_token, unsigned char direction) +static int shaper_deposit_token_is_enough(struct shaping_profile_hash_node *pf_hash_node, int req_token_bits, unsigned char direction, int priority) { if (direction == SHAPING_DIR_IN) { - if (__atomic_load_n(&pf_info->in_deposit_token, __ATOMIC_SEQ_CST) >= req_token) { + if (pf_hash_node->in_deposit_token_bits[priority] >= req_token_bits) { return 1; } else { return 0; } } else { - if (__atomic_load_n(&pf_info->out_deposit_token, __ATOMIC_SEQ_CST) >= req_token) { + if (pf_hash_node->out_deposit_token_bits[priority] >= req_token_bits) { return 1; } else { return 0; @@ -494,7 +506,7 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct break; } - if (pf_info->is_invalid) { + if (pf_info->hash_node->is_invalid) { if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {//for primary, means this rule don't need get token return SHAPER_TOKEN_GET_SUCCESS; } else {//for borrowing, means this profile has no token to borrow @@ -502,8 +514,8 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct } } - if (shaper_deposit_token_is_enough(pf_info, req_token_bits, direction)) { - shaper_deposit_token_sub(pf_info, req_token_bits, direction); + if (shaper_deposit_token_is_enough(pf_info->hash_node, req_token_bits, direction, pf_info->priority)) { + shaper_deposit_token_sub(pf_info->hash_node, req_token_bits, direction, pf_info->priority); return SHAPER_TOKEN_GET_SUCCESS; } @@ -583,6 +595,23 @@ END: } } +static void shaper_profile_hash_node_update(struct shaping_profile_info *profile) +{ + if (profile->hash_node == NULL) { + struct shaping_profile_hash_node *hash_node = NULL; + HASH_FIND_INT(thread_sp_hashtbl, &profile->id, hash_node); + if (hash_node) { + profile->hash_node = hash_node; + } else { + profile->hash_node = (struct shaping_profile_hash_node*)calloc(1, sizeof(struct shaping_profile_hash_node)); + profile->hash_node->id = profile->id; + HASH_ADD_INT(thread_sp_hashtbl, id, profile->hash_node); + } + } + + return; +} + static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int req_token_bytes, struct shaping_profile_info *profile, int profile_type, unsigned char direction) { @@ -603,21 +632,11 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f return SHAPER_TOKEN_GET_PASS;//rule is disabled, don't need to get token and forward packet } - if (shaper_deposit_token_is_enough(profile, req_token_bytes * 8, direction)) { - shaper_deposit_token_sub(profile, req_token_bytes * 8, direction); - return SHAPER_TOKEN_GET_SUCCESS; - } + shaper_profile_hash_node_update(profile); - if (profile->hash_node == NULL) { - struct shaping_profile_hash_node *hash_node = NULL; - HASH_FIND_INT(g_shaping_profile_table, &profile->id, hash_node); - if (hash_node) { - profile->hash_node = hash_node; - } else { - profile->hash_node = (struct shaping_profile_hash_node*)calloc(1, sizeof(struct shaping_profile_hash_node)); - profile->hash_node->id = profile->id; - HASH_ADD_INT(g_shaping_profile_table, id, profile->hash_node); - } + if (shaper_deposit_token_is_enough(profile->hash_node, req_token_bytes * 8, direction, profile->priority)) { + shaper_deposit_token_sub(profile->hash_node, req_token_bytes * 8, direction, profile->priority); + return SHAPER_TOKEN_GET_SUCCESS; } struct timespec curr_timespec; @@ -708,12 +727,6 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi pkt_wrapper = shaper_first_pkt_get(sf); assert(pkt_wrapper != NULL); - if (pkt_wrapper->tcp_pure_contorl) { - shaper_flow_pop(ctx, sf); - shaper_stat_forward_all_rule_inc(ctx->stat, sf, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index); - return SHAPING_FORWARD; - } - if (pf_container[0].pf_type == PROFILE_IN_RULE_TYPE_PRIMARY) { clock_gettime(CLOCK_MONOTONIC, &curr_time); if (shaper_pkt_latency_us_calculate(pf_container[0].pf_info, &curr_time) > ctx->conf.pkt_max_delay_time_us) { @@ -770,11 +783,6 @@ static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shapi unsigned long long enqueue_time; int enqueue_success = 0; - if (meta->is_tcp_pure_ctrl) { - shaper_stat_forward_all_rule_inc(ctx->stat, sf, meta->dir, meta->raw_len, ctx->thread_index); - return SHAPING_FORWARD; - } - int ret = shaper_token_consume(ctx, sf, meta->raw_len, profile, profile_type, meta->dir); if (ret >= SHAPER_TOKEN_GET_SUCCESS) { if (ret == SHAPER_TOKEN_GET_SUCCESS) { @@ -886,6 +894,19 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ } } +static void shaper_token_consume_force(struct shaping_flow *sf, struct metadata *meta) +{ + struct shaping_rule_info *rule; + + for (int i = 0; i < sf->rule_num; i++) { + rule = &sf->matched_rule_infos[i]; + shaper_profile_hash_node_update(&rule->primary); + shaper_deposit_token_sub(rule->primary.hash_node, meta->raw_len * 8, meta->dir, rule->primary.priority); + } + + return; +} + void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_buff, struct metadata *meta, struct shaping_flow *sf) { int shaping_ret; @@ -896,6 +917,7 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu sf->processed_pkts++; if (meta->is_tcp_pure_ctrl) { + shaper_token_consume_force(sf, meta); marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1); shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len); shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len); |
