diff options
| author | liuchang <[email protected]> | 2023-05-29 10:16:32 +0000 |
|---|---|---|
| committer | liuchang <[email protected]> | 2023-05-29 10:16:32 +0000 |
| commit | 66ea2254660e40f055668cfe1f8df3dc24e60475 (patch) | |
| tree | e64a953738cc44836c46166c280c4d08971d9a28 /shaping/src/shaper.cpp | |
| parent | 57efeb63d5769c9f1b92b1266780968ad1c30d78 (diff) | |
add async statistics for global metric
Diffstat (limited to 'shaping/src/shaper.cpp')
| -rw-r--r-- | shaping/src/shaper.cpp | 46 |
1 files changed, 38 insertions, 8 deletions
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 8f5f43d..5ca5834 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -226,6 +226,7 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx) shaper_stat_drop_inc(&rule->primary.stat, pkt_wrapper->length, ctx->thread_index); shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length); shaper_global_stat_drop_inc(ctx->global_stat, pkt_wrapper->length); + shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, pkt_wrapper->length); marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index); shaper_packet_dequeue(sf); @@ -234,8 +235,16 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx) return; } -static void swarmkv_reply_cb_do_nothing(const struct swarmkv_reply *reply, void * arg) +static void swarmkv_reply_cb_do_nothing(const struct swarmkv_reply *reply, void * cb_arg) { + struct shaping_global_stat *global_stat = (struct shaping_global_stat *)cb_arg; + + shaper_global_stat_async_callback_inc(global_stat); + + if (reply->type != SWARMKV_REPLY_INTEGER) { + shaper_global_stat_async_hincrby_failed_inc(global_stat); + } + return; } @@ -264,7 +273,8 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) { ret = 0; if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) { - swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->primary.id, priority); + shaper_global_stat_async_invoke_inc(ctx->global_stat); + swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->primary.id, priority); } shaper_stat_queueing_pkt_inc(&s_rule_info->primary.stat, pkt_wrapper->direction, ctx->thread_index); s_rule_info->primary.enqueue_time_us = enqueue_time; @@ -280,7 +290,8 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) { ret = 0; if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) { - swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->borrowing[i].id, priority); + shaper_global_stat_async_invoke_inc(ctx->global_stat); + swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->borrowing[i].id, priority); } s_rule_info->borrowing[i].enqueue_time_us = enqueue_time; } @@ -318,7 +329,8 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) if (avl_node_in_tree(s_node->avl_node[priority])) { avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) { - swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->primary.id, priority); + shaper_global_stat_async_invoke_inc(ctx->global_stat); + swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->primary.id, priority); } shaper_stat_queueing_pkt_dec(&s_rule_info->primary.stat, pkt_wrapper->direction, ctx->thread_index); @@ -336,7 +348,8 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) if (avl_node_in_tree(s_node->avl_node[priority])) { avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) { - swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->borrowing[i].id, priority); + shaper_global_stat_async_invoke_inc(ctx->global_stat); + swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->borrowing[i].id, priority); } latency = shaper_pkt_latency_calculate(&s_rule_info->borrowing[i], &curr_time); @@ -387,7 +400,12 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg struct shaping_profile_info *s_pf_info = arg->s_pf_info; struct shaping_flow *sf = arg->sf; - assert(reply->type == SWARMKV_REPLY_INTEGER); + shaper_global_stat_async_callback_inc(arg->ctx->global_stat); + + if (reply->type != SWARMKV_REPLY_INTEGER) { + shaper_global_stat_async_tconsume_failed_inc(arg->ctx->global_stat); + goto END; + } if (reply->integer < 0) {//profile not exist s_pf_info->is_invalid = 1; @@ -447,6 +465,7 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct arg->sf = sf; arg->direction = direction; + shaper_global_stat_async_invoke_inc(ctx->global_stat); swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits, shaper_token_get_cb, arg); if (__atomic_load_n(&pf_info->async_token_ref_count, __ATOMIC_SEQ_CST) != 0) {//has async operation not completed shaper_deposit_token_sub(pf_info, req_token_bits, direction); @@ -475,9 +494,12 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb struct shaping_profile_info *s_pf_info = arg->s_pf_info; struct shaping_flow *sf = arg->sf; + shaper_global_stat_async_callback_inc(arg->ctx->global_stat); + s_pf_info->is_priority_blocked = 0; if (!reply || reply->type != SWARMKV_REPLY_ARRAY) { + shaper_global_stat_async_hmget_failed_inc(arg->ctx->global_stat); goto END; } @@ -516,6 +538,7 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st __atomic_add_fetch(&profile->async_queue_len_ref_count, 1, __ATOMIC_SEQ_CST); __atomic_add_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST); + shaper_global_stat_async_invoke_inc(ctx->global_stat); swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, swarmkv_queue_len_get_cmd[priority], profile->id); if (__atomic_load_n(&profile->async_queue_len_ref_count, __ATOMIC_SEQ_CST) != 0) { @@ -726,6 +749,7 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ case SHAPING_DROP: shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length); shaper_global_stat_drop_inc(ctx->global_stat, pkt_wrapper->length); + shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, pkt_wrapper->length); marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index); shaper_packet_dequeue(sf); @@ -733,6 +757,7 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ case SHAPING_FORWARD: shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length); shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, pkt_wrapper->length); + shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, pkt_wrapper->length); marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &pkt_wrapper->pkt_buff, 1); shaper_packet_dequeue(sf); @@ -783,6 +808,7 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu if (meta->is_tcp_pure_ctrl) { marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1); shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len); + shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len); shaper_stat_forward_all_rule_inc(stat, sf, meta->dir, meta->raw_len, ctx->thread_index); goto END;//for tcp pure control pkt, transmit it directly } @@ -796,6 +822,7 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu } else { shaper_stat_drop_inc(&s_rule->primary.stat, meta->dir, ctx->thread_index); shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len); + shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, meta->raw_len); marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index); } @@ -803,6 +830,7 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu if (0 != shaper_packet_enqueue(ctx, sf, rx_buff, &curr_time, meta)) { marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index); shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len); + shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, meta->raw_len); LOG_ERROR("%s: shaping enqueue packet failed while queue empty for session: %s", LOG_TAG_SHAPING, addr_tuple4_to_str(&sf->tuple4)); goto END; } @@ -818,11 +846,13 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index); shaper_packet_dequeue(sf); shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len); + shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, meta->raw_len); break; case SHAPING_FORWARD: marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1); shaper_packet_dequeue(sf); shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len); + shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len); break; default: assert(0); @@ -905,6 +935,7 @@ static struct shaping_flow* shaper_ctrl_pkt_session_handle(struct shaping_thread sf = shaper_session_reset_all(ctx, meta); break; default: + shaper_global_stat_ctrlpkt_err_inc(ctx->global_stat); assert(0); } @@ -926,7 +957,7 @@ static struct shaping_flow *shaper_raw_pkt_session_handle(struct shaping_thread_ session_node = session_table_search_by_id(ctx->session_table, meta->session_id); if (session_node) { sf = (struct shaping_flow *)session_node->val_data; - shaper_global_stat_hit_policy_inc(ctx->global_stat, meta->raw_len); + shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_RX, meta->raw_len); } return sf; @@ -948,7 +979,6 @@ void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx) for (i = 0; i < rx_num; i++) { if (marsio_buff_is_ctrlbuf(rx_buff[i])) { - shaper_global_stat_ctrlpkt_inc(ctx->global_stat); sf = shaper_ctrl_pkt_session_handle(ctx, rx_buff[i], &meta); } else { sf = shaper_raw_pkt_session_handle(ctx, rx_buff[i], &meta); |
