diff options
| author | root <[email protected]> | 2023-11-16 02:51:38 +0000 |
|---|---|---|
| committer | root <[email protected]> | 2023-11-16 02:51:38 +0000 |
| commit | 8976a632838c5ae9d4f239850be16c85931af502 (patch) | |
| tree | 2b9377b82c4ffd5ba9e138eccec89ebe2af16424 /shaping/src/shaper.cpp | |
| parent | 5b20b8ffa36252a676caefe731886f3c6898905a (diff) | |
store deposit token in thread_local profile_hash_node, remove shaping_flow ref_count operation
Diffstat (limited to 'shaping/src/shaper.cpp')
| -rw-r--r-- | shaping/src/shaper.cpp | 39 |
1 files changed, 13 insertions, 26 deletions
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index bef1b56..8deda2a 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -59,8 +59,7 @@ struct shaping_node {//a session will have 10 nodes, corresponding 10 avl tree struct shaping_async_cb_arg { struct shaping_thread_ctx *ctx; - struct shaping_flow *sf; - struct shaping_profile_info *s_pf_info; + struct shaping_profile_hash_node *pf_hash_node; int priority; unsigned char direction; }; @@ -173,7 +172,6 @@ struct shaping_flow* shaping_flow_new(struct shaping_thread_ctx *ctx) } TAILQ_INIT(&s_node->shaping_flow.packet_queue); - s_node->shaping_flow.ref_count = 1; s_node->shaping_flow.priority = SHAPING_PRIORITY_NUM_MAX - 1; timeout_init(&s_node->shaping_flow.timeout_handle, TIMEOUT_ABS); timeouts_add(ctx->expires, &s_node->shaping_flow.timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC); @@ -189,11 +187,9 @@ void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) { struct shaping_node *s_node = (struct shaping_node*)sf; - if (__atomic_sub_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST) == 0) { - timeouts_del(ctx->expires, &sf->timeout_handle); - shaper_stat_refresh(ctx, sf, ctx->thread_index, 1); - shaping_node_free(s_node); - } + timeouts_del(ctx->expires, &sf->timeout_handle); + shaper_stat_refresh(ctx, sf, ctx->thread_index, 1); + shaping_node_free(s_node); return; } @@ -410,8 +406,7 @@ static void shaper_deposit_token_add(struct shaping_profile_hash_node *pf_hash_n static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg) { struct shaping_async_cb_arg *arg = (struct shaping_async_cb_arg*)cb_arg; - struct shaping_profile_info *s_pf_info = arg->s_pf_info; - struct shaping_flow *sf = arg->sf; + struct shaping_profile_hash_node *pf_hash_node = arg->pf_hash_node; shaper_global_stat_async_callback_inc(arg->ctx->global_stat); @@ -423,24 +418,23 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg } if (reply->integer < 0) {//profile not exist - s_pf_info->hash_node->is_invalid = 1; + pf_hash_node->is_invalid = 1; goto END; } else { - s_pf_info->hash_node->is_invalid = 0; + pf_hash_node->is_invalid = 0; } if (reply->integer > 0) { - shaper_deposit_token_add(s_pf_info->hash_node, reply->integer, arg->direction, s_pf_info->priority);//deposit tokens to profile + shaper_deposit_token_add(pf_hash_node, reply->integer, arg->direction, arg->priority);//deposit tokens to profile } END: if (reply->type != SWARMKV_REPLY_INTEGER || reply->integer == 0) { struct timespec curr_time; clock_gettime(CLOCK_MONOTONIC, &curr_time); - s_pf_info->hash_node->last_failed_get_token_ms = curr_time.tv_sec * MILLI_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MILLI_SEC; + pf_hash_node->last_failed_get_token_ms = curr_time.tv_sec * MILLI_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MILLI_SEC; } - shaping_flow_free(arg->ctx, sf);//sub ref count and decide if need to free free(cb_arg); cb_arg = NULL; @@ -478,13 +472,11 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct struct shaping_async_cb_arg *arg = NULL; char key[32] = {0}; - __atomic_add_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST); - snprintf(key, sizeof(key), "tsg-shaping-%d-%s", pf_info->id, direction == SHAPING_DIR_OUT ? "outgoing" : "incoming"); arg = (struct shaping_async_cb_arg *)calloc(1, sizeof(struct shaping_async_cb_arg)); arg->ctx = ctx; - arg->s_pf_info = pf_info; - arg->sf = sf; + arg->pf_hash_node = pf_info->hash_node; + arg->priority = pf_info->priority; arg->direction = direction; shaper_global_stat_async_invoke_inc(ctx->global_stat); @@ -525,8 +517,7 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb_arg) { struct shaping_async_cb_arg *arg = (struct shaping_async_cb_arg *)cb_arg; - struct shaping_profile_hash_node *pf_hash_node = arg->s_pf_info->hash_node; - struct shaping_flow *sf = arg->sf; + struct shaping_profile_hash_node *pf_hash_node = arg->pf_hash_node; int priority = arg->priority; shaper_global_stat_async_callback_inc(arg->ctx->global_stat); @@ -558,7 +549,6 @@ END: clock_gettime(CLOCK_MONOTONIC, &curr_time); pf_hash_node->last_hmget_ms[priority] = curr_time.tv_sec * MILLI_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MILLI_SEC; - shaping_flow_free(arg->ctx, sf);//sub ref count and decide if need to free free(cb_arg); cb_arg = NULL; } @@ -578,12 +568,9 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st arg = (struct shaping_async_cb_arg *)calloc(1, sizeof(struct shaping_async_cb_arg)); arg->ctx = ctx; - arg->s_pf_info = profile; - arg->sf = sf; + arg->pf_hash_node = profile->hash_node; arg->priority = priority; - __atomic_add_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST); - shaper_global_stat_async_invoke_inc(ctx->global_stat); swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, swarmkv_queue_len_get_cmd[priority], profile->id); |
