summaryrefslogtreecommitdiff
path: root/shaping/src/shaper_stat.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'shaping/src/shaper_stat.cpp')
-rw-r--r--shaping/src/shaper_stat.cpp314
1 files changed, 124 insertions, 190 deletions
diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp
index 9ed1669..6c9644b 100644
--- a/shaping/src/shaper_stat.cpp
+++ b/shaping/src/shaper_stat.cpp
@@ -2,297 +2,231 @@
#include <time.h>
#include <sys/socket.h>
#include <arpa/inet.h>
+#include <MESA/MESA_prof_load.h>
+#include <fieldstat.h>
-#include <MESA/stream.h>
-
+#include "log.h"
+#include "utils.h"
#include "shaper.h"
#include "shaper_stat.h"
+struct shaper_stat_conf {
+ int enable_backgroud_thread;
+ int output_interval_ms;
+ char telegraf_ip[16];
+ short telegraf_port;
+};
-#define SHAPING_STAT_SEND_INTERVAL_SEC 1 //unit: second
-#define SHAPING_STAT_SEND_INTERVAL_NS 500000000 //unit: nano second
-
-#define SHAPING_STAT_FORMAT "SHAPING-STAT,rule_id=%d,profile_id=%d,priority=%d,profile_type=%s "\
- "queueing_sessions=%d,in_rx_pkts=%llu,in_rx_bytes=%llu,"\
- "in_tx_pkts=%llu,in_tx_bytes=%llu,in_drop_pkts=%llu,in_max_latency_us=%llu,in_queue_len=%lld,"\
- "out_rx_pkts=%llu,out_rx_bytes=%llu,out_tx_pkts=%llu,out_tx_bytes=%llu,"\
- "out_drop_pkts=%llu,out_max_latency_us=%llu,out_queue_len=%lld"
-
-static void shaper_stat_counter_clear(struct shaping_stat_data *s)
-{
- long long in_queue_len, out_queue_len;
- struct shaping_stat_data_dir *in = &s->incoming;
- struct shaping_stat_data_dir *out = &s->outgoing;
-
- in_queue_len = in->queue_len;//queue_len is gauge metric, do not clear
- out_queue_len = out->queue_len;
-
- memset(in, 0, sizeof(struct shaping_stat_data_dir));
- memset(out, 0, sizeof(struct shaping_stat_data_dir));
-
- in->queue_len = in_queue_len;
- out->queue_len = out_queue_len;
-
- return;
-}
-
-static void shaper_stat_data_send(struct shaping_stat *stat, struct shaping_stat_data *s)
-{
- char buf[1024];
- struct shaping_stat_data_dir *in = &s->incoming;
- struct shaping_stat_data_dir *out = &s->outgoing;
-
- snprintf(buf, sizeof(buf), SHAPING_STAT_FORMAT, s->key.rule_id, s->key.profile_id, s->key.priority,
- s->key.profile_type == SHAPING_PROFILE_TYPE_PRIMARY ? "primary" : "borrow",
- s->queueing_session_num, in->rx_pkts, in->rx_bytes, in->tx_pkts, in->tx_bytes,
- in->drop_pkts, in->max_latency, in->queue_len, out->rx_pkts, out->rx_bytes, out->tx_pkts, out->tx_bytes,
- out->drop_pkts, out->max_latency, out->queue_len);
-
- sendto(stat->sock_fd, buf, strlen(buf), 0, (struct sockaddr*)&stat->sock_addr, sizeof(stat->sock_addr));
-
- shaper_stat_counter_clear(s);
- return;
-}
+thread_local struct fieldstat_tag tags[TAG_IDX_MAX];
-static void shaper_stat_data_send_free(struct shaping_stat *stat, struct shaping_stat_data **stat_hashtbl)
+void shaper_stat_destroy(struct shaping_stat *stat)
{
- struct shaping_stat_data *s, *tmp = NULL;
-
- if (!stat || !*stat_hashtbl) {
+ if (!stat) {
return;
}
- HASH_ITER(hh, *stat_hashtbl, s, tmp) {
- shaper_stat_data_send(stat, s);
- HASH_DEL(*stat_hashtbl, s);
- free(s);
+ if (stat->instance) {
+ fieldstat_dynamic_instance_free(stat->instance);
}
+ free(stat);
return;
}
-void shaper_stat_send_free(struct shaping_stat *stat)
+static int shaper_stat_conf_load(struct shaper_stat_conf *conf)
{
- if (!stat) {
- return;
- }
+ memset(conf, 0, sizeof(struct shaper_stat_conf));
- if (stat->stat_hashtbl) {
- shaper_stat_data_send_free(stat, &stat->stat_hashtbl);
- }
- free(stat);
+ MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "TELEGRAF_IP", conf->telegraf_ip, sizeof(conf->telegraf_ip), "127.0.0.1");
+ MESA_load_profile_short_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "TELEGRAF_PORT", &conf->telegraf_port, 6379);
+ MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "FIELDSTAT_OUTPUT_INTERVAL_MS", &conf->output_interval_ms, 500);
+ MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "FIELDSTAT_ENABLE_BACKGRUND_THREAD", &conf->enable_backgroud_thread, 1);
- return;
+ return 0;
}
-void shaper_stat_send(struct shaping_stat *stat, struct shaping_stat_data **stat_hashtbl)
+struct shaping_stat* shaper_stat_init(int thread_num)
{
- struct shaping_stat_data *s, *tmp = NULL;
- struct timespec curr_time;
-
- if (!stat || !*stat_hashtbl) {
- return;
+ struct shaping_stat *stat = NULL;
+ int column_num;
+ struct shaper_stat_conf conf;
+ const char *column_name[] = {"queueing_sessions", "in_max_latency_us", "in_queue_len", "out_max_latency_us", "out_queue_len", //first line is gauge, second line is counter
+ "in_pkts", "in_bytes", "in_drop_pkts", "out_pkts", "out_bytes", "out_drop_pkts"};
+ enum field_type column_type[] = {FIELD_TYPE_GAUGE, FIELD_TYPE_GAUGE, FIELD_TYPE_GAUGE, FIELD_TYPE_GAUGE, FIELD_TYPE_GAUGE,
+ FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER};
+
+ column_num = sizeof(column_name)/sizeof(column_name[0]);
+ if (column_num != STAT_COLUNM_IDX_MAX) {
+ LOG_ERROR("%s: shaping init fieldstat failed, column_num %d != index num %d", LOG_TAG_STAT, column_num, STAT_COLUNM_IDX_MAX);
+ goto ERROR;
}
- clock_gettime(CLOCK_MONOTONIC, &curr_time);
- if (curr_time.tv_sec - stat->update_time.tv_sec >= SHAPING_STAT_SEND_INTERVAL_SEC||
- curr_time.tv_nsec - stat->update_time.tv_nsec >= SHAPING_STAT_SEND_INTERVAL_NS) {
- stat->update_time = curr_time;
- HASH_ITER(hh, *stat_hashtbl, s, tmp) {
- shaper_stat_data_send(stat, s);
- }
+ if (shaper_stat_conf_load(&conf) != 0) {
+ LOG_ERROR("%s: shaping init metric conf failed", LOG_TAG_STAT);
+ goto ERROR;
}
- return;
-}
+ stat = (struct shaping_stat *)calloc(1, sizeof(struct shaping_stat));
-struct shaping_stat* shaper_stat_new(char *telegraf_ip, short telegraf_port)
-{
- struct shaping_stat *stat = NULL;
- struct timespec curr_time;
+ stat->instance = fieldstat_dynamic_instance_new("shaping_engine", thread_num);
+ if (stat->instance == NULL) {
+ LOG_ERROR("%s: shaping init fieldstat instance failed", LOG_TAG_STAT);
+ goto ERROR;
+ }
- clock_gettime(CLOCK_MONOTONIC, &curr_time);
+ fieldstat_dynamic_set_output_interval(stat->instance, conf.output_interval_ms);
+ fieldstat_dynamic_set_line_protocol_server(stat->instance, conf.telegraf_ip, conf.telegraf_port);
+ if (conf.enable_backgroud_thread == 0) {
+ fieldstat_dynamic_disable_background_thread(stat->instance);
+ }
- stat = (struct shaping_stat *)calloc(1, sizeof(struct shaping_stat));
+ stat->table_id = fieldstat_register_dynamic_table(stat->instance, "shaping_metric", column_name, column_type, column_num, stat->column_ids);
+ if (stat->table_id < 0) {
+ LOG_ERROR("%s: shaping fieldstat register table failed", LOG_TAG_STAT);
+ goto ERROR;
+ }
+
+ tags[TAG_RULE_ID_IDX].key = "rule_id";
+ tags[TAG_RULE_ID_IDX].value_type = 0;
+ tags[TAG_PROFILE_ID_IDX].key = "profile_id";
+ tags[TAG_PROFILE_ID_IDX].value_type = 0;
+ tags[TAG_PRIORITY_IDX].key = "priority";
+ tags[TAG_PRIORITY_IDX].value_type = 0;
+ tags[TAG_PROFILE_TYPE_IDX].key = "profile_type";
+ tags[TAG_PROFILE_TYPE_IDX].value_type = 2;
- stat->sock_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
- stat->sock_addr.sin_family = AF_INET;
- stat->sock_addr.sin_port = htons(telegraf_port);
- stat->sock_addr.sin_addr.s_addr = inet_addr(telegraf_ip);
- stat->update_time = curr_time;
+ fieldstat_dynamic_instance_start(stat->instance);
return stat;
+
+ERROR:
+ if (stat) {
+ if (stat->instance) {
+ fieldstat_dynamic_instance_free(stat->instance);
+ }
+ free(stat);
+ }
+ return NULL;
}
-static struct shaping_stat_data *shaper_stat_ins_get(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id, int priority, int profile_type)
+static void shaper_stat_tags_build(int rule_id, int profile_id, int priority, int profile_type)
{
- struct shaping_stat_data *s_stat_data = NULL;
- struct shaping_stat_data_key key;
- memset(&key, 0, sizeof(key));//important for uthash opration
- key.rule_id = rule_id;
- key.profile_id = profile_id;
- key.priority = priority;
- key.profile_type = profile_type;
+ tags[TAG_RULE_ID_IDX].value_int = rule_id;
- HASH_FIND(hh, *stat_hashtbl, &key, sizeof(struct shaping_stat_data_key), s_stat_data);
- if (!s_stat_data) {
- s_stat_data = (struct shaping_stat_data *)calloc(1, sizeof(struct shaping_stat_data));
+ tags[TAG_PROFILE_ID_IDX].value_int = profile_id;
- memcpy(&s_stat_data->key, &key, sizeof(key));
+ tags[TAG_PRIORITY_IDX].value_int = priority;
- HASH_ADD(hh, *stat_hashtbl, key, sizeof(struct shaping_stat_data_key), s_stat_data);
+ if (profile_type == SHAPING_PROFILE_TYPE_PRIMARY) {
+ tags[TAG_PROFILE_TYPE_IDX].value_str = "primary";
+ } else {
+ tags[TAG_PROFILE_TYPE_IDX].value_str = "borrow";
}
- return s_stat_data;
+ return;
}
-void shaper_stat_drop_inc(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id,
- int priority, unsigned char direction, int pkt_len)
+void shaper_stat_drop_inc(struct shaping_stat *stat, int rule_id, int profile_id,
+ int priority, unsigned char direction, int pkt_len, int thread_id)
{
- struct shaping_stat_data *s_stat_data = NULL;
-
- s_stat_data = shaper_stat_ins_get(stat_hashtbl, rule_id, profile_id, priority, SHAPING_PROFILE_TYPE_PRIMARY);
+ shaper_stat_tags_build(rule_id, profile_id, priority, SHAPING_PROFILE_TYPE_PRIMARY);
if (direction == SHAPING_DIR_IN) {
- s_stat_data->incoming.drop_pkts++;
- s_stat_data->incoming.rx_pkts++;
- s_stat_data->incoming.rx_bytes += pkt_len;
+ fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_DROP_PKTS_IDX], "shaping_metric_row", 1, tags, TAG_IDX_MAX, thread_id);
} else {
- s_stat_data->outgoing.drop_pkts++;
- s_stat_data->outgoing.rx_pkts++;
- s_stat_data->outgoing.rx_bytes += pkt_len;
+ fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_DROP_PKTS_IDX], "shaping_metric_row", 1, tags, TAG_IDX_MAX, thread_id);
}
return;
}
-void shaper_stat_forward_inc(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id,
- int priority, unsigned char direction, int pkt_len, int profile_type)
+void shaper_stat_forward_inc(struct shaping_stat *stat, int rule_id, int profile_id,
+ int priority, unsigned char direction, int pkt_len, int profile_type, int thread_id)
{
- struct shaping_stat_data *s_stat_data = NULL;
-
- s_stat_data = shaper_stat_ins_get(stat_hashtbl, rule_id, profile_id, priority, profile_type);
+ shaper_stat_tags_build(rule_id, profile_id, priority, profile_type);
if (direction == SHAPING_DIR_IN) {
- s_stat_data->incoming.tx_pkts++;
- s_stat_data->incoming.tx_bytes += pkt_len;
- s_stat_data->incoming.rx_pkts++;
- s_stat_data->incoming.rx_bytes += pkt_len;
+ fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_PKTS_IDX], "shaping_metric_row", 1, tags, TAG_IDX_MAX, thread_id);
+ fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_BYTES_IDX], "shaping_metric_row", pkt_len, tags, TAG_IDX_MAX, thread_id);
} else {
- s_stat_data->outgoing.tx_pkts++;
- s_stat_data->outgoing.tx_bytes += pkt_len;
- s_stat_data->outgoing.rx_pkts++;
- s_stat_data->outgoing.rx_bytes += pkt_len;
+ fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_PKTS_IDX], "shaping_metric_row", 1, tags, TAG_IDX_MAX, thread_id);
+ fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_BYTES_IDX], "shaping_metric_row", pkt_len, tags, TAG_IDX_MAX, thread_id);
}
return;
}
-void shaper_stat_forward_all_rule_inc(struct shaping_stat_data **stat_hashtbl, struct shaping_flow *sf, unsigned char direction, int pkt_len)
+void shaper_stat_forward_all_rule_inc(struct shaping_stat *stat, struct shaping_flow *sf, unsigned char direction, int pkt_len, int thread_id)
{
struct shaping_rule_info *rule;
int i;
for (i = 0; i < sf->rule_num; i++) {
rule = &sf->matched_rule_infos[i];
- shaper_stat_forward_inc(stat_hashtbl, rule->id, rule->primary.id, rule->primary.priority, direction, pkt_len, SHAPING_PROFILE_TYPE_PRIMARY);
+ shaper_stat_forward_inc(stat, rule->id, rule->primary.id, rule->primary.priority, direction, pkt_len, SHAPING_PROFILE_TYPE_PRIMARY, thread_id);
}
return;
}
-void shaper_stat_queueing_session_inc(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id, int priority, int profile_type)
+void shaper_stat_queueing_session_inc(struct shaping_stat *stat, int rule_id, int profile_id, int priority, int profile_type, int thread_id)
{
- struct shaping_stat_data *s_stat_data = NULL;
-
- s_stat_data = shaper_stat_ins_get(stat_hashtbl, rule_id, profile_id, priority, profile_type);
-
- s_stat_data->queueing_session_num++;
+ shaper_stat_tags_build(rule_id, profile_id, priority, profile_type);
+ fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[QUEUEING_SESSIONS_IDX], "shaping_metric_row", 1, tags, TAG_IDX_MAX, thread_id);
return;
}
-void shaper_stat_queueing_session_dec(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id, int priority, int profile_type)
+void shaper_stat_queueing_session_dec(struct shaping_stat *stat, int rule_id, int profile_id, int priority, int profile_type, int thread_id)
{
- struct shaping_stat_data *s_stat_data = NULL;
-
- s_stat_data = shaper_stat_ins_get(stat_hashtbl, rule_id, profile_id, priority, profile_type);
-
- s_stat_data->queueing_session_num--;
+ shaper_stat_tags_build(rule_id, profile_id, priority, profile_type);
+ fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[QUEUEING_SESSIONS_IDX], "shaping_metric_row", -1, tags, TAG_IDX_MAX, thread_id);
return;
}
-void shaper_stat_queueing_pkt_inc(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id,
- int priority, unsigned char direction, int pkt_len, int profile_type)
+void shaper_stat_queueing_pkt_inc(struct shaping_stat *stat, int rule_id, int profile_id,
+ int priority, unsigned char direction, int pkt_len, int profile_type, int thread_id)
{
- struct shaping_stat_data *s_stat_data = NULL;
-
- s_stat_data = shaper_stat_ins_get(stat_hashtbl, rule_id, profile_id, priority, profile_type);
-
+ shaper_stat_tags_build(rule_id, profile_id, priority, profile_type);
if (direction == SHAPING_DIR_IN) {
- s_stat_data->incoming.rx_pkts++;
- s_stat_data->incoming.rx_bytes += pkt_len;
- s_stat_data->incoming.queue_len++;
+ fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_QUEUE_LEN_IDX], "shaping_metric_row", 1, tags, TAG_IDX_MAX, thread_id);
} else {
- s_stat_data->outgoing.rx_pkts++;
- s_stat_data->outgoing.rx_bytes += pkt_len;
- s_stat_data->outgoing.queue_len++;
+ fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_QUEUE_LEN_IDX], "shaping_metric_row", 1, tags, TAG_IDX_MAX, thread_id);
}
return;
}
-void shaper_stat_queueing_pkt_dec(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id,
- int priority, unsigned char direction, int pkt_len, int profile_type)
+void shaper_stat_queueing_pkt_dec(struct shaping_stat *stat, int rule_id, int profile_id,
+ int priority, unsigned char direction, int pkt_len, int profile_type, int thread_id)
{
- struct shaping_stat_data *s_stat_data = NULL;
-
- s_stat_data = shaper_stat_ins_get(stat_hashtbl, rule_id, profile_id, priority, profile_type);
-
+ shaper_stat_tags_build(rule_id, profile_id, priority, profile_type);
if (direction == SHAPING_DIR_IN) {
- s_stat_data->incoming.rx_pkts--;
- s_stat_data->incoming.rx_bytes -= pkt_len;
- s_stat_data->incoming.queue_len--;
+ fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_QUEUE_LEN_IDX], "shaping_metric_row", -1, tags, TAG_IDX_MAX, thread_id);
} else {
- s_stat_data->outgoing.rx_pkts--;
- s_stat_data->outgoing.rx_bytes -= pkt_len;
- s_stat_data->outgoing.queue_len--;
+ fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_QUEUE_LEN_IDX], "shaping_metric_row", -1, tags, TAG_IDX_MAX, thread_id);
}
return;
}
-void shaper_stat_max_latency_update(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id,
- int priority, unsigned char direction, unsigned long long latency, int profile_type)
+void shaper_stat_max_latency_update(struct shaping_stat *stat, int rule_id, int profile_id,
+ int priority, unsigned char direction, unsigned long long latency, int profile_type, int thread_id)
{
- struct shaping_stat_data *s_stat_data = NULL;
-
- s_stat_data = shaper_stat_ins_get(stat_hashtbl, rule_id, profile_id, priority, profile_type);
+ unsigned long long old_latency;
+ shaper_stat_tags_build(rule_id, profile_id, priority, profile_type);
if (direction == SHAPING_DIR_IN) {
- if (latency > s_stat_data->incoming.max_latency) {
- s_stat_data->incoming.max_latency = latency;
+ old_latency = fieldstat_dynamic_table_metric_value_get(stat->instance, stat->table_id, stat->column_ids[IN_MAX_LATENCY_IDX], "shaping_metric_row", tags, TAG_IDX_MAX, thread_id);
+ if (latency > old_latency) {
+ fieldstat_dynamic_table_metric_value_set(stat->instance, stat->table_id, stat->column_ids[IN_MAX_LATENCY_IDX], "shaping_metric_row", latency, tags, TAG_IDX_MAX, thread_id);
}
} else {
- if (latency > s_stat_data->outgoing.max_latency) {
- s_stat_data->outgoing.max_latency = latency;
+ old_latency = fieldstat_dynamic_table_metric_value_get(stat->instance, stat->table_id, stat->column_ids[OUT_MAX_LATENCY_IDX], "shaping_metric_row", tags, TAG_IDX_MAX, thread_id);
+ if (latency > old_latency) {
+ fieldstat_dynamic_table_metric_value_set(stat->instance, stat->table_id, stat->column_ids[OUT_MAX_LATENCY_IDX], "shaping_metric_row", latency, tags, TAG_IDX_MAX, thread_id);
}
}
return;
-}
-
-#if 0
-/*********just for self test stub****************/
-void stub_shaper_stat_send(int thread_seq)
-{
- struct shaping_stat_data *s, *tmp;
-
- HASH_ITER(hh, g_rt_para.stat[thread_seq]->stat_hashtbl, s, tmp) {
- shaper_stat_data_send(g_rt_para.stat[thread_seq], s);
- }
-
- return;
-}
-/************************************************/
-#endif \ No newline at end of file
+} \ No newline at end of file