summaryrefslogtreecommitdiff
path: root/shaping/src/shaper_stat.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'shaping/src/shaper_stat.cpp')
-rw-r--r--shaping/src/shaper_stat.cpp215
1 files changed, 145 insertions, 70 deletions
diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp
index e55463e..e37f5e0 100644
--- a/shaping/src/shaper_stat.cpp
+++ b/shaping/src/shaper_stat.cpp
@@ -4,7 +4,7 @@
#include <arpa/inet.h>
#include <MESA/MESA_prof_load.h>
#include <MESA/swarmkv.h>
-#include <fieldstat.h>
+#include <fieldstat/fieldstat_easy.h>
#include "log.h"
#include "utils.h"
@@ -12,117 +12,177 @@
#include "shaper_stat.h"
#include "shaper_global_stat.h"
-#define SHAPER_STAT_ROW_NAME "traffic_shaping_rule_hits"
-
#define SHAPER_STAT_REFRESH_TIME_US 10000 //10 ms
#define HINCRBY_RETRY_MAX 5
struct shaper_stat_conf {
- int enable_backgroud_thread;
- int output_interval_ms;
- char telegraf_ip[16];
- short telegraf_port;
+ char device_group[32];
+ char device_id[32];
+ char data_center[32];
+ char kafka_topic[64];
+ char kafka_username[64];
+ char kafka_password[64];
+ char kafka_brokers[256];
};
thread_local struct fieldstat_tag tags[TAG_IDX_MAX] =
{
- [TAG_VSYS_ID_IDX] = {.key = "vsys_id", .value_type = 0},
- [TAG_RULE_ID_IDX] = {.key = "rule_id", .value_type = 0},
- [TAG_PROFILE_ID_IDX] = {.key = "profile_id", .value_type = 0},
- [TAG_PRIORITY_IDX] = {.key = "priority", .value_type = 0},
- [TAG_PROFILE_TYPE_IDX] = {.key = "profile_type", .value_type = 2}
+ [TAG_VSYS_ID_IDX] = {.key = "vsys_id", .type = TAG_INTEGER},
+ [TAG_RULE_ID_IDX] = {.key = "rule_id", .type = TAG_INTEGER},
+ [TAG_PROFILE_ID_IDX] = {.key = "profile_id", .type = TAG_INTEGER},
+ [TAG_PRIORITY_IDX] = {.key = "priority", .type = TAG_INTEGER},
+ [TAG_PROFILE_TYPE_IDX] = {.key = "profile_type", .type = TAG_CSTRING}
};
+char *output_json_buf = NULL;
+
void shaper_stat_destroy(struct shaping_stat *stat)
{
- if (!stat) {
+ if (stat) {
+ if (stat->instance) {
+ fieldstat_easy_free(stat->instance);
+ }
+ free(stat);
+ }
+
+ return;
+}
+
+static void shaper_stat_kafka_init(struct shaping_stat *stat, struct shaper_stat_conf *conf)
+{
+ char kafka_errstr[1024]={0};
+
+ if (strlen(conf->kafka_topic) == 0 || strlen(conf->kafka_brokers) == 0) {
+ LOG_ERROR("%s: kafka topic or brokers is empty", LOG_TAG_STAT);
+ return;
+ }
+
+ if (strlen(conf->kafka_username) == 0 || strlen(conf->kafka_password) == 0) {
+ LOG_ERROR("%s: kafka username or password is empty", LOG_TAG_STAT);
return;
}
- if (stat->instance) {
- fieldstat_dynamic_instance_free(stat->instance);
+ rd_kafka_conf_t *rdkafka_conf = rd_kafka_conf_new();
+
+ rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000", kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "socket.keepalive.enable", "true", kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "bootstrap.servers", conf->kafka_brokers, kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "client.id", conf->kafka_topic, kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "sasl.username", conf->kafka_username, kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "sasl.password", conf->kafka_brokers, kafka_errstr, sizeof(kafka_errstr));
+
+ stat->kafka_handle = rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr));
+ if (stat->kafka_handle == NULL) {
+ LOG_ERROR("%s: kafka producer create failed, err %s", LOG_TAG_STAT, kafka_errstr);
+ return;
}
- free(stat);
+ stat->topic_rkt = rd_kafka_topic_new(stat->kafka_handle, conf->kafka_topic, NULL);
return;
}
-static int shaper_stat_conf_load(struct shaper_stat_conf *conf)
+static int shaper_stat_conf_load(struct shaping_stat *stat, struct shaper_stat_conf *conf)
{
- memset(conf, 0, sizeof(struct shaper_stat_conf));
+ MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "FIELDSTAT_OUTPUT_INTERVAL_S", &stat->output_interval_s, 1);
+ MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "DEVICE_GROUP", conf->device_group, sizeof(conf->device_group), "");
+ MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "DEVICE_ID", conf->device_id, sizeof(conf->device_id), "");
+ MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "DATA_CENTER", conf->data_center, sizeof(conf->data_center), "");
- MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "LINE_PROTOCOL_SERVER_IP", conf->telegraf_ip, sizeof(conf->telegraf_ip), "127.0.0.1");
- MESA_load_profile_short_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "LINE_PROTOCOL_SERVER_PORT", &conf->telegraf_port, 8200);
- MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "FIELDSTAT_OUTPUT_INTERVAL_MS", &conf->output_interval_ms, 500);
- MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "FIELDSTAT_ENABLE_BACKGRUND_THREAD", &conf->enable_backgroud_thread, 1);
+ MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "KAFKA_TOPIC", conf->kafka_topic, sizeof(conf->kafka_topic), "");
+ MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "KAFKA_USERNAME", conf->kafka_username, sizeof(conf->kafka_username), "");
+ MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "KAFKA_PASSWORD", conf->kafka_password, sizeof(conf->kafka_password), "");
+ MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "KAFKA_BROKERS", conf->kafka_brokers, sizeof(conf->kafka_brokers), "");
return 0;
}
struct shaping_stat* shaper_stat_init(int thread_num)
{
- struct shaping_stat *stat = NULL;
- int column_num;
+ struct fieldstat_tag global_tags[5];
struct shaper_stat_conf conf;
- const char *column_name[] = {"in_max_latency_us", "in_queue_len", "out_max_latency_us", "out_queue_len", //first line is gauge, second line is counter
- "in_pkts", "in_bytes", "in_drop_pkts", "out_pkts", "out_bytes", "out_drop_pkts"};
- enum field_type column_type[] = {FIELD_TYPE_COUNTER, FIELD_TYPE_GAUGE, FIELD_TYPE_COUNTER, FIELD_TYPE_GAUGE,
- FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER};
-
- column_num = sizeof(column_name)/sizeof(column_name[0]);
- if (column_num != STAT_COLUNM_IDX_MAX) {
- LOG_ERROR("%s: shaping init fieldstat failed, column_num %d != index num %d", LOG_TAG_STAT, column_num, STAT_COLUNM_IDX_MAX);
- goto ERROR;
- }
+ struct shaping_stat *stat = (struct shaping_stat *)calloc(1, sizeof(struct shaping_stat));
- if (shaper_stat_conf_load(&conf) != 0) {
+ if (shaper_stat_conf_load(stat, &conf) != 0) {
LOG_ERROR("%s: shaping init metric conf failed", LOG_TAG_STAT);
goto ERROR;
}
- stat = (struct shaping_stat *)calloc(1, sizeof(struct shaping_stat));
+ shaper_stat_kafka_init(stat, &conf);
- stat->instance = fieldstat_dynamic_instance_new("shaping_engine", thread_num);
+ global_tags[0].key = "app_name";
+ global_tags[0].type = TAG_CSTRING;
+ global_tags[0].value_str = "shaping_engine";
+
+ global_tags[1].key = "device_group";
+ global_tags[1].type = TAG_CSTRING;
+ global_tags[1].value_str = conf.device_group;
+
+ global_tags[2].key = "device_id";
+ global_tags[2].type = TAG_CSTRING;
+ global_tags[2].value_str = conf.device_id;
+
+ global_tags[3].key = "data_center";
+ global_tags[3].type = TAG_CSTRING;
+ global_tags[3].value_str = conf.data_center;
+
+ global_tags[4].key = "table_name";
+ global_tags[4].type = TAG_CSTRING;
+ global_tags[4].value_str = "shaping_metric";
+
+ stat->instance = fieldstat_easy_new(thread_num, "shaping_stat", global_tags, 5);
if (stat->instance == NULL) {
LOG_ERROR("%s: shaping init fieldstat instance failed", LOG_TAG_STAT);
goto ERROR;
}
- fieldstat_dynamic_set_output_interval(stat->instance, conf.output_interval_ms);
- fieldstat_dynamic_set_line_protocol_server(stat->instance, conf.telegraf_ip, conf.telegraf_port);
- if (conf.enable_backgroud_thread == 0) {
- fieldstat_dynamic_disable_background_thread(stat->instance);
+ stat->latency_histogram_id = fieldstat_easy_register_histogram(stat->instance, "latency_distribution_us", 1, 1000000, 5);
+ if (stat->latency_histogram_id < 0) {
+ LOG_ERROR("%s: shaping fieldstat register histogram failed", LOG_TAG_STAT);
+ goto ERROR;
}
- stat->table_id = fieldstat_register_dynamic_table(stat->instance, "shaping_metric", column_name, column_type, column_num, stat->column_ids);
- if (stat->table_id < 0) {
- LOG_ERROR("%s: shaping fieldstat register table failed", LOG_TAG_STAT);
- goto ERROR;
+ stat->column_ids[IN_QUEUE_LEN_IDX] = fieldstat_easy_register_counter(stat->instance, "in_queue_len");
+ stat->column_ids[OUT_QUEUE_LEN_IDX] = fieldstat_easy_register_counter(stat->instance, "out_queue_len");
+ stat->column_ids[IN_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "in_pkts");
+ stat->column_ids[IN_BYTES_IDX] = fieldstat_easy_register_counter(stat->instance, "in_bytes");
+ stat->column_ids[IN_DROP_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "in_drop_pkts");
+ stat->column_ids[OUT_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "out_pkts");
+ stat->column_ids[OUT_BYTES_IDX] = fieldstat_easy_register_counter(stat->instance, "out_bytes");
+ stat->column_ids[OUT_DROP_PKTS_IDX] = fieldstat_easy_register_counter(stat->instance, "out_drop_pkts");
+
+ for (int i = IN_QUEUE_LEN_IDX; i < STAT_COLUNM_IDX_MAX; i++) {
+ if (stat->column_ids[i] < 0) {
+ LOG_ERROR("%s: shaping fieldstat register column %d failed", LOG_TAG_STAT, i);
+ goto ERROR;
+ }
}
- fieldstat_dynamic_instance_start(stat->instance);
+ fieldstat_easy_enable_auto_output(stat->instance, "./metric/shaping_stat.json", 1);//TODO: output interval
return stat;
-
ERROR:
if (stat) {
if (stat->instance) {
- fieldstat_dynamic_instance_free(stat->instance);
+ fieldstat_easy_free(stat->instance);
}
free(stat);
}
+
return NULL;
}
static void shaper_stat_tags_build(int vsys_id, int rule_id, int profile_id, int priority, int profile_type)
{
- tags[TAG_VSYS_ID_IDX].value_int = vsys_id;
+ tags[TAG_VSYS_ID_IDX].value_longlong = vsys_id;
- tags[TAG_RULE_ID_IDX].value_int = rule_id;
+ tags[TAG_RULE_ID_IDX].value_longlong = rule_id;
- tags[TAG_PROFILE_ID_IDX].value_int = profile_id;
+ tags[TAG_PROFILE_ID_IDX].value_longlong = profile_id;
- tags[TAG_PRIORITY_IDX].value_int = priority;
+ tags[TAG_PRIORITY_IDX].value_longlong = priority;
if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {
tags[TAG_PROFILE_TYPE_IDX].value_str = "primary";
@@ -143,7 +203,7 @@ static void shaper_stat_swarmkv_hincrby_cb(const struct swarmkv_reply *reply, vo
clock_gettime(CLOCK_MONOTONIC, &curr_time);
curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
- shaper_global_stat_swarmkv_latency_update(global_stat, curr_time_us - arg->start_time_us);
+ shaper_global_stat_swarmkv_latency_update(global_stat, curr_time_us - arg->start_time_us, ctx->thread_index);
shaper_global_stat_async_callback_inc(&ctx->thread_global_stat);
shaper_global_stat_hincrby_callback_inc(&ctx->thread_global_stat);
@@ -240,7 +300,6 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, s
struct shaping_stat *stat = ctx->stat;
int priority = profile->priority;
int thread_id = ctx->thread_index;
- unsigned long long old_latency;
if (need_update_guage) {
profile->hash_node->local_queue_len[priority][SHAPING_DIR_IN] += profile_stat->priority_queue_len[SHAPING_DIR_IN];
@@ -255,28 +314,21 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, s
}
shaper_stat_tags_build(rule->vsys_id, rule->id, profile->id, priority, profile_type);
- fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_DROP_PKTS_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.drop_pkts, tags, TAG_IDX_MAX, thread_id);
- fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_PKTS_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.pkts, tags, TAG_IDX_MAX, thread_id);
- fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_BYTES_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.bytes, tags, TAG_IDX_MAX, thread_id);
- fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_DROP_PKTS_IDX], SHAPER_STAT_ROW_NAME, profile_stat->out.drop_pkts, tags, TAG_IDX_MAX, thread_id);
- fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_PKTS_IDX], SHAPER_STAT_ROW_NAME, profile_stat->out.pkts, tags, TAG_IDX_MAX, thread_id);
- fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_BYTES_IDX], SHAPER_STAT_ROW_NAME, profile_stat->out.bytes, tags, TAG_IDX_MAX, thread_id);
+ fieldstat_easy_counter_incrby(stat->instance, thread_id, stat->column_ids[IN_DROP_PKTS_IDX], tags, TAG_IDX_MAX, profile_stat->in.drop_pkts);
+ fieldstat_easy_counter_incrby(stat->instance, thread_id, stat->column_ids[IN_PKTS_IDX], tags, TAG_IDX_MAX, profile_stat->in.pkts);
+ fieldstat_easy_counter_incrby(stat->instance, thread_id, stat->column_ids[IN_BYTES_IDX], tags, TAG_IDX_MAX, profile_stat->in.bytes);
- old_latency = fieldstat_dynamic_table_metric_value_get(stat->instance, stat->table_id, stat->column_ids[IN_MAX_LATENCY_IDX], SHAPER_STAT_ROW_NAME, tags, TAG_IDX_MAX, thread_id);
- if (profile_stat->in.max_latency > old_latency) {
- fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_MAX_LATENCY_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.max_latency - old_latency, tags, TAG_IDX_MAX, thread_id);
- }
+ fieldstat_easy_counter_incrby(stat->instance, thread_id, stat->column_ids[OUT_DROP_PKTS_IDX], tags, TAG_IDX_MAX, profile_stat->out.drop_pkts);
+ fieldstat_easy_counter_incrby(stat->instance, thread_id, stat->column_ids[OUT_PKTS_IDX], tags, TAG_IDX_MAX, profile_stat->out.pkts);
+ fieldstat_easy_counter_incrby(stat->instance, thread_id, stat->column_ids[OUT_BYTES_IDX], tags, TAG_IDX_MAX, profile_stat->out.bytes);
- old_latency = fieldstat_dynamic_table_metric_value_get(stat->instance, stat->table_id, stat->column_ids[OUT_MAX_LATENCY_IDX], SHAPER_STAT_ROW_NAME, tags, TAG_IDX_MAX, thread_id);
- if (profile_stat->out.max_latency > old_latency) {
- fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_MAX_LATENCY_IDX], SHAPER_STAT_ROW_NAME, profile_stat->out.max_latency - old_latency, tags, TAG_IDX_MAX, thread_id);
- }
+ fieldstat_easy_histogram_record(stat->instance, thread_id, stat->latency_histogram_id, tags, TAG_IDX_MAX, profile_stat->out.max_latency);
if (need_update_guage) {
if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {
- fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_QUEUE_LEN_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.queue_len, tags, TAG_IDX_MAX, thread_id);
- fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_QUEUE_LEN_IDX], SHAPER_STAT_ROW_NAME, profile_stat->out.queue_len, tags, TAG_IDX_MAX, thread_id);
+ fieldstat_easy_counter_incrby(stat->instance, thread_id, stat->column_ids[IN_QUEUE_LEN_IDX], tags, TAG_IDX_MAX, profile_stat->in.queue_len);
+ fieldstat_easy_counter_incrby(stat->instance, thread_id, stat->column_ids[OUT_QUEUE_LEN_IDX], tags, TAG_IDX_MAX, profile_stat->out.queue_len);
}
memset(profile_stat, 0, sizeof(struct shaping_stat_for_profile));
@@ -431,4 +483,27 @@ void shaper_stat_max_latency_update(struct shaping_stat_for_profile *profile_sta
}
return;
+}
+
+void shaper_stat_output(struct shaping_stat *stat)
+{
+ size_t len = 0;
+ fieldstat_easy_output(stat->instance, &output_json_buf, &len);
+
+ int status=rd_kafka_produce(stat->topic_rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, output_json_buf, len, NULL, 0, NULL);
+ if (status < 0) {
+ LOG_ERROR("%s:shaper_stat_output, rd_kafka_produce is error of code: %d %s(%s), status: %d",
+ LOG_TAG_STAT,
+ rd_kafka_last_error(),
+ rd_kafka_err2name(rd_kafka_last_error()),
+ rd_kafka_err2str(rd_kafka_last_error()),
+ status);
+ }
+
+ if (output_json_buf) {
+ free(output_json_buf);
+ output_json_buf = NULL;
+ }
+
+ return;
} \ No newline at end of file