diff options
| author | liuchang <[email protected]> | 2023-04-03 11:09:45 +0000 |
|---|---|---|
| committer | liuchang <[email protected]> | 2023-04-04 02:33:50 +0000 |
| commit | 72ed9151b6dfa000b413e667e52d59498c7aaad7 (patch) | |
| tree | e0a50bc4b28eaa0de5cb9aef6e6388181cd76098 /shaping/src | |
| parent | 5e9d5418d1c0950352e3ab704285f8a371a7bae5 (diff) | |
add global metric
Diffstat (limited to 'shaping/src')
| -rw-r--r-- | shaping/src/shaper.cpp | 30 | ||||
| -rw-r--r-- | shaping/src/shaper_global_stat.cpp | 181 | ||||
| -rw-r--r-- | shaping/src/shaper_session.cpp | 7 |
3 files changed, 218 insertions, 0 deletions
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 6090a15..ba376db 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -21,6 +21,7 @@ extern "C" { #include "shaper_session.h" #include "shaper_swarmkv.h" #include "shaper_maat.h" +#include "shaper_global_stat.h" #define NANO_SECONDS_PER_MICRO_SEC 1000 #define MICRO_SECONDS_PER_SEC 1000000 @@ -203,6 +204,8 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx) pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index); shaper_stat_drop_inc(stat, rule->id, rule->primary.id, rule->primary.priority, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index); + shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length); + shaper_global_stat_drop_inc(ctx->global_stat, pkt_wrapper->length); marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index); shaper_packet_dequeue(sf); @@ -673,10 +676,16 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ } break; case SHAPING_DROP: + shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length); + shaper_global_stat_drop_inc(ctx->global_stat, pkt_wrapper->length); + marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index); shaper_packet_dequeue(sf); break; case SHAPING_FORWARD: + shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length); + shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, pkt_wrapper->length); + marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &pkt_wrapper->pkt_buff, 1); shaper_packet_dequeue(sf); break; @@ -727,20 +736,24 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu shaper_stat_queueing_pkt_inc(stat, s_rule->id, s_rule->primary.id, s_rule->primary.priority, meta->dir, meta->raw_len, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index); + shaper_global_stat_queueing_inc(ctx->global_stat, meta->raw_len); } else { shaper_stat_drop_inc(stat, s_rule->id, s_rule->primary.id, s_rule->primary.priority, meta->dir, meta->raw_len, ctx->thread_index); + shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len); marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index); } } else { if (meta->is_tcp_pure_ctrl) { marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1); + shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len); shaper_stat_forward_all_rule_inc(stat, sf, meta->dir, meta->raw_len, ctx->thread_index); goto JUDGE_CLOSE;//for tcp pure control pkt, transmit it directly } if (0 != shaper_packet_enqueue(ctx, sf, rx_buff, &curr_time, meta)) { marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index); + shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len); LOG_ERROR("%s: shaping enqueue packet failed while queue empty for session: %s", LOG_TAG_SHAPING, addr_tuple4_to_str(&sf->tuple4)); goto JUDGE_CLOSE; } @@ -750,14 +763,17 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu shaping_ret = shaper_pkt_action_decide_no_queue(ctx, sf, &sf->matched_rule_infos[sf->anchor].primary); switch (shaping_ret) { case SHAPING_QUEUED: + shaper_global_stat_queueing_inc(ctx->global_stat, meta->raw_len); break; case SHAPING_DROP: marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index); shaper_packet_dequeue(sf); + shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len); break; case SHAPING_FORWARD: marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1); shaper_packet_dequeue(sf); + shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len); break; default: assert(0); @@ -823,15 +839,19 @@ static struct shaping_flow* shaper_ctrl_pkt_session_handle(struct shaping_thread switch (ctrl_data.state) { case SESSION_STATE_OPENING: + shaper_global_stat_ctrlpkt_opening_inc(ctx->global_stat); sf = shaper_session_opening(ctx, meta, &ctrl_data, &raw_parser); break; case SESSION_STATE_ACTIVE: + shaper_global_stat_ctrlpkt_active_inc(ctx->global_stat); sf = shaper_session_active(ctx, meta, &ctrl_data); break; case SESSION_STATE_CLOSING: + shaper_global_stat_ctrlpkt_close_inc(ctx->global_stat); sf = shaper_session_close(ctx, meta); break; case SESSION_STATE_RESETALL: + shaper_global_stat_ctrlpkt_resetall_inc(ctx->global_stat); sf = shaper_session_reset_all(ctx, meta); break; default: @@ -877,13 +897,16 @@ void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx) for (i = 0; i < rx_num; i++) { if (marsio_buff_is_ctrlbuf(rx_buff[i])) { + shaper_global_stat_ctrlpkt_inc(ctx->global_stat); sf = shaper_ctrl_pkt_session_handle(ctx, rx_buff[i], &meta); } else { sf = shaper_raw_pkt_session_handle(ctx, rx_buff[i], &meta); } + shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_RX, meta.raw_len); if (meta.is_ctrl_pkt || !sf) {//ctrl pkt need send directly marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &rx_buff[i], 1); + shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta.raw_len); } else { shaping_packet_process(ctx, rx_buff[i], &meta, sf); } @@ -1001,6 +1024,7 @@ void shaping_engine_destroy(struct shaping_ctx *ctx) shaper_maat_destroy(ctx->maat_info); shaper_marsio_destroy(ctx->marsio_info); shaper_stat_destroy(ctx->stat); + shaper_global_stat_destroy(ctx->global_stat); if (ctx->thread_ctx) { for (int i = 0; i < ctx->thread_num; i++) { @@ -1058,6 +1082,11 @@ struct shaping_ctx *shaping_engine_init() if (ctx->stat == NULL) { goto ERROR; } + + ctx->global_stat = shaper_global_stat_init(); + if (ctx->global_stat == NULL) { + goto ERROR; + } ctx->thread_ctx = (struct shaping_thread_ctx *)calloc(conf.work_thread_num, sizeof(struct shaping_thread_ctx)); ctx->thread_num = conf.work_thread_num; @@ -1065,6 +1094,7 @@ struct shaping_ctx *shaping_engine_init() ctx->thread_ctx[i].thread_index = i; ctx->thread_ctx[i].sp = shaper_new(conf.priority_queue_len_max); ctx->thread_ctx[i].stat = ctx->stat; + ctx->thread_ctx[i].global_stat = ctx->global_stat; ctx->thread_ctx[i].session_table = session_table_create(); ctx->thread_ctx[i].maat_info = ctx->maat_info; ctx->thread_ctx[i].marsio_info = ctx->marsio_info; diff --git a/shaping/src/shaper_global_stat.cpp b/shaping/src/shaper_global_stat.cpp new file mode 100644 index 0000000..d8cb6fb --- /dev/null +++ b/shaping/src/shaper_global_stat.cpp @@ -0,0 +1,181 @@ +#include <stdlib.h> + +#include <MESA/MESA_prof_load.h> +#include <fieldstat.h> + +#include "log.h" +#include "utils.h" +#include "shaper.h" +#include "shaper_global_stat.h" + +struct shaper_global_stat_conf { + int enable_backgroud_thread; + int output_interval_ms; +}; + +static int shaper_global_stat_conf_load(struct shaper_global_stat_conf *conf) +{ + memset(conf, 0, sizeof(struct shaper_global_stat_conf)); + + MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "FIELDSTAT_OUTPUT_INTERVAL_MS", &conf->output_interval_ms, 500); + MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "FIELDSTAT_ENABLE_BACKGRUND_THREAD", &conf->enable_backgroud_thread, 1); + + return 0; +} + +struct shaping_global_stat* shaper_global_stat_init() +{ + struct shaping_global_stat *stat = NULL; + int column_num; + struct shaper_global_stat_conf conf; + const char *column_name[] = {"rx_pkts", "rx_bytes", "tx_pkts", "tx_bytes", "queueing_pkts", "queueing_bytes", "drop_pkts", "drop_bytes", + "ctrl_pkts", "ctrl_opening_pkts", "ctrl_active_pkts", "ctrl_close_pkts", "ctrl_resetall_pkts", "curr_session_num"}; + enum field_type column_type[] = {FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_GAUGE, FIELD_TYPE_GAUGE, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, + FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_GAUGE}; + + column_num = sizeof(column_name)/sizeof(column_name[0]); + if (column_num != GLOBAL_STAT_COLUNM_IDX_MAX) { + LOG_ERROR("%s: shaping globat init fieldstat failed, column_num %d != index num %d", LOG_TAG_STAT, column_num, GLOBAL_STAT_COLUNM_IDX_MAX); + goto ERROR; + } + + if (shaper_global_stat_conf_load(&conf) != 0) { + LOG_ERROR("%s: shaping init metric conf failed", LOG_TAG_STAT); + goto ERROR; + } + + stat = (struct shaping_global_stat*)calloc(1, sizeof(struct shaping_global_stat)); + stat->instance = fieldstat_instance_new("shaping_global"); + if (stat->instance == NULL) { + LOG_ERROR("%s: shaping global init fieldstat instance failed", LOG_TAG_STAT); + goto ERROR; + } + + stat->table_id = fieldstat_register_table(stat->instance, "shaping_global_metrics", column_name, column_type, column_num); + if (stat->table_id < 0) { + LOG_ERROR("%s: shaping global fieldstat register table failed", LOG_TAG_STAT); + goto ERROR; + } + + if (fieldstat_register_table_row(stat->instance, stat->table_id, "shaping_global_metric_row", NULL, 0, stat->column_ids) != 0) { + LOG_ERROR("%s: shaping global fieldstat register table row failed", LOG_TAG_STAT); + goto ERROR; + } + + if (conf.enable_backgroud_thread == 0) { + fieldstat_disable_background_thread(stat->instance); + } + + fieldstat_instance_start(stat->instance); + + return stat; + +ERROR: + if (stat) { + if (stat->instance) { + fieldstat_instance_free(stat->instance); + } + free(stat); + } + return NULL; +} + +void shaper_global_stat_destroy(struct shaping_global_stat *stat) +{ + if (!stat) { + return; + } + + if (stat->instance) { + fieldstat_instance_free(stat->instance); + } + free(stat); + + return; +} + +void shaper_global_stat_throughput_inc(struct shaping_global_stat *stat, enum shaping_global_stat_dir dir, int pkt_len) +{ + if (dir == SHAPING_GLOBAL_STAT_RX) { + fieldstat_value_incrby(stat->instance, stat->column_ids[RX_PKTS_IDX], 1); + fieldstat_value_incrby(stat->instance, stat->column_ids[RX_BYTES_IDX], pkt_len); + } else { + fieldstat_value_incrby(stat->instance, stat->column_ids[TX_PKTS_IDX], 1); + fieldstat_value_incrby(stat->instance, stat->column_ids[TX_BYTES_IDX], pkt_len); + } + + return; +} + +void shaper_global_stat_drop_inc(struct shaping_global_stat *stat, int pkt_len) +{ + fieldstat_value_incrby(stat->instance, stat->column_ids[DROP_PKTS_IDX], 1); + fieldstat_value_incrby(stat->instance, stat->column_ids[DROP_BYTES_IDX], pkt_len); + + return; +} + +void shaper_global_stat_queueing_inc(struct shaping_global_stat *stat, int pkt_len) +{ + fieldstat_value_incrby(stat->instance, stat->column_ids[QUEUEING_PKTS_IDX], 1); + fieldstat_value_incrby(stat->instance, stat->column_ids[QUEUEING_BYTES_IDX], pkt_len); + + return; +} + +void shaper_global_stat_queueing_dec(struct shaping_global_stat *stat, int pkt_len) +{ + fieldstat_value_decrby(stat->instance, stat->column_ids[QUEUEING_PKTS_IDX], 1); + fieldstat_value_decrby(stat->instance, stat->column_ids[QUEUEING_BYTES_IDX], pkt_len); + + return; +} + +void shaper_global_stat_ctrlpkt_inc(struct shaping_global_stat *stat) +{ + fieldstat_value_incrby(stat->instance, stat->column_ids[CTRL_PKTS_IDX], 1); + + return; +} + +void shaper_global_stat_ctrlpkt_opening_inc(struct shaping_global_stat *stat) +{ + fieldstat_value_incrby(stat->instance, stat->column_ids[CTRL_OPENING_PKTS_IDX], 1); + + return; +} + +void shaper_global_stat_ctrlpkt_active_inc(struct shaping_global_stat *stat) +{ + fieldstat_value_incrby(stat->instance, stat->column_ids[CTRL_ACTIVE_PKTS_IDX], 1); + + return; +} + +void shaper_global_stat_ctrlpkt_close_inc(struct shaping_global_stat *stat) +{ + fieldstat_value_incrby(stat->instance, stat->column_ids[CTRL_CLOSE_PKTS_IDX], 1); + + return; +} + +void shaper_global_stat_ctrlpkt_resetall_inc(struct shaping_global_stat *stat) +{ + fieldstat_value_incrby(stat->instance, stat->column_ids[CTRL_RESETALL_PKTS_IDX], 1); + + return; +} + +void shaper_global_stat_curr_session_inc(struct shaping_global_stat *stat) +{ + fieldstat_value_incrby(stat->instance, stat->column_ids[CURR_SESSION_NUM_IDX], 1); + + return; +} + +void shaper_global_stat_curr_session_dec(struct shaping_global_stat *stat) +{ + fieldstat_value_decrby(stat->instance, stat->column_ids[CURR_SESSION_NUM_IDX], 1); + + return; +}
\ No newline at end of file diff --git a/shaping/src/shaper_session.cpp b/shaping/src/shaper_session.cpp index 6194a90..f45c8d7 100644 --- a/shaping/src/shaper_session.cpp +++ b/shaping/src/shaper_session.cpp @@ -6,6 +6,7 @@ #include "shaper_session.h" #include "shaper_maat.h" #include "shaper_stat.h" +#include "shaper_global_stat.h" #include "shaper.h" struct shaping_flow* shaper_session_opening(struct shaping_thread_ctx *ctx, struct metadata *meta, struct ctrl_pkt_data *ctrl_data, struct raw_pkt_parser *raw_parser) @@ -26,6 +27,8 @@ struct shaping_flow* shaper_session_opening(struct shaping_thread_ctx *ctx, stru session_table_insert(ctx->session_table, meta->session_id, &sf->tuple4, sf, NULL); + shaper_global_stat_curr_session_inc(ctx->global_stat); + return sf; } @@ -43,6 +46,8 @@ struct shaping_flow* shaper_session_close(struct shaping_thread_ctx *ctx, struct sf->flag |= STREAM_CLOSE; session_table_delete_by_id(ctx->session_table, meta->session_id); + shaper_global_stat_curr_session_dec(ctx->global_stat); + return sf; } @@ -84,5 +89,7 @@ void shaper_session_data_free_cb(void *session_data, void *data) shaping_flow_free(sf); } + shaper_global_stat_curr_session_dec(ctx->global_stat); + return; }
\ No newline at end of file |
