summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-04-02 17:36:46 +0800
committerdoufenghu <[email protected]>2024-04-02 17:36:46 +0800
commit95bcb7db323b12d7e7f864dff615d73622d7e688 (patch)
tree919b582cbcefb9304004444fda1d350a3041a2f5
parent80e93523eb12f986acac73a81f2614516c718a5e (diff)
[Feature][Tests] add Kafka Container for unit test.
-rw-r--r--README.md2
-rw-r--r--config/grootstream_job_example.yaml3
-rw-r--r--config/template/grootstream_job_template.yaml1
-rw-r--r--docs/env-config.md18
-rw-r--r--groot-bootstrap/src/main/bin/stop.sh5
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamServer.java2
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java12
-rw-r--r--groot-bootstrap/src/main/resources/log4j.properties22
-rw-r--r--groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory2
-rw-r--r--groot-examples/end-to-end-example/pom.xml5
-rw-r--r--groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java2
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml2
-rw-r--r--groot-examples/pom.xml9
-rw-r--r--groot-tests/pom.xml20
-rw-r--r--groot-tests/test-common/pom.xml11
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java2
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java2
-rw-r--r--groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintTest.java (renamed from groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java)5
-rw-r--r--groot-tests/test-e2e-base/src/test/resources/kafka_to_print.yaml40
-rw-r--r--groot-tests/test-e2e-kafka/pom.xml45
-rw-r--r--groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java204
21 files changed, 351 insertions, 63 deletions
diff --git a/README.md b/README.md
index 6f3109e..88ab203 100644
--- a/README.md
+++ b/README.md
@@ -106,7 +106,7 @@ cd "groot-stream-${version}"
- Run the following command to start the groot-stream server for Yarn Per-job Mode:
```shell
cd "groot-stream-${version}"
-./bin/start.sh -c ./config/grootstream_job_example.yaml --target yarn-per-job -n inline-to-print-job -d
+./bin/start.sh -c ./config/grootstream_job_example.yaml --target yarn-per-job -Dyarn.application.name="inline-to-print-job" -n inline-to-print-job -d
```
### Configuring
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml
index daf6e32..52b5001 100644
--- a/config/grootstream_job_example.yaml
+++ b/config/grootstream_job_example.yaml
@@ -33,6 +33,9 @@ application:
parallelism: 3
pipeline:
object-reuse: true
+ execution:
+ restart:
+ strategy: none
topology:
- name: inline_source
downstream: [filter_operator]
diff --git a/config/template/grootstream_job_template.yaml b/config/template/grootstream_job_template.yaml
index d700777..af73704 100644
--- a/config/template/grootstream_job_template.yaml
+++ b/config/template/grootstream_job_template.yaml
@@ -20,6 +20,7 @@ sources: # [object] Define connector source
kafka.max.partition.fetch.bytes: 31457280
kafka.group.id: SESSION-RECORD-GROUP-GROOT-STREAM-001 # [string] Kafka Group ID for Consumer
kafka.auto.offset.reset: latest # [string] Kafka Auto Offset Reset, default is latest
+ kafka.compression.type: snappy # [string] Kafka Compression Type, default is none
format: json # [string] Data Format for Source. eg. json, protobuf, etc.
json.ignore.parse.errors: false # [boolean] Flag to ignore parse errors, default will record the parse errors. If set true, it will ignore the parse errors.
diff --git a/docs/env-config.md b/docs/env-config.md
index 3e91f3e..e29acb0 100644
--- a/docs/env-config.md
+++ b/docs/env-config.md
@@ -44,12 +44,18 @@ Specify a list of classpath URLs via `pipeline.classpaths`, The classpaths are s
You can directly use the flink parameter by prefixing `flink.`, such as `flink.execution.buffer-timeout`, `flink.object-reuse`, etc. More details can be found in the official [flink documentation](https://flink.apache.org/).
Of course, you can use groot stream parameter, here are some parameter names corresponding to the names in Flink.
-| Groot Stream | Flink |
-|--------------------------|--------------------------------|
-| execution.buffer-timeout | flink.execution.buffer-timeout |
-| pipeline.object-reuse | flink.object-reuse |
-| pipeline.max-parallelism | flink.pipeline.max-parallelism |
-| ... | ... |
+| Groot Stream | Flink |
+|----------------------------------------|---------------------------------------------------------------|
+| execution.buffer-timeout | flink.execution.buffer-timeout |
+| pipeline.object-reuse | flink.object-reuse |
+| pipeline.max-parallelism | flink.pipeline.max-parallelism |
+| execution.restart.strategy | flink.restart-strategy |
+| execution.restart.attempts | flink.restart-strategy.fixed-delay.attempts |
+| execution.restart.delayBetweenAttempts | flink.restart-strategy.fixed-delay.delay |
+| execution.restart.failure-rate | flink.restart-strategy.failure-rate.max-failures-per-interval |
+| execution.restart.failureInterval | flink.restart-strategy.failure-rate.failure-rate-interval |
+| execution.restart.delayInterval | flink.restart-strategy.failure-rate.delay |
+| ... | ... |
diff --git a/groot-bootstrap/src/main/bin/stop.sh b/groot-bootstrap/src/main/bin/stop.sh
index 9fd0469..91e24b9 100644
--- a/groot-bootstrap/src/main/bin/stop.sh
+++ b/groot-bootstrap/src/main/bin/stop.sh
@@ -16,7 +16,7 @@ stop_jobs() {
done
;;
yarn-per-job)
- # Command to stop YARN applications for the specified job name
+ # Command to stop YARN applications for the specified Yarn cluster app name
yarn application -list -appStates RUNNING | grep "$job_name" | awk '{print $1}' | while read -r appId
do
yarn application -kill "$appId"
@@ -65,9 +65,6 @@ fi
deployment_mode=$1 # standalone, yarn-per-job, or yarn-session
job_name=$2 # The Flink job name to stop
-# Trim whitespace from job_name
-job_name=$(echo "$job_name" | tr -d '[:space:]')
-
# Checking for empty input arguments
if [ -z "$deployment_mode" ] || [ -z "$job_name" ]; then
display_usage
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamServer.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamServer.java
index 38d91c5..93cb972 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamServer.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamServer.java
@@ -43,7 +43,7 @@ public class GrootStreamServer {
private static void outputFatalError(Throwable throwable) {
log.error("\\n\\n===============================================================================\\n\\n");
String errorMsg = throwable.getMessage();
- log.error("Fatal Error ,Reason is :{} \n", errorMsg);
+ log.error("Fatal Error: {} \n", errorMsg);
log.error("Exception StackTrace :{}", ExceptionUtils.getStackTrace(throwable));
log.error("\\n\\n===============================================================================\\n\\n");
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java
index 79cd8bf..13db3d4 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java
@@ -52,7 +52,7 @@ public final class EnvironmentUtil {
if (envConfig.hasPath(ExecutionConfigKeyName.RESTART_STRATEGY)) {
String restartStrategy = envConfig.getString(ExecutionConfigKeyName.RESTART_STRATEGY);
switch (restartStrategy.toLowerCase()) {
- case "no":
+ case "none":
executionConfig.setRestartStrategy(RestartStrategies.noRestart());
break;
case "fixed-delay":
@@ -68,18 +68,18 @@ public final class EnvironmentUtil {
long delayInterval = envConfig.getLong(ExecutionConfigKeyName.RESTART_DELAY_INTERVAL);
executionConfig.setRestartStrategy(
RestartStrategies.failureRateRestart(
- rate,
- Time.of(failureInterval, TimeUnit.MILLISECONDS),
- Time.of(delayInterval, TimeUnit.MILLISECONDS)));
+ rate, // max failures per interval
+ Time.of(failureInterval, TimeUnit.MILLISECONDS), //time interval for measuring failure rate
+ Time.of(delayInterval, TimeUnit.MILLISECONDS))); // delay
break;
default:
log.warn(
- "set restart.strategy failed, unknown restart.strategy [{}],only support no,fixed-delay,failure-rate",
+ "Set restart strategy failed, unknown restart strategy [{}],only support no,fixed-delay,failure-rate",
restartStrategy);
}
}
} catch (Exception e) {
- log.warn("set restart.strategy in config '{}' exception", envConfig, e);
+ log.warn("Set restart strategy in config '{}' exception", envConfig, e);
}
}
diff --git a/groot-bootstrap/src/main/resources/log4j.properties b/groot-bootstrap/src/main/resources/log4j.properties
deleted file mode 100644
index db5d9e5..0000000
--- a/groot-bootstrap/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,22 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# Set everything to be logged to the console
-log4j.rootCategory=INFO, console
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
index 21e390d..6d6a1bb 100644
--- a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
+++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
@@ -1,2 +1,2 @@
com.geedgenetworks.core.connector.inline.InlineTableFactory
-com.geedgenetworks.core.connector.print.PrintTableFactory \ No newline at end of file
+com.geedgenetworks.core.connector.print.PrintTableFactory
diff --git a/groot-examples/end-to-end-example/pom.xml b/groot-examples/end-to-end-example/pom.xml
index 0839e8e..34709fd 100644
--- a/groot-examples/end-to-end-example/pom.xml
+++ b/groot-examples/end-to-end-example/pom.xml
@@ -13,8 +13,9 @@
<name>Groot : Examples : End-to-end</name>
<properties>
- <maven.install.skip>true</maven.install.skip>
- <maven.deploy.skip>true</maven.deploy.skip>
+ <maven.install.skip>false</maven.install.skip>
+ <maven.deploy.skip>false</maven.deploy.skip>
</properties>
+
</project> \ No newline at end of file
diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
index aa2cb76..1f236d7 100644
--- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
+++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
@@ -13,7 +13,7 @@ import java.nio.file.Paths;
public class GrootStreamExample {
public static void main(String[] args) throws FileNotFoundException, URISyntaxException {
- String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print.yaml";
+ String configPath = args.length > 0 ? args[0] : "/examples/kafka_to_print.yaml";
String configFile = getTestConfigFile(configPath);
ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs();
executeCommandArgs.setConfigFile(configFile);
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml
index e883bce..bf8ebf2 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml
@@ -30,6 +30,8 @@ application: # [object] Define job configuration
parallelism: 1
pipeline:
object-reuse: true
+ flink:
+ restart-strategy: none
topology:
- name: kafka_source
downstream: [print_sink]
diff --git a/groot-examples/pom.xml b/groot-examples/pom.xml
index fc08086..1fb6212 100644
--- a/groot-examples/pom.xml
+++ b/groot-examples/pom.xml
@@ -30,28 +30,27 @@
<dependency>
<groupId>com.geedgenetworks</groupId>
<artifactId>groot-bootstrap</artifactId>
- <version>${revision}</version>
+ <version>${project.version}</version>
</dependency>
-
<dependency>
<groupId>com.geedgenetworks</groupId>
<artifactId>connector-kafka</artifactId>
- <version>${revision}</version>
+ <version>${project.version}</version>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>com.geedgenetworks</groupId>
<artifactId>connector-clickhouse</artifactId>
- <version>${revision}</version>
+ <version>${project.version}</version>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>com.geedgenetworks</groupId>
<artifactId>connector-ipfix-collector</artifactId>
- <version>${revision}</version>
+ <version>${project.version}</version>
<scope>${scope}</scope>
</dependency>
diff --git a/groot-tests/pom.xml b/groot-tests/pom.xml
index 5882a56..041cd9d 100644
--- a/groot-tests/pom.xml
+++ b/groot-tests/pom.xml
@@ -15,6 +15,7 @@
<modules>
<module>test-common</module>
<module>test-e2e-base</module>
+ <module>test-e2e-kafka</module>
</modules>
<properties>
@@ -26,6 +27,25 @@
<dependencies>
<dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-bootstrap</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>format-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
</dependency>
diff --git a/groot-tests/test-common/pom.xml b/groot-tests/test-common/pom.xml
index c6cb7bf..c086f41 100644
--- a/groot-tests/test-common/pom.xml
+++ b/groot-tests/test-common/pom.xml
@@ -17,16 +17,7 @@
</properties>
<dependencies>
- <dependency>
- <groupId>com.typesafe</groupId>
- <artifactId>config</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.geedgenetworks</groupId>
- <artifactId>groot-bootstrap</artifactId>
- <version>${revision}</version>
- </dependency>
+
</dependencies>
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
index aed38b9..29154e5 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
@@ -55,6 +55,8 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer {
new LogMessageWaitStrategy()
.withRegEx(".*Starting the resource manager.*")
.withStartupTimeout(Duration.ofMinutes(2)));
+
+ // Copy groot-stream bootstrap and some other files to the container
copyGrootStreamStarterToContainer(jobManager);
copyGrootStreamStarterLoggingToContainer(jobManager);
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java
index 3f8435d..58eee29 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java
@@ -71,7 +71,7 @@ public final class ContainerUtil {
Paths.get(GrootStreamHomeInContainer, "bootstrap", startJarName).toString());
- // copy lib
+ // copy libs
String hbaseClientJar = "hbase-client-shaded-" + getProjectVersion() + ".jar";
Path hbaseClientJarPath =
Paths.get(PROJECT_ROOT_PATH, "groot-shaded/hbase-client-shaded", "target", hbaseClientJar);
diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintTest.java
index b84d169..1e94cdc 100644
--- a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java
+++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintTest.java
@@ -5,7 +5,6 @@ import com.alibaba.fastjson2.TypeReference;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.geedgenetworks.test.common.TestSuiteBase;
import com.geedgenetworks.test.common.container.AbstractTestFlinkContainer;
-import com.geedgenetworks.test.common.container.TestContainer;
import com.geedgenetworks.test.common.container.TestContainerId;
import com.geedgenetworks.test.common.junit.DisabledOnContainer;
import lombok.extern.slf4j.Slf4j;
@@ -25,14 +24,14 @@ import static org.awaitility.Awaitility.await;
value = {TestContainerId.FLINK_1_17},
type = {},
disabledReason = "only flink adjusts the parameter configuration rules")
-public class InlineToPrint extends TestSuiteBase {
+public class InlineToPrintTest extends TestSuiteBase {
@TestTemplate
public void testInlineToPrint(AbstractTestFlinkContainer container) throws IOException, InterruptedException {
CompletableFuture.supplyAsync(
() -> {
try {
- return container.executeJob("/inline_to_print.yaml");
+ return container.executeJob("/kafka_to_print.yaml");
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
diff --git a/groot-tests/test-e2e-base/src/test/resources/kafka_to_print.yaml b/groot-tests/test-e2e-base/src/test/resources/kafka_to_print.yaml
new file mode 100644
index 0000000..b1e4f35
--- /dev/null
+++ b/groot-tests/test-e2e-base/src/test/resources/kafka_to_print.yaml
@@ -0,0 +1,40 @@
+sources:
+ kafka_source:
+ type : kafka
+ schema:
+ fields: # [array of object] Schema field projection, support read data only from specified fields.
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: string
+ properties: # [object] Kafka source properties
+ topic: SESSION-RECORD
+ kafka.bootstrap.servers: 192.168.44.11:9092
+ kafka.session.timeout.ms: 60000
+ kafka.max.poll.records: 3000
+ kafka.max.partition.fetch.bytes: 31457280
+ kafka.group.id: GROOT-STREAM-EXAMPLE-KAFKA-TO-PRINT
+ kafka.auto.offset.reset: latest
+ format: json
+
+sinks: # [object] Define connector sink
+ print_sink:
+ type: print
+ properties:
+ mode: log_info
+ format: json
+
+application: # [object] Define job configuration
+ env:
+ name: example-kafka-to-print
+ parallelism: 1
+ pipeline:
+ object-reuse: true
+ execution:
+ restart:
+ strategy: no
+ topology:
+ - name: kafka_source
+ downstream: [print_sink]
+ - name: print_sink
+ downstream: [] \ No newline at end of file
diff --git a/groot-tests/test-e2e-kafka/pom.xml b/groot-tests/test-e2e-kafka/pom.xml
new file mode 100644
index 0000000..fbbe0e7
--- /dev/null
+++ b/groot-tests/test-e2e-kafka/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-tests</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>test-e2e-kafka</artifactId>
+ <name>Groot : Tests : E2E : Kafka</name>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>test-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>kafka</artifactId>
+ <version>${testcontainer.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>connector-kafka</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+
+
+
+ </dependencies>
+
+
+</project> \ No newline at end of file
diff --git a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java
new file mode 100644
index 0000000..9cd09fc
--- /dev/null
+++ b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java
@@ -0,0 +1,204 @@
+package com.geedgenetworks.test.e2e.kafka;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.Types;
+import com.geedgenetworks.formats.json.JsonSerializer;
+import com.geedgenetworks.test.common.TestResource;
+import com.geedgenetworks.test.common.TestSuiteBase;
+import com.geedgenetworks.test.common.container.TestContainer;
+import com.geedgenetworks.test.common.container.TestContainerId;
+import com.geedgenetworks.test.common.junit.DisabledOnContainer;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {TestContainerId.FLINK_1_17},
+ disabledReason = "Override TestSuiteBase @DisabledOnContainer")
+public class KafkaConnectorTest extends TestSuiteBase implements TestResource {
+
+ private KafkaContainer kafkaContainer;
+
+ private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.4.0";
+ private static final String KAFKA_HOST = "kafkaCluster";
+ private KafkaProducer<byte[], byte[]> producer;
+
+
+
+ @Override
+ @BeforeAll
+ public void startUp() throws Exception {
+ kafkaContainer = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
+ .withNetwork(NETWORK)
+ .withNetworkAliases(KAFKA_HOST)
+ .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
+ Startables.deepStart(Stream.of(kafkaContainer)).join();
+ log.info("Kafka container started successfully");
+
+ Awaitility.given()
+ .ignoreExceptions()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(180, TimeUnit.SECONDS)
+ .untilAsserted(this::initKafkaProducer);
+
+ log.info("Write 100 records to topic test_topic_source");
+
+ StructType dataType = Types.parseStructType("client_ip: string, server_ip: String");
+ JsonSerializer serializer = new JsonSerializer(dataType);
+ generateTestData(serializer::serialize, 0, 100);
+
+
+ }
+
+ private void generateTestData(ProducerRecordConverter converter, int start, int end) {
+
+ for (int i = start; i < end; i++) {
+ Map<String, Object> row = Map
+ .of("client_ip", "192.168.40.12", "server_ip", "8.8.8.8");
+ ProducerRecord<byte[], byte[]> record =
+ new ProducerRecord<>("TEST-TOPIC-SOURCE", converter.convert(row));
+ producer.send(record);
+ }
+
+
+
+ }
+
+
+ @TestTemplate
+ public void testSourceKafkaToConsole(TestContainer container) throws IOException, InterruptedException {
+
+ List<String> data = getKafkaConsumerListData("TEST-TOPIC-SOURCE");
+
+ for (String record : data) {
+ log.info("Record: {}", record);}
+
+ Assertions.assertEquals(100, data.size());
+ }
+
+ interface ProducerRecordConverter {
+ byte[] convert(Map<String, Object> row);
+ }
+
+
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+ if (producer != null) {
+ producer.close();
+ }
+ if (kafkaContainer != null) {
+ kafkaContainer.close();
+ }
+
+ }
+
+ private void initKafkaProducer() {
+ Properties properties = new Properties();
+ String bootstrapServers = kafkaContainer.getBootstrapServers();
+ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+ producer = new KafkaProducer<>(properties);
+ }
+
+ private Properties kafkaConsumerConfig() {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consume-group");
+ props.put(
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ OffsetResetStrategy.EARLIEST.toString().toLowerCase());
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ return props;
+ }
+
+ private Properties kafkaByteConsumerConfig() {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consume-group");
+ props.put(
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ OffsetResetStrategy.EARLIEST.toString().toLowerCase());
+ props.setProperty(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName());
+ props.setProperty(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName());
+ return props;
+ }
+
+ private Map<String, String> getKafkaConsumerData(String topicName) {
+ Map<String, String> data = new HashMap<>();
+ try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig())) {
+ consumer.subscribe(Arrays.asList(topicName));
+ Map<TopicPartition, Long> offsets =
+ consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0)));
+ Long endOffset = offsets.entrySet().iterator().next().getValue();
+ Long lastProcessedOffset = -1L;
+
+ do {
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
+ for (ConsumerRecord<String, String> record : records) {
+ if (lastProcessedOffset < record.offset()) {
+ data.put(record.key(), record.value());
+ }
+ lastProcessedOffset = record.offset();
+ }
+ } while (lastProcessedOffset < endOffset - 1);
+ }
+ return data;
+ }
+
+ private List<String> getKafkaConsumerListData(String topicName) {
+ List<String> data = new ArrayList<>();
+ try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig())) {
+ consumer.subscribe(Arrays.asList(topicName));
+ Map<TopicPartition, Long> offsets =
+ consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0)));
+ Long endOffset = offsets.entrySet().iterator().next().getValue();
+ Long lastProcessedOffset = -1L;
+
+ do {
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
+ for (ConsumerRecord<String, String> record : records) {
+ if (lastProcessedOffset < record.offset()) {
+ data.add(record.value());
+ }
+ lastProcessedOffset = record.offset();
+ }
+ } while (lastProcessedOffset < endOffset - 1);
+ }
+ return data;
+ }
+
+
+}