summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorliuchang <[email protected]>2023-05-29 10:16:32 +0000
committerliuchang <[email protected]>2023-05-29 10:16:32 +0000
commit66ea2254660e40f055668cfe1f8df3dc24e60475 (patch)
treee64a953738cc44836c46166c280c4d08971d9a28
parent57efeb63d5769c9f1b92b1266780968ad1c30d78 (diff)
add async statistics for global metric
-rw-r--r--conf/shaping.conf1
-rw-r--r--shaping/include/shaper_global_stat.h97
-rw-r--r--shaping/src/main.cpp10
-rw-r--r--shaping/src/shaper.cpp46
-rw-r--r--shaping/src/shaper_global_stat.cpp284
5 files changed, 327 insertions, 111 deletions
diff --git a/conf/shaping.conf b/conf/shaping.conf
index 64d88a0..cc0ab27 100644
--- a/conf/shaping.conf
+++ b/conf/shaping.conf
@@ -31,6 +31,7 @@ SWARMKV_HEALTH_CHECK_ANNOUNCE_PORT=1111
[METRIC]
FIELDSTAT_OUTPUT_INTERVAL_MS=500
+GLOBAL_STAT_OUTPUT_INTERVAL_S=1
LINE_PROTOCOL_SERVER_IP="127.0.0.1"
LINE_PROTOCOL_SERVER_PORT=6667
diff --git a/shaping/include/shaper_global_stat.h b/shaping/include/shaper_global_stat.h
index 60e7e8c..4ab6a2b 100644
--- a/shaping/include/shaper_global_stat.h
+++ b/shaping/include/shaper_global_stat.h
@@ -1,50 +1,105 @@
#include <fieldstat.h>
enum shaping_global_stat_dir {
- SHAPING_GLOBAL_STAT_RX,
+ SHAPING_GLOBAL_STAT_RX = 0,
SHAPING_GLOBAL_STAT_TX
};
enum shaping_global_stat_column_index {
- RX_PKTS_IDX = 0,
+ CURR_SESSION_NUM_IDX = 0,
+ QUEUEING_PKTS_IDX,
+ QUEUEING_BYTES_IDX,
+
+ CTRL_ERR_IDX,
+ CTRL_OPENING_IDX,
+ CTRL_ACTIVE_IDX,
+ CTRL_CLOSE_IDX,
+ CTRL_RESETALL_IDX,
+ SESSION_LOG_SEND_IDX,
+
+ ASYNC_INVOKE_IDX,
+ ASYNC_CALLBACK_IDX,
+ ASYNC_TCONSUME_FAILED,
+ ASYNC_HINCRBY_FAILED,
+ ASYNC_HMGET_FAILED,
+
+ RX_PKTS_IDX,
RX_BYTES_IDX,
TX_PKTS_IDX,
TX_BYTES_IDX,
- QUEUEING_PKTS_IDX,
- QUEUEING_BYTES_IDX,
DROP_PKTS_IDX,
DROP_BYTES_IDX,
- HIT_POLICY_PKTS,
- HIT_POLICY_BYTES,
- CTRL_PKTS_IDX,
- CTRL_OPENING_PKTS_IDX,
- CTRL_ACTIVE_PKTS_IDX,
- CTRL_CLOSE_PKTS_IDX,
- CTRL_RESETALL_PKTS_IDX,
- CURR_SESSION_NUM_IDX,
- SESSION_LOG_SEND_NUM,
+
+ HIT_POLICY_RX_PKTS_IDX,
+ HIT_POLICY_RX_BYTES_IDX,
+ HIT_POLICY_TX_PKTS_IDX,
+ HIT_POLICY_TX_BYTES_IDX,
+ HIT_POLICY_DROP_PKTS_IDX,
+ HIT_POLICY_DROP_BYTES_IDX,
+
GLOBAL_STAT_COLUNM_IDX_MAX
};
+struct shaping_global_stat_traffic_data {
+ long long rx_pkts;
+ long long rx_bytes;
+ long long tx_pkts;
+ long long tx_bytes;
+ long long drop_pkts;
+ long long drop_bytes;
+};
+
+struct shaping_global_stat_data {
+ long long curr_session_num;
+ long long queueing_pkts;
+ long long queueing_bytes;
+ long long ctrl_error;
+ long long ctrl_opening;
+ long long ctrl_active;
+ long long ctrl_close;
+ long long ctrl_resetall;
+ long long session_log_send;
+ long long async_invoke;
+ long long async_callback;
+ long long async_tconsume_failed;
+ long long async_hincrby_failed;
+ long long async_hmget_failed;
+ struct shaping_global_stat_traffic_data all_traffic;
+ struct shaping_global_stat_traffic_data hit_policy_traffic;
+};
+
struct shaping_global_stat {
struct fieldstat_instance *instance;
- int table_id;
int column_ids[GLOBAL_STAT_COLUNM_IDX_MAX];
+ struct shaping_global_stat_data local_stat_data;
+ int output_interval_s;
};
struct shaping_global_stat* shaper_global_stat_init();
void shaper_global_stat_destroy(struct shaping_global_stat *stat);
-void shaper_global_stat_throughput_inc(struct shaping_global_stat *stat, enum shaping_global_stat_dir dir, int pkt_len);
-void shaper_global_stat_drop_inc(struct shaping_global_stat *stat, int pkt_len);
+void shaper_global_stat_curr_session_inc(struct shaping_global_stat *stat);
+void shaper_global_stat_curr_session_dec(struct shaping_global_stat *stat);
void shaper_global_stat_queueing_inc(struct shaping_global_stat *stat, int pkt_len);
void shaper_global_stat_queueing_dec(struct shaping_global_stat *stat, int pkt_len);
-void shaper_global_stat_hit_policy_inc(struct shaping_global_stat *stat, int pkt_len);
-void shaper_global_stat_ctrlpkt_inc(struct shaping_global_stat *stat);
+
+void shaper_global_stat_ctrlpkt_err_inc(struct shaping_global_stat *stat);
void shaper_global_stat_ctrlpkt_opening_inc(struct shaping_global_stat *stat);
void shaper_global_stat_ctrlpkt_active_inc(struct shaping_global_stat *stat);
void shaper_global_stat_ctrlpkt_close_inc(struct shaping_global_stat *stat);
void shaper_global_stat_ctrlpkt_resetall_inc(struct shaping_global_stat *stat);
-void shaper_global_stat_curr_session_inc(struct shaping_global_stat *stat);
-void shaper_global_stat_curr_session_dec(struct shaping_global_stat *stat);
-void shaper_global_stat_session_log_send_num_inc(struct shaping_global_stat *stat); \ No newline at end of file
+void shaper_global_stat_session_log_send_num_inc(struct shaping_global_stat *stat);
+
+void shaper_global_stat_async_invoke_inc(struct shaping_global_stat *stat);
+void shaper_global_stat_async_callback_inc(struct shaping_global_stat *stat);
+void shaper_global_stat_async_tconsume_failed_inc(struct shaping_global_stat *stat);
+void shaper_global_stat_async_hincrby_failed_inc(struct shaping_global_stat *stat);
+void shaper_global_stat_async_hmget_failed_inc(struct shaping_global_stat *stat);
+
+void shaper_global_stat_throughput_inc(struct shaping_global_stat *stat, enum shaping_global_stat_dir dir, int pkt_len);
+void shaper_global_stat_drop_inc(struct shaping_global_stat *stat, int pkt_len);
+
+void shaper_global_stat_hit_policy_throughput_inc(struct shaping_global_stat *stat, enum shaping_global_stat_dir dir, int pkt_len);
+void shaper_global_stat_hit_policy_drop_inc(struct shaping_global_stat *stat, int pkt_len);
+
+void shaper_global_stat_refresh(struct shaping_global_stat *stat); \ No newline at end of file
diff --git a/shaping/src/main.cpp b/shaping/src/main.cpp
index 2291db5..f07787f 100644
--- a/shaping/src/main.cpp
+++ b/shaping/src/main.cpp
@@ -11,6 +11,7 @@
#include "shaper_stat.h"
#include "shaper_marsio.h"
#include "shaper_session.h"
+#include "shaper_global_stat.h"
static void *shaper_thread_loop(void *data)
{
@@ -46,6 +47,7 @@ static void sig_handler(int signo)
int main(int argc, char **argv)
{
struct shaping_ctx *ctx = NULL;
+ time_t last_update_time = time(NULL);
if (LOG_INIT("./conf/zlog.conf") == -1)
{
@@ -68,9 +70,13 @@ int main(int argc, char **argv)
pthread_create(&ctx->thread_ctx[i].tid, NULL, shaper_thread_loop, &ctx->thread_ctx[i]);
}
- //TODO:主线程保留?
while(1) {
- sleep(1);
+ time_t curr_time = time(NULL);
+ if (curr_time - last_update_time >= ctx->global_stat->output_interval_s) {
+ shaper_global_stat_refresh(ctx->global_stat);
+ last_update_time = curr_time;
+ }
+ sleep(ctx->global_stat->output_interval_s);
}
return 0;
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index 8f5f43d..5ca5834 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -226,6 +226,7 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx)
shaper_stat_drop_inc(&rule->primary.stat, pkt_wrapper->length, ctx->thread_index);
shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length);
shaper_global_stat_drop_inc(ctx->global_stat, pkt_wrapper->length);
+ shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, pkt_wrapper->length);
marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index);
shaper_packet_dequeue(sf);
@@ -234,8 +235,16 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx)
return;
}
-static void swarmkv_reply_cb_do_nothing(const struct swarmkv_reply *reply, void * arg)
+static void swarmkv_reply_cb_do_nothing(const struct swarmkv_reply *reply, void * cb_arg)
{
+ struct shaping_global_stat *global_stat = (struct shaping_global_stat *)cb_arg;
+
+ shaper_global_stat_async_callback_inc(global_stat);
+
+ if (reply->type != SWARMKV_REPLY_INTEGER) {
+ shaper_global_stat_async_hincrby_failed_inc(global_stat);
+ }
+
return;
}
@@ -264,7 +273,8 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un
if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {
ret = 0;
if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) {
- swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->primary.id, priority);
+ shaper_global_stat_async_invoke_inc(ctx->global_stat);
+ swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->primary.id, priority);
}
shaper_stat_queueing_pkt_inc(&s_rule_info->primary.stat, pkt_wrapper->direction, ctx->thread_index);
s_rule_info->primary.enqueue_time_us = enqueue_time;
@@ -280,7 +290,8 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un
if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {
ret = 0;
if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) {
- swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->borrowing[i].id, priority);
+ shaper_global_stat_async_invoke_inc(ctx->global_stat);
+ swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->borrowing[i].id, priority);
}
s_rule_info->borrowing[i].enqueue_time_us = enqueue_time;
}
@@ -318,7 +329,8 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
if (avl_node_in_tree(s_node->avl_node[priority])) {
avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]);
if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) {
- swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->primary.id, priority);
+ shaper_global_stat_async_invoke_inc(ctx->global_stat);
+ swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->primary.id, priority);
}
shaper_stat_queueing_pkt_dec(&s_rule_info->primary.stat, pkt_wrapper->direction, ctx->thread_index);
@@ -336,7 +348,8 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
if (avl_node_in_tree(s_node->avl_node[priority])) {
avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]);
if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) {
- swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->borrowing[i].id, priority);
+ shaper_global_stat_async_invoke_inc(ctx->global_stat);
+ swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->borrowing[i].id, priority);
}
latency = shaper_pkt_latency_calculate(&s_rule_info->borrowing[i], &curr_time);
@@ -387,7 +400,12 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg
struct shaping_profile_info *s_pf_info = arg->s_pf_info;
struct shaping_flow *sf = arg->sf;
- assert(reply->type == SWARMKV_REPLY_INTEGER);
+ shaper_global_stat_async_callback_inc(arg->ctx->global_stat);
+
+ if (reply->type != SWARMKV_REPLY_INTEGER) {
+ shaper_global_stat_async_tconsume_failed_inc(arg->ctx->global_stat);
+ goto END;
+ }
if (reply->integer < 0) {//profile not exist
s_pf_info->is_invalid = 1;
@@ -447,6 +465,7 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
arg->sf = sf;
arg->direction = direction;
+ shaper_global_stat_async_invoke_inc(ctx->global_stat);
swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits, shaper_token_get_cb, arg);
if (__atomic_load_n(&pf_info->async_token_ref_count, __ATOMIC_SEQ_CST) != 0) {//has async operation not completed
shaper_deposit_token_sub(pf_info, req_token_bits, direction);
@@ -475,9 +494,12 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb
struct shaping_profile_info *s_pf_info = arg->s_pf_info;
struct shaping_flow *sf = arg->sf;
+ shaper_global_stat_async_callback_inc(arg->ctx->global_stat);
+
s_pf_info->is_priority_blocked = 0;
if (!reply || reply->type != SWARMKV_REPLY_ARRAY) {
+ shaper_global_stat_async_hmget_failed_inc(arg->ctx->global_stat);
goto END;
}
@@ -516,6 +538,7 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st
__atomic_add_fetch(&profile->async_queue_len_ref_count, 1, __ATOMIC_SEQ_CST);
__atomic_add_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST);
+ shaper_global_stat_async_invoke_inc(ctx->global_stat);
swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, swarmkv_queue_len_get_cmd[priority], profile->id);
if (__atomic_load_n(&profile->async_queue_len_ref_count, __ATOMIC_SEQ_CST) != 0) {
@@ -726,6 +749,7 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
case SHAPING_DROP:
shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length);
shaper_global_stat_drop_inc(ctx->global_stat, pkt_wrapper->length);
+ shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, pkt_wrapper->length);
marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index);
shaper_packet_dequeue(sf);
@@ -733,6 +757,7 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
case SHAPING_FORWARD:
shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length);
shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, pkt_wrapper->length);
+ shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, pkt_wrapper->length);
marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &pkt_wrapper->pkt_buff, 1);
shaper_packet_dequeue(sf);
@@ -783,6 +808,7 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
if (meta->is_tcp_pure_ctrl) {
marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1);
shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len);
+ shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len);
shaper_stat_forward_all_rule_inc(stat, sf, meta->dir, meta->raw_len, ctx->thread_index);
goto END;//for tcp pure control pkt, transmit it directly
}
@@ -796,6 +822,7 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
} else {
shaper_stat_drop_inc(&s_rule->primary.stat, meta->dir, ctx->thread_index);
shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len);
+ shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, meta->raw_len);
marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index);
}
@@ -803,6 +830,7 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
if (0 != shaper_packet_enqueue(ctx, sf, rx_buff, &curr_time, meta)) {
marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index);
shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len);
+ shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, meta->raw_len);
LOG_ERROR("%s: shaping enqueue packet failed while queue empty for session: %s", LOG_TAG_SHAPING, addr_tuple4_to_str(&sf->tuple4));
goto END;
}
@@ -818,11 +846,13 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index);
shaper_packet_dequeue(sf);
shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len);
+ shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, meta->raw_len);
break;
case SHAPING_FORWARD:
marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1);
shaper_packet_dequeue(sf);
shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len);
+ shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len);
break;
default:
assert(0);
@@ -905,6 +935,7 @@ static struct shaping_flow* shaper_ctrl_pkt_session_handle(struct shaping_thread
sf = shaper_session_reset_all(ctx, meta);
break;
default:
+ shaper_global_stat_ctrlpkt_err_inc(ctx->global_stat);
assert(0);
}
@@ -926,7 +957,7 @@ static struct shaping_flow *shaper_raw_pkt_session_handle(struct shaping_thread_
session_node = session_table_search_by_id(ctx->session_table, meta->session_id);
if (session_node) {
sf = (struct shaping_flow *)session_node->val_data;
- shaper_global_stat_hit_policy_inc(ctx->global_stat, meta->raw_len);
+ shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_RX, meta->raw_len);
}
return sf;
@@ -948,7 +979,6 @@ void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx)
for (i = 0; i < rx_num; i++) {
if (marsio_buff_is_ctrlbuf(rx_buff[i])) {
- shaper_global_stat_ctrlpkt_inc(ctx->global_stat);
sf = shaper_ctrl_pkt_session_handle(ctx, rx_buff[i], &meta);
} else {
sf = shaper_raw_pkt_session_handle(ctx, rx_buff[i], &meta);
diff --git a/shaping/src/shaper_global_stat.cpp b/shaping/src/shaper_global_stat.cpp
index ef915d1..37138b1 100644
--- a/shaping/src/shaper_global_stat.cpp
+++ b/shaping/src/shaper_global_stat.cpp
@@ -8,74 +8,73 @@
#include "shaper.h"
#include "shaper_global_stat.h"
-struct shaper_global_stat_conf {
- int enable_backgroud_thread;
- int output_interval_ms;
-};
-
-static int shaper_global_stat_conf_load(struct shaper_global_stat_conf *conf)
+static int shaper_global_stat_conf_load(struct shaping_global_stat *stat)
{
- memset(conf, 0, sizeof(struct shaper_global_stat_conf));
-
- MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "FIELDSTAT_OUTPUT_INTERVAL_MS", &conf->output_interval_ms, 500);
- MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "FIELDSTAT_ENABLE_BACKGRUND_THREAD", &conf->enable_backgroud_thread, 1);
+ MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "GLOBAL_STAT_OUTPUT_INTERVAL_S", &stat->output_interval_s, 1);
return 0;
}
+
+static void shaper_global_stat_fieldstat_reg(struct shaping_global_stat *stat)
+{
+ stat->column_ids[CURR_SESSION_NUM_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_GAUGE, "curr_session_num", NULL, 0);
+ stat->column_ids[QUEUEING_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_GAUGE, "curr_queueing_pkts", NULL, 0);
+ stat->column_ids[QUEUEING_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_GAUGE, "curr_queueing_bytes", NULL, 0);
+
+ stat->column_ids[CTRL_ERR_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_error", NULL, 0);
+ stat->column_ids[CTRL_OPENING_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_opening", NULL, 0);
+ stat->column_ids[CTRL_ACTIVE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_active", NULL, 0);
+ stat->column_ids[CTRL_CLOSE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_close", NULL, 0);
+ stat->column_ids[CTRL_RESETALL_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "ctrl_resetall", NULL, 0);
+ stat->column_ids[SESSION_LOG_SEND_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "session_log_send", NULL, 0);
+
+ stat->column_ids[ASYNC_INVOKE_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "swarmkv_async_invoke", NULL, 0);
+ stat->column_ids[ASYNC_CALLBACK_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "swarmkv_async_callback", NULL, 0);
+ stat->column_ids[ASYNC_TCONSUME_FAILED] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "swarmkv_tconsume_failed", NULL, 0);
+ stat->column_ids[ASYNC_HINCRBY_FAILED] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "swarmkv_hincrby_failed", NULL, 0);
+ stat->column_ids[ASYNC_HMGET_FAILED] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "swarmkv_hmget_failed", NULL, 0);
+
+ stat->column_ids[RX_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "rx_pkts", NULL, 0);
+ stat->column_ids[RX_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "rx_bytes", NULL, 0);
+ stat->column_ids[TX_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "tx_pkts", NULL, 0);
+ stat->column_ids[TX_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "tx_bytes", NULL, 0);
+ stat->column_ids[DROP_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "drop_pkts", NULL, 0);
+ stat->column_ids[DROP_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "drop_bytes", NULL, 0);
+
+ stat->column_ids[HIT_POLICY_RX_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hit_policy_rx_pkts", NULL, 0);
+ stat->column_ids[HIT_POLICY_RX_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hit_policy_rx_bytes", NULL, 0);
+ stat->column_ids[HIT_POLICY_TX_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hit_policy_tx_pkts", NULL, 0);
+ stat->column_ids[HIT_POLICY_TX_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hit_policy_tx_bytes", NULL, 0);
+ stat->column_ids[HIT_POLICY_DROP_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hit_policy_drop_pkts", NULL, 0);
+ stat->column_ids[HIT_POLICY_DROP_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "hit_policy_drop_bytes", NULL, 0);
+
+ return;
+}
+
struct shaping_global_stat* shaper_global_stat_init()
{
struct shaping_global_stat *stat = NULL;
int column_num;
- struct shaper_global_stat_conf conf;
- const char *column_name[] = {"rx_pkts", "rx_bytes", "tx_pkts", "tx_bytes", "queueing_pkts",
- "queueing_bytes", "drop_pkts", "drop_bytes", "hit_policy_pkts", "hit_policy_bytes",
- "ctrl_pkts", "ctrl_opening_pkts", "ctrl_active_pkts", "ctrl_close_pkts", "ctrl_resetall_pkts",
- "curr_session_num", "session_log_send_num"};
- enum field_type column_type[] = {FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_GAUGE,
- FIELD_TYPE_GAUGE, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER,
- FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER,
- FIELD_TYPE_GAUGE, FIELD_TYPE_COUNTER};
-
- column_num = sizeof(column_name)/sizeof(column_name[0]);
- if (column_num != GLOBAL_STAT_COLUNM_IDX_MAX) {
- LOG_ERROR("%s: shaping globat init fieldstat failed, column_num %d != index num %d", LOG_TAG_STAT, column_num, GLOBAL_STAT_COLUNM_IDX_MAX);
- goto ERROR;
- }
- if (shaper_global_stat_conf_load(&conf) != 0) {
+ stat = (struct shaping_global_stat*)calloc(1, sizeof(struct shaping_global_stat));
+
+ if (shaper_global_stat_conf_load(stat) != 0) {
LOG_ERROR("%s: shaping init metric conf failed", LOG_TAG_STAT);
goto ERROR;
}
-
- stat = (struct shaping_global_stat*)calloc(1, sizeof(struct shaping_global_stat));
+
stat->instance = fieldstat_instance_new("shaping_global");
if (stat->instance == NULL) {
LOG_ERROR("%s: shaping global init fieldstat instance failed", LOG_TAG_STAT);
goto ERROR;
}
- stat->table_id = fieldstat_register_table(stat->instance, "shaping_global_metrics", column_name, column_type, column_num);
- if (stat->table_id < 0) {
- LOG_ERROR("%s: shaping global fieldstat register table failed", LOG_TAG_STAT);
- goto ERROR;
- }
-
- if (fieldstat_register_table_row(stat->instance, stat->table_id, "shaping_global_metric_row", NULL, 0, stat->column_ids) != 0) {
- LOG_ERROR("%s: shaping global fieldstat register table row failed", LOG_TAG_STAT);
- goto ERROR;
- }
+ shaper_global_stat_fieldstat_reg(stat);
- if (conf.enable_backgroud_thread == 0) {
- fieldstat_disable_background_thread(stat->instance);
- }
+ fieldstat_disable_background_thread(stat->instance);
fieldstat_set_local_output(stat->instance, "shaping_global_metric", "json");
- if (fieldstat_set_output_interval(stat->instance, conf.output_interval_ms) != 0) {
- LOG_ERROR("%s: shaping global set fieldstat output interval failed", LOG_TAG_STAT);
- goto ERROR;
- }
-
if (fieldstat_global_enable_prometheus_endpoint(9007, NULL) != 0) {
LOG_ERROR("%s: shaping global fieldstat enable prometheus endpoint failed", LOG_TAG_STAT);
goto ERROR;
@@ -114,103 +113,228 @@ void shaper_global_stat_destroy(struct shaping_global_stat *stat)
return;
}
-void shaper_global_stat_throughput_inc(struct shaping_global_stat *stat, enum shaping_global_stat_dir dir, int pkt_len)
+void shaper_global_stat_curr_session_inc(struct shaping_global_stat *stat)
{
- if (dir == SHAPING_GLOBAL_STAT_RX) {
- fieldstat_value_incrby(stat->instance, stat->column_ids[RX_PKTS_IDX], 1);
- fieldstat_value_incrby(stat->instance, stat->column_ids[RX_BYTES_IDX], pkt_len);
- } else {
- fieldstat_value_incrby(stat->instance, stat->column_ids[TX_PKTS_IDX], 1);
- fieldstat_value_incrby(stat->instance, stat->column_ids[TX_BYTES_IDX], pkt_len);
- }
+ struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
+
+ __atomic_add_fetch(&local_stat_data->curr_session_num, 1, __ATOMIC_RELAXED);
return;
}
-void shaper_global_stat_drop_inc(struct shaping_global_stat *stat, int pkt_len)
+void shaper_global_stat_curr_session_dec(struct shaping_global_stat *stat)
{
- fieldstat_value_incrby(stat->instance, stat->column_ids[DROP_PKTS_IDX], 1);
- fieldstat_value_incrby(stat->instance, stat->column_ids[DROP_BYTES_IDX], pkt_len);
+ struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
+
+ __atomic_sub_fetch(&local_stat_data->curr_session_num, 1, __ATOMIC_RELAXED);
return;
}
void shaper_global_stat_queueing_inc(struct shaping_global_stat *stat, int pkt_len)
{
- fieldstat_value_incrby(stat->instance, stat->column_ids[QUEUEING_PKTS_IDX], 1);
- fieldstat_value_incrby(stat->instance, stat->column_ids[QUEUEING_BYTES_IDX], pkt_len);
+ struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
+
+ __atomic_add_fetch(&local_stat_data->queueing_pkts, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&local_stat_data->queueing_bytes, pkt_len, __ATOMIC_RELAXED);
return;
}
void shaper_global_stat_queueing_dec(struct shaping_global_stat *stat, int pkt_len)
{
- fieldstat_value_decrby(stat->instance, stat->column_ids[QUEUEING_PKTS_IDX], 1);
- fieldstat_value_decrby(stat->instance, stat->column_ids[QUEUEING_BYTES_IDX], pkt_len);
-
- return;
-}
+ struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
-void shaper_global_stat_hit_policy_inc(struct shaping_global_stat *stat, int pkt_len)
-{
- fieldstat_value_incrby(stat->instance, stat->column_ids[HIT_POLICY_PKTS], 1);
- fieldstat_value_incrby(stat->instance, stat->column_ids[HIT_POLICY_BYTES], pkt_len);
+ __atomic_sub_fetch(&local_stat_data->queueing_pkts, 1, __ATOMIC_RELAXED);
+ __atomic_sub_fetch(&local_stat_data->queueing_bytes, pkt_len, __ATOMIC_RELAXED);
return;
}
-void shaper_global_stat_ctrlpkt_inc(struct shaping_global_stat *stat)
+void shaper_global_stat_ctrlpkt_err_inc(struct shaping_global_stat *stat)
{
- fieldstat_value_incrby(stat->instance, stat->column_ids[CTRL_PKTS_IDX], 1);
+ struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
+
+ __atomic_add_fetch(&local_stat_data->ctrl_error, 1, __ATOMIC_RELAXED);
return;
}
void shaper_global_stat_ctrlpkt_opening_inc(struct shaping_global_stat *stat)
{
- fieldstat_value_incrby(stat->instance, stat->column_ids[CTRL_OPENING_PKTS_IDX], 1);
+ struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
+
+ __atomic_add_fetch(&local_stat_data->ctrl_opening, 1, __ATOMIC_RELAXED);
return;
}
void shaper_global_stat_ctrlpkt_active_inc(struct shaping_global_stat *stat)
{
- fieldstat_value_incrby(stat->instance, stat->column_ids[CTRL_ACTIVE_PKTS_IDX], 1);
+ struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
+
+ __atomic_add_fetch(&local_stat_data->ctrl_active, 1, __ATOMIC_RELAXED);
return;
}
void shaper_global_stat_ctrlpkt_close_inc(struct shaping_global_stat *stat)
{
- fieldstat_value_incrby(stat->instance, stat->column_ids[CTRL_CLOSE_PKTS_IDX], 1);
+ struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
+
+ __atomic_add_fetch(&local_stat_data->ctrl_close, 1, __ATOMIC_RELAXED);
return;
}
void shaper_global_stat_ctrlpkt_resetall_inc(struct shaping_global_stat *stat)
{
- fieldstat_value_incrby(stat->instance, stat->column_ids[CTRL_RESETALL_PKTS_IDX], 1);
+ struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
+
+ __atomic_add_fetch(&local_stat_data->ctrl_resetall, 1, __ATOMIC_RELAXED);
return;
}
-void shaper_global_stat_curr_session_inc(struct shaping_global_stat *stat)
+void shaper_global_stat_session_log_send_num_inc(struct shaping_global_stat *stat)
{
- fieldstat_value_incrby(stat->instance, stat->column_ids[CURR_SESSION_NUM_IDX], 1);
+ struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
+
+ __atomic_add_fetch(&local_stat_data->session_log_send, 1, __ATOMIC_RELAXED);
return;
}
-void shaper_global_stat_curr_session_dec(struct shaping_global_stat *stat)
+void shaper_global_stat_async_invoke_inc(struct shaping_global_stat *stat)
{
- fieldstat_value_decrby(stat->instance, stat->column_ids[CURR_SESSION_NUM_IDX], 1);
+ struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
+
+ __atomic_add_fetch(&local_stat_data->async_invoke, 1, __ATOMIC_RELAXED);
return;
}
-void shaper_global_stat_session_log_send_num_inc(struct shaping_global_stat *stat)
+void shaper_global_stat_async_callback_inc(struct shaping_global_stat *stat)
+{
+ struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
+
+ __atomic_add_fetch(&local_stat_data->async_callback, 1, __ATOMIC_RELAXED);
+
+ return;
+}
+
+void shaper_global_stat_async_tconsume_failed_inc(struct shaping_global_stat *stat)
+{
+ struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
+
+ __atomic_add_fetch(&local_stat_data->async_tconsume_failed, 1, __ATOMIC_RELAXED);
+
+ return;
+}
+
+void shaper_global_stat_async_hincrby_failed_inc(struct shaping_global_stat *stat)
+{
+ struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
+
+ __atomic_add_fetch(&local_stat_data->async_hincrby_failed, 1, __ATOMIC_RELAXED);
+
+ return;
+}
+
+void shaper_global_stat_async_hmget_failed_inc(struct shaping_global_stat *stat)
{
- fieldstat_value_incrby(stat->instance, stat->column_ids[SESSION_LOG_SEND_NUM], 1);
+ struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
+
+ __atomic_add_fetch(&local_stat_data->async_hmget_failed, 1, __ATOMIC_RELAXED);
+
+ return;
+}
+
+void shaper_global_stat_throughput_inc(struct shaping_global_stat *stat, enum shaping_global_stat_dir dir, int pkt_len)
+{
+ struct shaping_global_stat_traffic_data *data = &stat->local_stat_data.all_traffic;
+ if (dir == SHAPING_GLOBAL_STAT_RX) {
+ __atomic_add_fetch(&data->rx_pkts, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&data->rx_bytes, pkt_len, __ATOMIC_RELAXED);
+ } else {
+ __atomic_add_fetch(&data->tx_pkts, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&data->tx_bytes, pkt_len, __ATOMIC_RELAXED);
+ }
return;
+}
+
+void shaper_global_stat_drop_inc(struct shaping_global_stat *stat, int pkt_len)
+{
+ struct shaping_global_stat_traffic_data *data = &stat->local_stat_data.all_traffic;
+
+ __atomic_add_fetch(&data->drop_pkts, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&data->drop_bytes, pkt_len, __ATOMIC_RELAXED);
+
+ return;
+}
+
+void shaper_global_stat_hit_policy_throughput_inc(struct shaping_global_stat *stat, enum shaping_global_stat_dir dir, int pkt_len)
+{
+ struct shaping_global_stat_traffic_data *data = &stat->local_stat_data.hit_policy_traffic;
+
+ if (dir == SHAPING_GLOBAL_STAT_RX) {
+ __atomic_add_fetch(&data->rx_pkts, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&data->rx_bytes, pkt_len, __ATOMIC_RELAXED);
+ } else {
+ __atomic_add_fetch(&data->tx_pkts, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&data->tx_bytes, pkt_len, __ATOMIC_RELAXED);
+ }
+
+ return;
+}
+
+void shaper_global_stat_hit_policy_drop_inc(struct shaping_global_stat *stat, int pkt_len)
+{
+ struct shaping_global_stat_traffic_data *data = &stat->local_stat_data.hit_policy_traffic;
+
+ __atomic_add_fetch(&data->drop_pkts, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&data->drop_bytes, pkt_len, __ATOMIC_RELAXED);
+
+ return;
+}
+
+void shaper_global_stat_refresh(struct shaping_global_stat *stat)
+{
+ struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
+ struct shaping_global_stat_traffic_data *all_traffic_data = &stat->local_stat_data.all_traffic;
+ struct shaping_global_stat_traffic_data *hit_policy_traffic_data = &stat->local_stat_data.hit_policy_traffic;
+
+ fieldstat_value_set(stat->instance, stat->column_ids[CURR_SESSION_NUM_IDX], local_stat_data->curr_session_num);
+ fieldstat_value_set(stat->instance, stat->column_ids[QUEUEING_PKTS_IDX], local_stat_data->queueing_pkts);
+ fieldstat_value_set(stat->instance, stat->column_ids[QUEUEING_BYTES_IDX], local_stat_data->queueing_bytes);
+
+ fieldstat_value_set(stat->instance, stat->column_ids[CTRL_ERR_IDX], local_stat_data->ctrl_error);
+ fieldstat_value_set(stat->instance, stat->column_ids[CTRL_OPENING_IDX], local_stat_data->ctrl_opening);
+ fieldstat_value_set(stat->instance, stat->column_ids[CTRL_ACTIVE_IDX], local_stat_data->ctrl_active);
+ fieldstat_value_set(stat->instance, stat->column_ids[CTRL_CLOSE_IDX], local_stat_data->ctrl_close);
+ fieldstat_value_set(stat->instance, stat->column_ids[CTRL_RESETALL_IDX], local_stat_data->ctrl_resetall);
+ fieldstat_value_set(stat->instance, stat->column_ids[SESSION_LOG_SEND_IDX], local_stat_data->session_log_send);
+
+ fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_INVOKE_IDX], local_stat_data->async_invoke);
+ fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_CALLBACK_IDX], local_stat_data->async_callback);
+ fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_TCONSUME_FAILED], local_stat_data->async_tconsume_failed);
+ fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_HINCRBY_FAILED], local_stat_data->async_hincrby_failed);
+ fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_HMGET_FAILED], local_stat_data->async_hmget_failed);
+
+ fieldstat_value_set(stat->instance, stat->column_ids[RX_PKTS_IDX], all_traffic_data->rx_pkts);
+ fieldstat_value_set(stat->instance, stat->column_ids[RX_BYTES_IDX], all_traffic_data->rx_bytes);
+ fieldstat_value_set(stat->instance, stat->column_ids[TX_PKTS_IDX], all_traffic_data->tx_pkts);
+ fieldstat_value_set(stat->instance, stat->column_ids[TX_BYTES_IDX], all_traffic_data->tx_bytes);
+ fieldstat_value_set(stat->instance, stat->column_ids[DROP_PKTS_IDX], all_traffic_data->drop_pkts);
+ fieldstat_value_set(stat->instance, stat->column_ids[DROP_BYTES_IDX], all_traffic_data->drop_pkts);
+
+ fieldstat_value_set(stat->instance, stat->column_ids[HIT_POLICY_RX_PKTS_IDX], hit_policy_traffic_data->rx_pkts);
+ fieldstat_value_set(stat->instance, stat->column_ids[HIT_POLICY_RX_BYTES_IDX], hit_policy_traffic_data->rx_bytes);
+ fieldstat_value_set(stat->instance, stat->column_ids[HIT_POLICY_TX_PKTS_IDX], hit_policy_traffic_data->tx_pkts);
+ fieldstat_value_set(stat->instance, stat->column_ids[HIT_POLICY_TX_BYTES_IDX], hit_policy_traffic_data->tx_bytes);
+ fieldstat_value_set(stat->instance, stat->column_ids[HIT_POLICY_DROP_PKTS_IDX], hit_policy_traffic_data->drop_pkts);
+ fieldstat_value_set(stat->instance, stat->column_ids[HIT_POLICY_DROP_BYTES_IDX], hit_policy_traffic_data->drop_bytes);
+
+ fieldstat_passive_output(stat->instance);
} \ No newline at end of file