diff options
| author | root <[email protected]> | 2024-07-31 09:29:47 +0000 |
|---|---|---|
| committer | root <[email protected]> | 2024-07-31 09:29:47 +0000 |
| commit | 4030c6fb6887356e38b20cb3d74358463dd29e37 (patch) | |
| tree | 1faf56cf86595d081341f734326ad5256283591b /shaping | |
| parent | 9f31dd1065332f36382383564abe2734beba6ce8 (diff) | |
add fair-share profile test case
Diffstat (limited to 'shaping')
| -rw-r--r-- | shaping/src/shaper_stat.cpp | 96 | ||||
| -rw-r--r-- | shaping/test/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | shaping/test/dummy_rdkafka.cpp | 5 | ||||
| -rw-r--r-- | shaping/test/gtest_shaper.cpp | 1 | ||||
| -rw-r--r-- | shaping/test/gtest_shaper_with_swarmkv.cpp | 121 | ||||
| -rw-r--r-- | shaping/test/stub.cpp | 6 | ||||
| -rw-r--r-- | shaping/test/stub.h | 5 |
7 files changed, 199 insertions, 37 deletions
diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp index efc7322..1498230 100644 --- a/shaping/src/shaper_stat.cpp +++ b/shaping/src/shaper_stat.cpp @@ -33,8 +33,6 @@ thread_local struct field tags[TAG_IDX_MAX] = [TAG_PROFILE_TYPE_IDX] = {.key = "profile_type", .type = FIELD_VALUE_CSTRING} }; -char *output_json_buf = NULL; - void shaper_stat_destroy(struct shaping_stat *stat) { if (stat) { @@ -63,22 +61,63 @@ static void shaper_stat_kafka_init(struct shaping_stat *stat, struct shaper_stat rd_kafka_conf_t *rdkafka_conf = rd_kafka_conf_new(); - rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000", kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "socket.keepalive.enable", "true", kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "bootstrap.servers", conf->kafka_brokers, kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "client.id", conf->kafka_topic, kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "sasl.username", conf->kafka_username, kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "sasl.password", conf->kafka_brokers, kafka_errstr, sizeof(kafka_errstr)); + if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr))) + { + LOG_ERROR("%s: kafka producer set queue.buffering.max.messages failed, err %s", LOG_TAG_STAT, kafka_errstr); + return; + } + if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000", kafka_errstr, sizeof(kafka_errstr))) + { + LOG_ERROR("%s: kafka producer set topic.metadata.refresh.interval.ms failed, err %s", LOG_TAG_STAT, kafka_errstr); + return; + } + if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "socket.keepalive.enable", "true", kafka_errstr, sizeof(kafka_errstr))) + { + LOG_ERROR("%s: kafka producer set socket.keepalive.enable failed, err %s", LOG_TAG_STAT, kafka_errstr); + return; + } + if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr))) + { + LOG_ERROR("%s: kafka producer set security.protocol failed, err %s", LOG_TAG_STAT, kafka_errstr); + return; + } + if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "client.id", conf->kafka_topic, kafka_errstr, sizeof(kafka_errstr))) + { + LOG_ERROR("%s: kafka producer set client.id failed, err %s", LOG_TAG_STAT, kafka_errstr); + return; + } + if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr))) + { + LOG_ERROR("%s: kafka producer set sasl.mechanisms failed, err %s", LOG_TAG_STAT, kafka_errstr); + return; + } + if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "sasl.username", conf->kafka_username, kafka_errstr, sizeof(kafka_errstr))) + { + LOG_ERROR("%s: kafka producer set sasl.username failed, err %s", LOG_TAG_STAT, kafka_errstr); + return; + } + if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(rdkafka_conf, "sasl.password", conf->kafka_password, kafka_errstr, sizeof(kafka_errstr))) + { + LOG_ERROR("%s: kafka producer set sasl.password failed, err %s", LOG_TAG_STAT, kafka_errstr); + return; + } stat->kafka_handle = rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr)); if (stat->kafka_handle == NULL) { LOG_ERROR("%s: kafka producer create failed, err %s", LOG_TAG_STAT, kafka_errstr); return; } + if (rd_kafka_brokers_add(stat->kafka_handle, conf->kafka_brokers) <= 0) + { + LOG_ERROR("%s: kafka producer add brokers failed", LOG_TAG_STAT); + return; + } + stat->topic_rkt = rd_kafka_topic_new(stat->kafka_handle, conf->kafka_topic, NULL); + if (stat->topic_rkt == NULL) { + LOG_ERROR("%s: kafka producer create topic failed", LOG_TAG_STAT); + return; + } return; } @@ -131,7 +170,7 @@ struct shaping_stat* shaper_stat_init(int thread_num) global_tags[4].type = FIELD_VALUE_CSTRING; global_tags[4].value_str = "shaping_metric"; - stat->instance = fieldstat_easy_new(thread_num, "shaping_stat", global_tags, 5); + stat->instance = fieldstat_easy_new(thread_num, "traffic_shaping_rule_hits", global_tags, 5); if (stat->instance == NULL) { LOG_ERROR("%s: shaping init fieldstat instance failed", LOG_TAG_STAT); goto ERROR; @@ -159,8 +198,6 @@ struct shaping_stat* shaper_stat_init(int thread_num) } } - fieldstat_easy_enable_auto_output(stat->instance, "./metric/shaping_stat.json", 1);//TODO: output interval - return stat; ERROR: if (stat) { @@ -486,22 +523,21 @@ void shaper_stat_max_latency_update(struct shaping_stat_for_profile *profile_sta void shaper_stat_output(struct shaping_stat *stat) { - size_t len = 0; - fieldstat_easy_output(stat->instance, &output_json_buf, &len); - - int status=rd_kafka_produce(stat->topic_rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, output_json_buf, len, NULL, 0, NULL); - if (status < 0) { - LOG_ERROR("%s:shaper_stat_output, rd_kafka_produce is error of code: %d %s(%s), status: %d", - LOG_TAG_STAT, - rd_kafka_last_error(), - rd_kafka_err2name(rd_kafka_last_error()), - rd_kafka_err2str(rd_kafka_last_error()), - status); - } - - if (output_json_buf) { - free(output_json_buf); - output_json_buf = NULL; + char **output_buff_array = NULL; + size_t array_size = 0; + fieldstat_easy_output_array_and_reset(stat->instance, &output_buff_array, &array_size); + + for (int i = 0; i < array_size; i++) { + int status=rd_kafka_produce(stat->topic_rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, output_buff_array[i], strlen(output_buff_array[i]), NULL, 0, NULL); + if (status < 0) { + LOG_ERROR("%s:shaper_stat_output, rd_kafka_produce is error of code: %d %s(%s), status: %d", + LOG_TAG_STAT, + rd_kafka_last_error(), + rd_kafka_err2name(rd_kafka_last_error()), + rd_kafka_err2str(rd_kafka_last_error()), + status); + } + free(output_buff_array[i]); } return; diff --git a/shaping/test/CMakeLists.txt b/shaping/test/CMakeLists.txt index 67f18b6..cf171d6 100644 --- a/shaping/test/CMakeLists.txt +++ b/shaping/test/CMakeLists.txt @@ -51,6 +51,6 @@ gtest_discover_tests(gtest_shaper_maat) gtest_discover_tests(gtest_shaper_send_log) gtest_discover_tests(gtest_shaper) gtest_discover_tests(gtest_shaper_aqm) -gtest_discover_tests(gtest_shaper_with_swarmkv) +#gtest_discover_tests(gtest_shaper_with_swarmkv) file(COPY ./test_conf/ DESTINATION ./conf/)
\ No newline at end of file diff --git a/shaping/test/dummy_rdkafka.cpp b/shaping/test/dummy_rdkafka.cpp index 5d255d8..f49ff1b 100644 --- a/shaping/test/dummy_rdkafka.cpp +++ b/shaping/test/dummy_rdkafka.cpp @@ -38,4 +38,9 @@ const char *rd_kafka_err2name (rd_kafka_resp_err_t err) const char *rd_kafka_err2str (rd_kafka_resp_err_t err) { return NULL; +} + +int rd_kafka_brokers_add(rd_kafka_t *rk, const char *brokerlist) +{ + return 1; }
\ No newline at end of file diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp index 50c5e90..024717f 100644 --- a/shaping/test/gtest_shaper.cpp +++ b/shaping/test/gtest_shaper.cpp @@ -21,7 +21,6 @@ static struct stub_packet* packet_new(unsigned long long income_time, unsigned i struct stub_packet *packet; packet = (struct stub_packet*)calloc(1, sizeof(struct stub_packet)); - packet->income_time = income_time; packet->length = length; packet->direction = dir; diff --git a/shaping/test/gtest_shaper_with_swarmkv.cpp b/shaping/test/gtest_shaper_with_swarmkv.cpp index 675d46e..403aed2 100644 --- a/shaping/test/gtest_shaper_with_swarmkv.cpp +++ b/shaping/test/gtest_shaper_with_swarmkv.cpp @@ -160,6 +160,7 @@ static void send_packets(struct shaping_thread_ctx *ctx, struct shaping_flow *sf packet->direction = dir; packet->length = pkt_len; + packet->flow = sf; memset(&meta, 0, sizeof(meta)); @@ -177,7 +178,7 @@ static void send_packets(struct shaping_thread_ctx *ctx, struct shaping_flow *sf return; } -TEST(single_session, generic_profile) +TEST(generic_profile, single_session) { struct stub_pkt_queue *actual_tx_queue; struct shaping_ctx *ctx = NULL; @@ -233,6 +234,13 @@ TEST(single_session, generic_profile) while (shaper_global_stat_queueing_pkts_get() != 0) { polling_entry(ctx->thread_ctx[0].sp, ctx->stat, &ctx->thread_ctx[0]); } + while(!TAILQ_EMPTY(actual_tx_queue)) + { + struct stub_packet_node *pkt_node = TAILQ_FIRST(actual_tx_queue); + TAILQ_REMOVE(actual_tx_queue, pkt_node, node); + free(pkt_node->raw_packet); + free(pkt_node); + } shaping_flow_free(&ctx->thread_ctx[0], sf); @@ -240,6 +248,105 @@ TEST(single_session, generic_profile) shaping_engine_destroy(ctx); } +TEST(fair_share_profile, two_members) +{ + struct stub_pkt_queue *actual_tx_queue; + struct shaping_ctx *ctx = NULL; + struct shaping_flow *sf1 = NULL; + struct shaping_flow *sf2 = NULL; + long long rule_id[] = {0, 1}; + long long rule_id1[] = {0}; + long long rule_id2[] = {1}; + int priority[] = {1, 1}; + int profile_num[] = {1, 1}; + int profile_id[][MAX_REF_PROFILE] = {{0}, {0}}; + struct cmd_exec_arg* reply_arg=NULL; + char result[2048]={0}; + + stub_init(); + ctx = shaping_engine_init(); + ASSERT_TRUE(ctx != NULL); + sf1 = shaping_flow_new(&ctx->thread_ctx[0]); + ASSERT_TRUE(sf1 != NULL); + sf2 = shaping_flow_new(&ctx->thread_ctx[0]); + ASSERT_TRUE(sf2 != NULL); + + stub_set_profile_limit_direction(0, PROFILE_LIMIT_DIRECTION_INCOMING_OUTGOING); + stub_set_profile_type(0, PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS); + stub_set_shaping_rule_fair_factor(0, 1); + stub_set_shaping_rule_fair_factor(1, 3); + stub_set_matched_shaping_rules(2, rule_id, priority, profile_num, profile_id); + shaper_rules_update(&ctx->thread_ctx[0], sf1, rule_id1, 1); + shaper_rules_update(&ctx->thread_ctx[0], sf2, rule_id2, 1); + + sf1->src_ip_str = (char *)calloc(1, 16); + sf1->src_ip_str_len = strlen(sf1->src_ip_str); + memcpy(sf1->src_ip_str, "1.1.1.1", sf1->src_ip_str_len); + + sf2->src_ip_str = (char *)calloc(1, 16); + sf2->src_ip_str_len = strlen(sf2->src_ip_str); + memcpy(sf2->src_ip_str, "2.2.2.2", sf2->src_ip_str_len); + + //set swarmkv key + swarmkv_cli_set_db("swarmkv-shaping-nodes"); + reply_arg=cmd_exec_arg_new(); + + cmd_exec_arg_expect_OK(reply_arg); + swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_string, "ftcfg tsg-shaping-0-incoming 1000000 1000000 256"); + swarmkv_cli_system_cmd(reply_arg, result, sizeof(result), swarmkv_expect_reply_string, "ftcfg tsg-shaping-0-outgoing 1000000 1000000 256"); + cmd_exec_arg_clear(reply_arg); + + actual_tx_queue = stub_get_tx_queue(); + + time_t start_time = time(NULL); + time_t last_time = start_time; + unsigned long long total_bytes1 = 0; + unsigned long long total_bytes2 = 0; + while (1) { + time_t curr_time = time(NULL); + if (curr_time - last_time >= 1) { + EXPECT_NEAR(total_bytes1 * 3, total_bytes2, 100); + + last_time = curr_time; + total_bytes1 = 0; + total_bytes2 = 0; + } + if (curr_time - start_time >= 10) { + break; + } + send_packets(&ctx->thread_ctx[0], sf2, 100, SHAPING_DIR_IN, 0); + send_packets(&ctx->thread_ctx[0], sf1, 100, SHAPING_DIR_IN, 0); + while(!TAILQ_EMPTY(actual_tx_queue)) + { + struct stub_packet_node *pkt_node = TAILQ_FIRST(actual_tx_queue); + TAILQ_REMOVE(actual_tx_queue, pkt_node, node); + if (pkt_node->raw_packet->flow == sf1) { + total_bytes1 += pkt_node->raw_packet->length; + } else { + total_bytes2 += pkt_node->raw_packet->length; + } + free(pkt_node->raw_packet); + free(pkt_node); + } + } + + while (shaper_global_stat_queueing_pkts_get() != 0) { + polling_entry(ctx->thread_ctx[0].sp, ctx->stat, &ctx->thread_ctx[0]); + } + while(!TAILQ_EMPTY(actual_tx_queue)) + { + struct stub_packet_node *pkt_node = TAILQ_FIRST(actual_tx_queue); + TAILQ_REMOVE(actual_tx_queue, pkt_node, node); + free(pkt_node->raw_packet); + free(pkt_node); + } + + shaping_flow_free(&ctx->thread_ctx[0], sf1); + shaping_flow_free(&ctx->thread_ctx[0], sf2); + shaper_thread_resource_clear(); + shaping_engine_destroy(ctx); +} + int main(int argc, char **argv) { testing::InitGoogleTest(&argc, argv); @@ -247,6 +354,9 @@ int main(int argc, char **argv) const char *cluster_name="swarmkv-shaping-nodes"; + system("consul agent -dev -config-dir=/etc/consul.d > /dev/null 2>&1 &"); + sleep(3); + swarmkv_cli_create_cluster(cluster_name, "127.0.0.1:5210"); if (0 != MESA_handle_runtime_log_creation("./conf/zlog.conf")) @@ -262,5 +372,12 @@ int main(int argc, char **argv) return -1; } - return RUN_ALL_TESTS(); + int ret = RUN_ALL_TESTS(); + + MESA_destroy_runtime_log_handle(log_handle); + MESA_handle_runtime_log_destruction(); + + system("pkill -f consul"); + + return ret; }
\ No newline at end of file diff --git a/shaping/test/stub.cpp b/shaping/test/stub.cpp index 481f277..24b3ac3 100644 --- a/shaping/test/stub.cpp +++ b/shaping/test/stub.cpp @@ -76,6 +76,12 @@ void stub_set_shaping_rule_dscp_value(int rule_id, int dscp_value) return; } +void stub_set_shaping_rule_fair_factor(int rule_id, int fair_factor) +{ + matched_rules.rules[rule_id].fair_factor = fair_factor; + return; +} + void stub_clear_matched_shaping_rules() { memset(&matched_rules, 0, sizeof(struct stub_matched_rules)); diff --git a/shaping/test/stub.h b/shaping/test/stub.h index 4589be1..7581a98 100644 --- a/shaping/test/stub.h +++ b/shaping/test/stub.h @@ -18,9 +18,7 @@ struct stub_packet { unsigned char direction; unsigned char pure_control; unsigned int length; - unsigned int sequence; - unsigned long long income_time; - unsigned char detained_flag; + struct shaping_flow *flow; }; struct stub_packet_node { @@ -38,6 +36,7 @@ void stub_set_async_token_get_times(int profile_id, int times); void stub_set_matched_shaping_rules(int rule_num, long long *rule_id, const int *priority, const int *profile_num, int profile_id[][MAX_REF_PROFILE]); void stub_set_shaping_rule_dscp_value(int rule_id, int dscp_value); +void stub_set_shaping_rule_fair_factor(int rule_id, int fair_factor); void stub_clear_matched_shaping_rules(); void stub_send_packet(struct stub_packet *packet); |
