summaryrefslogtreecommitdiff
path: root/shaping/src
diff options
context:
space:
mode:
authorliuchang <[email protected]>2023-04-03 11:09:45 +0000
committerliuchang <[email protected]>2023-04-04 02:33:50 +0000
commit72ed9151b6dfa000b413e667e52d59498c7aaad7 (patch)
treee0a50bc4b28eaa0de5cb9aef6e6388181cd76098 /shaping/src
parent5e9d5418d1c0950352e3ab704285f8a371a7bae5 (diff)
add global metric
Diffstat (limited to 'shaping/src')
-rw-r--r--shaping/src/shaper.cpp30
-rw-r--r--shaping/src/shaper_global_stat.cpp181
-rw-r--r--shaping/src/shaper_session.cpp7
3 files changed, 218 insertions, 0 deletions
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index 6090a15..ba376db 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -21,6 +21,7 @@ extern "C" {
#include "shaper_session.h"
#include "shaper_swarmkv.h"
#include "shaper_maat.h"
+#include "shaper_global_stat.h"
#define NANO_SECONDS_PER_MICRO_SEC 1000
#define MICRO_SECONDS_PER_SEC 1000000
@@ -203,6 +204,8 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx)
pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index);
shaper_stat_drop_inc(stat, rule->id, rule->primary.id, rule->primary.priority,
pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index);
+ shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length);
+ shaper_global_stat_drop_inc(ctx->global_stat, pkt_wrapper->length);
marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index);
shaper_packet_dequeue(sf);
@@ -673,10 +676,16 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
}
break;
case SHAPING_DROP:
+ shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length);
+ shaper_global_stat_drop_inc(ctx->global_stat, pkt_wrapper->length);
+
marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index);
shaper_packet_dequeue(sf);
break;
case SHAPING_FORWARD:
+ shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length);
+ shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, pkt_wrapper->length);
+
marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &pkt_wrapper->pkt_buff, 1);
shaper_packet_dequeue(sf);
break;
@@ -727,20 +736,24 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
shaper_stat_queueing_pkt_inc(stat, s_rule->id,
s_rule->primary.id, s_rule->primary.priority, meta->dir, meta->raw_len,
SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index);
+ shaper_global_stat_queueing_inc(ctx->global_stat, meta->raw_len);
} else {
shaper_stat_drop_inc(stat, s_rule->id, s_rule->primary.id, s_rule->primary.priority, meta->dir, meta->raw_len, ctx->thread_index);
+ shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len);
marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index);
}
} else {
if (meta->is_tcp_pure_ctrl) {
marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1);
+ shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len);
shaper_stat_forward_all_rule_inc(stat, sf, meta->dir, meta->raw_len, ctx->thread_index);
goto JUDGE_CLOSE;//for tcp pure control pkt, transmit it directly
}
if (0 != shaper_packet_enqueue(ctx, sf, rx_buff, &curr_time, meta)) {
marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index);
+ shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len);
LOG_ERROR("%s: shaping enqueue packet failed while queue empty for session: %s", LOG_TAG_SHAPING, addr_tuple4_to_str(&sf->tuple4));
goto JUDGE_CLOSE;
}
@@ -750,14 +763,17 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
shaping_ret = shaper_pkt_action_decide_no_queue(ctx, sf, &sf->matched_rule_infos[sf->anchor].primary);
switch (shaping_ret) {
case SHAPING_QUEUED:
+ shaper_global_stat_queueing_inc(ctx->global_stat, meta->raw_len);
break;
case SHAPING_DROP:
marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index);
shaper_packet_dequeue(sf);
+ shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len);
break;
case SHAPING_FORWARD:
marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1);
shaper_packet_dequeue(sf);
+ shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len);
break;
default:
assert(0);
@@ -823,15 +839,19 @@ static struct shaping_flow* shaper_ctrl_pkt_session_handle(struct shaping_thread
switch (ctrl_data.state) {
case SESSION_STATE_OPENING:
+ shaper_global_stat_ctrlpkt_opening_inc(ctx->global_stat);
sf = shaper_session_opening(ctx, meta, &ctrl_data, &raw_parser);
break;
case SESSION_STATE_ACTIVE:
+ shaper_global_stat_ctrlpkt_active_inc(ctx->global_stat);
sf = shaper_session_active(ctx, meta, &ctrl_data);
break;
case SESSION_STATE_CLOSING:
+ shaper_global_stat_ctrlpkt_close_inc(ctx->global_stat);
sf = shaper_session_close(ctx, meta);
break;
case SESSION_STATE_RESETALL:
+ shaper_global_stat_ctrlpkt_resetall_inc(ctx->global_stat);
sf = shaper_session_reset_all(ctx, meta);
break;
default:
@@ -877,13 +897,16 @@ void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx)
for (i = 0; i < rx_num; i++) {
if (marsio_buff_is_ctrlbuf(rx_buff[i])) {
+ shaper_global_stat_ctrlpkt_inc(ctx->global_stat);
sf = shaper_ctrl_pkt_session_handle(ctx, rx_buff[i], &meta);
} else {
sf = shaper_raw_pkt_session_handle(ctx, rx_buff[i], &meta);
}
+ shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_RX, meta.raw_len);
if (meta.is_ctrl_pkt || !sf) {//ctrl pkt need send directly
marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &rx_buff[i], 1);
+ shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta.raw_len);
} else {
shaping_packet_process(ctx, rx_buff[i], &meta, sf);
}
@@ -1001,6 +1024,7 @@ void shaping_engine_destroy(struct shaping_ctx *ctx)
shaper_maat_destroy(ctx->maat_info);
shaper_marsio_destroy(ctx->marsio_info);
shaper_stat_destroy(ctx->stat);
+ shaper_global_stat_destroy(ctx->global_stat);
if (ctx->thread_ctx) {
for (int i = 0; i < ctx->thread_num; i++) {
@@ -1058,6 +1082,11 @@ struct shaping_ctx *shaping_engine_init()
if (ctx->stat == NULL) {
goto ERROR;
}
+
+ ctx->global_stat = shaper_global_stat_init();
+ if (ctx->global_stat == NULL) {
+ goto ERROR;
+ }
ctx->thread_ctx = (struct shaping_thread_ctx *)calloc(conf.work_thread_num, sizeof(struct shaping_thread_ctx));
ctx->thread_num = conf.work_thread_num;
@@ -1065,6 +1094,7 @@ struct shaping_ctx *shaping_engine_init()
ctx->thread_ctx[i].thread_index = i;
ctx->thread_ctx[i].sp = shaper_new(conf.priority_queue_len_max);
ctx->thread_ctx[i].stat = ctx->stat;
+ ctx->thread_ctx[i].global_stat = ctx->global_stat;
ctx->thread_ctx[i].session_table = session_table_create();
ctx->thread_ctx[i].maat_info = ctx->maat_info;
ctx->thread_ctx[i].marsio_info = ctx->marsio_info;
diff --git a/shaping/src/shaper_global_stat.cpp b/shaping/src/shaper_global_stat.cpp
new file mode 100644
index 0000000..d8cb6fb
--- /dev/null
+++ b/shaping/src/shaper_global_stat.cpp
@@ -0,0 +1,181 @@
+#include <stdlib.h>
+
+#include <MESA/MESA_prof_load.h>
+#include <fieldstat.h>
+
+#include "log.h"
+#include "utils.h"
+#include "shaper.h"
+#include "shaper_global_stat.h"
+
+struct shaper_global_stat_conf {
+ int enable_backgroud_thread;
+ int output_interval_ms;
+};
+
+static int shaper_global_stat_conf_load(struct shaper_global_stat_conf *conf)
+{
+ memset(conf, 0, sizeof(struct shaper_global_stat_conf));
+
+ MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "FIELDSTAT_OUTPUT_INTERVAL_MS", &conf->output_interval_ms, 500);
+ MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "FIELDSTAT_ENABLE_BACKGRUND_THREAD", &conf->enable_backgroud_thread, 1);
+
+ return 0;
+}
+
+struct shaping_global_stat* shaper_global_stat_init()
+{
+ struct shaping_global_stat *stat = NULL;
+ int column_num;
+ struct shaper_global_stat_conf conf;
+ const char *column_name[] = {"rx_pkts", "rx_bytes", "tx_pkts", "tx_bytes", "queueing_pkts", "queueing_bytes", "drop_pkts", "drop_bytes",
+ "ctrl_pkts", "ctrl_opening_pkts", "ctrl_active_pkts", "ctrl_close_pkts", "ctrl_resetall_pkts", "curr_session_num"};
+ enum field_type column_type[] = {FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_GAUGE, FIELD_TYPE_GAUGE, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER,
+ FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_GAUGE};
+
+ column_num = sizeof(column_name)/sizeof(column_name[0]);
+ if (column_num != GLOBAL_STAT_COLUNM_IDX_MAX) {
+ LOG_ERROR("%s: shaping globat init fieldstat failed, column_num %d != index num %d", LOG_TAG_STAT, column_num, GLOBAL_STAT_COLUNM_IDX_MAX);
+ goto ERROR;
+ }
+
+ if (shaper_global_stat_conf_load(&conf) != 0) {
+ LOG_ERROR("%s: shaping init metric conf failed", LOG_TAG_STAT);
+ goto ERROR;
+ }
+
+ stat = (struct shaping_global_stat*)calloc(1, sizeof(struct shaping_global_stat));
+ stat->instance = fieldstat_instance_new("shaping_global");
+ if (stat->instance == NULL) {
+ LOG_ERROR("%s: shaping global init fieldstat instance failed", LOG_TAG_STAT);
+ goto ERROR;
+ }
+
+ stat->table_id = fieldstat_register_table(stat->instance, "shaping_global_metrics", column_name, column_type, column_num);
+ if (stat->table_id < 0) {
+ LOG_ERROR("%s: shaping global fieldstat register table failed", LOG_TAG_STAT);
+ goto ERROR;
+ }
+
+ if (fieldstat_register_table_row(stat->instance, stat->table_id, "shaping_global_metric_row", NULL, 0, stat->column_ids) != 0) {
+ LOG_ERROR("%s: shaping global fieldstat register table row failed", LOG_TAG_STAT);
+ goto ERROR;
+ }
+
+ if (conf.enable_backgroud_thread == 0) {
+ fieldstat_disable_background_thread(stat->instance);
+ }
+
+ fieldstat_instance_start(stat->instance);
+
+ return stat;
+
+ERROR:
+ if (stat) {
+ if (stat->instance) {
+ fieldstat_instance_free(stat->instance);
+ }
+ free(stat);
+ }
+ return NULL;
+}
+
+void shaper_global_stat_destroy(struct shaping_global_stat *stat)
+{
+ if (!stat) {
+ return;
+ }
+
+ if (stat->instance) {
+ fieldstat_instance_free(stat->instance);
+ }
+ free(stat);
+
+ return;
+}
+
+void shaper_global_stat_throughput_inc(struct shaping_global_stat *stat, enum shaping_global_stat_dir dir, int pkt_len)
+{
+ if (dir == SHAPING_GLOBAL_STAT_RX) {
+ fieldstat_value_incrby(stat->instance, stat->column_ids[RX_PKTS_IDX], 1);
+ fieldstat_value_incrby(stat->instance, stat->column_ids[RX_BYTES_IDX], pkt_len);
+ } else {
+ fieldstat_value_incrby(stat->instance, stat->column_ids[TX_PKTS_IDX], 1);
+ fieldstat_value_incrby(stat->instance, stat->column_ids[TX_BYTES_IDX], pkt_len);
+ }
+
+ return;
+}
+
+void shaper_global_stat_drop_inc(struct shaping_global_stat *stat, int pkt_len)
+{
+ fieldstat_value_incrby(stat->instance, stat->column_ids[DROP_PKTS_IDX], 1);
+ fieldstat_value_incrby(stat->instance, stat->column_ids[DROP_BYTES_IDX], pkt_len);
+
+ return;
+}
+
+void shaper_global_stat_queueing_inc(struct shaping_global_stat *stat, int pkt_len)
+{
+ fieldstat_value_incrby(stat->instance, stat->column_ids[QUEUEING_PKTS_IDX], 1);
+ fieldstat_value_incrby(stat->instance, stat->column_ids[QUEUEING_BYTES_IDX], pkt_len);
+
+ return;
+}
+
+void shaper_global_stat_queueing_dec(struct shaping_global_stat *stat, int pkt_len)
+{
+ fieldstat_value_decrby(stat->instance, stat->column_ids[QUEUEING_PKTS_IDX], 1);
+ fieldstat_value_decrby(stat->instance, stat->column_ids[QUEUEING_BYTES_IDX], pkt_len);
+
+ return;
+}
+
+void shaper_global_stat_ctrlpkt_inc(struct shaping_global_stat *stat)
+{
+ fieldstat_value_incrby(stat->instance, stat->column_ids[CTRL_PKTS_IDX], 1);
+
+ return;
+}
+
+void shaper_global_stat_ctrlpkt_opening_inc(struct shaping_global_stat *stat)
+{
+ fieldstat_value_incrby(stat->instance, stat->column_ids[CTRL_OPENING_PKTS_IDX], 1);
+
+ return;
+}
+
+void shaper_global_stat_ctrlpkt_active_inc(struct shaping_global_stat *stat)
+{
+ fieldstat_value_incrby(stat->instance, stat->column_ids[CTRL_ACTIVE_PKTS_IDX], 1);
+
+ return;
+}
+
+void shaper_global_stat_ctrlpkt_close_inc(struct shaping_global_stat *stat)
+{
+ fieldstat_value_incrby(stat->instance, stat->column_ids[CTRL_CLOSE_PKTS_IDX], 1);
+
+ return;
+}
+
+void shaper_global_stat_ctrlpkt_resetall_inc(struct shaping_global_stat *stat)
+{
+ fieldstat_value_incrby(stat->instance, stat->column_ids[CTRL_RESETALL_PKTS_IDX], 1);
+
+ return;
+}
+
+void shaper_global_stat_curr_session_inc(struct shaping_global_stat *stat)
+{
+ fieldstat_value_incrby(stat->instance, stat->column_ids[CURR_SESSION_NUM_IDX], 1);
+
+ return;
+}
+
+void shaper_global_stat_curr_session_dec(struct shaping_global_stat *stat)
+{
+ fieldstat_value_decrby(stat->instance, stat->column_ids[CURR_SESSION_NUM_IDX], 1);
+
+ return;
+} \ No newline at end of file
diff --git a/shaping/src/shaper_session.cpp b/shaping/src/shaper_session.cpp
index 6194a90..f45c8d7 100644
--- a/shaping/src/shaper_session.cpp
+++ b/shaping/src/shaper_session.cpp
@@ -6,6 +6,7 @@
#include "shaper_session.h"
#include "shaper_maat.h"
#include "shaper_stat.h"
+#include "shaper_global_stat.h"
#include "shaper.h"
struct shaping_flow* shaper_session_opening(struct shaping_thread_ctx *ctx, struct metadata *meta, struct ctrl_pkt_data *ctrl_data, struct raw_pkt_parser *raw_parser)
@@ -26,6 +27,8 @@ struct shaping_flow* shaper_session_opening(struct shaping_thread_ctx *ctx, stru
session_table_insert(ctx->session_table, meta->session_id, &sf->tuple4, sf, NULL);
+ shaper_global_stat_curr_session_inc(ctx->global_stat);
+
return sf;
}
@@ -43,6 +46,8 @@ struct shaping_flow* shaper_session_close(struct shaping_thread_ctx *ctx, struct
sf->flag |= STREAM_CLOSE;
session_table_delete_by_id(ctx->session_table, meta->session_id);
+ shaper_global_stat_curr_session_dec(ctx->global_stat);
+
return sf;
}
@@ -84,5 +89,7 @@ void shaper_session_data_free_cb(void *session_data, void *data)
shaping_flow_free(sf);
}
+ shaper_global_stat_curr_session_dec(ctx->global_stat);
+
return;
} \ No newline at end of file