summaryrefslogtreecommitdiff
path: root/shaping/src/shaper.cpp
diff options
context:
space:
mode:
authorliuchang <[email protected]>2023-05-29 10:16:32 +0000
committerliuchang <[email protected]>2023-05-29 10:16:32 +0000
commit66ea2254660e40f055668cfe1f8df3dc24e60475 (patch)
treee64a953738cc44836c46166c280c4d08971d9a28 /shaping/src/shaper.cpp
parent57efeb63d5769c9f1b92b1266780968ad1c30d78 (diff)
add async statistics for global metric
Diffstat (limited to 'shaping/src/shaper.cpp')
-rw-r--r--shaping/src/shaper.cpp46
1 files changed, 38 insertions, 8 deletions
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index 8f5f43d..5ca5834 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -226,6 +226,7 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx)
shaper_stat_drop_inc(&rule->primary.stat, pkt_wrapper->length, ctx->thread_index);
shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length);
shaper_global_stat_drop_inc(ctx->global_stat, pkt_wrapper->length);
+ shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, pkt_wrapper->length);
marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index);
shaper_packet_dequeue(sf);
@@ -234,8 +235,16 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx)
return;
}
-static void swarmkv_reply_cb_do_nothing(const struct swarmkv_reply *reply, void * arg)
+static void swarmkv_reply_cb_do_nothing(const struct swarmkv_reply *reply, void * cb_arg)
{
+ struct shaping_global_stat *global_stat = (struct shaping_global_stat *)cb_arg;
+
+ shaper_global_stat_async_callback_inc(global_stat);
+
+ if (reply->type != SWARMKV_REPLY_INTEGER) {
+ shaper_global_stat_async_hincrby_failed_inc(global_stat);
+ }
+
return;
}
@@ -264,7 +273,8 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un
if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {
ret = 0;
if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) {
- swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->primary.id, priority);
+ shaper_global_stat_async_invoke_inc(ctx->global_stat);
+ swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->primary.id, priority);
}
shaper_stat_queueing_pkt_inc(&s_rule_info->primary.stat, pkt_wrapper->direction, ctx->thread_index);
s_rule_info->primary.enqueue_time_us = enqueue_time;
@@ -280,7 +290,8 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un
if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {
ret = 0;
if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) {
- swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->borrowing[i].id, priority);
+ shaper_global_stat_async_invoke_inc(ctx->global_stat);
+ swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->borrowing[i].id, priority);
}
s_rule_info->borrowing[i].enqueue_time_us = enqueue_time;
}
@@ -318,7 +329,8 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
if (avl_node_in_tree(s_node->avl_node[priority])) {
avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]);
if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) {
- swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->primary.id, priority);
+ shaper_global_stat_async_invoke_inc(ctx->global_stat);
+ swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->primary.id, priority);
}
shaper_stat_queueing_pkt_dec(&s_rule_info->primary.stat, pkt_wrapper->direction, ctx->thread_index);
@@ -336,7 +348,8 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
if (avl_node_in_tree(s_node->avl_node[priority])) {
avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]);
if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) {
- swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->borrowing[i].id, priority);
+ shaper_global_stat_async_invoke_inc(ctx->global_stat);
+ swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->borrowing[i].id, priority);
}
latency = shaper_pkt_latency_calculate(&s_rule_info->borrowing[i], &curr_time);
@@ -387,7 +400,12 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg
struct shaping_profile_info *s_pf_info = arg->s_pf_info;
struct shaping_flow *sf = arg->sf;
- assert(reply->type == SWARMKV_REPLY_INTEGER);
+ shaper_global_stat_async_callback_inc(arg->ctx->global_stat);
+
+ if (reply->type != SWARMKV_REPLY_INTEGER) {
+ shaper_global_stat_async_tconsume_failed_inc(arg->ctx->global_stat);
+ goto END;
+ }
if (reply->integer < 0) {//profile not exist
s_pf_info->is_invalid = 1;
@@ -447,6 +465,7 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
arg->sf = sf;
arg->direction = direction;
+ shaper_global_stat_async_invoke_inc(ctx->global_stat);
swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits, shaper_token_get_cb, arg);
if (__atomic_load_n(&pf_info->async_token_ref_count, __ATOMIC_SEQ_CST) != 0) {//has async operation not completed
shaper_deposit_token_sub(pf_info, req_token_bits, direction);
@@ -475,9 +494,12 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb
struct shaping_profile_info *s_pf_info = arg->s_pf_info;
struct shaping_flow *sf = arg->sf;
+ shaper_global_stat_async_callback_inc(arg->ctx->global_stat);
+
s_pf_info->is_priority_blocked = 0;
if (!reply || reply->type != SWARMKV_REPLY_ARRAY) {
+ shaper_global_stat_async_hmget_failed_inc(arg->ctx->global_stat);
goto END;
}
@@ -516,6 +538,7 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st
__atomic_add_fetch(&profile->async_queue_len_ref_count, 1, __ATOMIC_SEQ_CST);
__atomic_add_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST);
+ shaper_global_stat_async_invoke_inc(ctx->global_stat);
swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, swarmkv_queue_len_get_cmd[priority], profile->id);
if (__atomic_load_n(&profile->async_queue_len_ref_count, __ATOMIC_SEQ_CST) != 0) {
@@ -726,6 +749,7 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
case SHAPING_DROP:
shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length);
shaper_global_stat_drop_inc(ctx->global_stat, pkt_wrapper->length);
+ shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, pkt_wrapper->length);
marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index);
shaper_packet_dequeue(sf);
@@ -733,6 +757,7 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
case SHAPING_FORWARD:
shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length);
shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, pkt_wrapper->length);
+ shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, pkt_wrapper->length);
marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &pkt_wrapper->pkt_buff, 1);
shaper_packet_dequeue(sf);
@@ -783,6 +808,7 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
if (meta->is_tcp_pure_ctrl) {
marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1);
shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len);
+ shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len);
shaper_stat_forward_all_rule_inc(stat, sf, meta->dir, meta->raw_len, ctx->thread_index);
goto END;//for tcp pure control pkt, transmit it directly
}
@@ -796,6 +822,7 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
} else {
shaper_stat_drop_inc(&s_rule->primary.stat, meta->dir, ctx->thread_index);
shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len);
+ shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, meta->raw_len);
marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index);
}
@@ -803,6 +830,7 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
if (0 != shaper_packet_enqueue(ctx, sf, rx_buff, &curr_time, meta)) {
marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index);
shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len);
+ shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, meta->raw_len);
LOG_ERROR("%s: shaping enqueue packet failed while queue empty for session: %s", LOG_TAG_SHAPING, addr_tuple4_to_str(&sf->tuple4));
goto END;
}
@@ -818,11 +846,13 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index);
shaper_packet_dequeue(sf);
shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len);
+ shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, meta->raw_len);
break;
case SHAPING_FORWARD:
marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1);
shaper_packet_dequeue(sf);
shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len);
+ shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len);
break;
default:
assert(0);
@@ -905,6 +935,7 @@ static struct shaping_flow* shaper_ctrl_pkt_session_handle(struct shaping_thread
sf = shaper_session_reset_all(ctx, meta);
break;
default:
+ shaper_global_stat_ctrlpkt_err_inc(ctx->global_stat);
assert(0);
}
@@ -926,7 +957,7 @@ static struct shaping_flow *shaper_raw_pkt_session_handle(struct shaping_thread_
session_node = session_table_search_by_id(ctx->session_table, meta->session_id);
if (session_node) {
sf = (struct shaping_flow *)session_node->val_data;
- shaper_global_stat_hit_policy_inc(ctx->global_stat, meta->raw_len);
+ shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_RX, meta->raw_len);
}
return sf;
@@ -948,7 +979,6 @@ void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx)
for (i = 0; i < rx_num; i++) {
if (marsio_buff_is_ctrlbuf(rx_buff[i])) {
- shaper_global_stat_ctrlpkt_inc(ctx->global_stat);
sf = shaper_ctrl_pkt_session_handle(ctx, rx_buff[i], &meta);
} else {
sf = shaper_raw_pkt_session_handle(ctx, rx_buff[i], &meta);