From 95bcb7db323b12d7e7f864dff615d73622d7e688 Mon Sep 17 00:00:00 2001 From: doufenghu Date: Tue, 2 Apr 2024 17:36:46 +0800 Subject: [Feature][Tests] add Kafka Container for unit test. --- README.md | 2 +- config/grootstream_job_example.yaml | 3 + config/template/grootstream_job_template.yaml | 1 + docs/env-config.md | 18 +- groot-bootstrap/src/main/bin/stop.sh | 5 +- .../bootstrap/main/GrootStreamServer.java | 2 +- .../bootstrap/utils/EnvironmentUtil.java | 12 +- .../src/main/resources/log4j.properties | 22 --- .../com.geedgenetworks.core.factories.Factory | 2 +- groot-examples/end-to-end-example/pom.xml | 5 +- .../geedgenetworks/example/GrootStreamExample.java | 2 +- .../main/resources/examples/kafka_to_print.yaml | 2 + groot-examples/pom.xml | 9 +- groot-tests/pom.xml | 20 ++ groot-tests/test-common/pom.xml | 11 +- .../container/AbstractTestFlinkContainer.java | 2 + .../test/common/container/ContainerUtil.java | 2 +- .../test/e2e/base/InlineToPrint.java | 91 --------- .../test/e2e/base/InlineToPrintTest.java | 90 +++++++++ .../src/test/resources/kafka_to_print.yaml | 40 ++++ groot-tests/test-e2e-kafka/pom.xml | 45 +++++ .../test/e2e/kafka/KafkaConnectorTest.java | 204 +++++++++++++++++++++ 22 files changed, 439 insertions(+), 151 deletions(-) delete mode 100644 groot-bootstrap/src/main/resources/log4j.properties delete mode 100644 groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java create mode 100644 groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintTest.java create mode 100644 groot-tests/test-e2e-base/src/test/resources/kafka_to_print.yaml create mode 100644 groot-tests/test-e2e-kafka/pom.xml create mode 100644 groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java diff --git a/README.md b/README.md index 6f3109e..88ab203 100644 --- a/README.md +++ b/README.md @@ -106,7 +106,7 @@ cd "groot-stream-${version}" - Run the following command to start the groot-stream server for Yarn Per-job Mode: ```shell cd "groot-stream-${version}" -./bin/start.sh -c ./config/grootstream_job_example.yaml --target yarn-per-job -n inline-to-print-job -d +./bin/start.sh -c ./config/grootstream_job_example.yaml --target yarn-per-job -Dyarn.application.name="inline-to-print-job" -n inline-to-print-job -d ``` ### Configuring diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml index daf6e32..52b5001 100644 --- a/config/grootstream_job_example.yaml +++ b/config/grootstream_job_example.yaml @@ -33,6 +33,9 @@ application: parallelism: 3 pipeline: object-reuse: true + execution: + restart: + strategy: none topology: - name: inline_source downstream: [filter_operator] diff --git a/config/template/grootstream_job_template.yaml b/config/template/grootstream_job_template.yaml index d700777..af73704 100644 --- a/config/template/grootstream_job_template.yaml +++ b/config/template/grootstream_job_template.yaml @@ -20,6 +20,7 @@ sources: # [object] Define connector source kafka.max.partition.fetch.bytes: 31457280 kafka.group.id: SESSION-RECORD-GROUP-GROOT-STREAM-001 # [string] Kafka Group ID for Consumer kafka.auto.offset.reset: latest # [string] Kafka Auto Offset Reset, default is latest + kafka.compression.type: snappy # [string] Kafka Compression Type, default is none format: json # [string] Data Format for Source. eg. json, protobuf, etc. json.ignore.parse.errors: false # [boolean] Flag to ignore parse errors, default will record the parse errors. If set true, it will ignore the parse errors. diff --git a/docs/env-config.md b/docs/env-config.md index 3e91f3e..e29acb0 100644 --- a/docs/env-config.md +++ b/docs/env-config.md @@ -44,12 +44,18 @@ Specify a list of classpath URLs via `pipeline.classpaths`, The classpaths are s You can directly use the flink parameter by prefixing `flink.`, such as `flink.execution.buffer-timeout`, `flink.object-reuse`, etc. More details can be found in the official [flink documentation](https://flink.apache.org/). Of course, you can use groot stream parameter, here are some parameter names corresponding to the names in Flink. -| Groot Stream | Flink | -|--------------------------|--------------------------------| -| execution.buffer-timeout | flink.execution.buffer-timeout | -| pipeline.object-reuse | flink.object-reuse | -| pipeline.max-parallelism | flink.pipeline.max-parallelism | -| ... | ... | +| Groot Stream | Flink | +|----------------------------------------|---------------------------------------------------------------| +| execution.buffer-timeout | flink.execution.buffer-timeout | +| pipeline.object-reuse | flink.object-reuse | +| pipeline.max-parallelism | flink.pipeline.max-parallelism | +| execution.restart.strategy | flink.restart-strategy | +| execution.restart.attempts | flink.restart-strategy.fixed-delay.attempts | +| execution.restart.delayBetweenAttempts | flink.restart-strategy.fixed-delay.delay | +| execution.restart.failure-rate | flink.restart-strategy.failure-rate.max-failures-per-interval | +| execution.restart.failureInterval | flink.restart-strategy.failure-rate.failure-rate-interval | +| execution.restart.delayInterval | flink.restart-strategy.failure-rate.delay | +| ... | ... | diff --git a/groot-bootstrap/src/main/bin/stop.sh b/groot-bootstrap/src/main/bin/stop.sh index 9fd0469..91e24b9 100644 --- a/groot-bootstrap/src/main/bin/stop.sh +++ b/groot-bootstrap/src/main/bin/stop.sh @@ -16,7 +16,7 @@ stop_jobs() { done ;; yarn-per-job) - # Command to stop YARN applications for the specified job name + # Command to stop YARN applications for the specified Yarn cluster app name yarn application -list -appStates RUNNING | grep "$job_name" | awk '{print $1}' | while read -r appId do yarn application -kill "$appId" @@ -65,9 +65,6 @@ fi deployment_mode=$1 # standalone, yarn-per-job, or yarn-session job_name=$2 # The Flink job name to stop -# Trim whitespace from job_name -job_name=$(echo "$job_name" | tr -d '[:space:]') - # Checking for empty input arguments if [ -z "$deployment_mode" ] || [ -z "$job_name" ]; then display_usage diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamServer.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamServer.java index 38d91c5..93cb972 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamServer.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamServer.java @@ -43,7 +43,7 @@ public class GrootStreamServer { private static void outputFatalError(Throwable throwable) { log.error("\\n\\n===============================================================================\\n\\n"); String errorMsg = throwable.getMessage(); - log.error("Fatal Error ,Reason is :{} \n", errorMsg); + log.error("Fatal Error: {} \n", errorMsg); log.error("Exception StackTrace :{}", ExceptionUtils.getStackTrace(throwable)); log.error("\\n\\n===============================================================================\\n\\n"); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java index 79cd8bf..13db3d4 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java @@ -52,7 +52,7 @@ public final class EnvironmentUtil { if (envConfig.hasPath(ExecutionConfigKeyName.RESTART_STRATEGY)) { String restartStrategy = envConfig.getString(ExecutionConfigKeyName.RESTART_STRATEGY); switch (restartStrategy.toLowerCase()) { - case "no": + case "none": executionConfig.setRestartStrategy(RestartStrategies.noRestart()); break; case "fixed-delay": @@ -68,18 +68,18 @@ public final class EnvironmentUtil { long delayInterval = envConfig.getLong(ExecutionConfigKeyName.RESTART_DELAY_INTERVAL); executionConfig.setRestartStrategy( RestartStrategies.failureRateRestart( - rate, - Time.of(failureInterval, TimeUnit.MILLISECONDS), - Time.of(delayInterval, TimeUnit.MILLISECONDS))); + rate, // max failures per interval + Time.of(failureInterval, TimeUnit.MILLISECONDS), //time interval for measuring failure rate + Time.of(delayInterval, TimeUnit.MILLISECONDS))); // delay break; default: log.warn( - "set restart.strategy failed, unknown restart.strategy [{}],only support no,fixed-delay,failure-rate", + "Set restart strategy failed, unknown restart strategy [{}],only support no,fixed-delay,failure-rate", restartStrategy); } } } catch (Exception e) { - log.warn("set restart.strategy in config '{}' exception", envConfig, e); + log.warn("Set restart strategy in config '{}' exception", envConfig, e); } } diff --git a/groot-bootstrap/src/main/resources/log4j.properties b/groot-bootstrap/src/main/resources/log4j.properties deleted file mode 100644 index db5d9e5..0000000 --- a/groot-bootstrap/src/main/resources/log4j.properties +++ /dev/null @@ -1,22 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# Set everything to be logged to the console -log4j.rootCategory=INFO, console -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.err -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory index 21e390d..6d6a1bb 100644 --- a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory +++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory @@ -1,2 +1,2 @@ com.geedgenetworks.core.connector.inline.InlineTableFactory -com.geedgenetworks.core.connector.print.PrintTableFactory \ No newline at end of file +com.geedgenetworks.core.connector.print.PrintTableFactory diff --git a/groot-examples/end-to-end-example/pom.xml b/groot-examples/end-to-end-example/pom.xml index 0839e8e..34709fd 100644 --- a/groot-examples/end-to-end-example/pom.xml +++ b/groot-examples/end-to-end-example/pom.xml @@ -13,8 +13,9 @@ Groot : Examples : End-to-end - true - true + false + false + \ No newline at end of file diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java index aa2cb76..1f236d7 100644 --- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java +++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java @@ -13,7 +13,7 @@ import java.nio.file.Paths; public class GrootStreamExample { public static void main(String[] args) throws FileNotFoundException, URISyntaxException { - String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print.yaml"; + String configPath = args.length > 0 ? args[0] : "/examples/kafka_to_print.yaml"; String configFile = getTestConfigFile(configPath); ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs(); executeCommandArgs.setConfigFile(configFile); diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml index e883bce..bf8ebf2 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml @@ -30,6 +30,8 @@ application: # [object] Define job configuration parallelism: 1 pipeline: object-reuse: true + flink: + restart-strategy: none topology: - name: kafka_source downstream: [print_sink] diff --git a/groot-examples/pom.xml b/groot-examples/pom.xml index fc08086..1fb6212 100644 --- a/groot-examples/pom.xml +++ b/groot-examples/pom.xml @@ -30,28 +30,27 @@ com.geedgenetworks groot-bootstrap - ${revision} + ${project.version} - com.geedgenetworks connector-kafka - ${revision} + ${project.version} ${scope} com.geedgenetworks connector-clickhouse - ${revision} + ${project.version} ${scope} com.geedgenetworks connector-ipfix-collector - ${revision} + ${project.version} ${scope} diff --git a/groot-tests/pom.xml b/groot-tests/pom.xml index 5882a56..041cd9d 100644 --- a/groot-tests/pom.xml +++ b/groot-tests/pom.xml @@ -15,6 +15,7 @@ test-common test-e2e-base + test-e2e-kafka @@ -25,6 +26,25 @@ + + com.geedgenetworks + groot-bootstrap + ${project.version} + test + + + + com.typesafe + config + test + + + + com.geedgenetworks + format-json + ${project.version} + + org.testcontainers testcontainers diff --git a/groot-tests/test-common/pom.xml b/groot-tests/test-common/pom.xml index c6cb7bf..c086f41 100644 --- a/groot-tests/test-common/pom.xml +++ b/groot-tests/test-common/pom.xml @@ -17,16 +17,7 @@ - - com.typesafe - config - - - - com.geedgenetworks - groot-bootstrap - ${revision} - + diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java index aed38b9..29154e5 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java @@ -55,6 +55,8 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer { new LogMessageWaitStrategy() .withRegEx(".*Starting the resource manager.*") .withStartupTimeout(Duration.ofMinutes(2))); + + // Copy groot-stream bootstrap and some other files to the container copyGrootStreamStarterToContainer(jobManager); copyGrootStreamStarterLoggingToContainer(jobManager); diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java index 3f8435d..58eee29 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java @@ -71,7 +71,7 @@ public final class ContainerUtil { Paths.get(GrootStreamHomeInContainer, "bootstrap", startJarName).toString()); - // copy lib + // copy libs String hbaseClientJar = "hbase-client-shaded-" + getProjectVersion() + ".jar"; Path hbaseClientJarPath = Paths.get(PROJECT_ROOT_PATH, "groot-shaded/hbase-client-shaded", "target", hbaseClientJar); diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java deleted file mode 100644 index b84d169..0000000 --- a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java +++ /dev/null @@ -1,91 +0,0 @@ -package com.geedgenetworks.test.e2e.base; - -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.TypeReference; -import com.alibaba.nacos.client.naming.utils.CollectionUtils; -import com.geedgenetworks.test.common.TestSuiteBase; -import com.geedgenetworks.test.common.container.AbstractTestFlinkContainer; -import com.geedgenetworks.test.common.container.TestContainer; -import com.geedgenetworks.test.common.container.TestContainerId; -import com.geedgenetworks.test.common.junit.DisabledOnContainer; -import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.TestTemplate; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import static org.awaitility.Awaitility.await; - -@Slf4j -@DisabledOnContainer( - value = {TestContainerId.FLINK_1_17}, - type = {}, - disabledReason = "only flink adjusts the parameter configuration rules") -public class InlineToPrint extends TestSuiteBase { - - @TestTemplate - public void testInlineToPrint(AbstractTestFlinkContainer container) throws IOException, InterruptedException { - CompletableFuture.supplyAsync( - () -> { - try { - return container.executeJob("/inline_to_print.yaml"); - } catch (Exception e) { - log.error("Commit task exception :" + e.getMessage()); - throw new RuntimeException(e); - } - }); - - AtomicReference taskMangerID = new AtomicReference<>(); - await().atMost(300000, TimeUnit.MILLISECONDS) - .untilAsserted( - () -> { - Map taskMangerInfo = JSON.parseObject(container.executeJobManagerInnerCommand( - "curl http://localhost:8081/taskmanagers"), new TypeReference>() { - }); - List> taskManagers = - (List>) taskMangerInfo.get("taskmanagers"); - if (!CollectionUtils.isEmpty(taskManagers)) { - taskMangerID.set(taskManagers.get(0).get("id").toString()); - } - Assertions.assertNotNull(taskMangerID.get()); - }); - - AtomicReference jobId = new AtomicReference<>(); - await().atMost(300000, TimeUnit.MILLISECONDS) - .untilAsserted( - () -> { - Map jobInfo = JSON.parseObject(container.executeJobManagerInnerCommand( - "curl http://localhost:8081/jobs/overview"), new TypeReference>() { - }); - List> jobs = - (List>) jobInfo.get("jobs"); - if (!CollectionUtils.isEmpty(jobs)) { - jobId.set(jobs.get(0).get("jid").toString()); - } - Assertions.assertNotNull(jobId.get()); - }); - //Obtain job metrics - AtomicReference>> jobNumRestartsReference = new AtomicReference<>(); - await().atMost(60000, TimeUnit.MILLISECONDS) - .untilAsserted( - () -> { - Thread.sleep(5000); - String result = container.executeJobManagerInnerCommand( - String.format( - "curl http://localhost:8081/jobs/%s/metrics?get=numRestarts", jobId.get())); - List> jobNumRestartsInfo = JSON.parseObject(result, new TypeReference>>() { - }); - if (!CollectionUtils.isEmpty(jobNumRestartsInfo)) { - jobNumRestartsReference.set(jobNumRestartsInfo); - } - - Assertions.assertNotNull(jobNumRestartsReference.get()); - - }); - } - -} diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintTest.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintTest.java new file mode 100644 index 0000000..1e94cdc --- /dev/null +++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintTest.java @@ -0,0 +1,90 @@ +package com.geedgenetworks.test.e2e.base; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.TypeReference; +import com.alibaba.nacos.client.naming.utils.CollectionUtils; +import com.geedgenetworks.test.common.TestSuiteBase; +import com.geedgenetworks.test.common.container.AbstractTestFlinkContainer; +import com.geedgenetworks.test.common.container.TestContainerId; +import com.geedgenetworks.test.common.junit.DisabledOnContainer; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.TestTemplate; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.awaitility.Awaitility.await; + +@Slf4j +@DisabledOnContainer( + value = {TestContainerId.FLINK_1_17}, + type = {}, + disabledReason = "only flink adjusts the parameter configuration rules") +public class InlineToPrintTest extends TestSuiteBase { + + @TestTemplate + public void testInlineToPrint(AbstractTestFlinkContainer container) throws IOException, InterruptedException { + CompletableFuture.supplyAsync( + () -> { + try { + return container.executeJob("/kafka_to_print.yaml"); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + }); + + AtomicReference taskMangerID = new AtomicReference<>(); + await().atMost(300000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Map taskMangerInfo = JSON.parseObject(container.executeJobManagerInnerCommand( + "curl http://localhost:8081/taskmanagers"), new TypeReference>() { + }); + List> taskManagers = + (List>) taskMangerInfo.get("taskmanagers"); + if (!CollectionUtils.isEmpty(taskManagers)) { + taskMangerID.set(taskManagers.get(0).get("id").toString()); + } + Assertions.assertNotNull(taskMangerID.get()); + }); + + AtomicReference jobId = new AtomicReference<>(); + await().atMost(300000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Map jobInfo = JSON.parseObject(container.executeJobManagerInnerCommand( + "curl http://localhost:8081/jobs/overview"), new TypeReference>() { + }); + List> jobs = + (List>) jobInfo.get("jobs"); + if (!CollectionUtils.isEmpty(jobs)) { + jobId.set(jobs.get(0).get("jid").toString()); + } + Assertions.assertNotNull(jobId.get()); + }); + //Obtain job metrics + AtomicReference>> jobNumRestartsReference = new AtomicReference<>(); + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Thread.sleep(5000); + String result = container.executeJobManagerInnerCommand( + String.format( + "curl http://localhost:8081/jobs/%s/metrics?get=numRestarts", jobId.get())); + List> jobNumRestartsInfo = JSON.parseObject(result, new TypeReference>>() { + }); + if (!CollectionUtils.isEmpty(jobNumRestartsInfo)) { + jobNumRestartsReference.set(jobNumRestartsInfo); + } + + Assertions.assertNotNull(jobNumRestartsReference.get()); + + }); + } + +} diff --git a/groot-tests/test-e2e-base/src/test/resources/kafka_to_print.yaml b/groot-tests/test-e2e-base/src/test/resources/kafka_to_print.yaml new file mode 100644 index 0000000..b1e4f35 --- /dev/null +++ b/groot-tests/test-e2e-base/src/test/resources/kafka_to_print.yaml @@ -0,0 +1,40 @@ +sources: + kafka_source: + type : kafka + schema: + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: client_ip + type: string + - name: server_ip + type: string + properties: # [object] Kafka source properties + topic: SESSION-RECORD + kafka.bootstrap.servers: 192.168.44.11:9092 + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.group.id: GROOT-STREAM-EXAMPLE-KAFKA-TO-PRINT + kafka.auto.offset.reset: latest + format: json + +sinks: # [object] Define connector sink + print_sink: + type: print + properties: + mode: log_info + format: json + +application: # [object] Define job configuration + env: + name: example-kafka-to-print + parallelism: 1 + pipeline: + object-reuse: true + execution: + restart: + strategy: no + topology: + - name: kafka_source + downstream: [print_sink] + - name: print_sink + downstream: [] \ No newline at end of file diff --git a/groot-tests/test-e2e-kafka/pom.xml b/groot-tests/test-e2e-kafka/pom.xml new file mode 100644 index 0000000..fbbe0e7 --- /dev/null +++ b/groot-tests/test-e2e-kafka/pom.xml @@ -0,0 +1,45 @@ + + + 4.0.0 + + com.geedgenetworks + groot-tests + ${revision} + + + test-e2e-kafka + Groot : Tests : E2E : Kafka + + + + + com.geedgenetworks + test-common + ${project.version} + test-jar + test + + + + org.testcontainers + kafka + ${testcontainer.version} + test + + + + com.geedgenetworks + connector-kafka + ${project.version} + test + + + + + + + + + \ No newline at end of file diff --git a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java new file mode 100644 index 0000000..9cd09fc --- /dev/null +++ b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java @@ -0,0 +1,204 @@ +package com.geedgenetworks.test.e2e.kafka; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.types.StructType; +import com.geedgenetworks.core.types.Types; +import com.geedgenetworks.formats.json.JsonSerializer; +import com.geedgenetworks.test.common.TestResource; +import com.geedgenetworks.test.common.TestSuiteBase; +import com.geedgenetworks.test.common.container.TestContainer; +import com.geedgenetworks.test.common.container.TestContainerId; +import com.geedgenetworks.test.common.junit.DisabledOnContainer; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +@Slf4j +@DisabledOnContainer( + value = {TestContainerId.FLINK_1_17}, + disabledReason = "Override TestSuiteBase @DisabledOnContainer") +public class KafkaConnectorTest extends TestSuiteBase implements TestResource { + + private KafkaContainer kafkaContainer; + + private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.4.0"; + private static final String KAFKA_HOST = "kafkaCluster"; + private KafkaProducer producer; + + + + @Override + @BeforeAll + public void startUp() throws Exception { + kafkaContainer = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME)) + .withNetwork(NETWORK) + .withNetworkAliases(KAFKA_HOST) + .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME))); + Startables.deepStart(Stream.of(kafkaContainer)).join(); + log.info("Kafka container started successfully"); + + Awaitility.given() + .ignoreExceptions() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(this::initKafkaProducer); + + log.info("Write 100 records to topic test_topic_source"); + + StructType dataType = Types.parseStructType("client_ip: string, server_ip: String"); + JsonSerializer serializer = new JsonSerializer(dataType); + generateTestData(serializer::serialize, 0, 100); + + + } + + private void generateTestData(ProducerRecordConverter converter, int start, int end) { + + for (int i = start; i < end; i++) { + Map row = Map + .of("client_ip", "192.168.40.12", "server_ip", "8.8.8.8"); + ProducerRecord record = + new ProducerRecord<>("TEST-TOPIC-SOURCE", converter.convert(row)); + producer.send(record); + } + + + + } + + + @TestTemplate + public void testSourceKafkaToConsole(TestContainer container) throws IOException, InterruptedException { + + List data = getKafkaConsumerListData("TEST-TOPIC-SOURCE"); + + for (String record : data) { + log.info("Record: {}", record);} + + Assertions.assertEquals(100, data.size()); + } + + interface ProducerRecordConverter { + byte[] convert(Map row); + } + + + + @AfterAll + @Override + public void tearDown() throws Exception { + if (producer != null) { + producer.close(); + } + if (kafkaContainer != null) { + kafkaContainer.close(); + } + + } + + private void initKafkaProducer() { + Properties properties = new Properties(); + String bootstrapServers = kafkaContainer.getBootstrapServers(); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + producer = new KafkaProducer<>(properties); + } + + private Properties kafkaConsumerConfig() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consume-group"); + props.put( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + OffsetResetStrategy.EARLIEST.toString().toLowerCase()); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + return props; + } + + private Properties kafkaByteConsumerConfig() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consume-group"); + props.put( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + OffsetResetStrategy.EARLIEST.toString().toLowerCase()); + props.setProperty( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + ByteArrayDeserializer.class.getName()); + props.setProperty( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + ByteArrayDeserializer.class.getName()); + return props; + } + + private Map getKafkaConsumerData(String topicName) { + Map data = new HashMap<>(); + try (KafkaConsumer consumer = new KafkaConsumer<>(kafkaConsumerConfig())) { + consumer.subscribe(Arrays.asList(topicName)); + Map offsets = + consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0))); + Long endOffset = offsets.entrySet().iterator().next().getValue(); + Long lastProcessedOffset = -1L; + + do { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + if (lastProcessedOffset < record.offset()) { + data.put(record.key(), record.value()); + } + lastProcessedOffset = record.offset(); + } + } while (lastProcessedOffset < endOffset - 1); + } + return data; + } + + private List getKafkaConsumerListData(String topicName) { + List data = new ArrayList<>(); + try (KafkaConsumer consumer = new KafkaConsumer<>(kafkaConsumerConfig())) { + consumer.subscribe(Arrays.asList(topicName)); + Map offsets = + consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0))); + Long endOffset = offsets.entrySet().iterator().next().getValue(); + Long lastProcessedOffset = -1L; + + do { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + if (lastProcessedOffset < record.offset()) { + data.add(record.value()); + } + lastProcessedOffset = record.offset(); + } + } while (lastProcessedOffset < endOffset - 1); + } + return data; + } + + +} -- cgit v1.2.3 From 58460028cf73336292bbf9a6b05fc7612e3f891b Mon Sep 17 00:00:00 2001 From: doufenghu Date: Sat, 6 Apr 2024 01:11:57 +0800 Subject: [Feature][Tests] Improve kafka connector test by flink kafka-to-console and inline-to-kafka config. --- groot-tests/pom.xml | 2 + groot-tests/test-e2e-kafka/pom.xml | 7 ++ .../test/e2e/kafka/KafkaConnectorTest.java | 74 ++++++++++++++-------- .../test/resources/kafka_sink_inline_to_kafka.yaml | 64 +++++++++++++++++++ .../test/resources/kafka_source_to_console.yaml | 40 ++++++++++++ 5 files changed, 162 insertions(+), 25 deletions(-) create mode 100644 groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_inline_to_kafka.yaml create mode 100644 groot-tests/test-e2e-kafka/src/test/resources/kafka_source_to_console.yaml diff --git a/groot-tests/pom.xml b/groot-tests/pom.xml index 041cd9d..76f533a 100644 --- a/groot-tests/pom.xml +++ b/groot-tests/pom.xml @@ -23,6 +23,7 @@ true 2.4 4.3.1 + 1.1.8.3 @@ -73,6 +74,7 @@ ${rest-assured.version} test + diff --git a/groot-tests/test-e2e-kafka/pom.xml b/groot-tests/test-e2e-kafka/pom.xml index fbbe0e7..ab1ba72 100644 --- a/groot-tests/test-e2e-kafka/pom.xml +++ b/groot-tests/test-e2e-kafka/pom.xml @@ -36,6 +36,13 @@ test + + org.xerial.snappy + snappy-java + ${snappy-java.version} + test + + diff --git a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java index 9cd09fc..e3af025 100644 --- a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java +++ b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java @@ -1,5 +1,8 @@ package com.geedgenetworks.test.e2e.kafka; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.TypeReference; +import com.alibaba.nacos.client.naming.utils.CollectionUtils; import com.geedgenetworks.common.Event; import com.geedgenetworks.core.types.StructType; import com.geedgenetworks.core.types.Types; @@ -10,11 +13,13 @@ import com.geedgenetworks.test.common.container.TestContainer; import com.geedgenetworks.test.common.container.TestContainerId; import com.geedgenetworks.test.common.junit.DisabledOnContainer; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -23,18 +28,26 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.com.google.common.base.CharMatcher; +import org.testcontainers.shaded.com.google.common.base.Strings; +import org.testcontainers.shaded.com.google.common.primitives.Chars; import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.DockerLoggerFactory; import java.io.IOException; import java.time.Duration; import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; +import static org.awaitility.Awaitility.await; + @Slf4j @DisabledOnContainer( value = {TestContainerId.FLINK_1_17}, @@ -47,11 +60,11 @@ public class KafkaConnectorTest extends TestSuiteBase implements TestResource { private static final String KAFKA_HOST = "kafkaCluster"; private KafkaProducer producer; - + private static final String DEFAULT_TEST_TOPIC_SOURCE = "test_topic_source"; @Override @BeforeAll - public void startUp() throws Exception { + public void startUp() { kafkaContainer = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME)) .withNetwork(NETWORK) .withNetworkAliases(KAFKA_HOST) @@ -67,46 +80,57 @@ public class KafkaConnectorTest extends TestSuiteBase implements TestResource { .untilAsserted(this::initKafkaProducer); log.info("Write 100 records to topic test_topic_source"); + generateTestData(DEFAULT_TEST_TOPIC_SOURCE,0, 100); + } - StructType dataType = Types.parseStructType("client_ip: string, server_ip: String"); - JsonSerializer serializer = new JsonSerializer(dataType); - generateTestData(serializer::serialize, 0, 100); + @TestTemplate + public void testSourceKafkaJsonToConsole(TestContainer container) { + generateTestData("test_topic_json", 0, 10); + CompletableFuture.supplyAsync( + () -> { + try { + return container.executeJob("/kafka_source_to_console.yaml"); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + }); + + await().atMost(300000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + String logs = container.getServerLogs(); + Assertions.assertEquals(StringUtils.countMatches(logs, "PrintSinkFunction"), 10); + }); } - private void generateTestData(ProducerRecordConverter converter, int start, int end) { - + private void generateTestData(String topic, int start, int end) { + StructType dataType = Types.parseStructType("id: int, client_ip: string, server_ip: string, flag: string"); + JsonSerializer serializer = new JsonSerializer(dataType); for (int i = start; i < end; i++) { Map row = Map - .of("client_ip", "192.168.40.12", "server_ip", "8.8.8.8"); + .of("id", i, + "client_ip", "192.168.40.12", + "server_ip", "8.8.8.8" , + "flag", Boolean.FALSE.booleanValue()); ProducerRecord record = - new ProducerRecord<>("TEST-TOPIC-SOURCE", converter.convert(row)); + new ProducerRecord<>(topic, serializer.serialize(row)); producer.send(record); } - - } @TestTemplate - public void testSourceKafkaToConsole(TestContainer container) throws IOException, InterruptedException { - - List data = getKafkaConsumerListData("TEST-TOPIC-SOURCE"); - - for (String record : data) { - log.info("Record: {}", record);} - - Assertions.assertEquals(100, data.size()); + public void testSinkKafka(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/kafka_sink_inline_to_kafka.yaml"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + List data = getKafkaConsumerListData("test_topic"); + Assertions.assertEquals(10, data.size()); // Check if all 10 records are consumed } - interface ProducerRecordConverter { - byte[] convert(Map row); - } - - - @AfterAll @Override public void tearDown() throws Exception { diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_inline_to_kafka.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_inline_to_kafka.yaml new file mode 100644 index 0000000..dbfbc1e --- /dev/null +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_inline_to_kafka.yaml @@ -0,0 +1,64 @@ +sources: # [object] Define connector source + inline_source: + type: inline + schema: + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: log_id + type: bigint + - name: recv_time + type: bigint + - name: server_fqdn + type: string + - name: server_domain + type: string + - name: client_ip + type: string + - name: server_ip + type: string + - name: server_asn + type: string + - name: decoded_as + type: string + - name: device_group + type: string + - name: device_tag + type: string + properties: + # + # [string] Event Data, it will be parsed to Map by the specified format. + # + data: '{"recv_time": 1705565615, "log_id":206211012872372224, "tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}' + format: json + interval.per.row: 1s + repeat.count: 10 + json.ignore.parse.errors: false + + +sinks: + connector_kafka: + type: kafka + properties: + topic: test_topic + kafka.bootstrap.servers: kafkaCluster:9092 + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + format: json + log.failures.only: true + +application: # [object] Define job configuration + env: + name: example-inline-to-kafka + parallelism: 1 + shade.identifier: default + pipeline: + object-reuse: true + topology: + - name: inline_source + downstream: [ connector_kafka ] + - name: connector_kafka + downstream: [] \ No newline at end of file diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_to_console.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_to_console.yaml new file mode 100644 index 0000000..2afee47 --- /dev/null +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_to_console.yaml @@ -0,0 +1,40 @@ +sources: + kafka_source: + type : kafka + schema: + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: client_ip + type: string + - name: server_ip + type: string + properties: # [object] Kafka source properties + topic: test_topic_json + kafka.bootstrap.servers: kafkaCluster:9092 + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.group.id: test_topic_json_group + kafka.auto.offset.reset: earliest + format: json + +sinks: # [object] Define connector sink + print_sink: + type: print + properties: + mode: log_warn + format: json + +application: # [object] Define job configuration + env: + name: example-kafka-to-print + parallelism: 1 + pipeline: + object-reuse: true + execution: + restart: + strategy: no + topology: + - name: kafka_source + downstream: [print_sink] + - name: print_sink + downstream: [] \ No newline at end of file -- cgit v1.2.3