diff options
| author | 刘畅 <[email protected]> | 2023-04-04 12:27:33 +0000 |
|---|---|---|
| committer | 刘畅 <[email protected]> | 2023-04-04 12:27:33 +0000 |
| commit | 00d035db8063aef61076138b116fc06dde2ea4f0 (patch) | |
| tree | 48e50859012bf900ba07bdf3ad9f85d2701d327a /shaping/src/shaper.cpp | |
| parent | 9a0ff4d68c0d165ca4c65c850dfed9c2c7dd4c80 (diff) | |
| parent | d92e71f1082c9f38ca22e762d1dd7ba8fd7c0aa9 (diff) | |
Merge branch 'priority_by_swarmkv' into 'rel'
Priority by swarmkv
See merge request tango/shaping-engine!7
Diffstat (limited to 'shaping/src/shaper.cpp')
| -rw-r--r-- | shaping/src/shaper.cpp | 117 |
1 files changed, 78 insertions, 39 deletions
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 51474e3..64775e0 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -27,6 +27,20 @@ extern "C" { #define MICRO_SECONDS_PER_SEC 1000000 #define NANO_SECONDS_PER_SEC 1000000000 +#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 "HMGET tsg-shaping-%d priority-0" +#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 " priority-1" +#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 " priority-2" +#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_4 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3 " priority-3" +#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_5 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_4 " priority-4" +#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_6 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_5 " priority-5" +#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_7 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_6 " priority-6" +#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_8 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_7 " priority-7" +#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_9 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_8 " priority-8" + +const char *swarmkv_queue_len_get_cmd[] = {"", SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3, + SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_4, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_5, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_6, + SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_7, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_8, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_9}; + struct shaper {//trees in one thread struct avl_tree *priority_trees[SHAPING_PRIORITY_NUM_MAX];//represent 10 avl tree corresponding to 10 priority }; @@ -39,6 +53,7 @@ struct shaping_node {//a session will have 10 nodes, corresponding 10 avl tree struct shaping_async_cb_arg { struct shaping_flow *sf; struct shaping_profile_info *s_pf_info; + int priority; unsigned char direction; }; @@ -214,6 +229,11 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx) return; } +static void swarmkv_reply_cb_do_nothing(const struct swarmkv_reply *reply, void * arg) +{ + return; +} + //return success(0) while any avl tree insert success int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time) { @@ -232,6 +252,7 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns); if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) { ret = 0; + swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->primary.id, priority); shaper_stat_queueing_pkt_inc(ctx->stat, s_rule_info->id, s_rule_info->primary.id, priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index); shaper_stat_queueing_session_inc(ctx->stat, s_rule_info->id, s_rule_info->primary.id, priority, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index); @@ -246,6 +267,7 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns); if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) { ret = 0; + swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->borrowing[i].id, priority); shaper_stat_queueing_pkt_inc(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id, priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index); shaper_stat_queueing_session_inc(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id, priority, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index); @@ -285,6 +307,8 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) priority = s_rule_info->primary.priority; if (avl_node_in_tree(s_node->avl_node[priority])) { avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); + swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->primary.id, priority); + shaper_stat_queueing_pkt_dec(ctx->stat, s_rule_info->id, s_rule_info->primary.id, priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index); shaper_stat_queueing_session_dec(ctx->stat, s_rule_info->id, s_rule_info->primary.id, priority, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index); @@ -302,6 +326,8 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) priority = s_rule_info->borrowing[i].priority; if (avl_node_in_tree(s_node->avl_node[priority])) { avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); + swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->borrowing[i].id, priority); + shaper_stat_queueing_pkt_dec(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id, priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index); shaper_stat_queueing_session_dec(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id, priority, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index); @@ -368,7 +394,7 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg END: free(cb_arg); - __atomic_sub_fetch(&s_pf_info->async_ref_count, 1, __ATOMIC_SEQ_CST); + __atomic_sub_fetch(&s_pf_info->async_token_ref_count, 1, __ATOMIC_SEQ_CST); shaping_flow_free(sf);//sub ref count and decide if need to free return; @@ -405,7 +431,7 @@ static int shaper_token_get_from_profile(struct swarmkv *db, struct shaping_flow struct shaping_async_cb_arg *arg; char key[32] = {0}; - __atomic_add_fetch(&pf_info->async_ref_count, 1, __ATOMIC_SEQ_CST); + __atomic_add_fetch(&pf_info->async_token_ref_count, 1, __ATOMIC_SEQ_CST); __atomic_add_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST); snprintf(key, sizeof(key), "tsg-shaping-%d-%s", pf_info->id, direction == SHAPING_DIR_OUT ? "outgoing" : "incoming"); @@ -415,7 +441,7 @@ static int shaper_token_get_from_profile(struct swarmkv *db, struct shaping_flow arg->direction = direction; swarmkv_tconsume(db, key, strlen(key), req_token, shaper_token_get_cb, arg); - if (__atomic_load_n(&pf_info->async_ref_count, __ATOMIC_SEQ_CST) != 0) {//has async operation not completed + if (__atomic_load_n(&pf_info->async_token_ref_count, __ATOMIC_SEQ_CST) != 0) {//has async operation not completed shaper_deposit_token_sub(pf_info, req_token, direction); return 0; } @@ -436,57 +462,70 @@ static int shaper_token_get_from_profile(struct swarmkv *db, struct shaping_flow return -1; } -#if 0 -int shaper_token_consume(struct shaping_flow *sf, unsigned int req_token, struct shaping_rule_info *s_rule_info) +static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb_arg) { - int i; + struct shaping_async_cb_arg *arg = (struct shaping_async_cb_arg *)cb_arg; + struct shaping_profile_info *s_pf_info = arg->s_pf_info; + struct shaping_flow *sf = arg->sf; + + s_pf_info->is_priority_blocked = 0; - if (SHAPING_SUCCESS == shaping_token_get_from_profile(&s_rule_info->primary, 1, req_token)) { - return SHAPING_SUCCESS; + if (!reply || reply->type != SWARMKV_REPLY_ARRAY) { + goto END; } - if (s_rule_info->borrowing_num > 0) { - for (i = 0; i < s_rule_info->borrowing_num; i++) { - if (SHAPING_SUCCESS == shaping_token_get_from_profile(&s_rule_info->borrowing[i], 0, req_token)) { - return SHAPING_SUCCESS; - } + for (unsigned int i = 0; i < reply->n_element; i++) { + if (reply->elements[i] && reply->elements[i]->integer > 0) { + s_pf_info->is_priority_blocked = 1; + break; } } - return SHAPING_FAILED; +END: + free(cb_arg); + __atomic_sub_fetch(&s_pf_info->async_queue_len_ref_count, 1, __ATOMIC_SEQ_CST); + shaping_flow_free(sf);//sub ref count and decide if need to free } -#endif -static int shaper_token_consume(struct swarmkv *db, struct shaping_flow *sf, int req_token, - struct shaping_profile_info *profile, int profile_type, unsigned char direction) +static int shaper_profile_is_priority_blocked(struct swarmkv *db, struct shaping_flow *sf, struct shaping_profile_info *profile) { - return shaper_token_get_from_profile(db, sf, profile, profile_type, req_token, direction); - -} + struct shaping_async_cb_arg *arg; + int priority = profile->priority; -#if 0 -enum shaping_packet_action shaper_pkt_action_decide(struct shaping_flow *sf, struct shaper *sp, void *raw_pkt, - unsigned int pkt_len, unsigned char direction, unsigned long long income_time) -{ - int i; - struct shaping_rule_info *s_rule_info; - - for (i = sf->anchor; i < sf->rule_num; i++) { - s_rule_info = &sf->matched_rule_infos[i]; - if (-1 == shaper_token_consume(sf, pkt_len, s_rule_info, s_rule_info->primary.priority)) { - sf->anchor = i; - if (0 == shaper_flow_push(sf, sp)) { - shaper_packet_enqueue(sf, raw_pkt, direction, income_time); - return SHAPING_HOLD; - } else { - return SHAPING_DROP; - } + if (priority == 0) {//highest priority, can't be blocked + return 0; + } + + arg = (struct shaping_async_cb_arg *)calloc(1, sizeof(struct shaping_async_cb_arg)); + arg->s_pf_info = profile; + arg->sf = sf; + arg->priority = priority; + + __atomic_add_fetch(&profile->async_queue_len_ref_count, 1, __ATOMIC_SEQ_CST); + __atomic_add_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST); + + swarmkv_async_command(db, shaper_queue_len_get_cb, arg, swarmkv_queue_len_get_cmd[priority], profile->id); + + if (__atomic_load_n(&profile->async_queue_len_ref_count, __ATOMIC_SEQ_CST) != 0) { + return 0; + } else { + if (profile->is_priority_blocked) { + return 1; + } else { + return 0; } } +} - return SHAPING_FORWARD; +static int shaper_token_consume(struct swarmkv *db, struct shaping_flow *sf, int req_token, + struct shaping_profile_info *profile, int profile_type, unsigned char direction) +{ + if (shaper_profile_is_priority_blocked(db, sf, profile)) { + return -1; + } else { + return shaper_token_get_from_profile(db, sf, profile, profile_type, req_token, direction); + } } -#endif int shaper_profile_get(struct shaping_rule_info *s_rule_info, int priority, struct shaping_profile_container pf_container[]) { |
