summaryrefslogtreecommitdiff
path: root/groot-tests
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-04-02 17:36:46 +0800
committerdoufenghu <[email protected]>2024-04-02 17:36:46 +0800
commit95bcb7db323b12d7e7f864dff615d73622d7e688 (patch)
tree919b582cbcefb9304004444fda1d350a3041a2f5 /groot-tests
parent80e93523eb12f986acac73a81f2614516c718a5e (diff)
[Feature][Tests] add Kafka Container for unit test.
Diffstat (limited to 'groot-tests')
-rw-r--r--groot-tests/pom.xml20
-rw-r--r--groot-tests/test-common/pom.xml11
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java2
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java2
-rw-r--r--groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintTest.java (renamed from groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java)5
-rw-r--r--groot-tests/test-e2e-base/src/test/resources/kafka_to_print.yaml40
-rw-r--r--groot-tests/test-e2e-kafka/pom.xml45
-rw-r--r--groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java204
8 files changed, 315 insertions, 14 deletions
diff --git a/groot-tests/pom.xml b/groot-tests/pom.xml
index 5882a56..041cd9d 100644
--- a/groot-tests/pom.xml
+++ b/groot-tests/pom.xml
@@ -15,6 +15,7 @@
<modules>
<module>test-common</module>
<module>test-e2e-base</module>
+ <module>test-e2e-kafka</module>
</modules>
<properties>
@@ -26,6 +27,25 @@
<dependencies>
<dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-bootstrap</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>format-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
</dependency>
diff --git a/groot-tests/test-common/pom.xml b/groot-tests/test-common/pom.xml
index c6cb7bf..c086f41 100644
--- a/groot-tests/test-common/pom.xml
+++ b/groot-tests/test-common/pom.xml
@@ -17,16 +17,7 @@
</properties>
<dependencies>
- <dependency>
- <groupId>com.typesafe</groupId>
- <artifactId>config</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.geedgenetworks</groupId>
- <artifactId>groot-bootstrap</artifactId>
- <version>${revision}</version>
- </dependency>
+
</dependencies>
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
index aed38b9..29154e5 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
@@ -55,6 +55,8 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer {
new LogMessageWaitStrategy()
.withRegEx(".*Starting the resource manager.*")
.withStartupTimeout(Duration.ofMinutes(2)));
+
+ // Copy groot-stream bootstrap and some other files to the container
copyGrootStreamStarterToContainer(jobManager);
copyGrootStreamStarterLoggingToContainer(jobManager);
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java
index 3f8435d..58eee29 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java
@@ -71,7 +71,7 @@ public final class ContainerUtil {
Paths.get(GrootStreamHomeInContainer, "bootstrap", startJarName).toString());
- // copy lib
+ // copy libs
String hbaseClientJar = "hbase-client-shaded-" + getProjectVersion() + ".jar";
Path hbaseClientJarPath =
Paths.get(PROJECT_ROOT_PATH, "groot-shaded/hbase-client-shaded", "target", hbaseClientJar);
diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintTest.java
index b84d169..1e94cdc 100644
--- a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java
+++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintTest.java
@@ -5,7 +5,6 @@ import com.alibaba.fastjson2.TypeReference;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.geedgenetworks.test.common.TestSuiteBase;
import com.geedgenetworks.test.common.container.AbstractTestFlinkContainer;
-import com.geedgenetworks.test.common.container.TestContainer;
import com.geedgenetworks.test.common.container.TestContainerId;
import com.geedgenetworks.test.common.junit.DisabledOnContainer;
import lombok.extern.slf4j.Slf4j;
@@ -25,14 +24,14 @@ import static org.awaitility.Awaitility.await;
value = {TestContainerId.FLINK_1_17},
type = {},
disabledReason = "only flink adjusts the parameter configuration rules")
-public class InlineToPrint extends TestSuiteBase {
+public class InlineToPrintTest extends TestSuiteBase {
@TestTemplate
public void testInlineToPrint(AbstractTestFlinkContainer container) throws IOException, InterruptedException {
CompletableFuture.supplyAsync(
() -> {
try {
- return container.executeJob("/inline_to_print.yaml");
+ return container.executeJob("/kafka_to_print.yaml");
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
diff --git a/groot-tests/test-e2e-base/src/test/resources/kafka_to_print.yaml b/groot-tests/test-e2e-base/src/test/resources/kafka_to_print.yaml
new file mode 100644
index 0000000..b1e4f35
--- /dev/null
+++ b/groot-tests/test-e2e-base/src/test/resources/kafka_to_print.yaml
@@ -0,0 +1,40 @@
+sources:
+ kafka_source:
+ type : kafka
+ schema:
+ fields: # [array of object] Schema field projection, support read data only from specified fields.
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: string
+ properties: # [object] Kafka source properties
+ topic: SESSION-RECORD
+ kafka.bootstrap.servers: 192.168.44.11:9092
+ kafka.session.timeout.ms: 60000
+ kafka.max.poll.records: 3000
+ kafka.max.partition.fetch.bytes: 31457280
+ kafka.group.id: GROOT-STREAM-EXAMPLE-KAFKA-TO-PRINT
+ kafka.auto.offset.reset: latest
+ format: json
+
+sinks: # [object] Define connector sink
+ print_sink:
+ type: print
+ properties:
+ mode: log_info
+ format: json
+
+application: # [object] Define job configuration
+ env:
+ name: example-kafka-to-print
+ parallelism: 1
+ pipeline:
+ object-reuse: true
+ execution:
+ restart:
+ strategy: no
+ topology:
+ - name: kafka_source
+ downstream: [print_sink]
+ - name: print_sink
+ downstream: [] \ No newline at end of file
diff --git a/groot-tests/test-e2e-kafka/pom.xml b/groot-tests/test-e2e-kafka/pom.xml
new file mode 100644
index 0000000..fbbe0e7
--- /dev/null
+++ b/groot-tests/test-e2e-kafka/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-tests</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>test-e2e-kafka</artifactId>
+ <name>Groot : Tests : E2E : Kafka</name>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>test-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>kafka</artifactId>
+ <version>${testcontainer.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>connector-kafka</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+
+
+
+ </dependencies>
+
+
+</project> \ No newline at end of file
diff --git a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java
new file mode 100644
index 0000000..9cd09fc
--- /dev/null
+++ b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java
@@ -0,0 +1,204 @@
+package com.geedgenetworks.test.e2e.kafka;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.Types;
+import com.geedgenetworks.formats.json.JsonSerializer;
+import com.geedgenetworks.test.common.TestResource;
+import com.geedgenetworks.test.common.TestSuiteBase;
+import com.geedgenetworks.test.common.container.TestContainer;
+import com.geedgenetworks.test.common.container.TestContainerId;
+import com.geedgenetworks.test.common.junit.DisabledOnContainer;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {TestContainerId.FLINK_1_17},
+ disabledReason = "Override TestSuiteBase @DisabledOnContainer")
+public class KafkaConnectorTest extends TestSuiteBase implements TestResource {
+
+ private KafkaContainer kafkaContainer;
+
+ private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.4.0";
+ private static final String KAFKA_HOST = "kafkaCluster";
+ private KafkaProducer<byte[], byte[]> producer;
+
+
+
+ @Override
+ @BeforeAll
+ public void startUp() throws Exception {
+ kafkaContainer = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
+ .withNetwork(NETWORK)
+ .withNetworkAliases(KAFKA_HOST)
+ .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
+ Startables.deepStart(Stream.of(kafkaContainer)).join();
+ log.info("Kafka container started successfully");
+
+ Awaitility.given()
+ .ignoreExceptions()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(180, TimeUnit.SECONDS)
+ .untilAsserted(this::initKafkaProducer);
+
+ log.info("Write 100 records to topic test_topic_source");
+
+ StructType dataType = Types.parseStructType("client_ip: string, server_ip: String");
+ JsonSerializer serializer = new JsonSerializer(dataType);
+ generateTestData(serializer::serialize, 0, 100);
+
+
+ }
+
+ private void generateTestData(ProducerRecordConverter converter, int start, int end) {
+
+ for (int i = start; i < end; i++) {
+ Map<String, Object> row = Map
+ .of("client_ip", "192.168.40.12", "server_ip", "8.8.8.8");
+ ProducerRecord<byte[], byte[]> record =
+ new ProducerRecord<>("TEST-TOPIC-SOURCE", converter.convert(row));
+ producer.send(record);
+ }
+
+
+
+ }
+
+
+ @TestTemplate
+ public void testSourceKafkaToConsole(TestContainer container) throws IOException, InterruptedException {
+
+ List<String> data = getKafkaConsumerListData("TEST-TOPIC-SOURCE");
+
+ for (String record : data) {
+ log.info("Record: {}", record);}
+
+ Assertions.assertEquals(100, data.size());
+ }
+
+ interface ProducerRecordConverter {
+ byte[] convert(Map<String, Object> row);
+ }
+
+
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+ if (producer != null) {
+ producer.close();
+ }
+ if (kafkaContainer != null) {
+ kafkaContainer.close();
+ }
+
+ }
+
+ private void initKafkaProducer() {
+ Properties properties = new Properties();
+ String bootstrapServers = kafkaContainer.getBootstrapServers();
+ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+ producer = new KafkaProducer<>(properties);
+ }
+
+ private Properties kafkaConsumerConfig() {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consume-group");
+ props.put(
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ OffsetResetStrategy.EARLIEST.toString().toLowerCase());
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ return props;
+ }
+
+ private Properties kafkaByteConsumerConfig() {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consume-group");
+ props.put(
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ OffsetResetStrategy.EARLIEST.toString().toLowerCase());
+ props.setProperty(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName());
+ props.setProperty(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName());
+ return props;
+ }
+
+ private Map<String, String> getKafkaConsumerData(String topicName) {
+ Map<String, String> data = new HashMap<>();
+ try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig())) {
+ consumer.subscribe(Arrays.asList(topicName));
+ Map<TopicPartition, Long> offsets =
+ consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0)));
+ Long endOffset = offsets.entrySet().iterator().next().getValue();
+ Long lastProcessedOffset = -1L;
+
+ do {
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
+ for (ConsumerRecord<String, String> record : records) {
+ if (lastProcessedOffset < record.offset()) {
+ data.put(record.key(), record.value());
+ }
+ lastProcessedOffset = record.offset();
+ }
+ } while (lastProcessedOffset < endOffset - 1);
+ }
+ return data;
+ }
+
+ private List<String> getKafkaConsumerListData(String topicName) {
+ List<String> data = new ArrayList<>();
+ try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig())) {
+ consumer.subscribe(Arrays.asList(topicName));
+ Map<TopicPartition, Long> offsets =
+ consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0)));
+ Long endOffset = offsets.entrySet().iterator().next().getValue();
+ Long lastProcessedOffset = -1L;
+
+ do {
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
+ for (ConsumerRecord<String, String> record : records) {
+ if (lastProcessedOffset < record.offset()) {
+ data.add(record.value());
+ }
+ lastProcessedOffset = record.offset();
+ }
+ } while (lastProcessedOffset < endOffset - 1);
+ }
+ return data;
+ }
+
+
+}