diff options
Diffstat (limited to 'shaping/src/shaper.cpp')
| -rw-r--r-- | shaping/src/shaper.cpp | 59 |
1 files changed, 41 insertions, 18 deletions
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 55f32fb..e0fc695 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -162,6 +162,8 @@ struct shaping_flow* shaping_flow_new(struct shaping_thread_ctx *ctx) timeout_init(&s_node->shaping_flow.timeout_handle, TIMEOUT_ABS); timeouts_add(ctx->expires, &s_node->shaping_flow.timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC); + s_node->shaping_flow.ref_cnt = 1; + return &s_node->shaping_flow; ERROR: @@ -171,6 +173,11 @@ ERROR: void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) { + sf->ref_cnt--; + if (sf->ref_cnt > 0) { + return; + } + struct shaping_node *s_node = (struct shaping_node*)sf; timeouts_del(ctx->expires, &sf->timeout_handle); @@ -412,23 +419,25 @@ static void shaper_deposit_token_add(struct shaping_profile_info *profile, int r static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg) { - struct shaping_async_cb_arg *arg = (struct shaping_async_cb_arg*)cb_arg; + struct shaping_tconsume_cb_arg *arg = (struct shaping_tconsume_cb_arg*)cb_arg; + struct shaping_thread_ctx *ctx = arg->ctx; struct shaping_profile_info *profile = arg->profile; struct shaping_profile_hash_node *pf_hash_node = profile->hash_node; + struct shaping_flow *sf = arg->sf; struct timespec curr_time; long long curr_time_us; clock_gettime(CLOCK_MONOTONIC, &curr_time); curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; - shaper_global_stat_swarmkv_latency_update(arg->ctx->global_stat, curr_time_us - arg->start_time_us); + shaper_global_stat_swarmkv_latency_update(ctx->global_stat, curr_time_us - arg->start_time_us); - shaper_global_stat_async_callback_inc(arg->ctx->global_stat); - shaper_global_stat_tconsume_callback_inc(arg->ctx->global_stat); + shaper_global_stat_async_callback_inc(ctx->global_stat); + shaper_global_stat_tconsume_callback_inc(ctx->global_stat); LOG_INFO("Swarmkv reply type =%d, direction =%d, integer =%llu",reply->type, arg->direction, reply->integer); if (reply->type != SWARMKV_REPLY_INTEGER) { - shaper_global_stat_async_tconsume_failed_inc(arg->ctx->global_stat); + shaper_global_stat_async_tconsume_failed_inc(ctx->global_stat); goto END; } @@ -440,7 +449,10 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg } if (reply->integer > 0) { + sf->flag &= (~SESSION_BORROW); shaper_deposit_token_add(profile, reply->integer, arg->direction, profile->priority);//deposit tokens to profile + } else { + sf->flag |= SESSION_BORROW; } END: @@ -459,6 +471,8 @@ END: } } + shaping_flow_free(ctx, sf); + free(cb_arg); cb_arg = NULL; @@ -533,18 +547,22 @@ static int shaper_deposit_token_is_enough(struct shaping_profile_info *profile, static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *pf_info, int profile_type, int req_token_bits, unsigned char direction, struct timespec *curr_timespec) { - struct shaping_async_cb_arg *arg = NULL; + struct shaping_tconsume_cb_arg *arg = NULL; char key[32] = {0}; snprintf(key, sizeof(key), "tsg-shaping-%d-%s", pf_info->id, direction == SHAPING_DIR_OUT ? "outgoing" : "incoming"); - arg = (struct shaping_async_cb_arg *)calloc(1, sizeof(struct shaping_async_cb_arg)); + arg = (struct shaping_tconsume_cb_arg *)calloc(1, sizeof(struct shaping_tconsume_cb_arg)); arg->ctx = ctx; arg->profile = pf_info; + arg->sf = sf; arg->direction = direction; arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; shaper_global_stat_async_invoke_inc(ctx->global_stat); sheper_global_stat_tconsume_invoke_inc(ctx->global_stat); + + sf->ref_cnt++; + switch (pf_info->type) { case PROFILE_TYPE_GENERIC: swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg); @@ -583,24 +601,24 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb_arg) { - struct shaping_async_cb_arg *arg = (struct shaping_async_cb_arg *)cb_arg; - struct shaping_profile_info *profile = arg->profile; - struct shaping_profile_hash_node *pf_hash_node = profile->hash_node; - int priority = profile->priority; + struct shaping_hmget_cb_arg *arg = (struct shaping_hmget_cb_arg *)cb_arg; + struct shaping_thread_ctx *ctx = arg->ctx; + struct shaping_profile_hash_node *pf_hash_node = arg->pf_hash_node; + int priority = arg->priority; struct timespec curr_time; long long curr_time_us; clock_gettime(CLOCK_MONOTONIC, &curr_time); curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; - shaper_global_stat_swarmkv_latency_update(arg->ctx->global_stat, curr_time_us - arg->start_time_us); + shaper_global_stat_swarmkv_latency_update(ctx->global_stat, curr_time_us - arg->start_time_us); - shaper_global_stat_async_callback_inc(arg->ctx->global_stat); - shaper_global_stat_hmget_callback_inc(arg->ctx->global_stat); + shaper_global_stat_async_callback_inc(ctx->global_stat); + shaper_global_stat_hmget_callback_inc(ctx->global_stat); pf_hash_node->is_priority_blocked[priority] = 0; if (!reply || (reply->type != SWARMKV_REPLY_NIL && reply->type != SWARMKV_REPLY_ARRAY)) { - shaper_global_stat_async_hmget_failed_inc(arg->ctx->global_stat); + shaper_global_stat_async_hmget_failed_inc(ctx->global_stat); goto END; } @@ -628,7 +646,7 @@ END: static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, struct timespec *curr_timespec, long long curr_time_ms) { - struct shaping_async_cb_arg *arg; + struct shaping_hmget_cb_arg *arg; int priority = profile->priority; if (priority == 0) {//highest priority, can't be blocked @@ -639,9 +657,10 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st goto END; } - arg = (struct shaping_async_cb_arg *)calloc(1, sizeof(struct shaping_async_cb_arg)); + arg = (struct shaping_hmget_cb_arg *)calloc(1, sizeof(struct shaping_hmget_cb_arg)); arg->ctx = ctx; - arg->profile = profile; + arg->pf_hash_node = profile->hash_node; + arg->priority = priority; arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; shaper_global_stat_async_invoke_inc(ctx->global_stat); @@ -698,6 +717,10 @@ static int shaping_swarmkv_is_too_short_interval(long long curr_time_ms, struct static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int req_token_bytes, struct shaping_profile_info *profile, int profile_type, unsigned char direction) { + if (profile_type == PROFILE_IN_RULE_TYPE_BORROW && !(sf->flag & SESSION_BORROW)) { + return SHAPER_TOKEN_GET_FAILED; + } + struct shaping_rule_info *rule = &sf->matched_rule_infos[sf->anchor]; time_t curr_time = time(NULL); |
