diff options
| author | 窦凤虎 <[email protected]> | 2024-04-07 06:06:23 +0000 |
|---|---|---|
| committer | 窦凤虎 <[email protected]> | 2024-04-07 06:06:23 +0000 |
| commit | 7c242c5984644b4e216e9ba723a5e2d4a974bb56 (patch) | |
| tree | 8fcfc812125d4ef966c721ec010692b4e5eb536e | |
| parent | e736bdfa5dcef0af17b2a0d4ecbdfa6b4cd3aeee (diff) | |
| parent | 58460028cf73336292bbf9a6b05fc7612e3f891b (diff) | |
Merge branch 'feature/test-e2e-kafka' into 'develop'
Feature/test e2e kafka
See merge request galaxy/platform/groot-stream!31
23 files changed, 488 insertions, 63 deletions
@@ -106,7 +106,7 @@ cd "groot-stream-${version}" - Run the following command to start the groot-stream server for Yarn Per-job Mode: ```shell cd "groot-stream-${version}" -./bin/start.sh -c ./config/grootstream_job_example.yaml --target yarn-per-job -n inline-to-print-job -d +./bin/start.sh -c ./config/grootstream_job_example.yaml --target yarn-per-job -Dyarn.application.name="inline-to-print-job" -n inline-to-print-job -d ``` ### Configuring diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml index daf6e32..52b5001 100644 --- a/config/grootstream_job_example.yaml +++ b/config/grootstream_job_example.yaml @@ -33,6 +33,9 @@ application: parallelism: 3 pipeline: object-reuse: true + execution: + restart: + strategy: none topology: - name: inline_source downstream: [filter_operator] diff --git a/config/template/grootstream_job_template.yaml b/config/template/grootstream_job_template.yaml index d700777..af73704 100644 --- a/config/template/grootstream_job_template.yaml +++ b/config/template/grootstream_job_template.yaml @@ -20,6 +20,7 @@ sources: # [object] Define connector source kafka.max.partition.fetch.bytes: 31457280 kafka.group.id: SESSION-RECORD-GROUP-GROOT-STREAM-001 # [string] Kafka Group ID for Consumer kafka.auto.offset.reset: latest # [string] Kafka Auto Offset Reset, default is latest + kafka.compression.type: snappy # [string] Kafka Compression Type, default is none format: json # [string] Data Format for Source. eg. json, protobuf, etc. json.ignore.parse.errors: false # [boolean] Flag to ignore parse errors, default will record the parse errors. If set true, it will ignore the parse errors. diff --git a/docs/env-config.md b/docs/env-config.md index 3e91f3e..e29acb0 100644 --- a/docs/env-config.md +++ b/docs/env-config.md @@ -44,12 +44,18 @@ Specify a list of classpath URLs via `pipeline.classpaths`, The classpaths are s You can directly use the flink parameter by prefixing `flink.`, such as `flink.execution.buffer-timeout`, `flink.object-reuse`, etc. More details can be found in the official [flink documentation](https://flink.apache.org/). Of course, you can use groot stream parameter, here are some parameter names corresponding to the names in Flink. -| Groot Stream | Flink | -|--------------------------|--------------------------------| -| execution.buffer-timeout | flink.execution.buffer-timeout | -| pipeline.object-reuse | flink.object-reuse | -| pipeline.max-parallelism | flink.pipeline.max-parallelism | -| ... | ... | +| Groot Stream | Flink | +|----------------------------------------|---------------------------------------------------------------| +| execution.buffer-timeout | flink.execution.buffer-timeout | +| pipeline.object-reuse | flink.object-reuse | +| pipeline.max-parallelism | flink.pipeline.max-parallelism | +| execution.restart.strategy | flink.restart-strategy | +| execution.restart.attempts | flink.restart-strategy.fixed-delay.attempts | +| execution.restart.delayBetweenAttempts | flink.restart-strategy.fixed-delay.delay | +| execution.restart.failure-rate | flink.restart-strategy.failure-rate.max-failures-per-interval | +| execution.restart.failureInterval | flink.restart-strategy.failure-rate.failure-rate-interval | +| execution.restart.delayInterval | flink.restart-strategy.failure-rate.delay | +| ... | ... | diff --git a/groot-bootstrap/src/main/bin/stop.sh b/groot-bootstrap/src/main/bin/stop.sh index 9fd0469..91e24b9 100644 --- a/groot-bootstrap/src/main/bin/stop.sh +++ b/groot-bootstrap/src/main/bin/stop.sh @@ -16,7 +16,7 @@ stop_jobs() { done ;; yarn-per-job) - # Command to stop YARN applications for the specified job name + # Command to stop YARN applications for the specified Yarn cluster app name yarn application -list -appStates RUNNING | grep "$job_name" | awk '{print $1}' | while read -r appId do yarn application -kill "$appId" @@ -65,9 +65,6 @@ fi deployment_mode=$1 # standalone, yarn-per-job, or yarn-session job_name=$2 # The Flink job name to stop -# Trim whitespace from job_name -job_name=$(echo "$job_name" | tr -d '[:space:]') - # Checking for empty input arguments if [ -z "$deployment_mode" ] || [ -z "$job_name" ]; then display_usage diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamServer.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamServer.java index 38d91c5..93cb972 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamServer.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamServer.java @@ -43,7 +43,7 @@ public class GrootStreamServer { private static void outputFatalError(Throwable throwable) { log.error("\\n\\n===============================================================================\\n\\n"); String errorMsg = throwable.getMessage(); - log.error("Fatal Error ,Reason is :{} \n", errorMsg); + log.error("Fatal Error: {} \n", errorMsg); log.error("Exception StackTrace :{}", ExceptionUtils.getStackTrace(throwable)); log.error("\\n\\n===============================================================================\\n\\n"); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java index 79cd8bf..13db3d4 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java @@ -52,7 +52,7 @@ public final class EnvironmentUtil { if (envConfig.hasPath(ExecutionConfigKeyName.RESTART_STRATEGY)) { String restartStrategy = envConfig.getString(ExecutionConfigKeyName.RESTART_STRATEGY); switch (restartStrategy.toLowerCase()) { - case "no": + case "none": executionConfig.setRestartStrategy(RestartStrategies.noRestart()); break; case "fixed-delay": @@ -68,18 +68,18 @@ public final class EnvironmentUtil { long delayInterval = envConfig.getLong(ExecutionConfigKeyName.RESTART_DELAY_INTERVAL); executionConfig.setRestartStrategy( RestartStrategies.failureRateRestart( - rate, - Time.of(failureInterval, TimeUnit.MILLISECONDS), - Time.of(delayInterval, TimeUnit.MILLISECONDS))); + rate, // max failures per interval + Time.of(failureInterval, TimeUnit.MILLISECONDS), //time interval for measuring failure rate + Time.of(delayInterval, TimeUnit.MILLISECONDS))); // delay break; default: log.warn( - "set restart.strategy failed, unknown restart.strategy [{}],only support no,fixed-delay,failure-rate", + "Set restart strategy failed, unknown restart strategy [{}],only support no,fixed-delay,failure-rate", restartStrategy); } } } catch (Exception e) { - log.warn("set restart.strategy in config '{}' exception", envConfig, e); + log.warn("Set restart strategy in config '{}' exception", envConfig, e); } } diff --git a/groot-bootstrap/src/main/resources/log4j.properties b/groot-bootstrap/src/main/resources/log4j.properties deleted file mode 100644 index db5d9e5..0000000 --- a/groot-bootstrap/src/main/resources/log4j.properties +++ /dev/null @@ -1,22 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# Set everything to be logged to the console -log4j.rootCategory=INFO, console -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.err -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory index 21e390d..6d6a1bb 100644 --- a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory +++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory @@ -1,2 +1,2 @@ com.geedgenetworks.core.connector.inline.InlineTableFactory -com.geedgenetworks.core.connector.print.PrintTableFactory
\ No newline at end of file +com.geedgenetworks.core.connector.print.PrintTableFactory diff --git a/groot-examples/end-to-end-example/pom.xml b/groot-examples/end-to-end-example/pom.xml index 0839e8e..34709fd 100644 --- a/groot-examples/end-to-end-example/pom.xml +++ b/groot-examples/end-to-end-example/pom.xml @@ -13,8 +13,9 @@ <name>Groot : Examples : End-to-end</name> <properties> - <maven.install.skip>true</maven.install.skip> - <maven.deploy.skip>true</maven.deploy.skip> + <maven.install.skip>false</maven.install.skip> + <maven.deploy.skip>false</maven.deploy.skip> </properties> + </project>
\ No newline at end of file diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java index aa2cb76..1f236d7 100644 --- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java +++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java @@ -13,7 +13,7 @@ import java.nio.file.Paths; public class GrootStreamExample { public static void main(String[] args) throws FileNotFoundException, URISyntaxException { - String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print.yaml"; + String configPath = args.length > 0 ? args[0] : "/examples/kafka_to_print.yaml"; String configFile = getTestConfigFile(configPath); ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs(); executeCommandArgs.setConfigFile(configFile); diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml index e883bce..bf8ebf2 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml @@ -30,6 +30,8 @@ application: # [object] Define job configuration parallelism: 1 pipeline: object-reuse: true + flink: + restart-strategy: none topology: - name: kafka_source downstream: [print_sink] diff --git a/groot-examples/pom.xml b/groot-examples/pom.xml index fc08086..1fb6212 100644 --- a/groot-examples/pom.xml +++ b/groot-examples/pom.xml @@ -30,28 +30,27 @@ <dependency> <groupId>com.geedgenetworks</groupId> <artifactId>groot-bootstrap</artifactId> - <version>${revision}</version> + <version>${project.version}</version> </dependency> - <dependency> <groupId>com.geedgenetworks</groupId> <artifactId>connector-kafka</artifactId> - <version>${revision}</version> + <version>${project.version}</version> <scope>${scope}</scope> </dependency> <dependency> <groupId>com.geedgenetworks</groupId> <artifactId>connector-clickhouse</artifactId> - <version>${revision}</version> + <version>${project.version}</version> <scope>${scope}</scope> </dependency> <dependency> <groupId>com.geedgenetworks</groupId> <artifactId>connector-ipfix-collector</artifactId> - <version>${revision}</version> + <version>${project.version}</version> <scope>${scope}</scope> </dependency> diff --git a/groot-tests/pom.xml b/groot-tests/pom.xml index 5882a56..76f533a 100644 --- a/groot-tests/pom.xml +++ b/groot-tests/pom.xml @@ -15,6 +15,7 @@ <modules> <module>test-common</module> <module>test-e2e-base</module> + <module>test-e2e-kafka</module> </modules> <properties> @@ -22,10 +23,30 @@ <maven.deploy.skip>true</maven.deploy.skip> <maven-jar-plugin.version>2.4</maven-jar-plugin.version> <rest-assured.version>4.3.1</rest-assured.version> + <snappy-java.version>1.1.8.3</snappy-java.version> </properties> <dependencies> <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>groot-bootstrap</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.typesafe</groupId> + <artifactId>config</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>format-json</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> <groupId>org.testcontainers</groupId> <artifactId>testcontainers</artifactId> </dependency> @@ -53,6 +74,7 @@ <version>${rest-assured.version}</version> <scope>test</scope> </dependency> + </dependencies> <build> diff --git a/groot-tests/test-common/pom.xml b/groot-tests/test-common/pom.xml index c6cb7bf..c086f41 100644 --- a/groot-tests/test-common/pom.xml +++ b/groot-tests/test-common/pom.xml @@ -17,16 +17,7 @@ </properties> <dependencies> - <dependency> - <groupId>com.typesafe</groupId> - <artifactId>config</artifactId> - </dependency> - - <dependency> - <groupId>com.geedgenetworks</groupId> - <artifactId>groot-bootstrap</artifactId> - <version>${revision}</version> - </dependency> + </dependencies> diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java index aed38b9..29154e5 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java @@ -55,6 +55,8 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer { new LogMessageWaitStrategy() .withRegEx(".*Starting the resource manager.*") .withStartupTimeout(Duration.ofMinutes(2))); + + // Copy groot-stream bootstrap and some other files to the container copyGrootStreamStarterToContainer(jobManager); copyGrootStreamStarterLoggingToContainer(jobManager); diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java index 3f8435d..58eee29 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java @@ -71,7 +71,7 @@ public final class ContainerUtil { Paths.get(GrootStreamHomeInContainer, "bootstrap", startJarName).toString()); - // copy lib + // copy libs String hbaseClientJar = "hbase-client-shaded-" + getProjectVersion() + ".jar"; Path hbaseClientJarPath = Paths.get(PROJECT_ROOT_PATH, "groot-shaded/hbase-client-shaded", "target", hbaseClientJar); diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintTest.java index b84d169..1e94cdc 100644 --- a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java +++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintTest.java @@ -5,7 +5,6 @@ import com.alibaba.fastjson2.TypeReference; import com.alibaba.nacos.client.naming.utils.CollectionUtils; import com.geedgenetworks.test.common.TestSuiteBase; import com.geedgenetworks.test.common.container.AbstractTestFlinkContainer; -import com.geedgenetworks.test.common.container.TestContainer; import com.geedgenetworks.test.common.container.TestContainerId; import com.geedgenetworks.test.common.junit.DisabledOnContainer; import lombok.extern.slf4j.Slf4j; @@ -25,14 +24,14 @@ import static org.awaitility.Awaitility.await; value = {TestContainerId.FLINK_1_17}, type = {}, disabledReason = "only flink adjusts the parameter configuration rules") -public class InlineToPrint extends TestSuiteBase { +public class InlineToPrintTest extends TestSuiteBase { @TestTemplate public void testInlineToPrint(AbstractTestFlinkContainer container) throws IOException, InterruptedException { CompletableFuture.supplyAsync( () -> { try { - return container.executeJob("/inline_to_print.yaml"); + return container.executeJob("/kafka_to_print.yaml"); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); diff --git a/groot-tests/test-e2e-base/src/test/resources/kafka_to_print.yaml b/groot-tests/test-e2e-base/src/test/resources/kafka_to_print.yaml new file mode 100644 index 0000000..b1e4f35 --- /dev/null +++ b/groot-tests/test-e2e-base/src/test/resources/kafka_to_print.yaml @@ -0,0 +1,40 @@ +sources: + kafka_source: + type : kafka + schema: + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: client_ip + type: string + - name: server_ip + type: string + properties: # [object] Kafka source properties + topic: SESSION-RECORD + kafka.bootstrap.servers: 192.168.44.11:9092 + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.group.id: GROOT-STREAM-EXAMPLE-KAFKA-TO-PRINT + kafka.auto.offset.reset: latest + format: json + +sinks: # [object] Define connector sink + print_sink: + type: print + properties: + mode: log_info + format: json + +application: # [object] Define job configuration + env: + name: example-kafka-to-print + parallelism: 1 + pipeline: + object-reuse: true + execution: + restart: + strategy: no + topology: + - name: kafka_source + downstream: [print_sink] + - name: print_sink + downstream: []
\ No newline at end of file diff --git a/groot-tests/test-e2e-kafka/pom.xml b/groot-tests/test-e2e-kafka/pom.xml new file mode 100644 index 0000000..ab1ba72 --- /dev/null +++ b/groot-tests/test-e2e-kafka/pom.xml @@ -0,0 +1,52 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>com.geedgenetworks</groupId> + <artifactId>groot-tests</artifactId> + <version>${revision}</version> + </parent> + + <artifactId>test-e2e-kafka</artifactId> + <name>Groot : Tests : E2E : Kafka</name> + + <dependencies> + + <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>test-common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>kafka</artifactId> + <version>${testcontainer.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>connector-kafka</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + <version>${snappy-java.version}</version> + <scope>test</scope> + </dependency> + + + + + </dependencies> + + +</project>
\ No newline at end of file diff --git a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java new file mode 100644 index 0000000..e3af025 --- /dev/null +++ b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java @@ -0,0 +1,228 @@ +package com.geedgenetworks.test.e2e.kafka; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.TypeReference; +import com.alibaba.nacos.client.naming.utils.CollectionUtils; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.types.StructType; +import com.geedgenetworks.core.types.Types; +import com.geedgenetworks.formats.json.JsonSerializer; +import com.geedgenetworks.test.common.TestResource; +import com.geedgenetworks.test.common.TestSuiteBase; +import com.geedgenetworks.test.common.container.TestContainer; +import com.geedgenetworks.test.common.container.TestContainerId; +import com.geedgenetworks.test.common.junit.DisabledOnContainer; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.com.google.common.base.CharMatcher; +import org.testcontainers.shaded.com.google.common.base.Strings; +import org.testcontainers.shaded.com.google.common.primitives.Chars; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; + +import static org.awaitility.Awaitility.await; + +@Slf4j +@DisabledOnContainer( + value = {TestContainerId.FLINK_1_17}, + disabledReason = "Override TestSuiteBase @DisabledOnContainer") +public class KafkaConnectorTest extends TestSuiteBase implements TestResource { + + private KafkaContainer kafkaContainer; + + private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.4.0"; + private static final String KAFKA_HOST = "kafkaCluster"; + private KafkaProducer<byte[], byte[]> producer; + + private static final String DEFAULT_TEST_TOPIC_SOURCE = "test_topic_source"; + + @Override + @BeforeAll + public void startUp() { + kafkaContainer = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME)) + .withNetwork(NETWORK) + .withNetworkAliases(KAFKA_HOST) + .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME))); + Startables.deepStart(Stream.of(kafkaContainer)).join(); + log.info("Kafka container started successfully"); + + Awaitility.given() + .ignoreExceptions() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(this::initKafkaProducer); + + log.info("Write 100 records to topic test_topic_source"); + generateTestData(DEFAULT_TEST_TOPIC_SOURCE,0, 100); + } + + + + @TestTemplate + public void testSourceKafkaJsonToConsole(TestContainer container) { + generateTestData("test_topic_json", 0, 10); + CompletableFuture.supplyAsync( + () -> { + try { + return container.executeJob("/kafka_source_to_console.yaml"); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + }); + + await().atMost(300000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + String logs = container.getServerLogs(); + Assertions.assertEquals(StringUtils.countMatches(logs, "PrintSinkFunction"), 10); + }); + } + + private void generateTestData(String topic, int start, int end) { + StructType dataType = Types.parseStructType("id: int, client_ip: string, server_ip: string, flag: string"); + JsonSerializer serializer = new JsonSerializer(dataType); + for (int i = start; i < end; i++) { + Map<String, Object> row = Map + .of("id", i, + "client_ip", "192.168.40.12", + "server_ip", "8.8.8.8" , + "flag", Boolean.FALSE.booleanValue()); + ProducerRecord<byte[], byte[]> record = + new ProducerRecord<>(topic, serializer.serialize(row)); + producer.send(record); + } + + } + + + @TestTemplate + public void testSinkKafka(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/kafka_sink_inline_to_kafka.yaml"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + List<String> data = getKafkaConsumerListData("test_topic"); + Assertions.assertEquals(10, data.size()); // Check if all 10 records are consumed + } + + @AfterAll + @Override + public void tearDown() throws Exception { + if (producer != null) { + producer.close(); + } + if (kafkaContainer != null) { + kafkaContainer.close(); + } + + } + + private void initKafkaProducer() { + Properties properties = new Properties(); + String bootstrapServers = kafkaContainer.getBootstrapServers(); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + producer = new KafkaProducer<>(properties); + } + + private Properties kafkaConsumerConfig() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consume-group"); + props.put( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + OffsetResetStrategy.EARLIEST.toString().toLowerCase()); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + return props; + } + + private Properties kafkaByteConsumerConfig() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consume-group"); + props.put( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + OffsetResetStrategy.EARLIEST.toString().toLowerCase()); + props.setProperty( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + ByteArrayDeserializer.class.getName()); + props.setProperty( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + ByteArrayDeserializer.class.getName()); + return props; + } + + private Map<String, String> getKafkaConsumerData(String topicName) { + Map<String, String> data = new HashMap<>(); + try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig())) { + consumer.subscribe(Arrays.asList(topicName)); + Map<TopicPartition, Long> offsets = + consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0))); + Long endOffset = offsets.entrySet().iterator().next().getValue(); + Long lastProcessedOffset = -1L; + + do { + ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord<String, String> record : records) { + if (lastProcessedOffset < record.offset()) { + data.put(record.key(), record.value()); + } + lastProcessedOffset = record.offset(); + } + } while (lastProcessedOffset < endOffset - 1); + } + return data; + } + + private List<String> getKafkaConsumerListData(String topicName) { + List<String> data = new ArrayList<>(); + try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig())) { + consumer.subscribe(Arrays.asList(topicName)); + Map<TopicPartition, Long> offsets = + consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0))); + Long endOffset = offsets.entrySet().iterator().next().getValue(); + Long lastProcessedOffset = -1L; + + do { + ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord<String, String> record : records) { + if (lastProcessedOffset < record.offset()) { + data.add(record.value()); + } + lastProcessedOffset = record.offset(); + } + } while (lastProcessedOffset < endOffset - 1); + } + return data; + } + + +} diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_inline_to_kafka.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_inline_to_kafka.yaml new file mode 100644 index 0000000..dbfbc1e --- /dev/null +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_inline_to_kafka.yaml @@ -0,0 +1,64 @@ +sources: # [object] Define connector source + inline_source: + type: inline + schema: + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: log_id + type: bigint + - name: recv_time + type: bigint + - name: server_fqdn + type: string + - name: server_domain + type: string + - name: client_ip + type: string + - name: server_ip + type: string + - name: server_asn + type: string + - name: decoded_as + type: string + - name: device_group + type: string + - name: device_tag + type: string + properties: + # + # [string] Event Data, it will be parsed to Map<String, Object> by the specified format. + # + data: '{"recv_time": 1705565615, "log_id":206211012872372224, "tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}' + format: json + interval.per.row: 1s + repeat.count: 10 + json.ignore.parse.errors: false + + +sinks: + connector_kafka: + type: kafka + properties: + topic: test_topic + kafka.bootstrap.servers: kafkaCluster:9092 + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + format: json + log.failures.only: true + +application: # [object] Define job configuration + env: + name: example-inline-to-kafka + parallelism: 1 + shade.identifier: default + pipeline: + object-reuse: true + topology: + - name: inline_source + downstream: [ connector_kafka ] + - name: connector_kafka + downstream: []
\ No newline at end of file diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_to_console.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_to_console.yaml new file mode 100644 index 0000000..2afee47 --- /dev/null +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_to_console.yaml @@ -0,0 +1,40 @@ +sources: + kafka_source: + type : kafka + schema: + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: client_ip + type: string + - name: server_ip + type: string + properties: # [object] Kafka source properties + topic: test_topic_json + kafka.bootstrap.servers: kafkaCluster:9092 + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.group.id: test_topic_json_group + kafka.auto.offset.reset: earliest + format: json + +sinks: # [object] Define connector sink + print_sink: + type: print + properties: + mode: log_warn + format: json + +application: # [object] Define job configuration + env: + name: example-kafka-to-print + parallelism: 1 + pipeline: + object-reuse: true + execution: + restart: + strategy: no + topology: + - name: kafka_source + downstream: [print_sink] + - name: print_sink + downstream: []
\ No newline at end of file |
