diff options
| author | liuchang <[email protected]> | 2023-05-29 10:16:32 +0000 |
|---|---|---|
| committer | liuchang <[email protected]> | 2023-05-29 10:16:32 +0000 |
| commit | 66ea2254660e40f055668cfe1f8df3dc24e60475 (patch) | |
| tree | e64a953738cc44836c46166c280c4d08971d9a28 | |
| parent | 57efeb63d5769c9f1b92b1266780968ad1c30d78 (diff) | |
add async statistics for global metric
| -rw-r--r-- | conf/shaping.conf | 1 | ||||
| -rw-r--r-- | shaping/include/shaper_global_stat.h | 97 | ||||
| -rw-r--r-- | shaping/src/main.cpp | 10 | ||||
| -rw-r--r-- | shaping/src/shaper.cpp | 46 | ||||
| -rw-r--r-- | shaping/src/shaper_global_stat.cpp | 284 |
5 files changed, 327 insertions, 111 deletions
diff --git a/conf/shaping.conf b/conf/shaping.conf index 64d88a0..cc0ab27 100644 --- a/conf/shaping.conf +++ b/conf/shaping.conf @@ -31,6 +31,7 @@ SWARMKV_HEALTH_CHECK_ANNOUNCE_PORT=1111 [METRIC] FIELDSTAT_OUTPUT_INTERVAL_MS=500 +GLOBAL_STAT_OUTPUT_INTERVAL_S=1 LINE_PROTOCOL_SERVER_IP="127.0.0.1" LINE_PROTOCOL_SERVER_PORT=6667 diff --git a/shaping/include/shaper_global_stat.h b/shaping/include/shaper_global_stat.h index 60e7e8c..4ab6a2b 100644 --- a/shaping/include/shaper_global_stat.h +++ b/shaping/include/shaper_global_stat.h @@ -1,50 +1,105 @@ #include <fieldstat.h> enum shaping_global_stat_dir { - SHAPING_GLOBAL_STAT_RX, + SHAPING_GLOBAL_STAT_RX = 0, SHAPING_GLOBAL_STAT_TX }; enum shaping_global_stat_column_index { - RX_PKTS_IDX = 0, + CURR_SESSION_NUM_IDX = 0, + QUEUEING_PKTS_IDX, + QUEUEING_BYTES_IDX, + + CTRL_ERR_IDX, + CTRL_OPENING_IDX, + CTRL_ACTIVE_IDX, + CTRL_CLOSE_IDX, + CTRL_RESETALL_IDX, + SESSION_LOG_SEND_IDX, + + ASYNC_INVOKE_IDX, + ASYNC_CALLBACK_IDX, + ASYNC_TCONSUME_FAILED, + ASYNC_HINCRBY_FAILED, + ASYNC_HMGET_FAILED, + + RX_PKTS_IDX, RX_BYTES_IDX, TX_PKTS_IDX, TX_BYTES_IDX, - QUEUEING_PKTS_IDX, - QUEUEING_BYTES_IDX, DROP_PKTS_IDX, DROP_BYTES_IDX, - HIT_POLICY_PKTS, - HIT_POLICY_BYTES, - CTRL_PKTS_IDX, - CTRL_OPENING_PKTS_IDX, - CTRL_ACTIVE_PKTS_IDX, - CTRL_CLOSE_PKTS_IDX, - CTRL_RESETALL_PKTS_IDX, - CURR_SESSION_NUM_IDX, - SESSION_LOG_SEND_NUM, + + HIT_POLICY_RX_PKTS_IDX, + HIT_POLICY_RX_BYTES_IDX, + HIT_POLICY_TX_PKTS_IDX, + HIT_POLICY_TX_BYTES_IDX, + HIT_POLICY_DROP_PKTS_IDX, + HIT_POLICY_DROP_BYTES_IDX, + GLOBAL_STAT_COLUNM_IDX_MAX }; +struct shaping_global_stat_traffic_data { + long long rx_pkts; + long long rx_bytes; + long long tx_pkts; + long long tx_bytes; + long long drop_pkts; + long long drop_bytes; +}; + +struct shaping_global_stat_data { + long long curr_session_num; + long long queueing_pkts; + long long queueing_bytes; + long long ctrl_error; + long long ctrl_opening; + long long ctrl_active; + long long ctrl_close; + long long ctrl_resetall; + long long session_log_send; + long long async_invoke; + long long async_callback; + long long async_tconsume_failed; + long long async_hincrby_failed; + long long async_hmget_failed; + struct shaping_global_stat_traffic_data all_traffic; + struct shaping_global_stat_traffic_data hit_policy_traffic; +}; + struct shaping_global_stat { struct fieldstat_instance *instance; - int table_id; int column_ids[GLOBAL_STAT_COLUNM_IDX_MAX]; + struct shaping_global_stat_data local_stat_data; + int output_interval_s; }; struct shaping_global_stat* shaper_global_stat_init(); void shaper_global_stat_destroy(struct shaping_global_stat *stat); -void shaper_global_stat_throughput_inc(struct shaping_global_stat *stat, enum shaping_global_stat_dir dir, int pkt_len); -void shaper_global_stat_drop_inc(struct shaping_global_stat *stat, int pkt_len); +void shaper_global_stat_curr_session_inc(struct shaping_global_stat *stat); +void shaper_global_stat_curr_session_dec(struct shaping_global_stat *stat); void shaper_global_stat_queueing_inc(struct shaping_global_stat *stat, int pkt_len); void shaper_global_stat_queueing_dec(struct shaping_global_stat *stat, int pkt_len); -void shaper_global_stat_hit_policy_inc(struct shaping_global_stat *stat, int pkt_len); -void shaper_global_stat_ctrlpkt_inc(struct shaping_global_stat *stat); + +void shaper_global_stat_ctrlpkt_err_inc(struct shaping_global_stat *stat); void shaper_global_stat_ctrlpkt_opening_inc(struct shaping_global_stat *stat); void shaper_global_stat_ctrlpkt_active_inc(struct shaping_global_stat *stat); void shaper_global_stat_ctrlpkt_close_inc(struct shaping_global_stat *stat); void shaper_global_stat_ctrlpkt_resetall_inc(struct shaping_global_stat *stat); -void shaper_global_stat_curr_session_inc(struct shaping_global_stat *stat); -void shaper_global_stat_curr_session_dec(struct shaping_global_stat *stat); -void shaper_global_stat_session_log_send_num_inc(struct shaping_global_stat *stat);
\ No newline at end of file +void shaper_global_stat_session_log_send_num_inc(struct shaping_global_stat *stat); + +void shaper_global_stat_async_invoke_inc(struct shaping_global_stat *stat); +void shaper_global_stat_async_callback_inc(struct shaping_global_stat *stat); +void shaper_global_stat_async_tconsume_failed_inc(struct shaping_global_stat *stat); +void shaper_global_stat_async_hincrby_failed_inc(struct shaping_global_stat *stat); +void shaper_global_stat_async_hmget_failed_inc(struct shaping_global_stat *stat); + +void shaper_global_stat_throughput_inc(struct shaping_global_stat *stat, enum shaping_global_stat_dir dir, int pkt_len); +void shaper_global_stat_drop_inc(struct shaping_global_stat *stat, int pkt_len); + +void shaper_global_stat_hit_policy_throughput_inc(struct shaping_global_stat *stat, enum shaping_global_stat_dir dir, int pkt_len); +void shaper_global_stat_hit_policy_drop_inc(struct shaping_global_stat *stat, int pkt_len); + +void shaper_global_stat_refresh(struct shaping_global_stat *stat);
\ No newline at end of file diff --git a/shaping/src/main.cpp b/shaping/src/main.cpp index 2291db5..f07787f 100644 --- a/shaping/src/main.cpp +++ b/shaping/src/main.cpp @@ -11,6 +11,7 @@ #include "shaper_stat.h" #include "shaper_marsio.h" #include "shaper_session.h" +#include "shaper_global_stat.h" static void *shaper_thread_loop(void *data) { @@ -46,6 +47,7 @@ static void sig_handler(int signo) int main(int argc, char **argv) { struct shaping_ctx *ctx = NULL; + time_t last_update_time = time(NULL); if (LOG_INIT("./conf/zlog.conf") == -1) { @@ -68,9 +70,13 @@ int main(int argc, char **argv) pthread_create(&ctx->thread_ctx[i].tid, NULL, shaper_thread_loop, &ctx->thread_ctx[i]); } - //TODO:主线程保留? while(1) { - sleep(1); + time_t curr_time = time(NULL); + if (curr_time - last_update_time >= ctx->global_stat->output_interval_s) { + shaper_global_stat_refresh(ctx->global_stat); + last_update_time = curr_time; + } + sleep(ctx->global_stat->output_interval_s); } return 0; diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 8f5f43d..5ca5834 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -226,6 +226,7 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx) shaper_stat_drop_inc(&rule->primary.stat, pkt_wrapper->length, ctx->thread_index); shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length); shaper_global_stat_drop_inc(ctx->global_stat, pkt_wrapper->length); + shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, pkt_wrapper->length); marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index); shaper_packet_dequeue(sf); @@ -234,8 +235,16 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx) return; } -static void swarmkv_reply_cb_do_nothing(const struct swarmkv_reply *reply, void * arg) +static void swarmkv_reply_cb_do_nothing(const struct swarmkv_reply *reply, void * cb_arg) { + struct shaping_global_stat *global_stat = (struct shaping_global_stat *)cb_arg; + + shaper_global_stat_async_callback_inc(global_stat); + + if (reply->type != SWARMKV_REPLY_INTEGER) { + shaper_global_stat_async_hincrby_failed_inc(global_stat); + } + return; } @@ -264,7 +273,8 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) { ret = 0; if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) { - swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->primary.id, priority); + shaper_global_stat_async_invoke_inc(ctx->global_stat); + swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->primary.id, priority); } shaper_stat_queueing_pkt_inc(&s_rule_info->primary.stat, pkt_wrapper->direction, ctx->thread_index); s_rule_info->primary.enqueue_time_us = enqueue_time; @@ -280,7 +290,8 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) { ret = 0; if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) { - swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->borrowing[i].id, priority); + shaper_global_stat_async_invoke_inc(ctx->global_stat); + swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->borrowing[i].id, priority); } s_rule_info->borrowing[i].enqueue_time_us = enqueue_time; } @@ -318,7 +329,8 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) if (avl_node_in_tree(s_node->avl_node[priority])) { avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) { - swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->primary.id, priority); + shaper_global_stat_async_invoke_inc(ctx->global_stat); + swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->primary.id, priority); } shaper_stat_queueing_pkt_dec(&s_rule_info->primary.stat, pkt_wrapper->direction, ctx->thread_index); @@ -336,7 +348,8 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) if (avl_node_in_tree(s_node->avl_node[priority])) { avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) { - swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->borrowing[i].id, priority); + shaper_global_stat_async_invoke_inc(ctx->global_stat); + swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->borrowing[i].id, priority); } latency = shaper_pkt_latency_calculate(&s_rule_info->borrowing[i], &curr_time); @@ -387,7 +400,12 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg struct shaping_profile_info *s_pf_info = arg->s_pf_info; struct shaping_flow *sf = arg->sf; - assert(reply->type == SWARMKV_REPLY_INTEGER); + shaper_global_stat_async_callback_inc(arg->ctx->global_stat); + + if (reply->type != SWARMKV_REPLY_INTEGER) { + shaper_global_stat_async_tconsume_failed_inc(arg->ctx->global_stat); + goto END; + } if (reply->integer < 0) {//profile not exist s_pf_info->is_invalid = 1; @@ -447,6 +465,7 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct arg->sf = sf; arg->direction = direction; + shaper_global_stat_async_invoke_inc(ctx->global_stat); swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits, shaper_token_get_cb, arg); if (__atomic_load_n(&pf_info->async_token_ref_count, __ATOMIC_SEQ_CST) != 0) {//has async operation not completed shaper_deposit_token_sub(pf_info, req_token_bits, direction); @@ -475,9 +494,12 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb struct shaping_profile_info *s_pf_info = arg->s_pf_info; struct shaping_flow *sf = arg->sf; + shaper_global_stat_async_callback_inc(arg->ctx->global_stat); + s_pf_info->is_priority_blocked = 0; if (!reply || reply->type != SWARMKV_REPLY_ARRAY) { + shaper_global_stat_async_hmget_failed_inc(arg->ctx->global_stat); goto END; } @@ -516,6 +538,7 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st __atomic_add_fetch(&profile->async_queue_len_ref_count, 1, __ATOMIC_SEQ_CST); __atomic_add_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST); + shaper_global_stat_async_invoke_inc(ctx->global_stat); swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, swarmkv_queue_len_get_cmd[priority], profile->id); if (__atomic_load_n(&profile->async_queue_len_ref_count, __ATOMIC_SEQ_CST) != 0) { @@ -726,6 +749,7 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ case SHAPING_DROP: shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length); shaper_global_stat_drop_inc(ctx->global_stat, pkt_wrapper->length); + shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, pkt_wrapper->length); marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index); shaper_packet_dequeue(sf); @@ -733,6 +757,7 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ case SHAPING_FORWARD: shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length); shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, pkt_wrapper->length); + shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, pkt_wrapper->length); marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &pkt_wrapper->pkt_buff, 1); shaper_packet_dequeue(sf); @@ -783,6 +808,7 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu if (meta->is_tcp_pure_ctrl) { marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1); shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len); + shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len); shaper_stat_forward_all_rule_inc(stat, sf, meta->dir, meta->raw_len, ctx->thread_index); goto END;//for tcp pure control pkt, transmit it directly } @@ -796,6 +822,7 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu } else { shaper_stat_drop_inc(&s_rule->primary.stat, meta->dir, ctx->thread_index); shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len); + shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, meta->raw_len); marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index); } @@ -803,6 +830,7 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu if (0 != shaper_packet_enqueue(ctx, sf, rx_buff, &curr_time, meta)) { marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index); shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len); + shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, meta->raw_len); LOG_ERROR("%s: shaping enqueue packet failed while queue empty for session: %s", LOG_TAG_SHAPING, addr_tuple4_to_str(&sf->tuple4)); goto END; } @@ -818,11 +846,13 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index); shaper_packet_dequeue(sf); shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len); + shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, meta->raw_len); break; case SHAPING_FORWARD: marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1); shaper_packet_dequeue(sf); shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len); + shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len); break; default: assert(0); @@ -905,6 +935,7 @@ static struct shaping_flow* shaper_ctrl_pkt_session_handle(struct shaping_thread sf = shaper_session_reset_all(ctx, meta); break; default: + shaper_global_stat_ctrlpkt_err_inc(ctx->global_stat); assert(0); } @@ -926,7 +957,7 @@ static struct shaping_flow *shaper_raw_pkt_session_handle(struct shaping_thread_ session_node = session_table_search_by_id(ctx->session_table, meta->session_id); if (session_node) { sf = (struct shaping_flow *)session_node->val_data; - shaper_global_stat_hit_policy_inc(ctx->global_stat, meta->raw_len); + shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_RX, meta->raw_len); } return sf; @@ -948,7 +979,6 @@ void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx) for (i = 0; i < rx_num; i++) { if (marsio_buff_is_ctrlbuf(rx_buff[i])) { - shaper_global_stat_ctrlpkt_inc(ctx->global_stat); sf = shaper_ctrl_pkt_session_handle(ctx, rx_buff[i], &meta); } else { sf = shaper_raw_pkt_session_handle(ctx, rx_buff[i], &meta); diff --git a/shaping/src/shaper_global_stat.cpp b/shaping/src/shaper_global_stat.cpp index ef915d1..37138b1 100644 --- a/shaping/src/shaper_global_stat.cpp +++ b/shaping/src/shaper_global_stat.cpp @@ -8,74 +8,73 @@ #include "shaper.h" #include "shaper_global_stat.h" -struct shaper_global_stat_conf { - int enable_backgroud_thread; - int output_interval_ms; -}; - -static int shaper_global_stat_conf_load(struct shaper_global_stat_conf *conf) +static int shaper_global_stat_conf_load(struct shaping_global_stat *stat) { - memset(conf, 0, sizeof(struct shaper_global_stat_conf)); - - MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "FIELDSTAT_OUTPUT_INTERVAL_MS", &conf->output_interval_ms, 500); - MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "FIELDSTAT_ENABLE_BACKGRUND_THREAD", &conf->enable_backgroud_thread, 1); + MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "GLOBAL_STAT_OUTPUT_INTERVAL_S", &stat->output_interval_s, 1); return 0; } + +static void shaper_global_stat_fieldstat_reg(struct shaping_global_stat *stat) +{ + stat->column_ids[CURR_SESSION_NUM_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_GAUGE, "curr_session_num", NULL, 0); + stat->column_ids[QUEUEING_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_GAUGE, "curr_queueing_pkts", NULL, 0); + stat->column_ids[QUEUEING_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_GAUGE, "curr_queueing_bytes", NULL, 0); + + stat->column_ids[CTRL_ERR_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_error", NULL, 0); + stat->column_ids[CTRL_OPENING_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_opening", NULL, 0); + stat->column_ids[CTRL_ACTIVE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_active", NULL, 0); + stat->column_ids[CTRL_CLOSE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_close", NULL, 0); + stat->column_ids[CTRL_RESETALL_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_resetall", NULL, 0); + stat->column_ids[SESSION_LOG_SEND_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "session_log_send", NULL, 0); + + stat->column_ids[ASYNC_INVOKE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "swarmkv_async_invoke", NULL, 0); + stat->column_ids[ASYNC_CALLBACK_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "swarmkv_async_callback", NULL, 0); + stat->column_ids[ASYNC_TCONSUME_FAILED] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "swarmkv_tconsume_failed", NULL, 0); + stat->column_ids[ASYNC_HINCRBY_FAILED] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "swarmkv_hincrby_failed", NULL, 0); + stat->column_ids[ASYNC_HMGET_FAILED] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "swarmkv_hmget_failed", NULL, 0); + + stat->column_ids[RX_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "rx_pkts", NULL, 0); + stat->column_ids[RX_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "rx_bytes", NULL, 0); + stat->column_ids[TX_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "tx_pkts", NULL, 0); + stat->column_ids[TX_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "tx_bytes", NULL, 0); + stat->column_ids[DROP_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "drop_pkts", NULL, 0); + stat->column_ids[DROP_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "drop_bytes", NULL, 0); + + stat->column_ids[HIT_POLICY_RX_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hit_policy_rx_pkts", NULL, 0); + stat->column_ids[HIT_POLICY_RX_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hit_policy_rx_bytes", NULL, 0); + stat->column_ids[HIT_POLICY_TX_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hit_policy_tx_pkts", NULL, 0); + stat->column_ids[HIT_POLICY_TX_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hit_policy_tx_bytes", NULL, 0); + stat->column_ids[HIT_POLICY_DROP_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hit_policy_drop_pkts", NULL, 0); + stat->column_ids[HIT_POLICY_DROP_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hit_policy_drop_bytes", NULL, 0); + + return; +} + struct shaping_global_stat* shaper_global_stat_init() { struct shaping_global_stat *stat = NULL; int column_num; - struct shaper_global_stat_conf conf; - const char *column_name[] = {"rx_pkts", "rx_bytes", "tx_pkts", "tx_bytes", "queueing_pkts", - "queueing_bytes", "drop_pkts", "drop_bytes", "hit_policy_pkts", "hit_policy_bytes", - "ctrl_pkts", "ctrl_opening_pkts", "ctrl_active_pkts", "ctrl_close_pkts", "ctrl_resetall_pkts", - "curr_session_num", "session_log_send_num"}; - enum field_type column_type[] = {FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_GAUGE, - FIELD_TYPE_GAUGE, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, - FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, - FIELD_TYPE_GAUGE, FIELD_TYPE_COUNTER}; - - column_num = sizeof(column_name)/sizeof(column_name[0]); - if (column_num != GLOBAL_STAT_COLUNM_IDX_MAX) { - LOG_ERROR("%s: shaping globat init fieldstat failed, column_num %d != index num %d", LOG_TAG_STAT, column_num, GLOBAL_STAT_COLUNM_IDX_MAX); - goto ERROR; - } - if (shaper_global_stat_conf_load(&conf) != 0) { + stat = (struct shaping_global_stat*)calloc(1, sizeof(struct shaping_global_stat)); + + if (shaper_global_stat_conf_load(stat) != 0) { LOG_ERROR("%s: shaping init metric conf failed", LOG_TAG_STAT); goto ERROR; } - - stat = (struct shaping_global_stat*)calloc(1, sizeof(struct shaping_global_stat)); + stat->instance = fieldstat_instance_new("shaping_global"); if (stat->instance == NULL) { LOG_ERROR("%s: shaping global init fieldstat instance failed", LOG_TAG_STAT); goto ERROR; } - stat->table_id = fieldstat_register_table(stat->instance, "shaping_global_metrics", column_name, column_type, column_num); - if (stat->table_id < 0) { - LOG_ERROR("%s: shaping global fieldstat register table failed", LOG_TAG_STAT); - goto ERROR; - } - - if (fieldstat_register_table_row(stat->instance, stat->table_id, "shaping_global_metric_row", NULL, 0, stat->column_ids) != 0) { - LOG_ERROR("%s: shaping global fieldstat register table row failed", LOG_TAG_STAT); - goto ERROR; - } + shaper_global_stat_fieldstat_reg(stat); - if (conf.enable_backgroud_thread == 0) { - fieldstat_disable_background_thread(stat->instance); - } + fieldstat_disable_background_thread(stat->instance); fieldstat_set_local_output(stat->instance, "shaping_global_metric", "json"); - if (fieldstat_set_output_interval(stat->instance, conf.output_interval_ms) != 0) { - LOG_ERROR("%s: shaping global set fieldstat output interval failed", LOG_TAG_STAT); - goto ERROR; - } - if (fieldstat_global_enable_prometheus_endpoint(9007, NULL) != 0) { LOG_ERROR("%s: shaping global fieldstat enable prometheus endpoint failed", LOG_TAG_STAT); goto ERROR; @@ -114,103 +113,228 @@ void shaper_global_stat_destroy(struct shaping_global_stat *stat) return; } -void shaper_global_stat_throughput_inc(struct shaping_global_stat *stat, enum shaping_global_stat_dir dir, int pkt_len) +void shaper_global_stat_curr_session_inc(struct shaping_global_stat *stat) { - if (dir == SHAPING_GLOBAL_STAT_RX) { - fieldstat_value_incrby(stat->instance, stat->column_ids[RX_PKTS_IDX], 1); - fieldstat_value_incrby(stat->instance, stat->column_ids[RX_BYTES_IDX], pkt_len); - } else { - fieldstat_value_incrby(stat->instance, stat->column_ids[TX_PKTS_IDX], 1); - fieldstat_value_incrby(stat->instance, stat->column_ids[TX_BYTES_IDX], pkt_len); - } + struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; + + __atomic_add_fetch(&local_stat_data->curr_session_num, 1, __ATOMIC_RELAXED); return; } -void shaper_global_stat_drop_inc(struct shaping_global_stat *stat, int pkt_len) +void shaper_global_stat_curr_session_dec(struct shaping_global_stat *stat) { - fieldstat_value_incrby(stat->instance, stat->column_ids[DROP_PKTS_IDX], 1); - fieldstat_value_incrby(stat->instance, stat->column_ids[DROP_BYTES_IDX], pkt_len); + struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; + + __atomic_sub_fetch(&local_stat_data->curr_session_num, 1, __ATOMIC_RELAXED); return; } void shaper_global_stat_queueing_inc(struct shaping_global_stat *stat, int pkt_len) { - fieldstat_value_incrby(stat->instance, stat->column_ids[QUEUEING_PKTS_IDX], 1); - fieldstat_value_incrby(stat->instance, stat->column_ids[QUEUEING_BYTES_IDX], pkt_len); + struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; + + __atomic_add_fetch(&local_stat_data->queueing_pkts, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&local_stat_data->queueing_bytes, pkt_len, __ATOMIC_RELAXED); return; } void shaper_global_stat_queueing_dec(struct shaping_global_stat *stat, int pkt_len) { - fieldstat_value_decrby(stat->instance, stat->column_ids[QUEUEING_PKTS_IDX], 1); - fieldstat_value_decrby(stat->instance, stat->column_ids[QUEUEING_BYTES_IDX], pkt_len); - - return; -} + struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; -void shaper_global_stat_hit_policy_inc(struct shaping_global_stat *stat, int pkt_len) -{ - fieldstat_value_incrby(stat->instance, stat->column_ids[HIT_POLICY_PKTS], 1); - fieldstat_value_incrby(stat->instance, stat->column_ids[HIT_POLICY_BYTES], pkt_len); + __atomic_sub_fetch(&local_stat_data->queueing_pkts, 1, __ATOMIC_RELAXED); + __atomic_sub_fetch(&local_stat_data->queueing_bytes, pkt_len, __ATOMIC_RELAXED); return; } -void shaper_global_stat_ctrlpkt_inc(struct shaping_global_stat *stat) +void shaper_global_stat_ctrlpkt_err_inc(struct shaping_global_stat *stat) { - fieldstat_value_incrby(stat->instance, stat->column_ids[CTRL_PKTS_IDX], 1); + struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; + + __atomic_add_fetch(&local_stat_data->ctrl_error, 1, __ATOMIC_RELAXED); return; } void shaper_global_stat_ctrlpkt_opening_inc(struct shaping_global_stat *stat) { - fieldstat_value_incrby(stat->instance, stat->column_ids[CTRL_OPENING_PKTS_IDX], 1); + struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; + + __atomic_add_fetch(&local_stat_data->ctrl_opening, 1, __ATOMIC_RELAXED); return; } void shaper_global_stat_ctrlpkt_active_inc(struct shaping_global_stat *stat) { - fieldstat_value_incrby(stat->instance, stat->column_ids[CTRL_ACTIVE_PKTS_IDX], 1); + struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; + + __atomic_add_fetch(&local_stat_data->ctrl_active, 1, __ATOMIC_RELAXED); return; } void shaper_global_stat_ctrlpkt_close_inc(struct shaping_global_stat *stat) { - fieldstat_value_incrby(stat->instance, stat->column_ids[CTRL_CLOSE_PKTS_IDX], 1); + struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; + + __atomic_add_fetch(&local_stat_data->ctrl_close, 1, __ATOMIC_RELAXED); return; } void shaper_global_stat_ctrlpkt_resetall_inc(struct shaping_global_stat *stat) { - fieldstat_value_incrby(stat->instance, stat->column_ids[CTRL_RESETALL_PKTS_IDX], 1); + struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; + + __atomic_add_fetch(&local_stat_data->ctrl_resetall, 1, __ATOMIC_RELAXED); return; } -void shaper_global_stat_curr_session_inc(struct shaping_global_stat *stat) +void shaper_global_stat_session_log_send_num_inc(struct shaping_global_stat *stat) { - fieldstat_value_incrby(stat->instance, stat->column_ids[CURR_SESSION_NUM_IDX], 1); + struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; + + __atomic_add_fetch(&local_stat_data->session_log_send, 1, __ATOMIC_RELAXED); return; } -void shaper_global_stat_curr_session_dec(struct shaping_global_stat *stat) +void shaper_global_stat_async_invoke_inc(struct shaping_global_stat *stat) { - fieldstat_value_decrby(stat->instance, stat->column_ids[CURR_SESSION_NUM_IDX], 1); + struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; + + __atomic_add_fetch(&local_stat_data->async_invoke, 1, __ATOMIC_RELAXED); return; } -void shaper_global_stat_session_log_send_num_inc(struct shaping_global_stat *stat) +void shaper_global_stat_async_callback_inc(struct shaping_global_stat *stat) +{ + struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; + + __atomic_add_fetch(&local_stat_data->async_callback, 1, __ATOMIC_RELAXED); + + return; +} + +void shaper_global_stat_async_tconsume_failed_inc(struct shaping_global_stat *stat) +{ + struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; + + __atomic_add_fetch(&local_stat_data->async_tconsume_failed, 1, __ATOMIC_RELAXED); + + return; +} + +void shaper_global_stat_async_hincrby_failed_inc(struct shaping_global_stat *stat) +{ + struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; + + __atomic_add_fetch(&local_stat_data->async_hincrby_failed, 1, __ATOMIC_RELAXED); + + return; +} + +void shaper_global_stat_async_hmget_failed_inc(struct shaping_global_stat *stat) { - fieldstat_value_incrby(stat->instance, stat->column_ids[SESSION_LOG_SEND_NUM], 1); + struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; + + __atomic_add_fetch(&local_stat_data->async_hmget_failed, 1, __ATOMIC_RELAXED); + + return; +} + +void shaper_global_stat_throughput_inc(struct shaping_global_stat *stat, enum shaping_global_stat_dir dir, int pkt_len) +{ + struct shaping_global_stat_traffic_data *data = &stat->local_stat_data.all_traffic; + if (dir == SHAPING_GLOBAL_STAT_RX) { + __atomic_add_fetch(&data->rx_pkts, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&data->rx_bytes, pkt_len, __ATOMIC_RELAXED); + } else { + __atomic_add_fetch(&data->tx_pkts, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&data->tx_bytes, pkt_len, __ATOMIC_RELAXED); + } return; +} + +void shaper_global_stat_drop_inc(struct shaping_global_stat *stat, int pkt_len) +{ + struct shaping_global_stat_traffic_data *data = &stat->local_stat_data.all_traffic; + + __atomic_add_fetch(&data->drop_pkts, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&data->drop_bytes, pkt_len, __ATOMIC_RELAXED); + + return; +} + +void shaper_global_stat_hit_policy_throughput_inc(struct shaping_global_stat *stat, enum shaping_global_stat_dir dir, int pkt_len) +{ + struct shaping_global_stat_traffic_data *data = &stat->local_stat_data.hit_policy_traffic; + + if (dir == SHAPING_GLOBAL_STAT_RX) { + __atomic_add_fetch(&data->rx_pkts, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&data->rx_bytes, pkt_len, __ATOMIC_RELAXED); + } else { + __atomic_add_fetch(&data->tx_pkts, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&data->tx_bytes, pkt_len, __ATOMIC_RELAXED); + } + + return; +} + +void shaper_global_stat_hit_policy_drop_inc(struct shaping_global_stat *stat, int pkt_len) +{ + struct shaping_global_stat_traffic_data *data = &stat->local_stat_data.hit_policy_traffic; + + __atomic_add_fetch(&data->drop_pkts, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&data->drop_bytes, pkt_len, __ATOMIC_RELAXED); + + return; +} + +void shaper_global_stat_refresh(struct shaping_global_stat *stat) +{ + struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; + struct shaping_global_stat_traffic_data *all_traffic_data = &stat->local_stat_data.all_traffic; + struct shaping_global_stat_traffic_data *hit_policy_traffic_data = &stat->local_stat_data.hit_policy_traffic; + + fieldstat_value_set(stat->instance, stat->column_ids[CURR_SESSION_NUM_IDX], local_stat_data->curr_session_num); + fieldstat_value_set(stat->instance, stat->column_ids[QUEUEING_PKTS_IDX], local_stat_data->queueing_pkts); + fieldstat_value_set(stat->instance, stat->column_ids[QUEUEING_BYTES_IDX], local_stat_data->queueing_bytes); + + fieldstat_value_set(stat->instance, stat->column_ids[CTRL_ERR_IDX], local_stat_data->ctrl_error); + fieldstat_value_set(stat->instance, stat->column_ids[CTRL_OPENING_IDX], local_stat_data->ctrl_opening); + fieldstat_value_set(stat->instance, stat->column_ids[CTRL_ACTIVE_IDX], local_stat_data->ctrl_active); + fieldstat_value_set(stat->instance, stat->column_ids[CTRL_CLOSE_IDX], local_stat_data->ctrl_close); + fieldstat_value_set(stat->instance, stat->column_ids[CTRL_RESETALL_IDX], local_stat_data->ctrl_resetall); + fieldstat_value_set(stat->instance, stat->column_ids[SESSION_LOG_SEND_IDX], local_stat_data->session_log_send); + + fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_INVOKE_IDX], local_stat_data->async_invoke); + fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_CALLBACK_IDX], local_stat_data->async_callback); + fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_TCONSUME_FAILED], local_stat_data->async_tconsume_failed); + fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_HINCRBY_FAILED], local_stat_data->async_hincrby_failed); + fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_HMGET_FAILED], local_stat_data->async_hmget_failed); + + fieldstat_value_set(stat->instance, stat->column_ids[RX_PKTS_IDX], all_traffic_data->rx_pkts); + fieldstat_value_set(stat->instance, stat->column_ids[RX_BYTES_IDX], all_traffic_data->rx_bytes); + fieldstat_value_set(stat->instance, stat->column_ids[TX_PKTS_IDX], all_traffic_data->tx_pkts); + fieldstat_value_set(stat->instance, stat->column_ids[TX_BYTES_IDX], all_traffic_data->tx_bytes); + fieldstat_value_set(stat->instance, stat->column_ids[DROP_PKTS_IDX], all_traffic_data->drop_pkts); + fieldstat_value_set(stat->instance, stat->column_ids[DROP_BYTES_IDX], all_traffic_data->drop_pkts); + + fieldstat_value_set(stat->instance, stat->column_ids[HIT_POLICY_RX_PKTS_IDX], hit_policy_traffic_data->rx_pkts); + fieldstat_value_set(stat->instance, stat->column_ids[HIT_POLICY_RX_BYTES_IDX], hit_policy_traffic_data->rx_bytes); + fieldstat_value_set(stat->instance, stat->column_ids[HIT_POLICY_TX_PKTS_IDX], hit_policy_traffic_data->tx_pkts); + fieldstat_value_set(stat->instance, stat->column_ids[HIT_POLICY_TX_BYTES_IDX], hit_policy_traffic_data->tx_bytes); + fieldstat_value_set(stat->instance, stat->column_ids[HIT_POLICY_DROP_PKTS_IDX], hit_policy_traffic_data->drop_pkts); + fieldstat_value_set(stat->instance, stat->column_ids[HIT_POLICY_DROP_BYTES_IDX], hit_policy_traffic_data->drop_bytes); + + fieldstat_passive_output(stat->instance); }
\ No newline at end of file |
