From 9a9a03f7a9a5ec6acd6356a6e0143b1cd1e17892 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 29 Jul 2024 03:29:27 +0000 Subject: fix ci buile break --- shaping/src/shaper_stat.cpp | 1 - 1 file changed, 1 deletion(-) (limited to 'shaping/src/shaper_stat.cpp') diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp index e37f5e0..feb1406 100644 --- a/shaping/src/shaper_stat.cpp +++ b/shaping/src/shaper_stat.cpp @@ -4,7 +4,6 @@ #include #include #include -#include #include "log.h" #include "utils.h" -- cgit v1.2.3 From 9f31dd1065332f36382383564abe2734beba6ce8 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 29 Jul 2024 06:42:39 +0000 Subject: adapt latest version of fieldstat4 --- shaping/src/shaper_global_stat.cpp | 4 ++-- shaping/src/shaper_stat.cpp | 24 ++++++++++++------------ shaping/test/gtest_shaper.cpp | 29 ++++++++++------------------- 3 files changed, 24 insertions(+), 33 deletions(-) (limited to 'shaping/src/shaper_stat.cpp') diff --git a/shaping/src/shaper_global_stat.cpp b/shaping/src/shaper_global_stat.cpp index 1080033..1fb3764 100644 --- a/shaping/src/shaper_global_stat.cpp +++ b/shaping/src/shaper_global_stat.cpp @@ -75,7 +75,7 @@ struct shaping_global_stat* shaper_global_stat_init(int work_thread_num) { struct shaping_global_stat *stat = NULL; struct shping_global_stat_conf conf; - struct fieldstat_tag tag; + struct field tag; stat = (struct shaping_global_stat*)calloc(1, sizeof(struct shaping_global_stat)); @@ -85,7 +85,7 @@ struct shaping_global_stat* shaper_global_stat_init(int work_thread_num) } tag.key = "shaping_global"; - tag.type = TAG_CSTRING; + tag.type = FIELD_VALUE_CSTRING; tag.value_str = "shaping_global"; stat->instance = fieldstat_easy_new(work_thread_num, "shaping_global", &tag, 1); if (stat->instance == NULL) { diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp index feb1406..efc7322 100644 --- a/shaping/src/shaper_stat.cpp +++ b/shaping/src/shaper_stat.cpp @@ -24,13 +24,13 @@ struct shaper_stat_conf { char kafka_brokers[256]; }; -thread_local struct fieldstat_tag tags[TAG_IDX_MAX] = +thread_local struct field tags[TAG_IDX_MAX] = { - [TAG_VSYS_ID_IDX] = {.key = "vsys_id", .type = TAG_INTEGER}, - [TAG_RULE_ID_IDX] = {.key = "rule_id", .type = TAG_INTEGER}, - [TAG_PROFILE_ID_IDX] = {.key = "profile_id", .type = TAG_INTEGER}, - [TAG_PRIORITY_IDX] = {.key = "priority", .type = TAG_INTEGER}, - [TAG_PROFILE_TYPE_IDX] = {.key = "profile_type", .type = TAG_CSTRING} + [TAG_VSYS_ID_IDX] = {.key = "vsys_id", .type = FIELD_VALUE_INTEGER}, + [TAG_RULE_ID_IDX] = {.key = "rule_id", .type = FIELD_VALUE_INTEGER}, + [TAG_PROFILE_ID_IDX] = {.key = "profile_id", .type = FIELD_VALUE_INTEGER}, + [TAG_PRIORITY_IDX] = {.key = "priority", .type = FIELD_VALUE_INTEGER}, + [TAG_PROFILE_TYPE_IDX] = {.key = "profile_type", .type = FIELD_VALUE_CSTRING} }; char *output_json_buf = NULL; @@ -100,7 +100,7 @@ static int shaper_stat_conf_load(struct shaping_stat *stat, struct shaper_stat_c struct shaping_stat* shaper_stat_init(int thread_num) { - struct fieldstat_tag global_tags[5]; + struct field global_tags[5]; struct shaper_stat_conf conf; struct shaping_stat *stat = (struct shaping_stat *)calloc(1, sizeof(struct shaping_stat)); @@ -112,23 +112,23 @@ struct shaping_stat* shaper_stat_init(int thread_num) shaper_stat_kafka_init(stat, &conf); global_tags[0].key = "app_name"; - global_tags[0].type = TAG_CSTRING; + global_tags[0].type = FIELD_VALUE_CSTRING; global_tags[0].value_str = "shaping_engine"; global_tags[1].key = "device_group"; - global_tags[1].type = TAG_CSTRING; + global_tags[1].type = FIELD_VALUE_CSTRING; global_tags[1].value_str = conf.device_group; global_tags[2].key = "device_id"; - global_tags[2].type = TAG_CSTRING; + global_tags[2].type = FIELD_VALUE_CSTRING; global_tags[2].value_str = conf.device_id; global_tags[3].key = "data_center"; - global_tags[3].type = TAG_CSTRING; + global_tags[3].type = FIELD_VALUE_CSTRING; global_tags[3].value_str = conf.data_center; global_tags[4].key = "table_name"; - global_tags[4].type = TAG_CSTRING; + global_tags[4].type = FIELD_VALUE_CSTRING; global_tags[4].value_str = "shaping_metric"; stat->instance = fieldstat_easy_new(thread_num, "shaping_stat", global_tags, 5); diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp index 97f71ee..50c5e90 100644 --- a/shaping/test/gtest_shaper.cpp +++ b/shaping/test/gtest_shaper.cpp @@ -106,8 +106,6 @@ static void shaping_stat_judge(char *file_line, int json_array_idx, int rule_id, { cJSON *json = NULL; cJSON *json_array_element = NULL; - cJSON *fields_json = NULL; - cJSON *tags_json = NULL; cJSON *tmp_obj = NULL; char attr_name[32] = {0}; @@ -124,45 +122,41 @@ static void shaping_stat_judge(char *file_line, int json_array_idx, int rule_id, EXPECT_STREQ("shaping_stat", tmp_obj->valuestring); /******************parse tags***********************************/ - tags_json = cJSON_GetObjectItem(json_array_element, "tags"); - ASSERT_TRUE(tags_json != NULL); - tmp_obj = cJSON_GetObjectItem(tags_json, "vsys_id"); + tmp_obj = cJSON_GetObjectItem(json_array_element, "vsys_id"); ASSERT_TRUE(tmp_obj != NULL); EXPECT_EQ(tmp_obj->valueint, STUB_TEST_VSYS_ID); - tmp_obj = cJSON_GetObjectItem(tags_json, "rule_id"); + tmp_obj = cJSON_GetObjectItem(json_array_element, "rule_id"); ASSERT_TRUE(tmp_obj != NULL); EXPECT_EQ(rule_id, tmp_obj->valueint); - tmp_obj = cJSON_GetObjectItem(tags_json, "profile_id"); + tmp_obj = cJSON_GetObjectItem(json_array_element, "profile_id"); ASSERT_TRUE(tmp_obj != NULL); EXPECT_EQ(profile_id, tmp_obj->valueint); - tmp_obj = cJSON_GetObjectItem(tags_json, "priority"); + tmp_obj = cJSON_GetObjectItem(json_array_element, "priority"); ASSERT_TRUE(tmp_obj != NULL); EXPECT_EQ(priority, tmp_obj->valueint); - tmp_obj = cJSON_GetObjectItem(tags_json, "profile_type"); + tmp_obj = cJSON_GetObjectItem(json_array_element, "profile_type"); ASSERT_TRUE(tmp_obj != NULL); EXPECT_STREQ(tmp_obj->valuestring, profile_type); /******************parse fields**********************************/ - fields_json = cJSON_GetObjectItem(json_array_element, "fields"); - ASSERT_TRUE(fields_json != NULL); snprintf(attr_name, sizeof(attr_name), "%s_pkts", direction == SHAPING_DIR_OUT ? "out" : "in"); - tmp_obj = cJSON_GetObjectItem(fields_json, attr_name); + tmp_obj = cJSON_GetObjectItem(json_array_element, attr_name); ASSERT_TRUE(tmp_obj != NULL); EXPECT_EQ(tx_pkts, tmp_obj->valueint); snprintf(attr_name, sizeof(attr_name), "%s_bytes", direction == SHAPING_DIR_OUT ? "out" : "in"); - tmp_obj = cJSON_GetObjectItem(fields_json, attr_name); + tmp_obj = cJSON_GetObjectItem(json_array_element, attr_name); ASSERT_TRUE(tmp_obj != NULL); EXPECT_EQ(tx_bytes, tmp_obj->valueint); snprintf(attr_name, sizeof(attr_name), "%s_drop_pkts", direction == SHAPING_DIR_OUT ? "out" : "in"); - tmp_obj = cJSON_GetObjectItem(fields_json, attr_name); + tmp_obj = cJSON_GetObjectItem(json_array_element, attr_name); ASSERT_TRUE(tmp_obj != NULL); EXPECT_EQ(drop_pkts, tmp_obj->valueint); @@ -175,7 +169,7 @@ static void shaping_stat_judge(char *file_line, int json_array_idx, int rule_id, }*/ snprintf(attr_name, sizeof(attr_name), "%s_queue_len", direction == SHAPING_DIR_OUT ? "out" : "in"); - tmp_obj = cJSON_GetObjectItem(fields_json, attr_name); + tmp_obj = cJSON_GetObjectItem(json_array_element, attr_name); if (tmp_obj != NULL) { EXPECT_EQ(queue_len, tmp_obj->valueint); } @@ -200,12 +194,9 @@ static int shaping_global_stat_field_get(cJSON *metrics, const char *field_name) static void shaping_global_stat_judge(char *file_line, int tx_pkts, int tx_bytes, int drop_pkts, int drop_bytes, int queueing_pkts, int queueing_bytes) { cJSON *metrics = NULL; - cJSON *json_array_element = NULL; cJSON *json = cJSON_Parse(file_line); - json_array_element = cJSON_GetArrayItem(json, 0); - - metrics = cJSON_GetObjectItem(json_array_element, "fields"); + metrics = cJSON_GetArrayItem(json, 0); EXPECT_EQ(tx_pkts, shaping_global_stat_field_get(metrics, "all_tx_pkts")); EXPECT_EQ(tx_bytes, shaping_global_stat_field_get(metrics, "all_tx_bytes")); -- cgit v1.2.3 From 4030c6fb6887356e38b20cb3d74358463dd29e37 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 31 Jul 2024 09:29:47 +0000 Subject: add fair-share profile test case --- shaping/src/shaper_stat.cpp | 96 ++++++++++++++++------- shaping/test/CMakeLists.txt | 2 +- shaping/test/dummy_rdkafka.cpp | 5 ++ shaping/test/gtest_shaper.cpp | 1 - shaping/test/gtest_shaper_with_swarmkv.cpp | 121 ++++++++++++++++++++++++++++- shaping/test/stub.cpp | 6 ++ shaping/test/stub.h | 5 +- 7 files changed, 199 insertions(+), 37 deletions(-) (limited to 'shaping/src/shaper_stat.cpp') diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp index efc7322..1498230 100644 --- a/shaping/src/shaper_stat.cpp +++ b/shaping/src/shaper_stat.cpp @@ -33,8 +33,6 @@ thread_local struct field tags[TAG_IDX_MAX] = [TAG_PROFILE_TYPE_IDX] = {.key = "profile_type", .type = FIELD_VALUE_CSTRING} }; -char *output_json_buf = NULL; - void shaper_stat_destroy(struct shaping_stat *stat) { if (stat) { @@ -63,22 +61,63 @@ static void shaper_stat_kafka_init(struct shaping_stat *stat, struct shaper_stat rd_kafka_conf_t *rdkafka_conf = rd_kafka_conf_new(); - rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000", kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "socket.keepalive.enable", "true", kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "bootstrap.servers", conf->kafka_brokers, kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "client.id", conf->kafka_topic, kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "sasl.username", conf->kafka_username, kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "sasl.password", conf->kafka_brokers, kafka_errstr, sizeof(kafka_errstr)); + if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr))) + { + LOG_ERROR("%s: kafka producer set queue.buffering.max.messages failed, err %s", LOG_TAG_STAT, kafka_errstr); + return; + } + if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000", kafka_errstr, sizeof(kafka_errstr))) + { + LOG_ERROR("%s: kafka producer set topic.metadata.refresh.interval.ms failed, err %s", LOG_TAG_STAT, kafka_errstr); + return; + } + if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "socket.keepalive.enable", "true", kafka_errstr, sizeof(kafka_errstr))) + { + LOG_ERROR("%s: kafka producer set socket.keepalive.enable failed, err %s", LOG_TAG_STAT, kafka_errstr); + return; + } + if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr))) + { + LOG_ERROR("%s: kafka producer set security.protocol failed, err %s", LOG_TAG_STAT, kafka_errstr); + return; + } + if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "client.id", conf->kafka_topic, kafka_errstr, sizeof(kafka_errstr))) + { + LOG_ERROR("%s: kafka producer set client.id failed, err %s", LOG_TAG_STAT, kafka_errstr); + return; + } + if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr))) + { + LOG_ERROR("%s: kafka producer set sasl.mechanisms failed, err %s", LOG_TAG_STAT, kafka_errstr); + return; + } + if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "sasl.username", conf->kafka_username, kafka_errstr, sizeof(kafka_errstr))) + { + LOG_ERROR("%s: kafka producer set sasl.username failed, err %s", LOG_TAG_STAT, kafka_errstr); + return; + } + if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "sasl.password", conf->kafka_password, kafka_errstr, sizeof(kafka_errstr))) + { + LOG_ERROR("%s: kafka producer set sasl.password failed, err %s", LOG_TAG_STAT, kafka_errstr); + return; + } stat->kafka_handle = rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr)); if (stat->kafka_handle == NULL) { LOG_ERROR("%s: kafka producer create failed, err %s", LOG_TAG_STAT, kafka_errstr); return; } + if (rd_kafka_brokers_add(stat->kafka_handle, conf->kafka_brokers) <= 0) + { + LOG_ERROR("%s: kafka producer add brokers failed", LOG_TAG_STAT); + return; + } + stat->topic_rkt = rd_kafka_topic_new(stat->kafka_handle, conf->kafka_topic, NULL); + if (stat->topic_rkt == NULL) { + LOG_ERROR("%s: kafka producer create topic failed", LOG_TAG_STAT); + return; + } return; } @@ -131,7 +170,7 @@ struct shaping_stat* shaper_stat_init(int thread_num) global_tags[4].type = FIELD_VALUE_CSTRING; global_tags[4].value_str = "shaping_metric"; - stat->instance = fieldstat_easy_new(thread_num, "shaping_stat", global_tags, 5); + stat->instance = fieldstat_easy_new(thread_num, "traffic_shaping_rule_hits", global_tags, 5); if (stat->instance == NULL) { LOG_ERROR("%s: shaping init fieldstat instance failed", LOG_TAG_STAT); goto ERROR; @@ -159,8 +198,6 @@ struct shaping_stat* shaper_stat_init(int thread_num) } } - fieldstat_easy_enable_auto_output(stat->instance, "./metric/shaping_stat.json", 1);//TODO: output interval - return stat; ERROR: if (stat) { @@ -486,22 +523,21 @@ void shaper_stat_max_latency_update(struct shaping_stat_for_profile *profile_sta void shaper_stat_output(struct shaping_stat *stat) { - size_t len = 0; - fieldstat_easy_output(stat->instance, &output_json_buf, &len); - - int status=rd_kafka_produce(stat->topic_rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, output_json_buf, len, NULL, 0, NULL); - if (status < 0) { - LOG_ERROR("%s:shaper_stat_output, rd_kafka_produce is error of code: %d %s(%s), status: %d", - LOG_TAG_STAT, - rd_kafka_last_error(), - rd_kafka_err2name(rd_kafka_last_error()), - rd_kafka_err2str(rd_kafka_last_error()), - status); - } - - if (output_json_buf) { - free(output_json_buf); - output_json_buf = NULL; + char **output_buff_array = NULL; + size_t array_size = 0; + fieldstat_easy_output_array_and_reset(stat->instance, &output_buff_array, &array_size); + + for (int i = 0; i < array_size; i++) { + int status=rd_kafka_produce(stat->topic_rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, output_buff_array[i], strlen(output_buff_array[i]), NULL, 0, NULL); + if (status < 0) { + LOG_ERROR("%s:shaper_stat_output, rd_kafka_produce is error of code: %d %s(%s), status: %d", + LOG_TAG_STAT, + rd_kafka_last_error(), + rd_kafka_err2name(rd_kafka_last_error()), + rd_kafka_err2str(rd_kafka_last_error()), + status); + } + free(output_buff_array[i]); } return; diff --git a/shaping/test/CMakeLists.txt b/shaping/test/CMakeLists.txt index 67f18b6..cf171d6 100644 --- a/shaping/test/CMakeLists.txt +++ b/shaping/test/CMakeLists.txt @@ -51,6 +51,6 @@ gtest_discover_tests(gtest_shaper_maat) gtest_discover_tests(gtest_shaper_send_log) gtest_discover_tests(gtest_shaper) gtest_discover_tests(gtest_shaper_aqm) -gtest_discover_tests(gtest_shaper_with_swarmkv) +#gtest_discover_tests(gtest_shaper_with_swarmkv) file(COPY ./test_conf/ DESTINATION ./conf/) \ No newline at end of file diff --git a/shaping/test/dummy_rdkafka.cpp b/shaping/test/dummy_rdkafka.cpp index 5d255d8..f49ff1b 100644 --- a/shaping/test/dummy_rdkafka.cpp +++ b/shaping/test/dummy_rdkafka.cpp @@ -38,4 +38,9 @@ const char *rd_kafka_err2name (rd_kafka_resp_err_t err) const char *rd_kafka_err2str (rd_kafka_resp_err_t err) { return NULL; +} + +int rd_kafka_brokers_add(rd_kafka_t *rk, const char *brokerlist) +{ + return 1; } \ No newline at end of file diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp index 50c5e90..024717f 100644 --- a/shaping/test/gtest_shaper.cpp +++ b/shaping/test/gtest_shaper.cpp @@ -21,7 +21,6 @@ static struct stub_packet* packet_new(unsigned long long income_time, unsigned i struct stub_packet *packet; packet = (struct stub_packet*)calloc(1, sizeof(struct stub_packet)); - packet->income_time = income_time; packet->length = length; packet->direction = dir; diff --git a/shaping/test/gtest_shaper_with_swarmkv.cpp b/shaping/test/gtest_shaper_with_swarmkv.cpp index 675d46e..403aed2 100644 --- a/shaping/test/gtest_shaper_with_swarmkv.cpp +++ b/shaping/test/gtest_shaper_with_swarmkv.cpp @@ -160,6 +160,7 @@ static void send_packets(struct shaping_thread_ctx *ctx, struct shaping_flow *sf packet->direction = dir; packet->length = pkt_len; + packet->flow = sf; memset(&meta, 0, sizeof(meta)); @@ -177,7 +178,7 @@ static void send_packets(struct shaping_thread_ctx *ctx, struct shaping_flow *sf return; } -TEST(single_session, generic_profile) +TEST(generic_profile, single_session) { struct stub_pkt_queue *actual_tx_queue; struct shaping_ctx *ctx = NULL; @@ -233,6 +234,13 @@ TEST(single_session, generic_profile) while (shaper_global_stat_queueing_pkts_get() != 0) { polling_entry(ctx->thread_ctx[0].sp, ctx->stat, &ctx->thread_ctx[0]); } + while(!TAILQ_EMPTY(actual_tx_queue)) + { + struct stub_packet_node *pkt_node = TAILQ_FIRST(actual_tx_queue); + TAILQ_REMOVE(actual_tx_queue, pkt_node, node); + free(pkt_node->raw_packet); + free(pkt_node); + } shaping_flow_free(&ctx->thread_ctx[0], sf); @@ -240,6 +248,105 @@ TEST(single_session, generic_profile) shaping_engine_destroy(ctx); } +TEST(fair_share_profile, two_members) +{ + struct stub_pkt_queue *actual_tx_queue; + struct shaping_ctx *ctx = NULL; + struct shaping_flow *sf1 = NULL; + struct shaping_flow *sf2 = NULL; + long long rule_id[] = {0, 1}; + long long rule_id1[] = {0}; + long long rule_id2[] = {1}; + int priority[] = {1, 1}; + int profile_num[] = {1, 1}; + int profile_id[][MAX_REF_PROFILE] = {{0}, {0}}; + struct cmd_exec_arg* reply_arg=NULL; + char result[2048]={0}; + + stub_init(); + ctx = shaping_engine_init(); + ASSERT_TRUE(ctx != NULL); + sf1 = shaping_flow_new(&ctx->thread_ctx[0]); + ASSERT_TRUE(sf1 != NULL); + sf2 = shaping_flow_new(&ctx->thread_ctx[0]); + ASSERT_TRUE(sf2 != NULL); + + stub_set_profile_limit_direction(0, PROFILE_LIMIT_DIRECTION_INCOMING_OUTGOING); + stub_set_profile_type(0, PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS); + stub_set_shaping_rule_fair_factor(0, 1); + stub_set_shaping_rule_fair_factor(1, 3); + stub_set_matched_shaping_rules(2, rule_id, priority, profile_num, profile_id); + shaper_rules_update(&ctx->thread_ctx[0], sf1, rule_id1, 1); + shaper_rules_update(&ctx->thread_ctx[0], sf2, rule_id2, 1); + + sf1->src_ip_str = (char *)calloc(1, 16); + sf1->src_ip_str_len = strlen(sf1->src_ip_str); + memcpy(sf1->src_ip_str, "1.1.1.1", sf1->src_ip_str_len); + + sf2->src_ip_str = (char *)calloc(1, 16); + sf2->src_ip_str_len = strlen(sf2->src_ip_str); + memcpy(sf2->src_ip_str, "2.2.2.2", sf2->src_ip_str_len); + + //set swarmkv key + swarmkv_cli_set_db("swarmkv-shaping-nodes"); + reply_arg=cmd_exec_arg_new(); + + cmd_exec_arg_expect_OK(reply_arg); + swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_string, "ftcfg tsg-shaping-0-incoming 1000000 1000000 256"); + swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_string, "ftcfg tsg-shaping-0-outgoing 1000000 1000000 256"); + cmd_exec_arg_clear(reply_arg); + + actual_tx_queue = stub_get_tx_queue(); + + time_t start_time = time(NULL); + time_t last_time = start_time; + unsigned long long total_bytes1 = 0; + unsigned long long total_bytes2 = 0; + while (1) { + time_t curr_time = time(NULL); + if (curr_time - last_time >= 1) { + EXPECT_NEAR(total_bytes1 * 3, total_bytes2, 100); + + last_time = curr_time; + total_bytes1 = 0; + total_bytes2 = 0; + } + if (curr_time - start_time >= 10) { + break; + } + send_packets(&ctx->thread_ctx[0], sf2, 100, SHAPING_DIR_IN, 0); + send_packets(&ctx->thread_ctx[0], sf1, 100, SHAPING_DIR_IN, 0); + while(!TAILQ_EMPTY(actual_tx_queue)) + { + struct stub_packet_node *pkt_node = TAILQ_FIRST(actual_tx_queue); + TAILQ_REMOVE(actual_tx_queue, pkt_node, node); + if (pkt_node->raw_packet->flow == sf1) { + total_bytes1 += pkt_node->raw_packet->length; + } else { + total_bytes2 += pkt_node->raw_packet->length; + } + free(pkt_node->raw_packet); + free(pkt_node); + } + } + + while (shaper_global_stat_queueing_pkts_get() != 0) { + polling_entry(ctx->thread_ctx[0].sp, ctx->stat, &ctx->thread_ctx[0]); + } + while(!TAILQ_EMPTY(actual_tx_queue)) + { + struct stub_packet_node *pkt_node = TAILQ_FIRST(actual_tx_queue); + TAILQ_REMOVE(actual_tx_queue, pkt_node, node); + free(pkt_node->raw_packet); + free(pkt_node); + } + + shaping_flow_free(&ctx->thread_ctx[0], sf1); + shaping_flow_free(&ctx->thread_ctx[0], sf2); + shaper_thread_resource_clear(); + shaping_engine_destroy(ctx); +} + int main(int argc, char **argv) { testing::InitGoogleTest(&argc, argv); @@ -247,6 +354,9 @@ int main(int argc, char **argv) const char *cluster_name="swarmkv-shaping-nodes"; + system("consul agent -dev -config-dir=/etc/consul.d > /dev/null 2>&1 &"); + sleep(3); + swarmkv_cli_create_cluster(cluster_name, "127.0.0.1:5210"); if (0 != MESA_handle_runtime_log_creation("./conf/zlog.conf")) @@ -262,5 +372,12 @@ int main(int argc, char **argv) return -1; } - return RUN_ALL_TESTS(); + int ret = RUN_ALL_TESTS(); + + MESA_destroy_runtime_log_handle(log_handle); + MESA_handle_runtime_log_destruction(); + + system("pkill -f consul"); + + return ret; } \ No newline at end of file diff --git a/shaping/test/stub.cpp b/shaping/test/stub.cpp index 481f277..24b3ac3 100644 --- a/shaping/test/stub.cpp +++ b/shaping/test/stub.cpp @@ -76,6 +76,12 @@ void stub_set_shaping_rule_dscp_value(int rule_id, int dscp_value) return; } +void stub_set_shaping_rule_fair_factor(int rule_id, int fair_factor) +{ + matched_rules.rules[rule_id].fair_factor = fair_factor; + return; +} + void stub_clear_matched_shaping_rules() { memset(&matched_rules, 0, sizeof(struct stub_matched_rules)); diff --git a/shaping/test/stub.h b/shaping/test/stub.h index 4589be1..7581a98 100644 --- a/shaping/test/stub.h +++ b/shaping/test/stub.h @@ -18,9 +18,7 @@ struct stub_packet { unsigned char direction; unsigned char pure_control; unsigned int length; - unsigned int sequence; - unsigned long long income_time; - unsigned char detained_flag; + struct shaping_flow *flow; }; struct stub_packet_node { @@ -38,6 +36,7 @@ void stub_set_async_token_get_times(int profile_id, int times); void stub_set_matched_shaping_rules(int rule_num, long long *rule_id, const int *priority, const int *profile_num, int profile_id[][MAX_REF_PROFILE]); void stub_set_shaping_rule_dscp_value(int rule_id, int dscp_value); +void stub_set_shaping_rule_fair_factor(int rule_id, int fair_factor); void stub_clear_matched_shaping_rules(); void stub_send_packet(struct stub_packet *packet); -- cgit v1.2.3