summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorroot <[email protected]>2024-07-31 09:29:47 +0000
committerroot <[email protected]>2024-07-31 09:29:47 +0000
commit4030c6fb6887356e38b20cb3d74358463dd29e37 (patch)
tree1faf56cf86595d081341f734326ad5256283591b
parent9f31dd1065332f36382383564abe2734beba6ce8 (diff)
add fair-share profile test case
-rw-r--r--shaping/src/shaper_stat.cpp96
-rw-r--r--shaping/test/CMakeLists.txt2
-rw-r--r--shaping/test/dummy_rdkafka.cpp5
-rw-r--r--shaping/test/gtest_shaper.cpp1
-rw-r--r--shaping/test/gtest_shaper_with_swarmkv.cpp121
-rw-r--r--shaping/test/stub.cpp6
-rw-r--r--shaping/test/stub.h5
7 files changed, 199 insertions, 37 deletions
diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp
index efc7322..1498230 100644
--- a/shaping/src/shaper_stat.cpp
+++ b/shaping/src/shaper_stat.cpp
@@ -33,8 +33,6 @@ thread_local struct field tags[TAG_IDX_MAX] =
[TAG_PROFILE_TYPE_IDX] = {.key = "profile_type", .type = FIELD_VALUE_CSTRING}
};
-char *output_json_buf = NULL;
-
void shaper_stat_destroy(struct shaping_stat *stat)
{
if (stat) {
@@ -63,22 +61,63 @@ static void shaper_stat_kafka_init(struct shaping_stat *stat, struct shaper_stat
rd_kafka_conf_t *rdkafka_conf = rd_kafka_conf_new();
- rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000", kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "socket.keepalive.enable", "true", kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "bootstrap.servers", conf->kafka_brokers, kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "client.id", conf->kafka_topic, kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "sasl.username", conf->kafka_username, kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "sasl.password", conf->kafka_brokers, kafka_errstr, sizeof(kafka_errstr));
+ if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr)))
+ {
+ LOG_ERROR("%s: kafka producer set queue.buffering.max.messages failed, err %s", LOG_TAG_STAT, kafka_errstr);
+ return;
+ }
+ if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000", kafka_errstr, sizeof(kafka_errstr)))
+ {
+ LOG_ERROR("%s: kafka producer set topic.metadata.refresh.interval.ms failed, err %s", LOG_TAG_STAT, kafka_errstr);
+ return;
+ }
+ if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "socket.keepalive.enable", "true", kafka_errstr, sizeof(kafka_errstr)))
+ {
+ LOG_ERROR("%s: kafka producer set socket.keepalive.enable failed, err %s", LOG_TAG_STAT, kafka_errstr);
+ return;
+ }
+ if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr)))
+ {
+ LOG_ERROR("%s: kafka producer set security.protocol failed, err %s", LOG_TAG_STAT, kafka_errstr);
+ return;
+ }
+ if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "client.id", conf->kafka_topic, kafka_errstr, sizeof(kafka_errstr)))
+ {
+ LOG_ERROR("%s: kafka producer set client.id failed, err %s", LOG_TAG_STAT, kafka_errstr);
+ return;
+ }
+ if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr)))
+ {
+ LOG_ERROR("%s: kafka producer set sasl.mechanisms failed, err %s", LOG_TAG_STAT, kafka_errstr);
+ return;
+ }
+ if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "sasl.username", conf->kafka_username, kafka_errstr, sizeof(kafka_errstr)))
+ {
+ LOG_ERROR("%s: kafka producer set sasl.username failed, err %s", LOG_TAG_STAT, kafka_errstr);
+ return;
+ }
+ if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "sasl.password", conf->kafka_password, kafka_errstr, sizeof(kafka_errstr)))
+ {
+ LOG_ERROR("%s: kafka producer set sasl.password failed, err %s", LOG_TAG_STAT, kafka_errstr);
+ return;
+ }
stat->kafka_handle = rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr));
if (stat->kafka_handle == NULL) {
LOG_ERROR("%s: kafka producer create failed, err %s", LOG_TAG_STAT, kafka_errstr);
return;
}
+ if (rd_kafka_brokers_add(stat->kafka_handle, conf->kafka_brokers) <= 0)
+ {
+ LOG_ERROR("%s: kafka producer add brokers failed", LOG_TAG_STAT);
+ return;
+ }
+
stat->topic_rkt = rd_kafka_topic_new(stat->kafka_handle, conf->kafka_topic, NULL);
+ if (stat->topic_rkt == NULL) {
+ LOG_ERROR("%s: kafka producer create topic failed", LOG_TAG_STAT);
+ return;
+ }
return;
}
@@ -131,7 +170,7 @@ struct shaping_stat* shaper_stat_init(int thread_num)
global_tags[4].type = FIELD_VALUE_CSTRING;
global_tags[4].value_str = "shaping_metric";
- stat->instance = fieldstat_easy_new(thread_num, "shaping_stat", global_tags, 5);
+ stat->instance = fieldstat_easy_new(thread_num, "traffic_shaping_rule_hits", global_tags, 5);
if (stat->instance == NULL) {
LOG_ERROR("%s: shaping init fieldstat instance failed", LOG_TAG_STAT);
goto ERROR;
@@ -159,8 +198,6 @@ struct shaping_stat* shaper_stat_init(int thread_num)
}
}
- fieldstat_easy_enable_auto_output(stat->instance, "./metric/shaping_stat.json", 1);//TODO: output interval
-
return stat;
ERROR:
if (stat) {
@@ -486,22 +523,21 @@ void shaper_stat_max_latency_update(struct shaping_stat_for_profile *profile_sta
void shaper_stat_output(struct shaping_stat *stat)
{
- size_t len = 0;
- fieldstat_easy_output(stat->instance, &output_json_buf, &len);
-
- int status=rd_kafka_produce(stat->topic_rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, output_json_buf, len, NULL, 0, NULL);
- if (status < 0) {
- LOG_ERROR("%s:shaper_stat_output, rd_kafka_produce is error of code: %d %s(%s), status: %d",
- LOG_TAG_STAT,
- rd_kafka_last_error(),
- rd_kafka_err2name(rd_kafka_last_error()),
- rd_kafka_err2str(rd_kafka_last_error()),
- status);
- }
-
- if (output_json_buf) {
- free(output_json_buf);
- output_json_buf = NULL;
+ char **output_buff_array = NULL;
+ size_t array_size = 0;
+ fieldstat_easy_output_array_and_reset(stat->instance, &output_buff_array, &array_size);
+
+ for (int i = 0; i < array_size; i++) {
+ int status=rd_kafka_produce(stat->topic_rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, output_buff_array[i], strlen(output_buff_array[i]), NULL, 0, NULL);
+ if (status < 0) {
+ LOG_ERROR("%s:shaper_stat_output, rd_kafka_produce is error of code: %d %s(%s), status: %d",
+ LOG_TAG_STAT,
+ rd_kafka_last_error(),
+ rd_kafka_err2name(rd_kafka_last_error()),
+ rd_kafka_err2str(rd_kafka_last_error()),
+ status);
+ }
+ free(output_buff_array[i]);
}
return;
diff --git a/shaping/test/CMakeLists.txt b/shaping/test/CMakeLists.txt
index 67f18b6..cf171d6 100644
--- a/shaping/test/CMakeLists.txt
+++ b/shaping/test/CMakeLists.txt
@@ -51,6 +51,6 @@ gtest_discover_tests(gtest_shaper_maat)
gtest_discover_tests(gtest_shaper_send_log)
gtest_discover_tests(gtest_shaper)
gtest_discover_tests(gtest_shaper_aqm)
-gtest_discover_tests(gtest_shaper_with_swarmkv)
+#gtest_discover_tests(gtest_shaper_with_swarmkv)
file(COPY ./test_conf/ DESTINATION ./conf/) \ No newline at end of file
diff --git a/shaping/test/dummy_rdkafka.cpp b/shaping/test/dummy_rdkafka.cpp
index 5d255d8..f49ff1b 100644
--- a/shaping/test/dummy_rdkafka.cpp
+++ b/shaping/test/dummy_rdkafka.cpp
@@ -38,4 +38,9 @@ const char *rd_kafka_err2name (rd_kafka_resp_err_t err)
const char *rd_kafka_err2str (rd_kafka_resp_err_t err)
{
return NULL;
+}
+
+int rd_kafka_brokers_add(rd_kafka_t *rk, const char *brokerlist)
+{
+ return 1;
} \ No newline at end of file
diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp
index 50c5e90..024717f 100644
--- a/shaping/test/gtest_shaper.cpp
+++ b/shaping/test/gtest_shaper.cpp
@@ -21,7 +21,6 @@ static struct stub_packet* packet_new(unsigned long long income_time, unsigned i
struct stub_packet *packet;
packet = (struct stub_packet*)calloc(1, sizeof(struct stub_packet));
- packet->income_time = income_time;
packet->length = length;
packet->direction = dir;
diff --git a/shaping/test/gtest_shaper_with_swarmkv.cpp b/shaping/test/gtest_shaper_with_swarmkv.cpp
index 675d46e..403aed2 100644
--- a/shaping/test/gtest_shaper_with_swarmkv.cpp
+++ b/shaping/test/gtest_shaper_with_swarmkv.cpp
@@ -160,6 +160,7 @@ static void send_packets(struct shaping_thread_ctx *ctx, struct shaping_flow *sf
packet->direction = dir;
packet->length = pkt_len;
+ packet->flow = sf;
memset(&meta, 0, sizeof(meta));
@@ -177,7 +178,7 @@ static void send_packets(struct shaping_thread_ctx *ctx, struct shaping_flow *sf
return;
}
-TEST(single_session, generic_profile)
+TEST(generic_profile, single_session)
{
struct stub_pkt_queue *actual_tx_queue;
struct shaping_ctx *ctx = NULL;
@@ -233,6 +234,13 @@ TEST(single_session, generic_profile)
while (shaper_global_stat_queueing_pkts_get() != 0) {
polling_entry(ctx->thread_ctx[0].sp, ctx->stat, &ctx->thread_ctx[0]);
}
+ while(!TAILQ_EMPTY(actual_tx_queue))
+ {
+ struct stub_packet_node *pkt_node = TAILQ_FIRST(actual_tx_queue);
+ TAILQ_REMOVE(actual_tx_queue, pkt_node, node);
+ free(pkt_node->raw_packet);
+ free(pkt_node);
+ }
shaping_flow_free(&ctx->thread_ctx[0], sf);
@@ -240,6 +248,105 @@ TEST(single_session, generic_profile)
shaping_engine_destroy(ctx);
}
+TEST(fair_share_profile, two_members)
+{
+ struct stub_pkt_queue *actual_tx_queue;
+ struct shaping_ctx *ctx = NULL;
+ struct shaping_flow *sf1 = NULL;
+ struct shaping_flow *sf2 = NULL;
+ long long rule_id[] = {0, 1};
+ long long rule_id1[] = {0};
+ long long rule_id2[] = {1};
+ int priority[] = {1, 1};
+ int profile_num[] = {1, 1};
+ int profile_id[][MAX_REF_PROFILE] = {{0}, {0}};
+ struct cmd_exec_arg* reply_arg=NULL;
+ char result[2048]={0};
+
+ stub_init();
+ ctx = shaping_engine_init();
+ ASSERT_TRUE(ctx != NULL);
+ sf1 = shaping_flow_new(&ctx->thread_ctx[0]);
+ ASSERT_TRUE(sf1 != NULL);
+ sf2 = shaping_flow_new(&ctx->thread_ctx[0]);
+ ASSERT_TRUE(sf2 != NULL);
+
+ stub_set_profile_limit_direction(0, PROFILE_LIMIT_DIRECTION_INCOMING_OUTGOING);
+ stub_set_profile_type(0, PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS);
+ stub_set_shaping_rule_fair_factor(0, 1);
+ stub_set_shaping_rule_fair_factor(1, 3);
+ stub_set_matched_shaping_rules(2, rule_id, priority, profile_num, profile_id);
+ shaper_rules_update(&ctx->thread_ctx[0], sf1, rule_id1, 1);
+ shaper_rules_update(&ctx->thread_ctx[0], sf2, rule_id2, 1);
+
+ sf1->src_ip_str = (char *)calloc(1, 16);
+ sf1->src_ip_str_len = strlen(sf1->src_ip_str);
+ memcpy(sf1->src_ip_str, "1.1.1.1", sf1->src_ip_str_len);
+
+ sf2->src_ip_str = (char *)calloc(1, 16);
+ sf2->src_ip_str_len = strlen(sf2->src_ip_str);
+ memcpy(sf2->src_ip_str, "2.2.2.2", sf2->src_ip_str_len);
+
+ //set swarmkv key
+ swarmkv_cli_set_db("swarmkv-shaping-nodes");
+ reply_arg=cmd_exec_arg_new();
+
+ cmd_exec_arg_expect_OK(reply_arg);
+ swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_string, "ftcfg tsg-shaping-0-incoming 1000000 1000000 256");
+ swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_string, "ftcfg tsg-shaping-0-outgoing 1000000 1000000 256");
+ cmd_exec_arg_clear(reply_arg);
+
+ actual_tx_queue = stub_get_tx_queue();
+
+ time_t start_time = time(NULL);
+ time_t last_time = start_time;
+ unsigned long long total_bytes1 = 0;
+ unsigned long long total_bytes2 = 0;
+ while (1) {
+ time_t curr_time = time(NULL);
+ if (curr_time - last_time >= 1) {
+ EXPECT_NEAR(total_bytes1 * 3, total_bytes2, 100);
+
+ last_time = curr_time;
+ total_bytes1 = 0;
+ total_bytes2 = 0;
+ }
+ if (curr_time - start_time >= 10) {
+ break;
+ }
+ send_packets(&ctx->thread_ctx[0], sf2, 100, SHAPING_DIR_IN, 0);
+ send_packets(&ctx->thread_ctx[0], sf1, 100, SHAPING_DIR_IN, 0);
+ while(!TAILQ_EMPTY(actual_tx_queue))
+ {
+ struct stub_packet_node *pkt_node = TAILQ_FIRST(actual_tx_queue);
+ TAILQ_REMOVE(actual_tx_queue, pkt_node, node);
+ if (pkt_node->raw_packet->flow == sf1) {
+ total_bytes1 += pkt_node->raw_packet->length;
+ } else {
+ total_bytes2 += pkt_node->raw_packet->length;
+ }
+ free(pkt_node->raw_packet);
+ free(pkt_node);
+ }
+ }
+
+ while (shaper_global_stat_queueing_pkts_get() != 0) {
+ polling_entry(ctx->thread_ctx[0].sp, ctx->stat, &ctx->thread_ctx[0]);
+ }
+ while(!TAILQ_EMPTY(actual_tx_queue))
+ {
+ struct stub_packet_node *pkt_node = TAILQ_FIRST(actual_tx_queue);
+ TAILQ_REMOVE(actual_tx_queue, pkt_node, node);
+ free(pkt_node->raw_packet);
+ free(pkt_node);
+ }
+
+ shaping_flow_free(&ctx->thread_ctx[0], sf1);
+ shaping_flow_free(&ctx->thread_ctx[0], sf2);
+ shaper_thread_resource_clear();
+ shaping_engine_destroy(ctx);
+}
+
int main(int argc, char **argv)
{
testing::InitGoogleTest(&argc, argv);
@@ -247,6 +354,9 @@ int main(int argc, char **argv)
const char *cluster_name="swarmkv-shaping-nodes";
+ system("consul agent -dev -config-dir=/etc/consul.d > /dev/null 2>&1 &");
+ sleep(3);
+
swarmkv_cli_create_cluster(cluster_name, "127.0.0.1:5210");
if (0 != MESA_handle_runtime_log_creation("./conf/zlog.conf"))
@@ -262,5 +372,12 @@ int main(int argc, char **argv)
return -1;
}
- return RUN_ALL_TESTS();
+ int ret = RUN_ALL_TESTS();
+
+ MESA_destroy_runtime_log_handle(log_handle);
+ MESA_handle_runtime_log_destruction();
+
+ system("pkill -f consul");
+
+ return ret;
} \ No newline at end of file
diff --git a/shaping/test/stub.cpp b/shaping/test/stub.cpp
index 481f277..24b3ac3 100644
--- a/shaping/test/stub.cpp
+++ b/shaping/test/stub.cpp
@@ -76,6 +76,12 @@ void stub_set_shaping_rule_dscp_value(int rule_id, int dscp_value)
return;
}
+void stub_set_shaping_rule_fair_factor(int rule_id, int fair_factor)
+{
+ matched_rules.rules[rule_id].fair_factor = fair_factor;
+ return;
+}
+
void stub_clear_matched_shaping_rules()
{
memset(&matched_rules, 0, sizeof(struct stub_matched_rules));
diff --git a/shaping/test/stub.h b/shaping/test/stub.h
index 4589be1..7581a98 100644
--- a/shaping/test/stub.h
+++ b/shaping/test/stub.h
@@ -18,9 +18,7 @@ struct stub_packet {
unsigned char direction;
unsigned char pure_control;
unsigned int length;
- unsigned int sequence;
- unsigned long long income_time;
- unsigned char detained_flag;
+ struct shaping_flow *flow;
};
struct stub_packet_node {
@@ -38,6 +36,7 @@ void stub_set_async_token_get_times(int profile_id, int times);
void stub_set_matched_shaping_rules(int rule_num, long long *rule_id, const int *priority, const int *profile_num, int profile_id[][MAX_REF_PROFILE]);
void stub_set_shaping_rule_dscp_value(int rule_id, int dscp_value);
+void stub_set_shaping_rule_fair_factor(int rule_id, int fair_factor);
void stub_clear_matched_shaping_rules();
void stub_send_packet(struct stub_packet *packet);