From 58460028cf73336292bbf9a6b05fc7612e3f891b Mon Sep 17 00:00:00 2001 From: doufenghu Date: Sat, 6 Apr 2024 01:11:57 +0800 Subject: [Feature][Tests] Improve kafka connector test by flink kafka-to-console and inline-to-kafka config. --- groot-tests/pom.xml | 2 + groot-tests/test-e2e-kafka/pom.xml | 7 ++ .../test/e2e/kafka/KafkaConnectorTest.java | 74 ++++++++++++++-------- .../test/resources/kafka_sink_inline_to_kafka.yaml | 64 +++++++++++++++++++ .../test/resources/kafka_source_to_console.yaml | 40 ++++++++++++ 5 files changed, 162 insertions(+), 25 deletions(-) create mode 100644 groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_inline_to_kafka.yaml create mode 100644 groot-tests/test-e2e-kafka/src/test/resources/kafka_source_to_console.yaml diff --git a/groot-tests/pom.xml b/groot-tests/pom.xml index 041cd9d..76f533a 100644 --- a/groot-tests/pom.xml +++ b/groot-tests/pom.xml @@ -23,6 +23,7 @@ true 2.4 4.3.1 + 1.1.8.3 @@ -73,6 +74,7 @@ ${rest-assured.version} test + diff --git a/groot-tests/test-e2e-kafka/pom.xml b/groot-tests/test-e2e-kafka/pom.xml index fbbe0e7..ab1ba72 100644 --- a/groot-tests/test-e2e-kafka/pom.xml +++ b/groot-tests/test-e2e-kafka/pom.xml @@ -36,6 +36,13 @@ test + + org.xerial.snappy + snappy-java + ${snappy-java.version} + test + + diff --git a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java index 9cd09fc..e3af025 100644 --- a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java +++ b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java @@ -1,5 +1,8 @@ package com.geedgenetworks.test.e2e.kafka; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.TypeReference; +import com.alibaba.nacos.client.naming.utils.CollectionUtils; import com.geedgenetworks.common.Event; import com.geedgenetworks.core.types.StructType; import com.geedgenetworks.core.types.Types; @@ -10,11 +13,13 @@ import com.geedgenetworks.test.common.container.TestContainer; import com.geedgenetworks.test.common.container.TestContainerId; import com.geedgenetworks.test.common.junit.DisabledOnContainer; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -23,18 +28,26 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.com.google.common.base.CharMatcher; +import org.testcontainers.shaded.com.google.common.base.Strings; +import org.testcontainers.shaded.com.google.common.primitives.Chars; import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.DockerLoggerFactory; import java.io.IOException; import java.time.Duration; import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; +import static org.awaitility.Awaitility.await; + @Slf4j @DisabledOnContainer( value = {TestContainerId.FLINK_1_17}, @@ -47,11 +60,11 @@ public class KafkaConnectorTest extends TestSuiteBase implements TestResource { private static final String KAFKA_HOST = "kafkaCluster"; private KafkaProducer producer; - + private static final String DEFAULT_TEST_TOPIC_SOURCE = "test_topic_source"; @Override @BeforeAll - public void startUp() throws Exception { + public void startUp() { kafkaContainer = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME)) .withNetwork(NETWORK) .withNetworkAliases(KAFKA_HOST) @@ -67,46 +80,57 @@ public class KafkaConnectorTest extends TestSuiteBase implements TestResource { .untilAsserted(this::initKafkaProducer); log.info("Write 100 records to topic test_topic_source"); + generateTestData(DEFAULT_TEST_TOPIC_SOURCE,0, 100); + } - StructType dataType = Types.parseStructType("client_ip: string, server_ip: String"); - JsonSerializer serializer = new JsonSerializer(dataType); - generateTestData(serializer::serialize, 0, 100); + @TestTemplate + public void testSourceKafkaJsonToConsole(TestContainer container) { + generateTestData("test_topic_json", 0, 10); + CompletableFuture.supplyAsync( + () -> { + try { + return container.executeJob("/kafka_source_to_console.yaml"); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + }); + + await().atMost(300000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + String logs = container.getServerLogs(); + Assertions.assertEquals(StringUtils.countMatches(logs, "PrintSinkFunction"), 10); + }); } - private void generateTestData(ProducerRecordConverter converter, int start, int end) { - + private void generateTestData(String topic, int start, int end) { + StructType dataType = Types.parseStructType("id: int, client_ip: string, server_ip: string, flag: string"); + JsonSerializer serializer = new JsonSerializer(dataType); for (int i = start; i < end; i++) { Map row = Map - .of("client_ip", "192.168.40.12", "server_ip", "8.8.8.8"); + .of("id", i, + "client_ip", "192.168.40.12", + "server_ip", "8.8.8.8" , + "flag", Boolean.FALSE.booleanValue()); ProducerRecord record = - new ProducerRecord<>("TEST-TOPIC-SOURCE", converter.convert(row)); + new ProducerRecord<>(topic, serializer.serialize(row)); producer.send(record); } - - } @TestTemplate - public void testSourceKafkaToConsole(TestContainer container) throws IOException, InterruptedException { - - List data = getKafkaConsumerListData("TEST-TOPIC-SOURCE"); - - for (String record : data) { - log.info("Record: {}", record);} - - Assertions.assertEquals(100, data.size()); + public void testSinkKafka(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/kafka_sink_inline_to_kafka.yaml"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + List data = getKafkaConsumerListData("test_topic"); + Assertions.assertEquals(10, data.size()); // Check if all 10 records are consumed } - interface ProducerRecordConverter { - byte[] convert(Map row); - } - - - @AfterAll @Override public void tearDown() throws Exception { diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_inline_to_kafka.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_inline_to_kafka.yaml new file mode 100644 index 0000000..dbfbc1e --- /dev/null +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_inline_to_kafka.yaml @@ -0,0 +1,64 @@ +sources: # [object] Define connector source + inline_source: + type: inline + schema: + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: log_id + type: bigint + - name: recv_time + type: bigint + - name: server_fqdn + type: string + - name: server_domain + type: string + - name: client_ip + type: string + - name: server_ip + type: string + - name: server_asn + type: string + - name: decoded_as + type: string + - name: device_group + type: string + - name: device_tag + type: string + properties: + # + # [string] Event Data, it will be parsed to Map by the specified format. + # + data: '{"recv_time": 1705565615, "log_id":206211012872372224, "tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}' + format: json + interval.per.row: 1s + repeat.count: 10 + json.ignore.parse.errors: false + + +sinks: + connector_kafka: + type: kafka + properties: + topic: test_topic + kafka.bootstrap.servers: kafkaCluster:9092 + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + format: json + log.failures.only: true + +application: # [object] Define job configuration + env: + name: example-inline-to-kafka + parallelism: 1 + shade.identifier: default + pipeline: + object-reuse: true + topology: + - name: inline_source + downstream: [ connector_kafka ] + - name: connector_kafka + downstream: [] \ No newline at end of file diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_to_console.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_to_console.yaml new file mode 100644 index 0000000..2afee47 --- /dev/null +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_to_console.yaml @@ -0,0 +1,40 @@ +sources: + kafka_source: + type : kafka + schema: + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: client_ip + type: string + - name: server_ip + type: string + properties: # [object] Kafka source properties + topic: test_topic_json + kafka.bootstrap.servers: kafkaCluster:9092 + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.group.id: test_topic_json_group + kafka.auto.offset.reset: earliest + format: json + +sinks: # [object] Define connector sink + print_sink: + type: print + properties: + mode: log_warn + format: json + +application: # [object] Define job configuration + env: + name: example-kafka-to-print + parallelism: 1 + pipeline: + object-reuse: true + execution: + restart: + strategy: no + topology: + - name: kafka_source + downstream: [print_sink] + - name: print_sink + downstream: [] \ No newline at end of file -- cgit v1.2.3