summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author刘畅 <[email protected]>2024-07-31 09:41:21 +0000
committer刘畅 <[email protected]>2024-07-31 09:41:21 +0000
commit5b19aac02fb7b453cc0e6ae30eef1465352d92aa (patch)
tree1faf56cf86595d081341f734326ad5256283591b
parent2c2e3ac4c8ea3d04ca942330d7501bb28c48f5b9 (diff)
parent4030c6fb6887356e38b20cb3d74358463dd29e37 (diff)
Merge branch 'add_test_with_real_swarmkv' into 'rel'v3.1.39
Add test with real swarmkv See merge request tango/shaping-engine!101
-rw-r--r--shaping/include/shaper_global_stat.h1
-rw-r--r--shaping/include/shaper_stat.h1
-rw-r--r--shaping/src/shaper_global_stat.cpp5
-rw-r--r--shaping/src/shaper_stat.cpp121
-rw-r--r--shaping/test/CMakeLists.txt9
-rw-r--r--shaping/test/dummy_rdkafka.cpp5
-rw-r--r--shaping/test/dummy_swarmkv.cpp4
-rw-r--r--shaping/test/gtest_shaper.cpp49
-rw-r--r--shaping/test/gtest_shaper_with_swarmkv.cpp378
-rw-r--r--shaping/test/stub.cpp15
-rw-r--r--shaping/test/stub.h8
-rw-r--r--shaping/test/test_conf/shaping.conf8
12 files changed, 522 insertions, 82 deletions
diff --git a/shaping/include/shaper_global_stat.h b/shaping/include/shaper_global_stat.h
index ec68453..4e3a276 100644
--- a/shaping/include/shaper_global_stat.h
+++ b/shaping/include/shaper_global_stat.h
@@ -1,4 +1,5 @@
#pragma once
+#include <fieldstat/fieldstat_easy.h>
enum shaping_global_stat_column_index {
CURR_SESSION_NUM_IDX = 0,
diff --git a/shaping/include/shaper_stat.h b/shaping/include/shaper_stat.h
index 22ca620..e508d63 100644
--- a/shaping/include/shaper_stat.h
+++ b/shaping/include/shaper_stat.h
@@ -2,6 +2,7 @@
#include <netinet/in.h>
#include <librdkafka/rdkafka.h>
+#include <fieldstat/fieldstat_easy.h>
#include "uthash.h"
enum shaping_packet_dir {
diff --git a/shaping/src/shaper_global_stat.cpp b/shaping/src/shaper_global_stat.cpp
index c039fda..1fb3764 100644
--- a/shaping/src/shaper_global_stat.cpp
+++ b/shaping/src/shaper_global_stat.cpp
@@ -2,7 +2,6 @@
#include <stdlib.h>
#include <MESA/MESA_prof_load.h>
-#include <fieldstat/fieldstat_easy.h>
#include "log.h"
#include "utils.h"
@@ -76,7 +75,7 @@ struct shaping_global_stat* shaper_global_stat_init(int work_thread_num)
{
struct shaping_global_stat *stat = NULL;
struct shping_global_stat_conf conf;
- struct fieldstat_tag tag;
+ struct field tag;
stat = (struct shaping_global_stat*)calloc(1, sizeof(struct shaping_global_stat));
@@ -86,7 +85,7 @@ struct shaping_global_stat* shaper_global_stat_init(int work_thread_num)
}
tag.key = "shaping_global";
- tag.type = TAG_CSTRING;
+ tag.type = FIELD_VALUE_CSTRING;
tag.value_str = "shaping_global";
stat->instance = fieldstat_easy_new(work_thread_num, "shaping_global", &tag, 1);
if (stat->instance == NULL) {
diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp
index e37f5e0..1498230 100644
--- a/shaping/src/shaper_stat.cpp
+++ b/shaping/src/shaper_stat.cpp
@@ -4,7 +4,6 @@
#include <arpa/inet.h>
#include <MESA/MESA_prof_load.h>
#include <MESA/swarmkv.h>
-#include <fieldstat/fieldstat_easy.h>
#include "log.h"
#include "utils.h"
@@ -25,17 +24,15 @@ struct shaper_stat_conf {
char kafka_brokers[256];
};
-thread_local struct fieldstat_tag tags[TAG_IDX_MAX] =
+thread_local struct field tags[TAG_IDX_MAX] =
{
- [TAG_VSYS_ID_IDX] = {.key = "vsys_id", .type = TAG_INTEGER},
- [TAG_RULE_ID_IDX] = {.key = "rule_id", .type = TAG_INTEGER},
- [TAG_PROFILE_ID_IDX] = {.key = "profile_id", .type = TAG_INTEGER},
- [TAG_PRIORITY_IDX] = {.key = "priority", .type = TAG_INTEGER},
- [TAG_PROFILE_TYPE_IDX] = {.key = "profile_type", .type = TAG_CSTRING}
+ [TAG_VSYS_ID_IDX] = {.key = "vsys_id", .type = FIELD_VALUE_INTEGER},
+ [TAG_RULE_ID_IDX] = {.key = "rule_id", .type = FIELD_VALUE_INTEGER},
+ [TAG_PROFILE_ID_IDX] = {.key = "profile_id", .type = FIELD_VALUE_INTEGER},
+ [TAG_PRIORITY_IDX] = {.key = "priority", .type = FIELD_VALUE_INTEGER},
+ [TAG_PROFILE_TYPE_IDX] = {.key = "profile_type", .type = FIELD_VALUE_CSTRING}
};
-char *output_json_buf = NULL;
-
void shaper_stat_destroy(struct shaping_stat *stat)
{
if (stat) {
@@ -64,22 +61,63 @@ static void shaper_stat_kafka_init(struct shaping_stat *stat, struct shaper_stat
rd_kafka_conf_t *rdkafka_conf = rd_kafka_conf_new();
- rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000", kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "socket.keepalive.enable", "true", kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "bootstrap.servers", conf->kafka_brokers, kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "client.id", conf->kafka_topic, kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "sasl.username", conf->kafka_username, kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "sasl.password", conf->kafka_brokers, kafka_errstr, sizeof(kafka_errstr));
+ if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr)))
+ {
+ LOG_ERROR("%s: kafka producer set queue.buffering.max.messages failed, err %s", LOG_TAG_STAT, kafka_errstr);
+ return;
+ }
+ if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000", kafka_errstr, sizeof(kafka_errstr)))
+ {
+ LOG_ERROR("%s: kafka producer set topic.metadata.refresh.interval.ms failed, err %s", LOG_TAG_STAT, kafka_errstr);
+ return;
+ }
+ if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "socket.keepalive.enable", "true", kafka_errstr, sizeof(kafka_errstr)))
+ {
+ LOG_ERROR("%s: kafka producer set socket.keepalive.enable failed, err %s", LOG_TAG_STAT, kafka_errstr);
+ return;
+ }
+ if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr)))
+ {
+ LOG_ERROR("%s: kafka producer set security.protocol failed, err %s", LOG_TAG_STAT, kafka_errstr);
+ return;
+ }
+ if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "client.id", conf->kafka_topic, kafka_errstr, sizeof(kafka_errstr)))
+ {
+ LOG_ERROR("%s: kafka producer set client.id failed, err %s", LOG_TAG_STAT, kafka_errstr);
+ return;
+ }
+ if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr)))
+ {
+ LOG_ERROR("%s: kafka producer set sasl.mechanisms failed, err %s", LOG_TAG_STAT, kafka_errstr);
+ return;
+ }
+ if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "sasl.username", conf->kafka_username, kafka_errstr, sizeof(kafka_errstr)))
+ {
+ LOG_ERROR("%s: kafka producer set sasl.username failed, err %s", LOG_TAG_STAT, kafka_errstr);
+ return;
+ }
+ if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "sasl.password", conf->kafka_password, kafka_errstr, sizeof(kafka_errstr)))
+ {
+ LOG_ERROR("%s: kafka producer set sasl.password failed, err %s", LOG_TAG_STAT, kafka_errstr);
+ return;
+ }
stat->kafka_handle = rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr));
if (stat->kafka_handle == NULL) {
LOG_ERROR("%s: kafka producer create failed, err %s", LOG_TAG_STAT, kafka_errstr);
return;
}
+ if (rd_kafka_brokers_add(stat->kafka_handle, conf->kafka_brokers) <= 0)
+ {
+ LOG_ERROR("%s: kafka producer add brokers failed", LOG_TAG_STAT);
+ return;
+ }
+
stat->topic_rkt = rd_kafka_topic_new(stat->kafka_handle, conf->kafka_topic, NULL);
+ if (stat->topic_rkt == NULL) {
+ LOG_ERROR("%s: kafka producer create topic failed", LOG_TAG_STAT);
+ return;
+ }
return;
}
@@ -101,7 +139,7 @@ static int shaper_stat_conf_load(struct shaping_stat *stat, struct shaper_stat_c
struct shaping_stat* shaper_stat_init(int thread_num)
{
- struct fieldstat_tag global_tags[5];
+ struct field global_tags[5];
struct shaper_stat_conf conf;
struct shaping_stat *stat = (struct shaping_stat *)calloc(1, sizeof(struct shaping_stat));
@@ -113,26 +151,26 @@ struct shaping_stat* shaper_stat_init(int thread_num)
shaper_stat_kafka_init(stat, &conf);
global_tags[0].key = "app_name";
- global_tags[0].type = TAG_CSTRING;
+ global_tags[0].type = FIELD_VALUE_CSTRING;
global_tags[0].value_str = "shaping_engine";
global_tags[1].key = "device_group";
- global_tags[1].type = TAG_CSTRING;
+ global_tags[1].type = FIELD_VALUE_CSTRING;
global_tags[1].value_str = conf.device_group;
global_tags[2].key = "device_id";
- global_tags[2].type = TAG_CSTRING;
+ global_tags[2].type = FIELD_VALUE_CSTRING;
global_tags[2].value_str = conf.device_id;
global_tags[3].key = "data_center";
- global_tags[3].type = TAG_CSTRING;
+ global_tags[3].type = FIELD_VALUE_CSTRING;
global_tags[3].value_str = conf.data_center;
global_tags[4].key = "table_name";
- global_tags[4].type = TAG_CSTRING;
+ global_tags[4].type = FIELD_VALUE_CSTRING;
global_tags[4].value_str = "shaping_metric";
- stat->instance = fieldstat_easy_new(thread_num, "shaping_stat", global_tags, 5);
+ stat->instance = fieldstat_easy_new(thread_num, "traffic_shaping_rule_hits", global_tags, 5);
if (stat->instance == NULL) {
LOG_ERROR("%s: shaping init fieldstat instance failed", LOG_TAG_STAT);
goto ERROR;
@@ -160,8 +198,6 @@ struct shaping_stat* shaper_stat_init(int thread_num)
}
}
- fieldstat_easy_enable_auto_output(stat->instance, "./metric/shaping_stat.json", 1);//TODO: output interval
-
return stat;
ERROR:
if (stat) {
@@ -487,22 +523,21 @@ void shaper_stat_max_latency_update(struct shaping_stat_for_profile *profile_sta
void shaper_stat_output(struct shaping_stat *stat)
{
- size_t len = 0;
- fieldstat_easy_output(stat->instance, &output_json_buf, &len);
-
- int status=rd_kafka_produce(stat->topic_rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, output_json_buf, len, NULL, 0, NULL);
- if (status < 0) {
- LOG_ERROR("%s:shaper_stat_output, rd_kafka_produce is error of code: %d %s(%s), status: %d",
- LOG_TAG_STAT,
- rd_kafka_last_error(),
- rd_kafka_err2name(rd_kafka_last_error()),
- rd_kafka_err2str(rd_kafka_last_error()),
- status);
- }
-
- if (output_json_buf) {
- free(output_json_buf);
- output_json_buf = NULL;
+ char **output_buff_array = NULL;
+ size_t array_size = 0;
+ fieldstat_easy_output_array_and_reset(stat->instance, &output_buff_array, &array_size);
+
+ for (int i = 0; i < array_size; i++) {
+ int status=rd_kafka_produce(stat->topic_rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, output_buff_array[i], strlen(output_buff_array[i]), NULL, 0, NULL);
+ if (status < 0) {
+ LOG_ERROR("%s:shaper_stat_output, rd_kafka_produce is error of code: %d %s(%s), status: %d",
+ LOG_TAG_STAT,
+ rd_kafka_last_error(),
+ rd_kafka_err2name(rd_kafka_last_error()),
+ rd_kafka_err2str(rd_kafka_last_error()),
+ status);
+ }
+ free(output_buff_array[i]);
}
return;
diff --git a/shaping/test/CMakeLists.txt b/shaping/test/CMakeLists.txt
index 1c2e046..cf171d6 100644
--- a/shaping/test/CMakeLists.txt
+++ b/shaping/test/CMakeLists.txt
@@ -27,6 +27,14 @@ target_include_directories(gtest_shaper PUBLIC ${CMAKE_SOURCE_DIR}/shaping/inclu
target_link_libraries(gtest_shaper common shaper pthread gtest)
###############################################################################
+# gtest_shaper_with_swarmkv
+###############################################################################
+add_executable(gtest_shaper_with_swarmkv gtest_shaper_with_swarmkv.cpp stub.cpp dummy_rdkafka.cpp)
+target_include_directories(gtest_shaper_with_swarmkv PUBLIC ${CMAKE_SOURCE_DIR}/common/include)
+target_include_directories(gtest_shaper_with_swarmkv PUBLIC ${CMAKE_SOURCE_DIR}/shaping/include)
+target_link_libraries(gtest_shaper_with_swarmkv common shaper swarmkv pthread gtest)
+
+###############################################################################
# gtest_shaper_aqm
###############################################################################
add_executable(gtest_shaper_aqm gtest_shaper_aqm.cpp)
@@ -43,5 +51,6 @@ gtest_discover_tests(gtest_shaper_maat)
gtest_discover_tests(gtest_shaper_send_log)
gtest_discover_tests(gtest_shaper)
gtest_discover_tests(gtest_shaper_aqm)
+#gtest_discover_tests(gtest_shaper_with_swarmkv)
file(COPY ./test_conf/ DESTINATION ./conf/) \ No newline at end of file
diff --git a/shaping/test/dummy_rdkafka.cpp b/shaping/test/dummy_rdkafka.cpp
index 5d255d8..f49ff1b 100644
--- a/shaping/test/dummy_rdkafka.cpp
+++ b/shaping/test/dummy_rdkafka.cpp
@@ -38,4 +38,9 @@ const char *rd_kafka_err2name (rd_kafka_resp_err_t err)
const char *rd_kafka_err2str (rd_kafka_resp_err_t err)
{
return NULL;
+}
+
+int rd_kafka_brokers_add(rd_kafka_t *rk, const char *brokerlist)
+{
+ return 1;
} \ No newline at end of file
diff --git a/shaping/test/dummy_swarmkv.cpp b/shaping/test/dummy_swarmkv.cpp
index c86e524..85251b8 100644
--- a/shaping/test/dummy_swarmkv.cpp
+++ b/shaping/test/dummy_swarmkv.cpp
@@ -31,9 +31,9 @@ static int profile_priority_len[MAX_STUB_PROFILE_NUM][SHAPING_PRIORITY_NUM_MAX][
static struct stub_avaliable_token pf_curr_avl_token[MAX_STUB_PROFILE_NUM];
static int pf_async_times[MAX_STUB_PROFILE_NUM];
vector<struct stub_token_thread_arg> pf_async_thread[MAX_STUB_PROFILE_NUM];
-struct shaping_profile pf_array[MAX_STUB_PROFILE_NUM];
+extern struct shaping_profile pf_array[MAX_STUB_PROFILE_NUM];
-void init_dummy_swarmkv()
+void dummy_swarmkv_init()
{
memset(&pf_array, 0, MAX_STUB_PROFILE_NUM * sizeof(struct shaping_profile));
memset(&profile_priority_len, 0, MAX_STUB_PROFILE_NUM * SHAPING_PRIORITY_NUM_MAX * SHAPING_DIR_MAX * sizeof(int));
diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp
index cde8aef..024717f 100644
--- a/shaping/test/gtest_shaper.cpp
+++ b/shaping/test/gtest_shaper.cpp
@@ -1,7 +1,6 @@
#include <gtest/gtest.h>
#include <cjson/cJSON.h>
#include <sys/queue.h>
-#include <fieldstat/fieldstat_easy.h>
#include "log.h"
#include "shaper.h"
@@ -22,7 +21,6 @@ static struct stub_packet* packet_new(unsigned long long income_time, unsigned i
struct stub_packet *packet;
packet = (struct stub_packet*)calloc(1, sizeof(struct stub_packet));
- packet->income_time = income_time;
packet->length = length;
packet->direction = dir;
@@ -107,8 +105,6 @@ static void shaping_stat_judge(char *file_line, int json_array_idx, int rule_id,
{
cJSON *json = NULL;
cJSON *json_array_element = NULL;
- cJSON *fields_json = NULL;
- cJSON *tags_json = NULL;
cJSON *tmp_obj = NULL;
char attr_name[32] = {0};
@@ -125,45 +121,41 @@ static void shaping_stat_judge(char *file_line, int json_array_idx, int rule_id,
EXPECT_STREQ("shaping_stat", tmp_obj->valuestring);
/******************parse tags***********************************/
- tags_json = cJSON_GetObjectItem(json_array_element, "tags");
- ASSERT_TRUE(tags_json != NULL);
- tmp_obj = cJSON_GetObjectItem(tags_json, "vsys_id");
+ tmp_obj = cJSON_GetObjectItem(json_array_element, "vsys_id");
ASSERT_TRUE(tmp_obj != NULL);
EXPECT_EQ(tmp_obj->valueint, STUB_TEST_VSYS_ID);
- tmp_obj = cJSON_GetObjectItem(tags_json, "rule_id");
+ tmp_obj = cJSON_GetObjectItem(json_array_element, "rule_id");
ASSERT_TRUE(tmp_obj != NULL);
EXPECT_EQ(rule_id, tmp_obj->valueint);
- tmp_obj = cJSON_GetObjectItem(tags_json, "profile_id");
+ tmp_obj = cJSON_GetObjectItem(json_array_element, "profile_id");
ASSERT_TRUE(tmp_obj != NULL);
EXPECT_EQ(profile_id, tmp_obj->valueint);
- tmp_obj = cJSON_GetObjectItem(tags_json, "priority");
+ tmp_obj = cJSON_GetObjectItem(json_array_element, "priority");
ASSERT_TRUE(tmp_obj != NULL);
EXPECT_EQ(priority, tmp_obj->valueint);
- tmp_obj = cJSON_GetObjectItem(tags_json, "profile_type");
+ tmp_obj = cJSON_GetObjectItem(json_array_element, "profile_type");
ASSERT_TRUE(tmp_obj != NULL);
EXPECT_STREQ(tmp_obj->valuestring, profile_type);
/******************parse fields**********************************/
- fields_json = cJSON_GetObjectItem(json_array_element, "fields");
- ASSERT_TRUE(fields_json != NULL);
snprintf(attr_name, sizeof(attr_name), "%s_pkts", direction == SHAPING_DIR_OUT ? "out" : "in");
- tmp_obj = cJSON_GetObjectItem(fields_json, attr_name);
+ tmp_obj = cJSON_GetObjectItem(json_array_element, attr_name);
ASSERT_TRUE(tmp_obj != NULL);
EXPECT_EQ(tx_pkts, tmp_obj->valueint);
snprintf(attr_name, sizeof(attr_name), "%s_bytes", direction == SHAPING_DIR_OUT ? "out" : "in");
- tmp_obj = cJSON_GetObjectItem(fields_json, attr_name);
+ tmp_obj = cJSON_GetObjectItem(json_array_element, attr_name);
ASSERT_TRUE(tmp_obj != NULL);
EXPECT_EQ(tx_bytes, tmp_obj->valueint);
snprintf(attr_name, sizeof(attr_name), "%s_drop_pkts", direction == SHAPING_DIR_OUT ? "out" : "in");
- tmp_obj = cJSON_GetObjectItem(fields_json, attr_name);
+ tmp_obj = cJSON_GetObjectItem(json_array_element, attr_name);
ASSERT_TRUE(tmp_obj != NULL);
EXPECT_EQ(drop_pkts, tmp_obj->valueint);
@@ -176,7 +168,7 @@ static void shaping_stat_judge(char *file_line, int json_array_idx, int rule_id,
}*/
snprintf(attr_name, sizeof(attr_name), "%s_queue_len", direction == SHAPING_DIR_OUT ? "out" : "in");
- tmp_obj = cJSON_GetObjectItem(fields_json, attr_name);
+ tmp_obj = cJSON_GetObjectItem(json_array_element, attr_name);
if (tmp_obj != NULL) {
EXPECT_EQ(queue_len, tmp_obj->valueint);
}
@@ -201,12 +193,9 @@ static int shaping_global_stat_field_get(cJSON *metrics, const char *field_name)
static void shaping_global_stat_judge(char *file_line, int tx_pkts, int tx_bytes, int drop_pkts, int drop_bytes, int queueing_pkts, int queueing_bytes)
{
cJSON *metrics = NULL;
- cJSON *json_array_element = NULL;
cJSON *json = cJSON_Parse(file_line);
- json_array_element = cJSON_GetArrayItem(json, 0);
-
- metrics = cJSON_GetObjectItem(json_array_element, "fields");
+ metrics = cJSON_GetArrayItem(json, 0);
EXPECT_EQ(tx_pkts, shaping_global_stat_field_get(metrics, "all_tx_pkts"));
EXPECT_EQ(tx_bytes, shaping_global_stat_field_get(metrics, "all_tx_bytes"));
@@ -240,6 +229,7 @@ TEST(single_session, udp_tx_in_order)
TAILQ_INIT(&expec_tx_queue);
stub_init();
+ dummy_swarmkv_init();
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
sf = shaping_flow_new(&ctx->thread_ctx[0]);
@@ -316,6 +306,7 @@ TEST(bidirectional, udp_tx_in_order)
TAILQ_INIT(&expec_tx_queue_in);
TAILQ_INIT(&expec_tx_queue_out);
stub_init();
+ dummy_swarmkv_init();
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
sf = shaping_flow_new(&ctx->thread_ctx[0]);
@@ -378,6 +369,7 @@ TEST(max_min_host_fairness_profile, udp_tx_in_order)
TAILQ_INIT(&expec_tx_queue);
stub_init();
+ dummy_swarmkv_init();
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
sf = shaping_flow_new(&ctx->thread_ctx[0]);
@@ -455,6 +447,7 @@ TEST(single_session, tcp_tx_in_order)
TAILQ_INIT(&expec_tx_queue);
TAILQ_INIT(&expec_pure_ctl_tx_queue);
stub_init();
+ dummy_swarmkv_init();
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
sf = shaping_flow_new(&ctx->thread_ctx[0]);
@@ -534,6 +527,7 @@ TEST(single_session, udp_diff_direction)
TAILQ_INIT(&expec_tx_queue_in);
TAILQ_INIT(&expec_tx_queue_out);
stub_init();
+ dummy_swarmkv_init();
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
sf = shaping_flow_new(&ctx->thread_ctx[0]);
@@ -605,6 +599,7 @@ TEST(single_session, udp_multi_rules)
TAILQ_INIT(&expec_tx_queue);
stub_init();
+ dummy_swarmkv_init();
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
sf = shaping_flow_new(&ctx->thread_ctx[0]);
@@ -680,6 +675,7 @@ TEST(single_session, udp_borrow)
TAILQ_INIT(&expec_tx_queue);
stub_init();
+ dummy_swarmkv_init();
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
sf = shaping_flow_new(&ctx->thread_ctx[0]);
@@ -749,6 +745,7 @@ TEST(single_session, udp_borrow_same_priority_9)
TAILQ_INIT(&expec_tx_queue);
stub_init();
+ dummy_swarmkv_init();
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
sf = shaping_flow_new(&ctx->thread_ctx[0]);
@@ -821,6 +818,7 @@ TEST(single_session_async, udp_close_before_async_exec)
TAILQ_INIT(&expec_tx_queue);
stub_init();
+ dummy_swarmkv_init();
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
sf = shaping_flow_new(&ctx->thread_ctx[0]);
@@ -882,6 +880,7 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order)
TAILQ_INIT(&expec_tx_queue1);
TAILQ_INIT(&expec_tx_queue2);
stub_init();
+ dummy_swarmkv_init();
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
@@ -994,6 +993,7 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order)
TAILQ_INIT(&expec_tx_queue1);
TAILQ_INIT(&expec_tx_queue2);
stub_init();
+ dummy_swarmkv_init();
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
@@ -1079,6 +1079,7 @@ TEST(two_session_diff_priority_same_profile, profile_timer_test)
TAILQ_INIT(&expec_tx_queue1);
TAILQ_INIT(&expec_tx_queue2);
stub_init();
+ dummy_swarmkv_init();
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
@@ -1180,6 +1181,7 @@ TEST(two_session_diff_priority_same_profile, one_direction_dont_block_another)
TAILQ_INIT(&expec_tx_queue1);
TAILQ_INIT(&expec_tx_queue2);
stub_init();
+ dummy_swarmkv_init();
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
@@ -1271,6 +1273,7 @@ TEST(two_sessions, priority_non_block)
TAILQ_INIT(&expec_tx_queue1);
TAILQ_INIT(&expec_tx_queue2);
stub_init();
+ dummy_swarmkv_init();
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
@@ -1344,6 +1347,7 @@ TEST(two_sessions, borrow_when_primary_profile_priority_blocked)
TAILQ_INIT(&expec_tx_queue1);
TAILQ_INIT(&expec_tx_queue2);
stub_init();
+ dummy_swarmkv_init();
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
@@ -1418,6 +1422,7 @@ TEST(two_sessions, primary_profile_priority_blocked_by_borrow_profile)
TAILQ_INIT(&expec_tx_queue1);
TAILQ_INIT(&expec_tx_queue2);
stub_init();
+ dummy_swarmkv_init();
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
@@ -1483,6 +1488,7 @@ TEST(statistics, udp_drop_pkt)
TAILQ_INIT(&expec_tx_queue);
stub_init();
+ dummy_swarmkv_init();
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
sf = shaping_flow_new(&ctx->thread_ctx[0]);
@@ -1553,6 +1559,7 @@ TEST(statistics, udp_queueing_pkt)
TAILQ_INIT(&expec_tx_queue);
stub_init();
+ dummy_swarmkv_init();
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
diff --git a/shaping/test/gtest_shaper_with_swarmkv.cpp b/shaping/test/gtest_shaper_with_swarmkv.cpp
index e3f2304..403aed2 100644
--- a/shaping/test/gtest_shaper_with_swarmkv.cpp
+++ b/shaping/test/gtest_shaper_with_swarmkv.cpp
@@ -1,11 +1,383 @@
+#include <stdarg.h>
+#include <sys/syscall.h>
#include <gtest/gtest.h>
+#include <MESA/MESA_handle_logger.h>
+#include <MESA/swarmkv.h>
-TEST(single_session, generic_profile)
-{}
+#include "shaper_maat.h"
+#include "stub.h"
+
+struct swarmkv_cli_system
+{
+ int attached;
+ char *cluster_name;
+ char *attach_target_line;
+};
+
+typedef void proc_result_callback_t(struct cmd_exec_arg* exec_arg, void *uarg);
+struct cmd_exec_arg
+{
+ pthread_cond_t cond;
+ pthread_mutex_t mutex;
+ int is_callback_executed;
+ int success;
+ int api_invoking_tid;
+ pthread_t callback_invoking_tid;
+ int is_sync_callback;
+ struct swarmkv_reply expected_reply;
+ proc_result_callback_t *cb;
+ void *cb_uarg;
+ int print_reply_on_fail;
+ int diable_sync_check;
+};
+
+static struct swarmkv_cli_system g_cli_system;
+
+int swarmkv_cli_create_cluster(const char *cluster_name, const char *node_string)
+{
+ char cmd_string[1024];
+ snprintf(cmd_string, sizeof(cmd_string), "/opt/MESA/bin/swarmkv-cli --cluster-create %s %s", cluster_name, node_string);
+ return system(cmd_string);
+}
+
+void swarmkv_cli_set_db(const char *cluster_name)
+{
+ if(g_cli_system.cluster_name) free(g_cli_system.cluster_name);
+ asprintf(&(g_cli_system.cluster_name), "%s", cluster_name);
+}
+
+struct cmd_exec_arg* cmd_exec_arg_new(void)
+{
+ struct cmd_exec_arg* arg=(struct cmd_exec_arg*)calloc(sizeof(struct cmd_exec_arg), 1);
+ pthread_cond_init(&arg->cond, NULL);
+ pthread_mutex_init(&arg->mutex, NULL);
+ arg->callback_invoking_tid=syscall(SYS_gettid);
+ return arg;
+}
+
+void cmd_exec_arg_expect_OK(struct cmd_exec_arg* arg)
+{
+ arg->expected_reply.type=SWARMKV_REPLY_STATUS;
+ if(arg->expected_reply.str)
+ {
+ free(arg->expected_reply.str);
+ arg->expected_reply.str=NULL;
+ }
+ arg->expected_reply.str=strdup("OK");
+ arg->expected_reply.len=2;
+}
+
+void cmd_exec_arg_expect_cstring(struct cmd_exec_arg* arg, const char* string)
+{
+ arg->expected_reply.type=SWARMKV_REPLY_STRING;
+ if(arg->expected_reply.str)
+ {
+ free(arg->expected_reply.str);
+ arg->expected_reply.str=NULL;
+ }
+ arg->expected_reply.str=strdup(string);
+ arg->expected_reply.len=strlen(string);
+
+}
+
+void cmd_exec_arg_clear(struct cmd_exec_arg* arg)
+{
+ arg->success=0;
+ arg->is_callback_executed=0;
+ if(!arg->diable_sync_check)
+ {
+ arg->api_invoking_tid = syscall(SYS_gettid);
+ }
+ arg->callback_invoking_tid=0;
+ arg->is_sync_callback=0;
+ return;
+}
+
+typedef void swarmkv_cli_reply_funt_t(struct cmd_exec_arg* reply_arg, const char *);
+
+int swarmkv_cli_system_cmd(struct cmd_exec_arg* reply_arg, char *result, size_t result_len, swarmkv_cli_reply_funt_t *reply_funt_t, const char *format, ...)
+{
+ int line_num=0;
+ char command[1024] = {0};
+ char *cmd_str=NULL;
+
+ va_list ap;
+ va_start(ap,format);
+ vasprintf(&cmd_str, format, ap);
+ va_end(ap);
+
+ if(g_cli_system.attached)
+ {
+ snprintf(command, sizeof(command), "/opt/MESA/bin/swarmkv-cli -n %s %s --exec %s", g_cli_system.cluster_name, g_cli_system.attach_target_line, cmd_str);
+ }
+ else
+ {
+ snprintf(command, sizeof(command), "/opt/MESA/bin/swarmkv-cli -n %s --exec %s", g_cli_system.cluster_name, cmd_str);
+ }
+
+ char *p=result;
+ char line[1024] = {0};
+ FILE *fp = NULL;
+
+ if((fp = popen(command, "r")) == NULL)
+ {
+ printf("popen error!\n");
+ return 0;
+ }
+ memset(result, 0, result_len);
+ while (fgets(line, sizeof(line), fp))
+ {
+ int len= strlen(line);
+ line[len-1]='\0';
+ if((p - result) < (int)result_len)
+ {
+ if(line_num)
+ {
+ p += snprintf(p, result_len - (p - result), ",");
+ }
+
+ p += snprintf(p, result_len - (p - result), "%s", line);
+ }
+ line_num++;
+ }
+ pclose(fp);
+
+ if(reply_funt_t)
+ reply_funt_t(reply_arg, result);
+ free(cmd_str);
+ return 1;
+}
+
+void swarmkv_expect_reply_string(struct cmd_exec_arg* reply_arg, const char *line)
+{
+ EXPECT_STREQ(line, reply_arg->expected_reply.str);
+}
+
+static void send_packets(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int pkt_len, enum shaping_packet_dir dir, int is_tcp_pure_control)
+{
+ struct stub_packet *packet = (struct stub_packet *)calloc(1, sizeof(struct stub_packet));
+ struct metadata meta;
+
+ packet->direction = dir;
+ packet->length = pkt_len;
+ packet->flow = sf;
+
+ memset(&meta, 0, sizeof(meta));
+
+ meta.dir = dir;
+ meta.raw_len = pkt_len;
+ if (is_tcp_pure_control) {
+ meta.is_tcp_pure_ctrl = 1;
+ }
+
+ shaping_packet_process(ctx, packet, &meta, sf);
+
+ polling_entry(ctx->sp, ctx->stat, ctx);
+
+
+ return;
+}
+
+TEST(generic_profile, single_session)
+{
+ struct stub_pkt_queue *actual_tx_queue;
+ struct shaping_ctx *ctx = NULL;
+ struct shaping_flow *sf = NULL;
+ long long rule_id[] = {0};
+ int priority[] = {1};
+ int profile_num[] = {1};
+ int profile_id[][MAX_REF_PROFILE] = {{0}};
+ struct cmd_exec_arg* reply_arg=NULL;
+ char result[2048]={0};
+
+ stub_init();
+ ctx = shaping_engine_init();
+ ASSERT_TRUE(ctx != NULL);
+ sf = shaping_flow_new(&ctx->thread_ctx[0]);
+ ASSERT_TRUE(sf != NULL);
+
+ stub_set_profile_limit_direction(0, PROFILE_LIMIT_DIRECTION_INCOMING_OUTGOING);
+ stub_set_matched_shaping_rules(1, rule_id, priority, profile_num, profile_id);
+ shaper_rules_update(&ctx->thread_ctx[0], sf, rule_id, 1);
+
+ //set swarmkv key
+ swarmkv_cli_set_db("swarmkv-shaping-nodes");
+ reply_arg=cmd_exec_arg_new();
+
+ cmd_exec_arg_expect_OK(reply_arg);
+ swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_string, "tcfg tsg-shaping-0-incoming 1000000 1000000");
+ swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_string, "tcfg tsg-shaping-0-outgoing 1000000 1000000");
+ cmd_exec_arg_clear(reply_arg);
+
+ actual_tx_queue = stub_get_tx_queue();
+
+ time_t start_time = time(NULL);
+ unsigned long long total_bytes = 0;
+ while (1) {
+ if (time(NULL) - start_time >= 10) {
+ break;
+ }
+ send_packets(&ctx->thread_ctx[0], sf, 100, SHAPING_DIR_IN, 0);
+ while(!TAILQ_EMPTY(actual_tx_queue))
+ {
+ struct stub_packet_node *pkt_node = TAILQ_FIRST(actual_tx_queue);
+ TAILQ_REMOVE(actual_tx_queue, pkt_node, node);
+ total_bytes += pkt_node->raw_packet->length;
+ free(pkt_node->raw_packet);
+ free(pkt_node);
+ }
+ }
+
+ EXPECT_GE(total_bytes * 8, 1000000 * 10);
+ EXPECT_LE(total_bytes * 8, 1000000 * 11);
+
+ while (shaper_global_stat_queueing_pkts_get() != 0) {
+ polling_entry(ctx->thread_ctx[0].sp, ctx->stat, &ctx->thread_ctx[0]);
+ }
+ while(!TAILQ_EMPTY(actual_tx_queue))
+ {
+ struct stub_packet_node *pkt_node = TAILQ_FIRST(actual_tx_queue);
+ TAILQ_REMOVE(actual_tx_queue, pkt_node, node);
+ free(pkt_node->raw_packet);
+ free(pkt_node);
+ }
+
+
+ shaping_flow_free(&ctx->thread_ctx[0], sf);
+ shaper_thread_resource_clear();
+ shaping_engine_destroy(ctx);
+}
+
+TEST(fair_share_profile, two_members)
+{
+ struct stub_pkt_queue *actual_tx_queue;
+ struct shaping_ctx *ctx = NULL;
+ struct shaping_flow *sf1 = NULL;
+ struct shaping_flow *sf2 = NULL;
+ long long rule_id[] = {0, 1};
+ long long rule_id1[] = {0};
+ long long rule_id2[] = {1};
+ int priority[] = {1, 1};
+ int profile_num[] = {1, 1};
+ int profile_id[][MAX_REF_PROFILE] = {{0}, {0}};
+ struct cmd_exec_arg* reply_arg=NULL;
+ char result[2048]={0};
+
+ stub_init();
+ ctx = shaping_engine_init();
+ ASSERT_TRUE(ctx != NULL);
+ sf1 = shaping_flow_new(&ctx->thread_ctx[0]);
+ ASSERT_TRUE(sf1 != NULL);
+ sf2 = shaping_flow_new(&ctx->thread_ctx[0]);
+ ASSERT_TRUE(sf2 != NULL);
+
+ stub_set_profile_limit_direction(0, PROFILE_LIMIT_DIRECTION_INCOMING_OUTGOING);
+ stub_set_profile_type(0, PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS);
+ stub_set_shaping_rule_fair_factor(0, 1);
+ stub_set_shaping_rule_fair_factor(1, 3);
+ stub_set_matched_shaping_rules(2, rule_id, priority, profile_num, profile_id);
+ shaper_rules_update(&ctx->thread_ctx[0], sf1, rule_id1, 1);
+ shaper_rules_update(&ctx->thread_ctx[0], sf2, rule_id2, 1);
+
+ sf1->src_ip_str = (char *)calloc(1, 16);
+ sf1->src_ip_str_len = strlen(sf1->src_ip_str);
+ memcpy(sf1->src_ip_str, "1.1.1.1", sf1->src_ip_str_len);
+
+ sf2->src_ip_str = (char *)calloc(1, 16);
+ sf2->src_ip_str_len = strlen(sf2->src_ip_str);
+ memcpy(sf2->src_ip_str, "2.2.2.2", sf2->src_ip_str_len);
+
+ //set swarmkv key
+ swarmkv_cli_set_db("swarmkv-shaping-nodes");
+ reply_arg=cmd_exec_arg_new();
+
+ cmd_exec_arg_expect_OK(reply_arg);
+ swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_string, "ftcfg tsg-shaping-0-incoming 1000000 1000000 256");
+ swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_string, "ftcfg tsg-shaping-0-outgoing 1000000 1000000 256");
+ cmd_exec_arg_clear(reply_arg);
+
+ actual_tx_queue = stub_get_tx_queue();
+
+ time_t start_time = time(NULL);
+ time_t last_time = start_time;
+ unsigned long long total_bytes1 = 0;
+ unsigned long long total_bytes2 = 0;
+ while (1) {
+ time_t curr_time = time(NULL);
+ if (curr_time - last_time >= 1) {
+ EXPECT_NEAR(total_bytes1 * 3, total_bytes2, 100);
+
+ last_time = curr_time;
+ total_bytes1 = 0;
+ total_bytes2 = 0;
+ }
+ if (curr_time - start_time >= 10) {
+ break;
+ }
+ send_packets(&ctx->thread_ctx[0], sf2, 100, SHAPING_DIR_IN, 0);
+ send_packets(&ctx->thread_ctx[0], sf1, 100, SHAPING_DIR_IN, 0);
+ while(!TAILQ_EMPTY(actual_tx_queue))
+ {
+ struct stub_packet_node *pkt_node = TAILQ_FIRST(actual_tx_queue);
+ TAILQ_REMOVE(actual_tx_queue, pkt_node, node);
+ if (pkt_node->raw_packet->flow == sf1) {
+ total_bytes1 += pkt_node->raw_packet->length;
+ } else {
+ total_bytes2 += pkt_node->raw_packet->length;
+ }
+ free(pkt_node->raw_packet);
+ free(pkt_node);
+ }
+ }
+
+ while (shaper_global_stat_queueing_pkts_get() != 0) {
+ polling_entry(ctx->thread_ctx[0].sp, ctx->stat, &ctx->thread_ctx[0]);
+ }
+ while(!TAILQ_EMPTY(actual_tx_queue))
+ {
+ struct stub_packet_node *pkt_node = TAILQ_FIRST(actual_tx_queue);
+ TAILQ_REMOVE(actual_tx_queue, pkt_node, node);
+ free(pkt_node->raw_packet);
+ free(pkt_node);
+ }
+
+ shaping_flow_free(&ctx->thread_ctx[0], sf1);
+ shaping_flow_free(&ctx->thread_ctx[0], sf2);
+ shaper_thread_resource_clear();
+ shaping_engine_destroy(ctx);
+}
int main(int argc, char **argv)
{
testing::InitGoogleTest(&argc, argv);
//testing::GTEST_FLAG(filter) = "single_session.udp_tx_in_order";
- return RUN_ALL_TESTS();
+
+ const char *cluster_name="swarmkv-shaping-nodes";
+
+ system("consul agent -dev -config-dir=/etc/consul.d > /dev/null 2>&1 &");
+ sleep(3);
+
+ swarmkv_cli_create_cluster(cluster_name, "127.0.0.1:5210");
+
+ if (0 != MESA_handle_runtime_log_creation("./conf/zlog.conf"))
+ {
+ fprintf(stderr, "FATAL: unable to create runtime logger\n");
+ return -1;
+ }
+
+ void *log_handle = MESA_create_runtime_log_handle("log/shaping", RLOG_LV_DEBUG);
+ if (log_handle == NULL)
+ {
+ fprintf(stderr, "FATAL: unable to create log handle\n");
+ return -1;
+ }
+
+ int ret = RUN_ALL_TESTS();
+
+ MESA_destroy_runtime_log_handle(log_handle);
+ MESA_handle_runtime_log_destruction();
+
+ system("pkill -f consul");
+
+ return ret;
} \ No newline at end of file
diff --git a/shaping/test/stub.cpp b/shaping/test/stub.cpp
index 4da686e..24b3ac3 100644
--- a/shaping/test/stub.cpp
+++ b/shaping/test/stub.cpp
@@ -28,7 +28,7 @@ struct stub_matched_rules {
struct stub_pkt_queue tx_queue;
struct stub_matched_rules matched_rules;
-extern struct shaping_profile pf_array[MAX_STUB_PROFILE_NUM];
+struct shaping_profile pf_array[MAX_STUB_PROFILE_NUM];
void stub_set_profile_type(int profile_id, enum shaping_profile_type type)
{
@@ -36,6 +36,12 @@ void stub_set_profile_type(int profile_id, enum shaping_profile_type type)
return;
}
+void stub_set_profile_limit_direction(int profile_id, enum shaping_profile_limit_direction limit_direction)
+{
+ pf_array[profile_id].limit_direction = limit_direction;
+ return;
+}
+
void stub_set_matched_shaping_rules(int rule_num, long long *rule_id, const int *priority, const int *profile_num, int profile_id[][MAX_REF_PROFILE])
{
struct shaping_rule *rules;
@@ -70,6 +76,12 @@ void stub_set_shaping_rule_dscp_value(int rule_id, int dscp_value)
return;
}
+void stub_set_shaping_rule_fair_factor(int rule_id, int fair_factor)
+{
+ matched_rules.rules[rule_id].fair_factor = fair_factor;
+ return;
+}
+
void stub_clear_matched_shaping_rules()
{
memset(&matched_rules, 0, sizeof(struct stub_matched_rules));
@@ -104,7 +116,6 @@ void stub_init()
TAILQ_INIT(&tx_queue);
memset(&matched_rules, 0, sizeof(struct stub_matched_rules));
- init_dummy_swarmkv();
return;
}
diff --git a/shaping/test/stub.h b/shaping/test/stub.h
index 4c86f09..7581a98 100644
--- a/shaping/test/stub.h
+++ b/shaping/test/stub.h
@@ -18,9 +18,7 @@ struct stub_packet {
unsigned char direction;
unsigned char pure_control;
unsigned int length;
- unsigned int sequence;
- unsigned long long income_time;
- unsigned char detained_flag;
+ struct shaping_flow *flow;
};
struct stub_packet_node {
@@ -33,10 +31,12 @@ TAILQ_HEAD(stub_pkt_queue, stub_packet_node);
void stub_set_token_bucket_avl_per_sec(int profile_id, unsigned int tokens, unsigned char direction, enum shaping_profile_limit_direction limit_direction);
void stub_refresh_token_bucket(int profile_id);
void stub_set_profile_type(int profile_id, enum shaping_profile_type type);
+void stub_set_profile_limit_direction(int profile_id, enum shaping_profile_limit_direction limit_direction);
void stub_set_async_token_get_times(int profile_id, int times);
void stub_set_matched_shaping_rules(int rule_num, long long *rule_id, const int *priority, const int *profile_num, int profile_id[][MAX_REF_PROFILE]);
void stub_set_shaping_rule_dscp_value(int rule_id, int dscp_value);
+void stub_set_shaping_rule_fair_factor(int rule_id, int fair_factor);
void stub_clear_matched_shaping_rules();
void stub_send_packet(struct stub_packet *packet);
@@ -49,7 +49,7 @@ void stub_curr_time_s_inc(int time_s);
unsigned long long stub_curr_time_ns_get();
void stub_init();
-void init_dummy_swarmkv();
+void dummy_swarmkv_init();
/*******************temporary for test******************************/
void stub_shaper_stat_send(int thread_seq);
diff --git a/shaping/test/test_conf/shaping.conf b/shaping/test/test_conf/shaping.conf
index 3e71225..cd0c1a5 100644
--- a/shaping/test/test_conf/shaping.conf
+++ b/shaping/test/test_conf/shaping.conf
@@ -19,15 +19,15 @@ REDIS_PORT="6379"
[SWARMKV]
-SWARMKV_CLUSTER_NAME="shaping"
+SWARMKV_CLUSTER_NAME="swarmkv-shaping-nodes"
SWARMKV_NODE_IP="127.0.0.1"
SWARMKV_NODE_PORT=5210
SWARMKV_CONSUL_IP="127.0.0.1"
SWARMKV_CONSUL_PORT=8500
SWARMKV_CLUSTER_ANNOUNCE_IP="127.0.0.1"
-SWARMKV_CLUSTER_ANNOUNCE_PORT=8501
-SWARMKV_HEALTH_CHECK_PORT=0
-SWARMKV_HEALTH_CHECK_ANNOUNCE_PORT=1111
+SWARMKV_CLUSTER_ANNOUNCE_PORT=5210
+SWARMKV_HEALTH_CHECK_PORT=8552
+SWARMKV_HEALTH_CHECK_ANNOUNCE_PORT=8552
[METRIC]
FIELDSTAT_OUTPUT_INTERVAL_S=999999000