summaryrefslogtreecommitdiff
path: root/shaping/src
diff options
context:
space:
mode:
author刘畅 <[email protected]>2024-07-31 09:41:21 +0000
committer刘畅 <[email protected]>2024-07-31 09:41:21 +0000
commit5b19aac02fb7b453cc0e6ae30eef1465352d92aa (patch)
tree1faf56cf86595d081341f734326ad5256283591b /shaping/src
parent2c2e3ac4c8ea3d04ca942330d7501bb28c48f5b9 (diff)
parent4030c6fb6887356e38b20cb3d74358463dd29e37 (diff)
Merge branch 'add_test_with_real_swarmkv' into 'rel'v3.1.39
Add test with real swarmkv See merge request tango/shaping-engine!101
Diffstat (limited to 'shaping/src')
-rw-r--r--shaping/src/shaper_global_stat.cpp5
-rw-r--r--shaping/src/shaper_stat.cpp121
2 files changed, 80 insertions, 46 deletions
diff --git a/shaping/src/shaper_global_stat.cpp b/shaping/src/shaper_global_stat.cpp
index c039fda..1fb3764 100644
--- a/shaping/src/shaper_global_stat.cpp
+++ b/shaping/src/shaper_global_stat.cpp
@@ -2,7 +2,6 @@
#include <stdlib.h>
#include <MESA/MESA_prof_load.h>
-#include <fieldstat/fieldstat_easy.h>
#include "log.h"
#include "utils.h"
@@ -76,7 +75,7 @@ struct shaping_global_stat* shaper_global_stat_init(int work_thread_num)
{
struct shaping_global_stat *stat = NULL;
struct shping_global_stat_conf conf;
- struct fieldstat_tag tag;
+ struct field tag;
stat = (struct shaping_global_stat*)calloc(1, sizeof(struct shaping_global_stat));
@@ -86,7 +85,7 @@ struct shaping_global_stat* shaper_global_stat_init(int work_thread_num)
}
tag.key = "shaping_global";
- tag.type = TAG_CSTRING;
+ tag.type = FIELD_VALUE_CSTRING;
tag.value_str = "shaping_global";
stat->instance = fieldstat_easy_new(work_thread_num, "shaping_global", &tag, 1);
if (stat->instance == NULL) {
diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp
index e37f5e0..1498230 100644
--- a/shaping/src/shaper_stat.cpp
+++ b/shaping/src/shaper_stat.cpp
@@ -4,7 +4,6 @@
#include <arpa/inet.h>
#include <MESA/MESA_prof_load.h>
#include <MESA/swarmkv.h>
-#include <fieldstat/fieldstat_easy.h>
#include "log.h"
#include "utils.h"
@@ -25,17 +24,15 @@ struct shaper_stat_conf {
char kafka_brokers[256];
};
-thread_local struct fieldstat_tag tags[TAG_IDX_MAX] =
+thread_local struct field tags[TAG_IDX_MAX] =
{
- [TAG_VSYS_ID_IDX] = {.key = "vsys_id", .type = TAG_INTEGER},
- [TAG_RULE_ID_IDX] = {.key = "rule_id", .type = TAG_INTEGER},
- [TAG_PROFILE_ID_IDX] = {.key = "profile_id", .type = TAG_INTEGER},
- [TAG_PRIORITY_IDX] = {.key = "priority", .type = TAG_INTEGER},
- [TAG_PROFILE_TYPE_IDX] = {.key = "profile_type", .type = TAG_CSTRING}
+ [TAG_VSYS_ID_IDX] = {.key = "vsys_id", .type = FIELD_VALUE_INTEGER},
+ [TAG_RULE_ID_IDX] = {.key = "rule_id", .type = FIELD_VALUE_INTEGER},
+ [TAG_PROFILE_ID_IDX] = {.key = "profile_id", .type = FIELD_VALUE_INTEGER},
+ [TAG_PRIORITY_IDX] = {.key = "priority", .type = FIELD_VALUE_INTEGER},
+ [TAG_PROFILE_TYPE_IDX] = {.key = "profile_type", .type = FIELD_VALUE_CSTRING}
};
-char *output_json_buf = NULL;
-
void shaper_stat_destroy(struct shaping_stat *stat)
{
if (stat) {
@@ -64,22 +61,63 @@ static void shaper_stat_kafka_init(struct shaping_stat *stat, struct shaper_stat
rd_kafka_conf_t *rdkafka_conf = rd_kafka_conf_new();
- rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000", kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "socket.keepalive.enable", "true", kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "bootstrap.servers", conf->kafka_brokers, kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "client.id", conf->kafka_topic, kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "sasl.username", conf->kafka_username, kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "sasl.password", conf->kafka_brokers, kafka_errstr, sizeof(kafka_errstr));
+ if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr)))
+ {
+ LOG_ERROR("%s: kafka producer set queue.buffering.max.messages failed, err %s", LOG_TAG_STAT, kafka_errstr);
+ return;
+ }
+ if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000", kafka_errstr, sizeof(kafka_errstr)))
+ {
+ LOG_ERROR("%s: kafka producer set topic.metadata.refresh.interval.ms failed, err %s", LOG_TAG_STAT, kafka_errstr);
+ return;
+ }
+ if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "socket.keepalive.enable", "true", kafka_errstr, sizeof(kafka_errstr)))
+ {
+ LOG_ERROR("%s: kafka producer set socket.keepalive.enable failed, err %s", LOG_TAG_STAT, kafka_errstr);
+ return;
+ }
+ if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr)))
+ {
+ LOG_ERROR("%s: kafka producer set security.protocol failed, err %s", LOG_TAG_STAT, kafka_errstr);
+ return;
+ }
+ if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "client.id", conf->kafka_topic, kafka_errstr, sizeof(kafka_errstr)))
+ {
+ LOG_ERROR("%s: kafka producer set client.id failed, err %s", LOG_TAG_STAT, kafka_errstr);
+ return;
+ }
+ if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr)))
+ {
+ LOG_ERROR("%s: kafka producer set sasl.mechanisms failed, err %s", LOG_TAG_STAT, kafka_errstr);
+ return;
+ }
+ if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "sasl.username", conf->kafka_username, kafka_errstr, sizeof(kafka_errstr)))
+ {
+ LOG_ERROR("%s: kafka producer set sasl.username failed, err %s", LOG_TAG_STAT, kafka_errstr);
+ return;
+ }
+ if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "sasl.password", conf->kafka_password, kafka_errstr, sizeof(kafka_errstr)))
+ {
+ LOG_ERROR("%s: kafka producer set sasl.password failed, err %s", LOG_TAG_STAT, kafka_errstr);
+ return;
+ }
stat->kafka_handle = rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr));
if (stat->kafka_handle == NULL) {
LOG_ERROR("%s: kafka producer create failed, err %s", LOG_TAG_STAT, kafka_errstr);
return;
}
+ if (rd_kafka_brokers_add(stat->kafka_handle, conf->kafka_brokers) <= 0)
+ {
+ LOG_ERROR("%s: kafka producer add brokers failed", LOG_TAG_STAT);
+ return;
+ }
+
stat->topic_rkt = rd_kafka_topic_new(stat->kafka_handle, conf->kafka_topic, NULL);
+ if (stat->topic_rkt == NULL) {
+ LOG_ERROR("%s: kafka producer create topic failed", LOG_TAG_STAT);
+ return;
+ }
return;
}
@@ -101,7 +139,7 @@ static int shaper_stat_conf_load(struct shaping_stat *stat, struct shaper_stat_c
struct shaping_stat* shaper_stat_init(int thread_num)
{
- struct fieldstat_tag global_tags[5];
+ struct field global_tags[5];
struct shaper_stat_conf conf;
struct shaping_stat *stat = (struct shaping_stat *)calloc(1, sizeof(struct shaping_stat));
@@ -113,26 +151,26 @@ struct shaping_stat* shaper_stat_init(int thread_num)
shaper_stat_kafka_init(stat, &conf);
global_tags[0].key = "app_name";
- global_tags[0].type = TAG_CSTRING;
+ global_tags[0].type = FIELD_VALUE_CSTRING;
global_tags[0].value_str = "shaping_engine";
global_tags[1].key = "device_group";
- global_tags[1].type = TAG_CSTRING;
+ global_tags[1].type = FIELD_VALUE_CSTRING;
global_tags[1].value_str = conf.device_group;
global_tags[2].key = "device_id";
- global_tags[2].type = TAG_CSTRING;
+ global_tags[2].type = FIELD_VALUE_CSTRING;
global_tags[2].value_str = conf.device_id;
global_tags[3].key = "data_center";
- global_tags[3].type = TAG_CSTRING;
+ global_tags[3].type = FIELD_VALUE_CSTRING;
global_tags[3].value_str = conf.data_center;
global_tags[4].key = "table_name";
- global_tags[4].type = TAG_CSTRING;
+ global_tags[4].type = FIELD_VALUE_CSTRING;
global_tags[4].value_str = "shaping_metric";
- stat->instance = fieldstat_easy_new(thread_num, "shaping_stat", global_tags, 5);
+ stat->instance = fieldstat_easy_new(thread_num, "traffic_shaping_rule_hits", global_tags, 5);
if (stat->instance == NULL) {
LOG_ERROR("%s: shaping init fieldstat instance failed", LOG_TAG_STAT);
goto ERROR;
@@ -160,8 +198,6 @@ struct shaping_stat* shaper_stat_init(int thread_num)
}
}
- fieldstat_easy_enable_auto_output(stat->instance, "./metric/shaping_stat.json", 1);//TODO: output interval
-
return stat;
ERROR:
if (stat) {
@@ -487,22 +523,21 @@ void shaper_stat_max_latency_update(struct shaping_stat_for_profile *profile_sta
void shaper_stat_output(struct shaping_stat *stat)
{
- size_t len = 0;
- fieldstat_easy_output(stat->instance, &output_json_buf, &len);
-
- int status=rd_kafka_produce(stat->topic_rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, output_json_buf, len, NULL, 0, NULL);
- if (status < 0) {
- LOG_ERROR("%s:shaper_stat_output, rd_kafka_produce is error of code: %d %s(%s), status: %d",
- LOG_TAG_STAT,
- rd_kafka_last_error(),
- rd_kafka_err2name(rd_kafka_last_error()),
- rd_kafka_err2str(rd_kafka_last_error()),
- status);
- }
-
- if (output_json_buf) {
- free(output_json_buf);
- output_json_buf = NULL;
+ char **output_buff_array = NULL;
+ size_t array_size = 0;
+ fieldstat_easy_output_array_and_reset(stat->instance, &output_buff_array, &array_size);
+
+ for (int i = 0; i < array_size; i++) {
+ int status=rd_kafka_produce(stat->topic_rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, output_buff_array[i], strlen(output_buff_array[i]), NULL, 0, NULL);
+ if (status < 0) {
+ LOG_ERROR("%s:shaper_stat_output, rd_kafka_produce is error of code: %d %s(%s), status: %d",
+ LOG_TAG_STAT,
+ rd_kafka_last_error(),
+ rd_kafka_err2name(rd_kafka_last_error()),
+ rd_kafka_err2str(rd_kafka_last_error()),
+ status);
+ }
+ free(output_buff_array[i]);
}
return;