summaryrefslogtreecommitdiff
path: root/shaping/src
diff options
context:
space:
mode:
author刘畅 <[email protected]>2023-11-17 10:15:39 +0000
committer刘畅 <[email protected]>2023-11-17 10:15:39 +0000
commitca892c8bdfd82979edc7dbb6821cd51ceae516c4 (patch)
tree8b7d26187de43052a63468280943ec9a00198eed /shaping/src
parent63abee15b68414fc2a73aa99dea3a8ace73606be (diff)
parentc8283b4a6285d27eb6910abf3097b7d2270920a3 (diff)
Merge branch 'add_swarmkv_latency_histogram' into 'rel'v1.3.9
add swarmkv async latency statistics See merge request tango/shaping-engine!54
Diffstat (limited to 'shaping/src')
-rw-r--r--shaping/src/shaper.cpp42
-rw-r--r--shaping/src/shaper_global_stat.cpp92
-rw-r--r--shaping/src/shaper_stat.cpp26
3 files changed, 129 insertions, 31 deletions
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index 8deda2a..3de4e0b 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -23,13 +23,6 @@ extern "C" {
#include "shaper_maat.h"
#include "shaper_global_stat.h"
-#define NANO_SECONDS_PER_MICRO_SEC 1000
-#define MICRO_SECONDS_PER_SEC 1000000
-#define NANO_SECONDS_PER_SEC 1000000000
-
-#define NANO_SECONDS_PER_MILLI_SEC 1000000
-#define MILLI_SECONDS_PER_SEC 1000
-
#define TOKEN_ENLARGE_TIMES 10
#define TOKEN_GET_FAILED_INTERVAL_MS 1
#define HMGET_REQUEST_INTERVAL_MS 1000
@@ -57,13 +50,6 @@ struct shaping_node {//a session will have 10 nodes, corresponding 10 avl tree
struct avl_node *avl_node[SHAPING_PRIORITY_NUM_MAX];
};
-struct shaping_async_cb_arg {
- struct shaping_thread_ctx *ctx;
- struct shaping_profile_hash_node *pf_hash_node;
- int priority;
- unsigned char direction;
-};
-
struct shaping_profile_container {
struct shaping_profile_info *pf_info;
int pf_type;
@@ -407,8 +393,15 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg
{
struct shaping_async_cb_arg *arg = (struct shaping_async_cb_arg*)cb_arg;
struct shaping_profile_hash_node *pf_hash_node = arg->pf_hash_node;
+ struct timespec curr_time;
+ long long curr_time_us;
+
+ clock_gettime(CLOCK_MONOTONIC, &curr_time);
+ curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
+ shaper_global_stat_swarmkv_latency_update(arg->ctx->global_stat, curr_time_us - arg->start_time_us);
shaper_global_stat_async_callback_inc(arg->ctx->global_stat);
+ shaper_global_stat_tconsume_callback_inc(arg->ctx->global_stat);
LOG_INFO("Swarmkv reply type =%d, direction =%d, integer =%llu",reply->type, arg->direction, reply->integer);
@@ -467,7 +460,7 @@ static int shaper_deposit_token_is_enough(struct shaping_profile_hash_node *pf_h
}
}
-static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *pf_info, int profile_type, int req_token_bits, unsigned char direction)
+static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *pf_info, int profile_type, int req_token_bits, unsigned char direction, struct timespec *curr_timespec)
{
struct shaping_async_cb_arg *arg = NULL;
char key[32] = {0};
@@ -478,8 +471,10 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
arg->pf_hash_node = pf_info->hash_node;
arg->priority = pf_info->priority;
arg->direction = direction;
+ arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
shaper_global_stat_async_invoke_inc(ctx->global_stat);
+ sheper_global_stat_tconsume_invoke_inc(ctx->global_stat);
switch (pf_info->type) {
case PROFILE_TYPE_GENERIC:
swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg);
@@ -519,8 +514,15 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb
struct shaping_async_cb_arg *arg = (struct shaping_async_cb_arg *)cb_arg;
struct shaping_profile_hash_node *pf_hash_node = arg->pf_hash_node;
int priority = arg->priority;
+ struct timespec curr_time;
+ long long curr_time_us;
+
+ clock_gettime(CLOCK_MONOTONIC, &curr_time);
+ curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
+ shaper_global_stat_swarmkv_latency_update(arg->ctx->global_stat, curr_time_us - arg->start_time_us);
shaper_global_stat_async_callback_inc(arg->ctx->global_stat);
+ shaper_global_stat_hmget_callback_inc(arg->ctx->global_stat);
pf_hash_node->is_priority_blocked[priority] = 0;
@@ -545,15 +547,13 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb
}
END:
- struct timespec curr_time;
- clock_gettime(CLOCK_MONOTONIC, &curr_time);
pf_hash_node->last_hmget_ms[priority] = curr_time.tv_sec * MILLI_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MILLI_SEC;
free(cb_arg);
cb_arg = NULL;
}
-static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, long long curr_time_ms)
+static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile, struct timespec *curr_timespec, long long curr_time_ms)
{
struct shaping_async_cb_arg *arg;
int priority = profile->priority;
@@ -570,8 +570,10 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st
arg->ctx = ctx;
arg->pf_hash_node = profile->hash_node;
arg->priority = priority;
+ arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
shaper_global_stat_async_invoke_inc(ctx->global_stat);
+ shaper_global_stat_hmget_invoke_inc(ctx->global_stat);
swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, swarmkv_queue_len_get_cmd[priority], profile->id);
END:
@@ -638,11 +640,11 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f
return SHAPER_TOKEN_GET_FAILED;
}
- if (shaper_profile_is_priority_blocked(ctx, sf, profile, curr_time_ms)) {
+ if (shaper_profile_is_priority_blocked(ctx, sf, profile, &curr_timespec, curr_time_ms)) {
return SHAPER_TOKEN_GET_FAILED;
} else {
int req_token_bits = req_token_bytes * 8;
- return shaper_token_get_from_profile(ctx, sf, profile, profile_type, req_token_bits, direction);
+ return shaper_token_get_from_profile(ctx, sf, profile, profile_type, req_token_bits, direction, &curr_timespec);
}
}
diff --git a/shaping/src/shaper_global_stat.cpp b/shaping/src/shaper_global_stat.cpp
index a73f53f..748becb 100644
--- a/shaping/src/shaper_global_stat.cpp
+++ b/shaping/src/shaper_global_stat.cpp
@@ -22,20 +22,31 @@ static int shaper_global_stat_conf_load(struct shaping_global_stat *stat, struct
static void shaper_global_stat_fieldstat_reg(struct shaping_global_stat *stat)
{
+ const char * quantiles = "0.1,0.5,0.8,0.9,0.95,0.99";
+ stat->swarmkv_latency_summary_id = fieldstat_register_summary(stat->instance, "async_delay(us)", NULL, 0, quantiles, 1, 500000, 3, 1);
+
stat->column_ids[CURR_SESSION_NUM_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_GAUGE, "curr_session_num", NULL, 0);
stat->column_ids[QUEUEING_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_GAUGE, "curr_queueing_pkts", NULL, 0);
stat->column_ids[QUEUEING_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_GAUGE, "curr_queueing_bytes", NULL, 0);
stat->column_ids[CTRL_ERR_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_error", NULL, 0);
- stat->column_ids[CTRL_OPENING_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_opening", NULL, 0);
+ stat->column_ids[CTRL_OPENING_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_open", NULL, 0);
stat->column_ids[CTRL_ACTIVE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_active", NULL, 0);
stat->column_ids[CTRL_CLOSE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_close", NULL, 0);
- stat->column_ids[CTRL_ACTIVE_CLOSE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_active_close", NULL, 0);
- stat->column_ids[CTRL_RESETALL_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_resetall", NULL, 0);
- stat->column_ids[SESSION_LOG_SEND_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "session_log_send", NULL, 0);
+ stat->column_ids[CTRL_ACTIVE_CLOSE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_sf_close", NULL, 0);
+ stat->column_ids[CTRL_RESETALL_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_reset", NULL, 0);
+ stat->column_ids[SESSION_LOG_SEND_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "sess_log_send", NULL, 0);
+
+ stat->column_ids[ASYNC_INVOKE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "async", NULL, 0);
+ stat->column_ids[ASYNC_CALLBACK_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "async_cb", NULL, 0);
+
+ stat->column_ids[ASYNC_TCONSUME_INVOKE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "tconsume", NULL, 0);
+ stat->column_ids[ASYNC_TCONSUME_CALLBACK_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "tconsume_cb", NULL, 0);
+ stat->column_ids[ASYNC_HINCRBY_INVOKE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hincrby", NULL, 0);
+ stat->column_ids[ASYNC_HINCRBY_CALLBACK_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hincrby_cb", NULL, 0);
+ stat->column_ids[ASYNC_HMGET_INVOKE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hmget", NULL, 0);
+ stat->column_ids[ASYNC_HMGET_CALLBACK_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hmget_cb", NULL, 0);
- stat->column_ids[ASYNC_INVOKE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "async_invoke", NULL, 0);
- stat->column_ids[ASYNC_CALLBACK_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "async_callback", NULL, 0);
stat->column_ids[ASYNC_TCONSUME_FAILED] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "tconsume_failed", NULL, 0);
stat->column_ids[ASYNC_HINCRBY_FAILED] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hincrby_failed", NULL, 0);
stat->column_ids[ASYNC_HMGET_FAILED] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hmget_failed", NULL, 0);
@@ -122,6 +133,13 @@ void shaper_global_stat_destroy(struct shaping_global_stat *stat)
return;
}
+void shaper_global_stat_swarmkv_latency_update(struct shaping_global_stat *stat, long long latency_us)
+{
+ fieldstat_value_set(stat->instance, stat->swarmkv_latency_summary_id, latency_us);
+
+ return;
+}
+
void shaper_global_stat_curr_session_inc(struct shaping_global_stat *stat)
{
struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
@@ -246,6 +264,60 @@ void shaper_global_stat_async_callback_inc(struct shaping_global_stat *stat)
return;
}
+void sheper_global_stat_tconsume_invoke_inc(struct shaping_global_stat *stat)
+{
+ struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
+
+ __atomic_add_fetch(&local_stat_data->async_tconsume_invoke, 1, __ATOMIC_RELAXED);
+
+ return;
+}
+
+void shaper_global_stat_tconsume_callback_inc(struct shaping_global_stat *stat)
+{
+ struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
+
+ __atomic_add_fetch(&local_stat_data->async_tconsume_callback, 1, __ATOMIC_RELAXED);
+
+ return;
+}
+
+void shaper_global_stat_hincrby_invoke_inc(struct shaping_global_stat *stat)
+{
+ struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
+
+ __atomic_add_fetch(&local_stat_data->async_hincrby_invoke, 1, __ATOMIC_RELAXED);
+
+ return;
+}
+
+void shaper_global_stat_hincrby_callback_inc(struct shaping_global_stat *stat)
+{
+ struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
+
+ __atomic_add_fetch(&local_stat_data->async_hincrby_callback, 1, __ATOMIC_RELAXED);
+
+ return;
+}
+
+void shaper_global_stat_hmget_invoke_inc(struct shaping_global_stat *stat)
+{
+ struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
+
+ __atomic_add_fetch(&local_stat_data->async_hmget_invoke, 1, __ATOMIC_RELAXED);
+
+ return;
+}
+
+void shaper_global_stat_hmget_callback_inc(struct shaping_global_stat *stat)
+{
+ struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
+
+ __atomic_add_fetch(&local_stat_data->async_hmget_callback, 1, __ATOMIC_RELAXED);
+
+ return;
+}
+
void shaper_global_stat_async_tconsume_failed_inc(struct shaping_global_stat *stat)
{
struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
@@ -342,6 +414,14 @@ void shaper_global_stat_refresh(struct shaping_global_stat *stat)
fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_INVOKE_IDX], local_stat_data->async_invoke);
fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_CALLBACK_IDX], local_stat_data->async_callback);
+
+ fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_TCONSUME_INVOKE_IDX], local_stat_data->async_tconsume_invoke);
+ fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_TCONSUME_CALLBACK_IDX], local_stat_data->async_tconsume_callback);
+ fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_HINCRBY_INVOKE_IDX], local_stat_data->async_hincrby_invoke);
+ fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_HINCRBY_CALLBACK_IDX], local_stat_data->async_hincrby_callback);
+ fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_HMGET_INVOKE_IDX], local_stat_data->async_hmget_invoke);
+ fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_HMGET_CALLBACK_IDX], local_stat_data->async_hmget_callback);
+
fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_TCONSUME_FAILED], local_stat_data->async_tconsume_failed);
fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_HINCRBY_FAILED], local_stat_data->async_hincrby_failed);
fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_HMGET_FAILED], local_stat_data->async_hmget_failed);
diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp
index c06f19d..40a52a2 100644
--- a/shaping/src/shaper_stat.cpp
+++ b/shaping/src/shaper_stat.cpp
@@ -1,4 +1,3 @@
-#include <cstring>
#include <stdio.h>
#include <time.h>
#include <sys/socket.h>
@@ -135,14 +134,23 @@ static void shaper_stat_tags_build(int vsys_id, int rule_id, int profile_id, int
static void shaper_stat_swarmkv_hincrby_cb(const struct swarmkv_reply *reply, void * cb_arg)
{
- struct shaping_global_stat *global_stat = (struct shaping_global_stat *)cb_arg;
+ struct shaping_async_cb_arg *arg = (struct shaping_async_cb_arg *)cb_arg;
+ struct timespec curr_time;
+ long long curr_time_us;
+
+ clock_gettime(CLOCK_MONOTONIC, &curr_time);
+ curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
+ shaper_global_stat_swarmkv_latency_update(arg->ctx->global_stat, curr_time_us - arg->start_time_us);
- shaper_global_stat_async_callback_inc(global_stat);
+ shaper_global_stat_async_callback_inc(arg->ctx->global_stat);
+ shaper_global_stat_hincrby_callback_inc(arg->ctx->global_stat);
if (reply->type != SWARMKV_REPLY_INTEGER) {
- shaper_global_stat_async_hincrby_failed_inc(global_stat);
+ shaper_global_stat_async_hincrby_failed_inc(arg->ctx->global_stat);
}
+ free(cb_arg);
+
return;
}
@@ -175,8 +183,16 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, i
fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_QUEUE_LEN_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.queue_len, tags, TAG_IDX_MAX, thread_id);
fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_QUEUE_LEN_IDX], SHAPER_STAT_ROW_NAME, profile_stat->out.queue_len, tags, TAG_IDX_MAX, thread_id);
+ struct shaping_async_cb_arg *arg = (struct shaping_async_cb_arg *)calloc(1, sizeof(struct shaping_async_cb_arg));
+ struct timespec curr_time;
+
+ clock_gettime(CLOCK_MONOTONIC, &curr_time);
+ arg->ctx = ctx;
+ arg->start_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
shaper_global_stat_async_invoke_inc(ctx->global_stat);
- swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d %lld", profile->id, profile->priority, profile_stat->in.queue_len + profile_stat->out.queue_len);
+ shaper_global_stat_hincrby_invoke_inc(ctx->global_stat);
+ swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, arg, "HINCRBY tsg-shaping-%d priority-%d %lld", profile->id, profile->priority, profile_stat->in.queue_len + profile_stat->out.queue_len);
+
memset(profile_stat, 0, sizeof(struct shaping_stat_for_profile));
} else {
profile_stat->in.pkts = 0;