diff options
| author | doufenghu <[email protected]> | 2024-04-02 17:36:46 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-04-02 17:36:46 +0800 |
| commit | 95bcb7db323b12d7e7f864dff615d73622d7e688 (patch) | |
| tree | 919b582cbcefb9304004444fda1d350a3041a2f5 | |
| parent | 80e93523eb12f986acac73a81f2614516c718a5e (diff) | |
[Feature][Tests] add Kafka Container for unit test.
21 files changed, 351 insertions, 63 deletions
@@ -106,7 +106,7 @@ cd "groot-stream-${version}" - Run the following command to start the groot-stream server for Yarn Per-job Mode: ```shell cd "groot-stream-${version}" -./bin/start.sh -c ./config/grootstream_job_example.yaml --target yarn-per-job -n inline-to-print-job -d +./bin/start.sh -c ./config/grootstream_job_example.yaml --target yarn-per-job -Dyarn.application.name="inline-to-print-job" -n inline-to-print-job -d ``` ### Configuring diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml index daf6e32..52b5001 100644 --- a/config/grootstream_job_example.yaml +++ b/config/grootstream_job_example.yaml @@ -33,6 +33,9 @@ application: parallelism: 3 pipeline: object-reuse: true + execution: + restart: + strategy: none topology: - name: inline_source downstream: [filter_operator] diff --git a/config/template/grootstream_job_template.yaml b/config/template/grootstream_job_template.yaml index d700777..af73704 100644 --- a/config/template/grootstream_job_template.yaml +++ b/config/template/grootstream_job_template.yaml @@ -20,6 +20,7 @@ sources: # [object] Define connector source kafka.max.partition.fetch.bytes: 31457280 kafka.group.id: SESSION-RECORD-GROUP-GROOT-STREAM-001 # [string] Kafka Group ID for Consumer kafka.auto.offset.reset: latest # [string] Kafka Auto Offset Reset, default is latest + kafka.compression.type: snappy # [string] Kafka Compression Type, default is none format: json # [string] Data Format for Source. eg. json, protobuf, etc. json.ignore.parse.errors: false # [boolean] Flag to ignore parse errors, default will record the parse errors. If set true, it will ignore the parse errors. diff --git a/docs/env-config.md b/docs/env-config.md index 3e91f3e..e29acb0 100644 --- a/docs/env-config.md +++ b/docs/env-config.md @@ -44,12 +44,18 @@ Specify a list of classpath URLs via `pipeline.classpaths`, The classpaths are s You can directly use the flink parameter by prefixing `flink.`, such as `flink.execution.buffer-timeout`, `flink.object-reuse`, etc. More details can be found in the official [flink documentation](https://flink.apache.org/). Of course, you can use groot stream parameter, here are some parameter names corresponding to the names in Flink. -| Groot Stream | Flink | -|--------------------------|--------------------------------| -| execution.buffer-timeout | flink.execution.buffer-timeout | -| pipeline.object-reuse | flink.object-reuse | -| pipeline.max-parallelism | flink.pipeline.max-parallelism | -| ... | ... | +| Groot Stream | Flink | +|----------------------------------------|---------------------------------------------------------------| +| execution.buffer-timeout | flink.execution.buffer-timeout | +| pipeline.object-reuse | flink.object-reuse | +| pipeline.max-parallelism | flink.pipeline.max-parallelism | +| execution.restart.strategy | flink.restart-strategy | +| execution.restart.attempts | flink.restart-strategy.fixed-delay.attempts | +| execution.restart.delayBetweenAttempts | flink.restart-strategy.fixed-delay.delay | +| execution.restart.failure-rate | flink.restart-strategy.failure-rate.max-failures-per-interval | +| execution.restart.failureInterval | flink.restart-strategy.failure-rate.failure-rate-interval | +| execution.restart.delayInterval | flink.restart-strategy.failure-rate.delay | +| ... | ... | diff --git a/groot-bootstrap/src/main/bin/stop.sh b/groot-bootstrap/src/main/bin/stop.sh index 9fd0469..91e24b9 100644 --- a/groot-bootstrap/src/main/bin/stop.sh +++ b/groot-bootstrap/src/main/bin/stop.sh @@ -16,7 +16,7 @@ stop_jobs() { done ;; yarn-per-job) - # Command to stop YARN applications for the specified job name + # Command to stop YARN applications for the specified Yarn cluster app name yarn application -list -appStates RUNNING | grep "$job_name" | awk '{print $1}' | while read -r appId do yarn application -kill "$appId" @@ -65,9 +65,6 @@ fi deployment_mode=$1 # standalone, yarn-per-job, or yarn-session job_name=$2 # The Flink job name to stop -# Trim whitespace from job_name -job_name=$(echo "$job_name" | tr -d '[:space:]') - # Checking for empty input arguments if [ -z "$deployment_mode" ] || [ -z "$job_name" ]; then display_usage diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamServer.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamServer.java index 38d91c5..93cb972 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamServer.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamServer.java @@ -43,7 +43,7 @@ public class GrootStreamServer { private static void outputFatalError(Throwable throwable) { log.error("\\n\\n===============================================================================\\n\\n"); String errorMsg = throwable.getMessage(); - log.error("Fatal Error ,Reason is :{} \n", errorMsg); + log.error("Fatal Error: {} \n", errorMsg); log.error("Exception StackTrace :{}", ExceptionUtils.getStackTrace(throwable)); log.error("\\n\\n===============================================================================\\n\\n"); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java index 79cd8bf..13db3d4 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java @@ -52,7 +52,7 @@ public final class EnvironmentUtil { if (envConfig.hasPath(ExecutionConfigKeyName.RESTART_STRATEGY)) { String restartStrategy = envConfig.getString(ExecutionConfigKeyName.RESTART_STRATEGY); switch (restartStrategy.toLowerCase()) { - case "no": + case "none": executionConfig.setRestartStrategy(RestartStrategies.noRestart()); break; case "fixed-delay": @@ -68,18 +68,18 @@ public final class EnvironmentUtil { long delayInterval = envConfig.getLong(ExecutionConfigKeyName.RESTART_DELAY_INTERVAL); executionConfig.setRestartStrategy( RestartStrategies.failureRateRestart( - rate, - Time.of(failureInterval, TimeUnit.MILLISECONDS), - Time.of(delayInterval, TimeUnit.MILLISECONDS))); + rate, // max failures per interval + Time.of(failureInterval, TimeUnit.MILLISECONDS), //time interval for measuring failure rate + Time.of(delayInterval, TimeUnit.MILLISECONDS))); // delay break; default: log.warn( - "set restart.strategy failed, unknown restart.strategy [{}],only support no,fixed-delay,failure-rate", + "Set restart strategy failed, unknown restart strategy [{}],only support no,fixed-delay,failure-rate", restartStrategy); } } } catch (Exception e) { - log.warn("set restart.strategy in config '{}' exception", envConfig, e); + log.warn("Set restart strategy in config '{}' exception", envConfig, e); } } diff --git a/groot-bootstrap/src/main/resources/log4j.properties b/groot-bootstrap/src/main/resources/log4j.properties deleted file mode 100644 index db5d9e5..0000000 --- a/groot-bootstrap/src/main/resources/log4j.properties +++ /dev/null @@ -1,22 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# Set everything to be logged to the console -log4j.rootCategory=INFO, console -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.err -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory index 21e390d..6d6a1bb 100644 --- a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory +++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory @@ -1,2 +1,2 @@ com.geedgenetworks.core.connector.inline.InlineTableFactory -com.geedgenetworks.core.connector.print.PrintTableFactory
\ No newline at end of file +com.geedgenetworks.core.connector.print.PrintTableFactory diff --git a/groot-examples/end-to-end-example/pom.xml b/groot-examples/end-to-end-example/pom.xml index 0839e8e..34709fd 100644 --- a/groot-examples/end-to-end-example/pom.xml +++ b/groot-examples/end-to-end-example/pom.xml @@ -13,8 +13,9 @@ <name>Groot : Examples : End-to-end</name> <properties> - <maven.install.skip>true</maven.install.skip> - <maven.deploy.skip>true</maven.deploy.skip> + <maven.install.skip>false</maven.install.skip> + <maven.deploy.skip>false</maven.deploy.skip> </properties> + </project>
\ No newline at end of file diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java index aa2cb76..1f236d7 100644 --- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java +++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java @@ -13,7 +13,7 @@ import java.nio.file.Paths; public class GrootStreamExample { public static void main(String[] args) throws FileNotFoundException, URISyntaxException { - String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print.yaml"; + String configPath = args.length > 0 ? args[0] : "/examples/kafka_to_print.yaml"; String configFile = getTestConfigFile(configPath); ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs(); executeCommandArgs.setConfigFile(configFile); diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml index e883bce..bf8ebf2 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml @@ -30,6 +30,8 @@ application: # [object] Define job configuration parallelism: 1 pipeline: object-reuse: true + flink: + restart-strategy: none topology: - name: kafka_source downstream: [print_sink] diff --git a/groot-examples/pom.xml b/groot-examples/pom.xml index fc08086..1fb6212 100644 --- a/groot-examples/pom.xml +++ b/groot-examples/pom.xml @@ -30,28 +30,27 @@ <dependency> <groupId>com.geedgenetworks</groupId> <artifactId>groot-bootstrap</artifactId> - <version>${revision}</version> + <version>${project.version}</version> </dependency> - <dependency> <groupId>com.geedgenetworks</groupId> <artifactId>connector-kafka</artifactId> - <version>${revision}</version> + <version>${project.version}</version> <scope>${scope}</scope> </dependency> <dependency> <groupId>com.geedgenetworks</groupId> <artifactId>connector-clickhouse</artifactId> - <version>${revision}</version> + <version>${project.version}</version> <scope>${scope}</scope> </dependency> <dependency> <groupId>com.geedgenetworks</groupId> <artifactId>connector-ipfix-collector</artifactId> - <version>${revision}</version> + <version>${project.version}</version> <scope>${scope}</scope> </dependency> diff --git a/groot-tests/pom.xml b/groot-tests/pom.xml index 5882a56..041cd9d 100644 --- a/groot-tests/pom.xml +++ b/groot-tests/pom.xml @@ -15,6 +15,7 @@ <modules> <module>test-common</module> <module>test-e2e-base</module> + <module>test-e2e-kafka</module> </modules> <properties> @@ -26,6 +27,25 @@ <dependencies> <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>groot-bootstrap</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.typesafe</groupId> + <artifactId>config</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>format-json</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> <groupId>org.testcontainers</groupId> <artifactId>testcontainers</artifactId> </dependency> diff --git a/groot-tests/test-common/pom.xml b/groot-tests/test-common/pom.xml index c6cb7bf..c086f41 100644 --- a/groot-tests/test-common/pom.xml +++ b/groot-tests/test-common/pom.xml @@ -17,16 +17,7 @@ </properties> <dependencies> - <dependency> - <groupId>com.typesafe</groupId> - <artifactId>config</artifactId> - </dependency> - - <dependency> - <groupId>com.geedgenetworks</groupId> - <artifactId>groot-bootstrap</artifactId> - <version>${revision}</version> - </dependency> + </dependencies> diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java index aed38b9..29154e5 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java @@ -55,6 +55,8 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer { new LogMessageWaitStrategy() .withRegEx(".*Starting the resource manager.*") .withStartupTimeout(Duration.ofMinutes(2))); + + // Copy groot-stream bootstrap and some other files to the container copyGrootStreamStarterToContainer(jobManager); copyGrootStreamStarterLoggingToContainer(jobManager); diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java index 3f8435d..58eee29 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java @@ -71,7 +71,7 @@ public final class ContainerUtil { Paths.get(GrootStreamHomeInContainer, "bootstrap", startJarName).toString()); - // copy lib + // copy libs String hbaseClientJar = "hbase-client-shaded-" + getProjectVersion() + ".jar"; Path hbaseClientJarPath = Paths.get(PROJECT_ROOT_PATH, "groot-shaded/hbase-client-shaded", "target", hbaseClientJar); diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintTest.java index b84d169..1e94cdc 100644 --- a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java +++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintTest.java @@ -5,7 +5,6 @@ import com.alibaba.fastjson2.TypeReference; import com.alibaba.nacos.client.naming.utils.CollectionUtils; import com.geedgenetworks.test.common.TestSuiteBase; import com.geedgenetworks.test.common.container.AbstractTestFlinkContainer; -import com.geedgenetworks.test.common.container.TestContainer; import com.geedgenetworks.test.common.container.TestContainerId; import com.geedgenetworks.test.common.junit.DisabledOnContainer; import lombok.extern.slf4j.Slf4j; @@ -25,14 +24,14 @@ import static org.awaitility.Awaitility.await; value = {TestContainerId.FLINK_1_17}, type = {}, disabledReason = "only flink adjusts the parameter configuration rules") -public class InlineToPrint extends TestSuiteBase { +public class InlineToPrintTest extends TestSuiteBase { @TestTemplate public void testInlineToPrint(AbstractTestFlinkContainer container) throws IOException, InterruptedException { CompletableFuture.supplyAsync( () -> { try { - return container.executeJob("/inline_to_print.yaml"); + return container.executeJob("/kafka_to_print.yaml"); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); diff --git a/groot-tests/test-e2e-base/src/test/resources/kafka_to_print.yaml b/groot-tests/test-e2e-base/src/test/resources/kafka_to_print.yaml new file mode 100644 index 0000000..b1e4f35 --- /dev/null +++ b/groot-tests/test-e2e-base/src/test/resources/kafka_to_print.yaml @@ -0,0 +1,40 @@ +sources: + kafka_source: + type : kafka + schema: + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: client_ip + type: string + - name: server_ip + type: string + properties: # [object] Kafka source properties + topic: SESSION-RECORD + kafka.bootstrap.servers: 192.168.44.11:9092 + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.group.id: GROOT-STREAM-EXAMPLE-KAFKA-TO-PRINT + kafka.auto.offset.reset: latest + format: json + +sinks: # [object] Define connector sink + print_sink: + type: print + properties: + mode: log_info + format: json + +application: # [object] Define job configuration + env: + name: example-kafka-to-print + parallelism: 1 + pipeline: + object-reuse: true + execution: + restart: + strategy: no + topology: + - name: kafka_source + downstream: [print_sink] + - name: print_sink + downstream: []
\ No newline at end of file diff --git a/groot-tests/test-e2e-kafka/pom.xml b/groot-tests/test-e2e-kafka/pom.xml new file mode 100644 index 0000000..fbbe0e7 --- /dev/null +++ b/groot-tests/test-e2e-kafka/pom.xml @@ -0,0 +1,45 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>com.geedgenetworks</groupId> + <artifactId>groot-tests</artifactId> + <version>${revision}</version> + </parent> + + <artifactId>test-e2e-kafka</artifactId> + <name>Groot : Tests : E2E : Kafka</name> + + <dependencies> + + <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>test-common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>kafka</artifactId> + <version>${testcontainer.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>connector-kafka</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + + + + </dependencies> + + +</project>
\ No newline at end of file diff --git a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java new file mode 100644 index 0000000..9cd09fc --- /dev/null +++ b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java @@ -0,0 +1,204 @@ +package com.geedgenetworks.test.e2e.kafka; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.types.StructType; +import com.geedgenetworks.core.types.Types; +import com.geedgenetworks.formats.json.JsonSerializer; +import com.geedgenetworks.test.common.TestResource; +import com.geedgenetworks.test.common.TestSuiteBase; +import com.geedgenetworks.test.common.container.TestContainer; +import com.geedgenetworks.test.common.container.TestContainerId; +import com.geedgenetworks.test.common.junit.DisabledOnContainer; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +@Slf4j +@DisabledOnContainer( + value = {TestContainerId.FLINK_1_17}, + disabledReason = "Override TestSuiteBase @DisabledOnContainer") +public class KafkaConnectorTest extends TestSuiteBase implements TestResource { + + private KafkaContainer kafkaContainer; + + private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.4.0"; + private static final String KAFKA_HOST = "kafkaCluster"; + private KafkaProducer<byte[], byte[]> producer; + + + + @Override + @BeforeAll + public void startUp() throws Exception { + kafkaContainer = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME)) + .withNetwork(NETWORK) + .withNetworkAliases(KAFKA_HOST) + .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME))); + Startables.deepStart(Stream.of(kafkaContainer)).join(); + log.info("Kafka container started successfully"); + + Awaitility.given() + .ignoreExceptions() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(this::initKafkaProducer); + + log.info("Write 100 records to topic test_topic_source"); + + StructType dataType = Types.parseStructType("client_ip: string, server_ip: String"); + JsonSerializer serializer = new JsonSerializer(dataType); + generateTestData(serializer::serialize, 0, 100); + + + } + + private void generateTestData(ProducerRecordConverter converter, int start, int end) { + + for (int i = start; i < end; i++) { + Map<String, Object> row = Map + .of("client_ip", "192.168.40.12", "server_ip", "8.8.8.8"); + ProducerRecord<byte[], byte[]> record = + new ProducerRecord<>("TEST-TOPIC-SOURCE", converter.convert(row)); + producer.send(record); + } + + + + } + + + @TestTemplate + public void testSourceKafkaToConsole(TestContainer container) throws IOException, InterruptedException { + + List<String> data = getKafkaConsumerListData("TEST-TOPIC-SOURCE"); + + for (String record : data) { + log.info("Record: {}", record);} + + Assertions.assertEquals(100, data.size()); + } + + interface ProducerRecordConverter { + byte[] convert(Map<String, Object> row); + } + + + + @AfterAll + @Override + public void tearDown() throws Exception { + if (producer != null) { + producer.close(); + } + if (kafkaContainer != null) { + kafkaContainer.close(); + } + + } + + private void initKafkaProducer() { + Properties properties = new Properties(); + String bootstrapServers = kafkaContainer.getBootstrapServers(); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + producer = new KafkaProducer<>(properties); + } + + private Properties kafkaConsumerConfig() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consume-group"); + props.put( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + OffsetResetStrategy.EARLIEST.toString().toLowerCase()); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + return props; + } + + private Properties kafkaByteConsumerConfig() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consume-group"); + props.put( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + OffsetResetStrategy.EARLIEST.toString().toLowerCase()); + props.setProperty( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + ByteArrayDeserializer.class.getName()); + props.setProperty( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + ByteArrayDeserializer.class.getName()); + return props; + } + + private Map<String, String> getKafkaConsumerData(String topicName) { + Map<String, String> data = new HashMap<>(); + try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig())) { + consumer.subscribe(Arrays.asList(topicName)); + Map<TopicPartition, Long> offsets = + consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0))); + Long endOffset = offsets.entrySet().iterator().next().getValue(); + Long lastProcessedOffset = -1L; + + do { + ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord<String, String> record : records) { + if (lastProcessedOffset < record.offset()) { + data.put(record.key(), record.value()); + } + lastProcessedOffset = record.offset(); + } + } while (lastProcessedOffset < endOffset - 1); + } + return data; + } + + private List<String> getKafkaConsumerListData(String topicName) { + List<String> data = new ArrayList<>(); + try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig())) { + consumer.subscribe(Arrays.asList(topicName)); + Map<TopicPartition, Long> offsets = + consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0))); + Long endOffset = offsets.entrySet().iterator().next().getValue(); + Long lastProcessedOffset = -1L; + + do { + ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord<String, String> record : records) { + if (lastProcessedOffset < record.offset()) { + data.add(record.value()); + } + lastProcessedOffset = record.offset(); + } + } while (lastProcessedOffset < endOffset - 1); + } + return data; + } + + +} |
