diff options
| author | 刘畅 <[email protected]> | 2024-07-31 09:41:21 +0000 |
|---|---|---|
| committer | 刘畅 <[email protected]> | 2024-07-31 09:41:21 +0000 |
| commit | 5b19aac02fb7b453cc0e6ae30eef1465352d92aa (patch) | |
| tree | 1faf56cf86595d081341f734326ad5256283591b /shaping/src | |
| parent | 2c2e3ac4c8ea3d04ca942330d7501bb28c48f5b9 (diff) | |
| parent | 4030c6fb6887356e38b20cb3d74358463dd29e37 (diff) | |
Merge branch 'add_test_with_real_swarmkv' into 'rel'v3.1.39
Add test with real swarmkv
See merge request tango/shaping-engine!101
Diffstat (limited to 'shaping/src')
| -rw-r--r-- | shaping/src/shaper_global_stat.cpp | 5 | ||||
| -rw-r--r-- | shaping/src/shaper_stat.cpp | 121 |
2 files changed, 80 insertions, 46 deletions
diff --git a/shaping/src/shaper_global_stat.cpp b/shaping/src/shaper_global_stat.cpp index c039fda..1fb3764 100644 --- a/shaping/src/shaper_global_stat.cpp +++ b/shaping/src/shaper_global_stat.cpp @@ -2,7 +2,6 @@ #include <stdlib.h> #include <MESA/MESA_prof_load.h> -#include <fieldstat/fieldstat_easy.h> #include "log.h" #include "utils.h" @@ -76,7 +75,7 @@ struct shaping_global_stat* shaper_global_stat_init(int work_thread_num) { struct shaping_global_stat *stat = NULL; struct shping_global_stat_conf conf; - struct fieldstat_tag tag; + struct field tag; stat = (struct shaping_global_stat*)calloc(1, sizeof(struct shaping_global_stat)); @@ -86,7 +85,7 @@ struct shaping_global_stat* shaper_global_stat_init(int work_thread_num) } tag.key = "shaping_global"; - tag.type = TAG_CSTRING; + tag.type = FIELD_VALUE_CSTRING; tag.value_str = "shaping_global"; stat->instance = fieldstat_easy_new(work_thread_num, "shaping_global", &tag, 1); if (stat->instance == NULL) { diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp index e37f5e0..1498230 100644 --- a/shaping/src/shaper_stat.cpp +++ b/shaping/src/shaper_stat.cpp @@ -4,7 +4,6 @@ #include <arpa/inet.h> #include <MESA/MESA_prof_load.h> #include <MESA/swarmkv.h> -#include <fieldstat/fieldstat_easy.h> #include "log.h" #include "utils.h" @@ -25,17 +24,15 @@ struct shaper_stat_conf { char kafka_brokers[256]; }; -thread_local struct fieldstat_tag tags[TAG_IDX_MAX] = +thread_local struct field tags[TAG_IDX_MAX] = { - [TAG_VSYS_ID_IDX] = {.key = "vsys_id", .type = TAG_INTEGER}, - [TAG_RULE_ID_IDX] = {.key = "rule_id", .type = TAG_INTEGER}, - [TAG_PROFILE_ID_IDX] = {.key = "profile_id", .type = TAG_INTEGER}, - [TAG_PRIORITY_IDX] = {.key = "priority", .type = TAG_INTEGER}, - [TAG_PROFILE_TYPE_IDX] = {.key = "profile_type", .type = TAG_CSTRING} + [TAG_VSYS_ID_IDX] = {.key = "vsys_id", .type = FIELD_VALUE_INTEGER}, + [TAG_RULE_ID_IDX] = {.key = "rule_id", .type = FIELD_VALUE_INTEGER}, + [TAG_PROFILE_ID_IDX] = {.key = "profile_id", .type = FIELD_VALUE_INTEGER}, + [TAG_PRIORITY_IDX] = {.key = "priority", .type = FIELD_VALUE_INTEGER}, + [TAG_PROFILE_TYPE_IDX] = {.key = "profile_type", .type = FIELD_VALUE_CSTRING} }; -char *output_json_buf = NULL; - void shaper_stat_destroy(struct shaping_stat *stat) { if (stat) { @@ -64,22 +61,63 @@ static void shaper_stat_kafka_init(struct shaping_stat *stat, struct shaper_stat rd_kafka_conf_t *rdkafka_conf = rd_kafka_conf_new(); - rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000", kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "socket.keepalive.enable", "true", kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "bootstrap.servers", conf->kafka_brokers, kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "client.id", conf->kafka_topic, kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "sasl.username", conf->kafka_username, kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "sasl.password", conf->kafka_brokers, kafka_errstr, sizeof(kafka_errstr)); + if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr))) + { + LOG_ERROR("%s: kafka producer set queue.buffering.max.messages failed, err %s", LOG_TAG_STAT, kafka_errstr); + return; + } + if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000", kafka_errstr, sizeof(kafka_errstr))) + { + LOG_ERROR("%s: kafka producer set topic.metadata.refresh.interval.ms failed, err %s", LOG_TAG_STAT, kafka_errstr); + return; + } + if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "socket.keepalive.enable", "true", kafka_errstr, sizeof(kafka_errstr))) + { + LOG_ERROR("%s: kafka producer set socket.keepalive.enable failed, err %s", LOG_TAG_STAT, kafka_errstr); + return; + } + if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr))) + { + LOG_ERROR("%s: kafka producer set security.protocol failed, err %s", LOG_TAG_STAT, kafka_errstr); + return; + } + if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "client.id", conf->kafka_topic, kafka_errstr, sizeof(kafka_errstr))) + { + LOG_ERROR("%s: kafka producer set client.id failed, err %s", LOG_TAG_STAT, kafka_errstr); + return; + } + if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr))) + { + LOG_ERROR("%s: kafka producer set sasl.mechanisms failed, err %s", LOG_TAG_STAT, kafka_errstr); + return; + } + if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "sasl.username", conf->kafka_username, kafka_errstr, sizeof(kafka_errstr))) + { + LOG_ERROR("%s: kafka producer set sasl.username failed, err %s", LOG_TAG_STAT, kafka_errstr); + return; + } + if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "sasl.password", conf->kafka_password, kafka_errstr, sizeof(kafka_errstr))) + { + LOG_ERROR("%s: kafka producer set sasl.password failed, err %s", LOG_TAG_STAT, kafka_errstr); + return; + } stat->kafka_handle = rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr)); if (stat->kafka_handle == NULL) { LOG_ERROR("%s: kafka producer create failed, err %s", LOG_TAG_STAT, kafka_errstr); return; } + if (rd_kafka_brokers_add(stat->kafka_handle, conf->kafka_brokers) <= 0) + { + LOG_ERROR("%s: kafka producer add brokers failed", LOG_TAG_STAT); + return; + } + stat->topic_rkt = rd_kafka_topic_new(stat->kafka_handle, conf->kafka_topic, NULL); + if (stat->topic_rkt == NULL) { + LOG_ERROR("%s: kafka producer create topic failed", LOG_TAG_STAT); + return; + } return; } @@ -101,7 +139,7 @@ static int shaper_stat_conf_load(struct shaping_stat *stat, struct shaper_stat_c struct shaping_stat* shaper_stat_init(int thread_num) { - struct fieldstat_tag global_tags[5]; + struct field global_tags[5]; struct shaper_stat_conf conf; struct shaping_stat *stat = (struct shaping_stat *)calloc(1, sizeof(struct shaping_stat)); @@ -113,26 +151,26 @@ struct shaping_stat* shaper_stat_init(int thread_num) shaper_stat_kafka_init(stat, &conf); global_tags[0].key = "app_name"; - global_tags[0].type = TAG_CSTRING; + global_tags[0].type = FIELD_VALUE_CSTRING; global_tags[0].value_str = "shaping_engine"; global_tags[1].key = "device_group"; - global_tags[1].type = TAG_CSTRING; + global_tags[1].type = FIELD_VALUE_CSTRING; global_tags[1].value_str = conf.device_group; global_tags[2].key = "device_id"; - global_tags[2].type = TAG_CSTRING; + global_tags[2].type = FIELD_VALUE_CSTRING; global_tags[2].value_str = conf.device_id; global_tags[3].key = "data_center"; - global_tags[3].type = TAG_CSTRING; + global_tags[3].type = FIELD_VALUE_CSTRING; global_tags[3].value_str = conf.data_center; global_tags[4].key = "table_name"; - global_tags[4].type = TAG_CSTRING; + global_tags[4].type = FIELD_VALUE_CSTRING; global_tags[4].value_str = "shaping_metric"; - stat->instance = fieldstat_easy_new(thread_num, "shaping_stat", global_tags, 5); + stat->instance = fieldstat_easy_new(thread_num, "traffic_shaping_rule_hits", global_tags, 5); if (stat->instance == NULL) { LOG_ERROR("%s: shaping init fieldstat instance failed", LOG_TAG_STAT); goto ERROR; @@ -160,8 +198,6 @@ struct shaping_stat* shaper_stat_init(int thread_num) } } - fieldstat_easy_enable_auto_output(stat->instance, "./metric/shaping_stat.json", 1);//TODO: output interval - return stat; ERROR: if (stat) { @@ -487,22 +523,21 @@ void shaper_stat_max_latency_update(struct shaping_stat_for_profile *profile_sta void shaper_stat_output(struct shaping_stat *stat) { - size_t len = 0; - fieldstat_easy_output(stat->instance, &output_json_buf, &len); - - int status=rd_kafka_produce(stat->topic_rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, output_json_buf, len, NULL, 0, NULL); - if (status < 0) { - LOG_ERROR("%s:shaper_stat_output, rd_kafka_produce is error of code: %d %s(%s), status: %d", - LOG_TAG_STAT, - rd_kafka_last_error(), - rd_kafka_err2name(rd_kafka_last_error()), - rd_kafka_err2str(rd_kafka_last_error()), - status); - } - - if (output_json_buf) { - free(output_json_buf); - output_json_buf = NULL; + char **output_buff_array = NULL; + size_t array_size = 0; + fieldstat_easy_output_array_and_reset(stat->instance, &output_buff_array, &array_size); + + for (int i = 0; i < array_size; i++) { + int status=rd_kafka_produce(stat->topic_rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, output_buff_array[i], strlen(output_buff_array[i]), NULL, 0, NULL); + if (status < 0) { + LOG_ERROR("%s:shaper_stat_output, rd_kafka_produce is error of code: %d %s(%s), status: %d", + LOG_TAG_STAT, + rd_kafka_last_error(), + rd_kafka_err2name(rd_kafka_last_error()), + rd_kafka_err2str(rd_kafka_last_error()), + status); + } + free(output_buff_array[i]); } return; |
