From c8283b4a6285d27eb6910abf3097b7d2270920a3 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 17 Nov 2023 10:00:51 +0000 Subject: resolve confilitcs, and add swarmkv async latency statistics --- shaping/src/shaper.cpp | 42 ++++++++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 20 deletions(-) (limited to 'shaping/src/shaper.cpp') diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 8deda2a..3de4e0b 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -23,13 +23,6 @@ extern "C" { #include "shaper_maat.h" #include "shaper_global_stat.h" -#define NANO_SECONDS_PER_MICRO_SEC 1000 -#define MICRO_SECONDS_PER_SEC 1000000 -#define NANO_SECONDS_PER_SEC 1000000000 - -#define NANO_SECONDS_PER_MILLI_SEC 1000000 -#define MILLI_SECONDS_PER_SEC 1000 - #define TOKEN_ENLARGE_TIMES 10 #define TOKEN_GET_FAILED_INTERVAL_MS 1 #define HMGET_REQUEST_INTERVAL_MS 1000 @@ -57,13 +50,6 @@ struct shaping_node {//a session will have 10 nodes, corresponding 10 avl tree struct avl_node *avl_node[SHAPING_PRIORITY_NUM_MAX]; }; -struct shaping_async_cb_arg { - struct shaping_thread_ctx *ctx; - struct shaping_profile_hash_node *pf_hash_node; - int priority; - unsigned char direction; -}; - struct shaping_profile_container { struct shaping_profile_info *pf_info; int pf_type; @@ -407,8 +393,15 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg { struct shaping_async_cb_arg *arg = (struct shaping_async_cb_arg*)cb_arg; struct shaping_profile_hash_node *pf_hash_node = arg->pf_hash_node; + struct timespec curr_time; + long long curr_time_us; + + clock_gettime(CLOCK_MONOTONIC, &curr_time); + curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; + shaper_global_stat_swarmkv_latency_update(arg->ctx->global_stat, curr_time_us - arg->start_time_us); shaper_global_stat_async_callback_inc(arg->ctx->global_stat); + shaper_global_stat_tconsume_callback_inc(arg->ctx->global_stat); LOG_INFO("Swarmkv reply type =%d, direction =%d, integer =%llu",reply->type, arg->direction, reply->integer); @@ -467,7 +460,7 @@ static int shaper_deposit_token_is_enough(struct shaping_profile_hash_node *pf_h } } -static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *pf_info, int profile_type, int req_token_bits, unsigned char direction) +static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *pf_info, int profile_type, int req_token_bits, unsigned char direction, struct timespec *curr_timespec) { struct shaping_async_cb_arg *arg = NULL; char key[32] = {0}; @@ -478,8 +471,10 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct arg->pf_hash_node = pf_info->hash_node; arg->priority = pf_info->priority; arg->direction = direction; + arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; shaper_global_stat_async_invoke_inc(ctx->global_stat); + sheper_global_stat_tconsume_invoke_inc(ctx->global_stat); switch (pf_info->type) { case PROFILE_TYPE_GENERIC: swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg); @@ -519,8 +514,15 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb struct shaping_async_cb_arg *arg = (struct shaping_async_cb_arg *)cb_arg; struct shaping_profile_hash_node *pf_hash_node = arg->pf_hash_node; int priority = arg->priority; + struct timespec curr_time; + long long curr_time_us; + + clock_gettime(CLOCK_MONOTONIC, &curr_time); + curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; + shaper_global_stat_swarmkv_latency_update(arg->ctx->global_stat, curr_time_us - arg->start_time_us); shaper_global_stat_async_callback_inc(arg->ctx->global_stat); + shaper_global_stat_hmget_callback_inc(arg->ctx->global_stat); pf_hash_node->is_priority_blocked[priority] = 0; @@ -545,15 +547,13 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb } END: - struct timespec curr_time; - clock_gettime(CLOCK_MONOTONIC, &curr_time); pf_hash_node->last_hmget_ms[priority] = curr_time.tv_sec * MILLI_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MILLI_SEC; free(cb_arg); cb_arg = NULL; } -static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, long long curr_time_ms) +static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, struct timespec *curr_timespec, long long curr_time_ms) { struct shaping_async_cb_arg *arg; int priority = profile->priority; @@ -570,8 +570,10 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st arg->ctx = ctx; arg->pf_hash_node = profile->hash_node; arg->priority = priority; + arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; shaper_global_stat_async_invoke_inc(ctx->global_stat); + shaper_global_stat_hmget_invoke_inc(ctx->global_stat); swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, swarmkv_queue_len_get_cmd[priority], profile->id); END: @@ -638,11 +640,11 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f return SHAPER_TOKEN_GET_FAILED; } - if (shaper_profile_is_priority_blocked(ctx, sf, profile, curr_time_ms)) { + if (shaper_profile_is_priority_blocked(ctx, sf, profile, &curr_timespec, curr_time_ms)) { return SHAPER_TOKEN_GET_FAILED; } else { int req_token_bits = req_token_bytes * 8; - return shaper_token_get_from_profile(ctx, sf, profile, profile_type, req_token_bits, direction); + return shaper_token_get_from_profile(ctx, sf, profile, profile_type, req_token_bits, direction, &curr_timespec); } } -- cgit v1.2.3