summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-03-06 20:26:08 +0800
committerdoufenghu <[email protected]>2024-03-06 20:26:08 +0800
commit80e5a4756ca97a52709922956b586fda11377bc5 (patch)
tree8c4cdd1dc156ca082fe1c9c9450014d496e6874f
parentf3ecbcc30d01c70bb96634c36c4d1ca7f1e0e869 (diff)
[Feature][Test] Add TestContainner lib for end-to-end testing. Add test-common and test-e2e-base submodules. Support Inline to Print test in Flink13 Container.
-rw-r--r--groot-bootstrap/pom.xml23
-rw-r--r--groot-connectors/connector-clickhouse/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory2
-rw-r--r--groot-connectors/connector-ipfix-collector/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory2
-rw-r--r--groot-connectors/connector-kafka/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory2
-rw-r--r--groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java3
-rw-r--r--groot-tests/pom.xml57
-rw-r--r--groot-tests/test-common/pom.xml54
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/AbstractFlinkContainer.java37
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/TestResource.java27
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/TestSuiteBase.java29
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java174
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java144
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerExtendedFactory.java11
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java291
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/EngineType.java16
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java47
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainer.java33
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java19
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainersFactory.java7
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestHelper.java23
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/AnnotationUtil.java37
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/ContainerTestingExtension.java84
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/DisabledOnContainer.java22
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestCaseInvocationContextProvider.java114
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestContainerExtension.java12
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestContainers.java11
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestLoggerExtension.java60
-rw-r--r--groot-tests/test-common/src/test/resources/log4j2.properties42
-rw-r--r--groot-tests/test-e2e-base/pom.xml49
-rw-r--r--groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/Flink13Container.java34
-rw-r--r--groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java16
-rw-r--r--groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml44
-rw-r--r--plugin-mapping.properties5
-rw-r--r--pom.xml61
34 files changed, 1578 insertions, 14 deletions
diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml
index 6f6b5f6..62c2dc8 100644
--- a/groot-bootstrap/pom.xml
+++ b/groot-bootstrap/pom.xml
@@ -156,6 +156,29 @@
<build>
<finalName>${project.artifactId}</finalName>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-starter-logging-package-for-e2e</id>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <excludeTransitive>false</excludeTransitive>
+ <includeGroupIds>org.slf4j,org.apache.logging.log4j</includeGroupIds>
+ <includeArtifactIds>slf4j-api,jcl-over-slf4j,log4j-slf4j-impl,log4j-api,log4j-core</includeArtifactIds>
+ <outputDirectory>${project.build.directory}/logging-e2e</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </pluginManagement>
</build>
</project>
diff --git a/groot-connectors/connector-clickhouse/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-connectors/connector-clickhouse/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
index 7950033..9f8187a 100644
--- a/groot-connectors/connector-clickhouse/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
+++ b/groot-connectors/connector-clickhouse/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
@@ -1 +1 @@
-com.geedgenetworks.connectors.clickhouse.ClickHouseTableFactory
+com.geedgenetworks.connectors.clickhouse.ClickHouseTableFactory \ No newline at end of file
diff --git a/groot-connectors/connector-ipfix-collector/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-connectors/connector-ipfix-collector/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
index 2400d05..bcf4133 100644
--- a/groot-connectors/connector-ipfix-collector/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
+++ b/groot-connectors/connector-ipfix-collector/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
@@ -1 +1 @@
-com.geedgenetworks.connectors.ipfix.collector.IPFixTableFactory
+com.geedgenetworks.connectors.ipfix.collector.IPFixTableFactory \ No newline at end of file
diff --git a/groot-connectors/connector-kafka/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-connectors/connector-kafka/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
index 95a23d8..531df31 100644
--- a/groot-connectors/connector-kafka/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
+++ b/groot-connectors/connector-kafka/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
@@ -1 +1 @@
-com.geedgenetworks.connectors.kafka.KafkaTableFactory
+com.geedgenetworks.connectors.kafka.KafkaTableFactory \ No newline at end of file
diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
index 825f79a..aa2cb76 100644
--- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
+++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
@@ -13,11 +13,12 @@ import java.nio.file.Paths;
public class GrootStreamExample {
public static void main(String[] args) throws FileNotFoundException, URISyntaxException {
- String configPath = args.length > 0 ? args[0] : "/examples/kafka_to_print.yaml";
+ String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print.yaml";
String configFile = getTestConfigFile(configPath);
ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs();
executeCommandArgs.setConfigFile(configFile);
executeCommandArgs.setCheckConfig(false);
+ executeCommandArgs.setVersion(false);
executeCommandArgs.setDeployMode(DeployMode.RUN);
executeCommandArgs.setTargetType(TargetType.LOCAL);
GrootStreamServer.run(executeCommandArgs.buildCommand());
diff --git a/groot-tests/pom.xml b/groot-tests/pom.xml
index ec2bcd9..5882a56 100644
--- a/groot-tests/pom.xml
+++ b/groot-tests/pom.xml
@@ -10,10 +10,65 @@
</parent>
<artifactId>groot-tests</artifactId>
- <name>Groot : Tests </name>
+ <packaging>pom</packaging>
+ <name>Groot : Tests : </name>
+ <modules>
+ <module>test-common</module>
+ <module>test-e2e-base</module>
+ </modules>
+
<properties>
<maven.install.skip>true</maven.install.skip>
<maven.deploy.skip>true</maven.deploy.skip>
+ <maven-jar-plugin.version>2.4</maven-jar-plugin.version>
+ <rest-assured.version>4.3.1</rest-assured.version>
</properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ </dependency>
+ <!-- Testcontainers 1.x is tightly coupled with the JUnit 4.x rule API-->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit4.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.rest-assured</groupId>
+ <artifactId>rest-assured</artifactId>
+ <version>${rest-assured.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.rest-assured</groupId>
+ <artifactId>json-path</artifactId>
+ <version>${rest-assured.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <skip>${test.dependency.skip}</skip>
+ <appendOutput>true</appendOutput>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+
+
+
</project> \ No newline at end of file
diff --git a/groot-tests/test-common/pom.xml b/groot-tests/test-common/pom.xml
new file mode 100644
index 0000000..c6cb7bf
--- /dev/null
+++ b/groot-tests/test-common/pom.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-tests</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>test-common</artifactId>
+ <name>Groot : Tests : Common</name>
+
+ <properties>
+
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-bootstrap</artifactId>
+ <version>${revision}</version>
+ </dependency>
+ </dependencies>
+
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>${maven-jar-plugin.version}</version>
+ <configuration>
+ <skip>false</skip>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+
+</project> \ No newline at end of file
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/AbstractFlinkContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/AbstractFlinkContainer.java
new file mode 100644
index 0000000..3444e0d
--- /dev/null
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/AbstractFlinkContainer.java
@@ -0,0 +1,37 @@
+package com.geedgenetworks.test.common;
+
+
+import com.geedgenetworks.test.common.container.AbstractTestFlinkContainer;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestInstance;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+@Slf4j
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class AbstractFlinkContainer extends AbstractTestFlinkContainer {
+ @Override
+ @BeforeAll
+ public void startUp() throws Exception {
+ super.startUp();
+ log.info("The TestContainer[{}] is running.", identifier());
+ }
+
+ @Override
+ @AfterAll
+ public void tearDown() throws Exception {
+ super.tearDown();
+ log.info("The TestContainer[{}] is closed.", identifier());
+ }
+
+
+ public Container.ExecResult executeGrootStreamFlinkJob(String confFile)
+ throws IOException, InterruptedException {
+ return executeJob(confFile);
+ }
+
+
+}
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/TestResource.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/TestResource.java
new file mode 100644
index 0000000..f84307e
--- /dev/null
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/TestResource.java
@@ -0,0 +1,27 @@
+package com.geedgenetworks.test.common;
+
+/**
+ * Basic abstractions for all resources used in connector testing framework.
+ *
+ * <p>Lifecycle of test resources will be managed by the framework.
+ */
+public interface TestResource {
+
+ /**
+ * Start up the test resource.
+ *
+ * <p>The implementation of this method should be idempotent.
+ *
+ * @throws Exception if anything wrong when starting the resource
+ */
+ void startUp() throws Exception;
+
+ /**
+ * Tear down the test resource.
+ *
+ * <p>The test resource should be able to tear down even without a startup (could be a no-op).
+ *
+ * @throws Exception if anything wrong when tearing the resource down
+ */
+ void tearDown() throws Exception;
+}
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/TestSuiteBase.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/TestSuiteBase.java
new file mode 100644
index 0000000..83f4a7c
--- /dev/null
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/TestSuiteBase.java
@@ -0,0 +1,29 @@
+package com.geedgenetworks.test.common;
+
+import com.geedgenetworks.test.common.container.ContainerUtil;
+import com.geedgenetworks.test.common.container.TestContainer;
+import com.geedgenetworks.test.common.container.TestContainersFactory;
+import com.geedgenetworks.test.common.junit.ContainerTestingExtension;
+import com.geedgenetworks.test.common.junit.TestCaseInvocationContextProvider;
+import com.geedgenetworks.test.common.junit.TestContainers;
+import com.geedgenetworks.test.common.junit.TestLoggerExtension;
+import com.github.dockerjava.api.DockerClient;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.containers.Network;
+
+@ExtendWith({
+ ContainerTestingExtension.class,
+ TestLoggerExtension.class,
+ TestCaseInvocationContextProvider.class
+})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class TestSuiteBase {
+ protected static final Network NETWORK = TestContainer.NETWORK;
+ @TestContainers
+ private TestContainersFactory containersFactory = ContainerUtil::discoverTestContainers;
+ protected DockerClient dockerClient = DockerClientFactory.lazyClient();
+
+
+}
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java
new file mode 100644
index 0000000..54719bb
--- /dev/null
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java
@@ -0,0 +1,174 @@
+package com.geedgenetworks.test.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import static com.geedgenetworks.test.common.container.ContainerUtil.PROJECT_ROOT_PATH;
+public abstract class AbstractTestContainer implements TestContainer {
+ protected static final Logger LOG = LoggerFactory.getLogger(AbstractTestContainer.class);
+
+ public static final String GROOTSTREAM_HOME = "/tmp/grootstream/";
+
+ protected final String startModuleName;
+
+ protected final String startModuleFullPath;
+
+ public AbstractTestContainer() {
+ this.startModuleName = getStartModuleName();
+ this.startModuleFullPath =
+ PROJECT_ROOT_PATH
+ + File.separator
+ + this.startModuleName;
+ ContainerUtil.checkPathExist(startModuleFullPath);
+ }
+
+ protected abstract String getDockerImage();
+
+ protected abstract String getStartModuleName();
+
+ protected abstract String getStartShellName();
+
+ protected abstract String getConnectorModulePath();
+
+ protected abstract String getConnectorType();
+
+ protected abstract String getSavePointCommand();
+
+ protected abstract String getRestoreCommand();
+
+ protected abstract String getConnectorNamePrefix();
+
+ protected abstract List<String> getExtraStartShellCommands();
+
+ protected void executeExtraCommands(GenericContainer<?> container)
+ throws IOException, InterruptedException {
+ // do nothing
+ }
+
+ protected void copyGrootStreamStarterToContainer(GenericContainer<?> container) {
+ ContainerUtil.copyGrootStreamStarterToContainer(
+ container, this.startModuleName, this.startModuleFullPath, GROOTSTREAM_HOME);
+ }
+
+ protected void copyGrootStreamStarterLoggingToContainer(GenericContainer<?> container) {
+ ContainerUtil.copyGrootStreamStarterLoggingToContainer(
+ container, this.startModuleFullPath, GROOTSTREAM_HOME);
+ }
+
+ protected Container.ExecResult executeJob(GenericContainer<?> container, String confFile)
+ throws IOException, InterruptedException {
+ final String confInContainerPath = ContainerUtil.copyConfigFileToContainer(container, confFile);
+ // copy connectors
+ ContainerUtil.copyConnectorJarToContainer(
+ container,
+ confFile,
+ getConnectorModulePath(),
+ getConnectorNamePrefix(),
+ getConnectorType(),
+ GROOTSTREAM_HOME);
+ final List<String> command = new ArrayList<>();
+ String binPath = Paths.get(GROOTSTREAM_HOME, "bin", getStartShellName()).toString();
+ // base command
+ command.add(ContainerUtil.adaptPathForWin(binPath));
+ command.add("--config");
+ command.add(ContainerUtil.adaptPathForWin(confInContainerPath));
+ /* command.add("--target");
+ command.add("remote");*/
+ command.addAll(getExtraStartShellCommands());
+ return executeCommand(container, command);
+ }
+
+ protected Container.ExecResult savepointJob(GenericContainer<?> container, String jobId)
+ throws IOException, InterruptedException {
+ final List<String> command = new ArrayList<>();
+ String binPath = Paths.get(GROOTSTREAM_HOME, "bin", getStartShellName()).toString();
+ // base command
+ command.add(ContainerUtil.adaptPathForWin(binPath));
+ command.add(getSavePointCommand());
+ command.add(jobId);
+ command.addAll(getExtraStartShellCommands());
+ return executeCommand(container, command);
+ }
+
+ protected Container.ExecResult restoreJob(
+ GenericContainer<?> container, String confFile, String jobId)
+ throws IOException, InterruptedException {
+ final String confInContainerPath = ContainerUtil.copyConfigFileToContainer(container, confFile);
+ // copy connectors
+ ContainerUtil.copyConnectorJarToContainer(
+ container,
+ confFile,
+ getConnectorModulePath(),
+ getConnectorNamePrefix(),
+ getConnectorType(),
+ GROOTSTREAM_HOME);
+ final List<String> command = new ArrayList<>();
+ String binPath = Paths.get(GROOTSTREAM_HOME, "bin", getStartShellName()).toString();
+ // base command
+ command.add(ContainerUtil.adaptPathForWin(binPath));
+ command.add("--config");
+ command.add(ContainerUtil.adaptPathForWin(confInContainerPath));
+ command.add(getRestoreCommand());
+ command.add(jobId);
+ command.addAll(getExtraStartShellCommands());
+ return executeCommand(container, command);
+ }
+
+ protected Container.ExecResult executeCommand(
+ GenericContainer<?> container, List<String> command)
+ throws IOException, InterruptedException {
+ String commandStr = String.join(" ", command);
+ LOG.info(
+ "Execute command in container[{}] "
+ + "\n==================== Shell Command start ====================\n"
+ + "{}"
+ + "\n==================== Shell Command end ====================",
+ container.getDockerImageName(),
+ commandStr);
+ Container.ExecResult execResult = container.execInContainer("bash", "-c", commandStr);
+
+ if (execResult.getStdout() != null && !execResult.getStdout().isEmpty()) {
+ LOG.info(
+ "Container[{}] command {} STDOUT:"
+ + "\n==================== STDOUT start ====================\n"
+ + "{}"
+ + "\n==================== STDOUT end ====================",
+ container.getDockerImageName(),
+ commandStr,
+ execResult.getStdout());
+ }
+ if (execResult.getStderr() != null && !execResult.getStderr().isEmpty()) {
+ LOG.error(
+ "Container[{}] command {} STDERR:"
+ + "\n==================== STDERR start ====================\n"
+ + "{}"
+ + "\n==================== STDERR end ====================",
+ container.getDockerImageName(),
+ commandStr,
+ execResult.getStderr());
+ }
+
+ if (execResult.getExitCode() != 0) {
+ LOG.info(
+ "Container[{}] command {} Server Log:"
+ + "\n==================== Server Log start ====================\n"
+ + "{}"
+ + "\n==================== Server Log end ====================",
+ container.getDockerImageName(),
+ commandStr,
+ container.getLogs());
+ }
+
+ return execResult;
+ }
+
+
+
+}
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
new file mode 100644
index 0000000..aed38b9
--- /dev/null
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
@@ -0,0 +1,144 @@
+package com.geedgenetworks.test.common.container;
+
+import com.google.common.collect.Lists;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+
+@NoArgsConstructor
+@Slf4j
+public abstract class AbstractTestFlinkContainer extends AbstractTestContainer {
+ protected static final List<String> DEFAULT_FLINK_PROPERTIES =
+ Arrays.asList(
+ "jobmanager.rpc.address: jobmanager",
+ "taskmanager.numberOfTaskSlots: 10",
+ "parallelism.default: 4",
+ "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false");
+
+ protected static final String DEFAULT_DOCKER_IMAGE = "flink:1.13.1-scala_2.11-java11";
+
+ protected GenericContainer<?> jobManager;
+ protected GenericContainer<?> taskManager;
+
+ @Override
+ protected String getDockerImage() {
+ return DEFAULT_DOCKER_IMAGE;
+ }
+
+ @Override
+ public void startUp() throws Exception {
+ final String dockerImage = getDockerImage();
+ final String properties = String.join("\n", getFlinkProperties());
+ jobManager =
+ new GenericContainer<>(dockerImage)
+ .withCommand("jobmanager")
+ .withNetwork(NETWORK)
+ .withNetworkAliases("jobmanager")
+ .withExposedPorts()
+ .withEnv("FLINK_PROPERTIES", properties)
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+ DockerLoggerFactory.getLogger(dockerImage + ":jobmanager")))
+ .waitingFor(
+ new LogMessageWaitStrategy()
+ .withRegEx(".*Starting the resource manager.*")
+ .withStartupTimeout(Duration.ofMinutes(2)));
+ copyGrootStreamStarterToContainer(jobManager);
+ copyGrootStreamStarterLoggingToContainer(jobManager);
+
+ jobManager.setPortBindings(Lists.newArrayList(String.format("%s:%s", 8081, 8084)));
+
+ taskManager =
+ new GenericContainer<>(dockerImage)
+ .withCommand("taskmanager")
+ .withNetwork(NETWORK)
+ .withNetworkAliases("taskmanager")
+ .withEnv("FLINK_PROPERTIES", properties)
+ .dependsOn(jobManager)
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+ DockerLoggerFactory.getLogger(
+ dockerImage + ":taskmanager")))
+ .waitingFor(
+ new LogMessageWaitStrategy()
+ .withRegEx(
+ ".*Successful registration at resource manager.*")
+ .withStartupTimeout(Duration.ofMinutes(2)));
+
+ Startables.deepStart(Stream.of(jobManager)).join();
+ Startables.deepStart(Stream.of(taskManager)).join();
+ // execute extra commands
+ executeExtraCommands(jobManager);
+ }
+
+ protected List<String> getFlinkProperties() {
+ return DEFAULT_FLINK_PROPERTIES;
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ if (taskManager != null) {
+ taskManager.stop();
+ }
+ if (jobManager != null) {
+ jobManager.stop();
+ }
+ }
+
+ @Override
+ protected String getSavePointCommand() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ protected String getRestoreCommand() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ protected List<String> getExtraStartShellCommands() {
+ return Collections.emptyList();
+ }
+
+
+ public void executeExtraCommands(ContainerExtendedFactory extendedFactory)
+ throws IOException, InterruptedException {
+ extendedFactory.extend(jobManager);
+ extendedFactory.extend(taskManager);
+ }
+
+
+ @Override
+ public Container.ExecResult executeJob(String confFile)
+ throws IOException, InterruptedException {
+ log.info("test in container: {}", identifier());
+ return executeJob(jobManager, confFile);
+ }
+
+ @Override
+ public String getServerLogs() {
+ return jobManager.getLogs() + "\n" + taskManager.getLogs();
+ }
+
+
+ public String executeJobManagerInnerCommand(String command)
+ throws IOException, InterruptedException {
+ return jobManager.execInContainer("bash", "-c", command).getStdout();
+ }
+
+
+
+
+}
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerExtendedFactory.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerExtendedFactory.java
new file mode 100644
index 0000000..c7c1df0
--- /dev/null
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerExtendedFactory.java
@@ -0,0 +1,11 @@
+package com.geedgenetworks.test.common.container;
+
+import org.testcontainers.containers.GenericContainer;
+
+import java.io.IOException;
+
+@FunctionalInterface
+public interface ContainerExtendedFactory {
+ void extend(GenericContainer<?> engineMasterContainer) throws IOException, InterruptedException;
+
+}
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java
new file mode 100644
index 0000000..39c1e2c
--- /dev/null
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java
@@ -0,0 +1,291 @@
+package com.geedgenetworks.test.common.container;
+
+import cn.hutool.core.util.XmlUtil;
+import cn.hutool.json.XML;
+import com.geedgenetworks.bootstrap.utils.ConfigBuilder;
+import com.geedgenetworks.common.config.CheckConfigUtil;
+import com.geedgenetworks.common.config.CheckResult;
+import com.geedgenetworks.common.config.SourceConfigOptions;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.ConfigValidationException;
+import com.geedgenetworks.core.pojo.SourceConfig;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.Types;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigResolveOptions;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.Assertions;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.MountableFile;
+import org.w3c.dom.Attr;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+import javax.xml.xpath.XPathConstants;
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.stream.Collectors;
+
+@Slf4j
+public final class ContainerUtil {
+
+ public static final String PROJECT_ROOT_PATH = getProjectRootPath();
+ public static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties";
+
+
+ private static String getProjectRootPath() {
+ String testCommonRootModuleDir = "groot-tests";
+ Path path = Paths.get(System.getProperty("user.dir"));
+ while (!path.endsWith(Paths.get(testCommonRootModuleDir))) {
+ path = path.getParent();
+ }
+ return path.getParent().toString();
+ }
+
+ public static String getProjectVersion() {
+ String pomFile = PROJECT_ROOT_PATH + File.separator + "pom.xml";
+ checkPathExist(pomFile);
+ Document docResult = XmlUtil.readXML(new File(pomFile));
+ Element project = XmlUtil.getRootElement(docResult);
+ Element properties = XmlUtil.getElement(project, "properties");
+ Element revisionElement = XmlUtil.getElement(properties, "revision");
+ return revisionElement.getTextContent();
+ }
+
+
+ public static void checkPathExist(String path) {
+ Assertions.assertTrue(new File(path).exists(), path + " must exist");
+ }
+
+ public static void copyGrootStreamStarterToContainer(
+ GenericContainer<?> container,
+ String startModuleName,
+ String startModulePath,
+ String GrootStreamHomeInContainer) {
+
+ final String[] splits = StringUtils.split(startModuleName, File.separator);
+ final String startJarName = splits[splits.length - 1] + ".jar";
+ final String startJarPath =
+ startModulePath + File.separator + "target" + File.separator + startJarName;
+ checkPathExist(startJarPath);
+
+ // don't use container#withFileSystemBind, this isn't supported in Windows.
+ container.withCopyFileToContainer(
+ MountableFile.forHostPath(startJarPath),
+ Paths.get(GrootStreamHomeInContainer, "bootstrap", startJarName).toString());
+
+
+ // copy lib
+ String hbaseClientJar = "hbase-client-shaded-" + getProjectVersion() + ".jar";
+ Path hbaseClientJarPath =
+ Paths.get(PROJECT_ROOT_PATH, "groot-shaded/hbase-client-shaded", "target", hbaseClientJar);
+ container.withCopyFileToContainer(
+ MountableFile.forHostPath(hbaseClientJarPath),
+ Paths.get(GrootStreamHomeInContainer, "lib", hbaseClientJar).toString());
+
+
+ String formatJsonJar = "format-json-" + getProjectVersion() + ".jar";
+ Path formatJsonJarPath =
+ Paths.get(PROJECT_ROOT_PATH, "groot-formats/format-json", "target", formatJsonJar);
+ container.withCopyFileToContainer(
+ MountableFile.forHostPath(formatJsonJarPath),
+ Paths.get(GrootStreamHomeInContainer, "lib", formatJsonJar).toString());
+
+ String formatProtobufJar = "format-protobuf-" + getProjectVersion() + ".jar";
+ Path formatProtobufJarPath =
+ Paths.get(PROJECT_ROOT_PATH, "groot-formats/format-protobuf", "target", formatProtobufJar);
+ container.withCopyFileToContainer(
+ MountableFile.forHostPath(formatProtobufJarPath),
+ Paths.get(GrootStreamHomeInContainer, "lib", formatProtobufJar).toString());
+
+
+ //copy system config
+ final String configPath = PROJECT_ROOT_PATH + "/config";
+ checkPathExist(configPath);
+ container.withCopyFileToContainer(MountableFile.forHostPath(configPath),
+ Paths.get(GrootStreamHomeInContainer, "config").toString());
+
+
+ // copy bin
+ final String startBinPath = startModulePath + File.separator + "src/main/bin/";
+ checkPathExist(startBinPath);
+ container.withCopyFileToContainer(
+ MountableFile.forHostPath(startBinPath),
+ Paths.get(GrootStreamHomeInContainer, "bin").toString());
+
+ // copy plugin-mapping.properties
+ container.withCopyFileToContainer(
+ MountableFile.forHostPath(PROJECT_ROOT_PATH + "/plugin-mapping.properties"),
+ Paths.get(GrootStreamHomeInContainer, "connectors", PLUGIN_MAPPING_FILE).toString());
+
+
+
+ }
+
+ public static void copyGrootStreamStarterLoggingToContainer(
+ GenericContainer<?> container,
+ String startModulePath,
+ String GrootStreamHomeInContainer) {
+ // copy logging lib
+ final String loggingLibPath =
+ startModulePath
+ + File.separator
+ + "target"
+ + File.separator
+ + "logging-e2e"
+ + File.separator;
+ checkPathExist(loggingLibPath);
+ container.withCopyFileToContainer(
+ MountableFile.forHostPath(loggingLibPath),
+ Paths.get(GrootStreamHomeInContainer, "bootstrap", "logging").toString());
+ }
+
+ public static String copyConfigFileToContainer(GenericContainer<?> container, String confFile) {
+ final String targetConfInContainer = Paths.get("/tmp", confFile).toString();
+ container.copyFileToContainer(
+ MountableFile.forHostPath(getResourcesFile(confFile).getAbsolutePath()),
+ targetConfInContainer);
+ return targetConfInContainer;
+ }
+
+ public static File getResourcesFile(String confFile) {
+ File file = new File(getCurrentModulePath() + "/src/test/resources" + confFile);
+ if (file.exists()) {
+ return file;
+ }
+ throw new IllegalArgumentException(confFile + " doesn't exist");
+ }
+
+ public static Path getCurrentModulePath() {
+ return Paths.get(System.getProperty("user.dir"));
+ }
+
+ public static void copyConnectorJarToContainer(
+ GenericContainer<?> container,
+ String confFile,
+ String connectorsRootPath,
+ String connectorPrefix,
+ String connectorType,
+ String grootStreamHome) {
+ Config jobConfig = getJobConfig(getResourcesFile(confFile));
+ Config connectorsMapping =
+ getPluginProperties(new File(PROJECT_ROOT_PATH + File.separator + PLUGIN_MAPPING_FILE));
+ if (!connectorsMapping.hasPath(connectorType)
+ || connectorsMapping.getConfig(connectorType).isEmpty()) {
+ return;
+ }
+ Config connectors = connectorsMapping.getConfig(connectorType);
+ Set<String> connectorNames = getConnectors(jobConfig, connectors, "source");
+ connectorNames.addAll(getConnectors(jobConfig, connectors, "sink"));
+ File module = new File(PROJECT_ROOT_PATH + File.separator + connectorsRootPath);
+
+ List<File> connectorFiles = getConnectorFiles(module, connectorNames, connectorPrefix);
+ connectorFiles.forEach(
+ jar ->
+ container.copyFileToContainer(
+ MountableFile.forHostPath(jar.getAbsolutePath()),
+ Paths.get(grootStreamHome, "connectors", jar.getName()).toString()));
+ }
+
+ public static String adaptPathForWin(String path) {
+ // Running IT use cases under Windows requires replacing \ with /
+ return path == null ? "" : path.replaceAll("\\\\", "/");
+ }
+
+ private static List<File> getConnectorFiles(
+ File currentModule, Set<String> connectorNames, String connectorPrefix) {
+ List<File> connectorFiles = new ArrayList<>();
+ for (File file : Objects.requireNonNull(currentModule.listFiles())) {
+ getConnectorFiles(file, connectorNames, connectorPrefix, connectorFiles);
+ }
+ return connectorFiles;
+ }
+
+ private static void getConnectorFiles(
+ File currentModule,
+ Set<String> connectorNames,
+ String connectorPrefix,
+ List<File> connectors) {
+ if (currentModule.isFile() || connectorNames.size() == connectors.size()) {
+ return;
+ }
+ if (connectorNames.contains(currentModule.getName())) {
+ File targetPath = new File(currentModule.getAbsolutePath() + File.separator + "target");
+ for (File file : Objects.requireNonNull(targetPath.listFiles())) {
+ if (file.getName().startsWith(currentModule.getName())
+ && !file.getName().endsWith("javadoc.jar")
+ && !file.getName().endsWith("tests.jar")) {
+ connectors.add(file);
+ return;
+ }
+ }
+ }
+
+ if (currentModule.getName().startsWith(connectorPrefix)) {
+ for (File file : Objects.requireNonNull(currentModule.listFiles())) {
+ getConnectorFiles(file, connectorNames, connectorPrefix, connectors);
+ }
+ }
+ }
+
+ public static List<TestContainer> discoverTestContainers() {
+ try {
+ final List<TestContainer> result = new LinkedList<>();
+ ServiceLoader.load(TestContainer.class, Thread.currentThread().getContextClassLoader())
+ .iterator()
+ .forEachRemaining(result::add);
+ return result;
+ } catch (ServiceConfigurationError e) {
+ log.error("Could not load service provider for containers.", e);
+ throw new RuntimeException("Could not load service provider for containers.", e);
+ }
+ }
+
+ private static Set<String> getConnectors(
+ Config jobConfig, Config connectorsMap, String pluginType) {
+ // using specific needed plugin type in the job config
+ Config connectorConfig = jobConfig.getConfig(pluginType+"s");
+ List<String> connectorList = Lists.newArrayList();
+ connectorConfig.root().unwrapped().forEach((key,value) -> {
+ Map<String, Object> map = (Map<String, Object>) value;
+ connectorList.add(map.get("type").toString());
+
+ });
+
+ Map<String, String> connectors = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+ connectorsMap.getConfig(pluginType).entrySet().forEach(entry -> {
+ connectors.put(entry.getKey(), entry.getValue().unwrapped().toString());
+ });
+
+ return connectorList.stream()
+ .map(String::toLowerCase)
+ .filter(connectors::containsKey)
+ .map(connectors::get)
+ .collect(Collectors.toSet());
+ }
+
+ private static Config getJobConfig(File file) {
+ return ConfigBuilder.of(file.getAbsolutePath());
+ }
+
+ private static Config getPluginProperties(File file) {
+ return ConfigFactory.parseFile(file)
+ .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
+ .resolveWith(
+ ConfigFactory.systemProperties(),
+ ConfigResolveOptions.defaults().setAllowUnresolved(true));
+ }
+
+
+
+
+
+
+
+}
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/EngineType.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/EngineType.java
new file mode 100644
index 0000000..4f348ae
--- /dev/null
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/EngineType.java
@@ -0,0 +1,16 @@
+package com.geedgenetworks.test.common.container;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+@Getter
+@AllArgsConstructor
+public enum EngineType {
+ FLINK("Flink");
+ private final String name;
+ @Override
+ public String toString() {
+ return name;
+ }
+
+}
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java
new file mode 100644
index 0000000..7c41167
--- /dev/null
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java
@@ -0,0 +1,47 @@
+package com.geedgenetworks.test.common.container;
+
+import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor
+@AutoService(TestContainer.class)
+public class Flink13Container extends AbstractTestFlinkContainer{
+
+ @Override
+ protected String getStartModuleName() {
+ return "groot-bootstrap";
+
+ }
+
+ @Override
+ protected String getStartShellName() {
+ return "start.sh";
+ }
+
+ @Override
+ protected String getConnectorModulePath() {
+ return "groot-connectors";
+ }
+
+ @Override
+ protected String getConnectorType() {
+ return "grootstream";
+ }
+
+ @Override
+ protected String getConnectorNamePrefix() {
+ return "connector-";
+ }
+
+ @Override
+ public TestContainerId identifier() {
+ return TestContainerId.FLINK_1_13;
+ }
+
+ @Override
+ protected String getDockerImage() {
+ return "flink:1.13.1-scala_2.11-java11";
+ }
+
+
+}
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainer.java
new file mode 100644
index 0000000..6e4cd1f
--- /dev/null
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainer.java
@@ -0,0 +1,33 @@
+package com.geedgenetworks.test.common.container;
+
+import com.geedgenetworks.test.common.TestResource;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.Network;
+
+import java.io.IOException;
+
+public interface TestContainer extends TestResource {
+ Network NETWORK = Network.newNetwork();
+ TestContainerId identifier();
+
+ void executeExtraCommands(ContainerExtendedFactory extendedFactory)
+ throws IOException, InterruptedException;
+
+ Container.ExecResult executeJob(String confFile) throws IOException, InterruptedException;
+
+
+ default Container.ExecResult savepointJob(String jobId)
+ throws IOException, InterruptedException {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ default Container.ExecResult restoreJob(String confFile, String jobId)
+ throws IOException, InterruptedException {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ String getServerLogs();
+
+
+
+}
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java
new file mode 100644
index 0000000..ddd2012
--- /dev/null
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java
@@ -0,0 +1,19 @@
+package com.geedgenetworks.test.common.container;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+import static com.geedgenetworks.test.common.container.EngineType.FLINK;
+@Getter
+@AllArgsConstructor
+public enum TestContainerId {
+ FLINK_1_13(FLINK, "1.13.6");
+ private final EngineType engineType;
+ private final String version;
+
+ @Override
+ public String toString() {
+ return engineType.toString() + ":" + version;
+ }
+
+}
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainersFactory.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainersFactory.java
new file mode 100644
index 0000000..675ee53
--- /dev/null
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainersFactory.java
@@ -0,0 +1,7 @@
+package com.geedgenetworks.test.common.container;
+
+import java.util.List;
+
+public interface TestContainersFactory {
+ List<TestContainer> create();
+}
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestHelper.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestHelper.java
new file mode 100644
index 0000000..e343dcd
--- /dev/null
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestHelper.java
@@ -0,0 +1,23 @@
+package com.geedgenetworks.test.common.container;
+
+import org.junit.jupiter.api.Assertions;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class TestHelper {
+ private final TestContainer container;
+
+ public TestHelper(TestContainer container) {
+ this.container = container;
+ }
+
+ public void execute(String file) throws IOException, InterruptedException {
+ execute(0, file);
+ }
+
+ public void execute(int exceptResult, String file) throws IOException, InterruptedException {
+ Container.ExecResult result = container.executeJob(file);
+ Assertions.assertEquals(exceptResult, result.getExitCode(), result.getStderr());
+ }
+}
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/AnnotationUtil.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/AnnotationUtil.java
new file mode 100644
index 0000000..c10e782
--- /dev/null
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/AnnotationUtil.java
@@ -0,0 +1,37 @@
+package com.geedgenetworks.test.common.junit;
+
+import com.geedgenetworks.test.common.container.TestContainer;
+import com.geedgenetworks.test.common.container.TestContainerId;
+import com.geedgenetworks.test.common.container.EngineType;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.junit.platform.commons.util.AnnotationUtils;
+import java.lang.reflect.AnnotatedElement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class AnnotationUtil {
+ public static List<TestContainer> filterDisabledContainers(
+ List<TestContainer> containers, AnnotatedElement annotatedElement) {
+ // Filters disabled containers
+ final List<TestContainerId> disabledContainers = new ArrayList<>();
+ final List<EngineType> disabledEngineTypes = new ArrayList<>();
+ AnnotationUtils.findAnnotation(annotatedElement, DisabledOnContainer.class)
+ .ifPresent(
+ annotation -> {
+ Collections.addAll(disabledContainers, annotation.value());
+ Collections.addAll(disabledEngineTypes, annotation.type());
+ });
+ return containers.stream()
+ .filter(container -> !disabledContainers.contains(container.identifier()))
+ .filter(
+ container ->
+ !disabledEngineTypes.contains(
+ container.identifier().getEngineType()))
+ .collect(Collectors.toList());
+ }
+}
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/ContainerTestingExtension.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/ContainerTestingExtension.java
new file mode 100644
index 0000000..bad2ce4
--- /dev/null
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/ContainerTestingExtension.java
@@ -0,0 +1,84 @@
+package com.geedgenetworks.test.common.junit;
+
+import com.geedgenetworks.test.common.container.ContainerExtendedFactory;
+import com.geedgenetworks.test.common.container.TestContainer;
+import com.geedgenetworks.test.common.container.TestContainersFactory;
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.platform.commons.support.AnnotationSupport;
+
+import java.lang.annotation.Annotation;
+import java.util.Collection;
+import java.util.List;
+
+public class ContainerTestingExtension implements BeforeAllCallback, AfterAllCallback {
+ public static final ExtensionContext.Namespace TEST_RESOURCE_NAMESPACE =
+ ExtensionContext.Namespace.create("testResourceNamespace");
+ public static final String TEST_CONTAINERS_STORE_KEY = "testContainers";
+ public static final String TEST_EXTENDED_FACTORY_STORE_KEY = "testContainerExtendedFactory";
+
+ @Override
+ public void beforeAll(ExtensionContext context) throws Exception {
+
+ List<ContainerExtendedFactory> containerExtendedFactories =
+ AnnotationSupport.findAnnotatedFieldValues(
+ context.getRequiredTestInstance(),
+ TestContainerExtension.class,
+ ContainerExtendedFactory.class);
+ checkAtMostOneAnnotationField(containerExtendedFactories, TestContainerExtension.class);
+ ContainerExtendedFactory containerExtendedFactory = container -> {};
+ if (!containerExtendedFactories.isEmpty()) {
+ containerExtendedFactory = containerExtendedFactories.get(0);
+ }
+ context.getStore(TEST_RESOURCE_NAMESPACE)
+ .put(TEST_EXTENDED_FACTORY_STORE_KEY, containerExtendedFactory);
+
+ List<TestContainersFactory> containersFactories =
+ AnnotationSupport.findAnnotatedFieldValues(
+ context.getRequiredTestInstance(),
+ TestContainers.class,
+ TestContainersFactory.class);
+
+ checkExactlyOneAnnotatedField(containersFactories, TestContainers.class);
+
+ List<TestContainer> testContainers =
+ AnnotationUtil.filterDisabledContainers(
+ containersFactories.get(0).create(),
+ context.getRequiredTestInstance().getClass());
+ context.getStore(TEST_RESOURCE_NAMESPACE).put(TEST_CONTAINERS_STORE_KEY, testContainers);
+
+ }
+
+ @Override
+ public void afterAll(ExtensionContext context) throws Exception {
+ context.getStore(TEST_RESOURCE_NAMESPACE).remove(TEST_CONTAINERS_STORE_KEY);
+ }
+
+
+
+
+ private void checkExactlyOneAnnotatedField(
+ Collection<?> fields, Class<? extends Annotation> annotation) {
+ checkAtMostOneAnnotationField(fields, annotation);
+ checkAtLeastOneAnnotationField(fields, annotation);
+ }
+
+ private void checkAtLeastOneAnnotationField(
+ Collection<?> fields, Class<? extends Annotation> annotation) {
+ if (fields.isEmpty()) {
+ throw new IllegalStateException(
+ String.format(
+ "No fields are annotated with '@%s'", annotation.getSimpleName()));
+ }
+ }
+ private void checkAtMostOneAnnotationField(
+ Collection<?> fields, Class<? extends Annotation> annotation) {
+ if (fields.size() > 1) {
+ throw new IllegalStateException(
+ String.format(
+ "Multiple fields are annotated with '@%s'",
+ annotation.getSimpleName()));
+ }
+ }
+}
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/DisabledOnContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/DisabledOnContainer.java
new file mode 100644
index 0000000..3c4e655
--- /dev/null
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/DisabledOnContainer.java
@@ -0,0 +1,22 @@
+package com.geedgenetworks.test.common.junit;
+
+import com.geedgenetworks.test.common.container.EngineType;
+import com.geedgenetworks.test.common.container.TestContainerId;
+
+import java.lang.annotation.*;
+
+@Target({ElementType.TYPE, ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+@Inherited
+public @interface DisabledOnContainer {
+ TestContainerId[] value();
+ EngineType[] type() default {};
+
+ /**
+ * Custom reason to provide if the test container is disabled.
+ *
+ * <p>If a custom reason is supplied, it will be combined with the default reason for this
+ * annotation. If a custom reason is not supplied, the default reason will be used.
+ */
+ String disabledReason() default "";
+}
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestCaseInvocationContextProvider.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestCaseInvocationContextProvider.java
new file mode 100644
index 0000000..01f29bf
--- /dev/null
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestCaseInvocationContextProvider.java
@@ -0,0 +1,114 @@
+package com.geedgenetworks.test.common.junit;
+
+import com.geedgenetworks.test.common.container.ContainerExtendedFactory;
+import com.geedgenetworks.test.common.container.TestContainer;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.extension.*;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+import static com.geedgenetworks.test.common.junit.ContainerTestingExtension.TEST_CONTAINERS_STORE_KEY;
+import static com.geedgenetworks.test.common.junit.ContainerTestingExtension.TEST_EXTENDED_FACTORY_STORE_KEY;
+import static com.geedgenetworks.test.common.junit.ContainerTestingExtension.TEST_RESOURCE_NAMESPACE;
+@Slf4j
+public class TestCaseInvocationContextProvider implements TestTemplateInvocationContextProvider {
+ @Override
+ public boolean supportsTestTemplate(ExtensionContext context) {
+ // Only support test cases with TestContainer as parameter
+ Class<?>[] parameterTypes = context.getRequiredTestMethod().getParameterTypes();
+ return parameterTypes.length == 1
+ && Arrays.stream(parameterTypes).anyMatch(TestContainer.class::isAssignableFrom);
+ }
+
+ @Override
+ public Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContexts(
+ ExtensionContext context) {
+ List<TestContainer> testContainers =
+ AnnotationUtil.filterDisabledContainers(
+ (List<TestContainer>)
+ context.getStore(TEST_RESOURCE_NAMESPACE)
+ .get(TEST_CONTAINERS_STORE_KEY),
+ context.getRequiredTestMethod());
+
+ ContainerExtendedFactory containerExtendedFactory =
+ (ContainerExtendedFactory)
+ context.getStore(TEST_RESOURCE_NAMESPACE)
+ .get(TEST_EXTENDED_FACTORY_STORE_KEY);
+
+ int containerAmount = testContainers.size();
+ return testContainers.stream()
+ .map(
+ testContainer ->
+ new TestResourceProvidingInvocationContext(
+ testContainer, containerExtendedFactory, containerAmount));
+ }
+ static class TestResourceProvidingInvocationContext implements TestTemplateInvocationContext {
+ private final TestContainer testContainer;
+ private final ContainerExtendedFactory containerExtendedFactory;
+ private final Integer containerAmount;
+
+ public TestResourceProvidingInvocationContext(
+ TestContainer testContainer,
+ ContainerExtendedFactory containerExtendedFactory,
+ int containerAmount) {
+ this.testContainer = testContainer;
+ this.containerExtendedFactory = containerExtendedFactory;
+ this.containerAmount = containerAmount;
+ }
+
+ @Override
+ public String getDisplayName(int invocationIndex) {
+ return String.format(
+ "TestContainer(%s/%s): %s",
+ invocationIndex, containerAmount, testContainer.identifier());
+ }
+
+ @Override
+ public List<Extension> getAdditionalExtensions() {
+ return Arrays.asList(
+ // Extension for injecting parameters
+ new TestContainerResolver(testContainer, containerExtendedFactory),
+ // Extension for closing test container
+ (AfterTestExecutionCallback)
+ ignore -> {
+ testContainer.tearDown();
+ log.info(
+ "The TestContainer[{}] is closed.",
+ testContainer.identifier());
+ });
+ }
+ }
+
+ private static class TestContainerResolver implements ParameterResolver {
+
+ private final TestContainer testContainer;
+ private final ContainerExtendedFactory containerExtendedFactory;
+
+ private TestContainerResolver(
+ TestContainer testContainer, ContainerExtendedFactory containerExtendedFactory) {
+ this.testContainer = testContainer;
+ this.containerExtendedFactory = containerExtendedFactory;
+ }
+
+ @Override
+ public boolean supportsParameter(
+ ParameterContext parameterContext, ExtensionContext extensionContext)
+ throws ParameterResolutionException {
+ return TestContainer.class.isAssignableFrom(parameterContext.getParameter().getType());
+ }
+
+ @SneakyThrows
+ @Override
+ public Object resolveParameter(
+ ParameterContext parameterContext, ExtensionContext extensionContext)
+ throws ParameterResolutionException {
+ testContainer.startUp();
+ testContainer.executeExtraCommands(containerExtendedFactory);
+ log.info("The TestContainer[{}] is running.", testContainer.identifier());
+ return this.testContainer;
+ }
+ }
+
+}
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestContainerExtension.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestContainerExtension.java
new file mode 100644
index 0000000..0c18003
--- /dev/null
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestContainerExtension.java
@@ -0,0 +1,12 @@
+package com.geedgenetworks.test.common.junit;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target(ElementType.FIELD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface TestContainerExtension {
+
+}
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestContainers.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestContainers.java
new file mode 100644
index 0000000..c7ffa17
--- /dev/null
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestContainers.java
@@ -0,0 +1,11 @@
+package com.geedgenetworks.test.common.junit;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target(ElementType.FIELD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface TestContainers {
+}
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestLoggerExtension.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestLoggerExtension.java
new file mode 100644
index 0000000..e48135c
--- /dev/null
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestLoggerExtension.java
@@ -0,0 +1,60 @@
+package com.geedgenetworks.test.common.junit;
+
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestWatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+public class TestLoggerExtension implements TestWatcher, BeforeEachCallback {
+ private static final Logger LOG = LoggerFactory.getLogger(TestLoggerExtension.class);
+ @Override
+ public void beforeEach(ExtensionContext context) {
+ LOG.info(
+ "\n================================================================================"
+ + "\nTest {}.{} is running."
+ + "\n--------------------------------------------------------------------------------",
+ context.getRequiredTestClass().getCanonicalName(),
+ context.getRequiredTestMethod().getName());
+ }
+
+ @Override
+ public void testSuccessful(ExtensionContext context) {
+ LOG.info(
+ "\n--------------------------------------------------------------------------------"
+ + "\nTest {}.{} successfully run."
+ + "\n================================================================================",
+ context.getRequiredTestClass().getCanonicalName(),
+ context.getRequiredTestMethod().getName());
+ }
+
+ @Override
+ public void testFailed(ExtensionContext context, Throwable cause) {
+ LOG.error(
+ "\n--------------------------------------------------------------------------------"
+ + "\nTest {}.{} failed with:\n{}"
+ + "\n================================================================================",
+ context.getRequiredTestClass().getCanonicalName(),
+ context.getRequiredTestMethod().getName(),
+ exceptionToString(cause));
+ }
+
+ private static String exceptionToString(Throwable t) {
+ if (t == null) {
+ return "(null)";
+ }
+
+ try {
+ StringWriter stm = new StringWriter();
+ PrintWriter wrt = new PrintWriter(stm);
+ t.printStackTrace(wrt);
+ wrt.close();
+ return stm.toString();
+ } catch (Throwable ignored) {
+ return t.getClass().getName() + " (error while printing stack trace)";
+ }
+ }
+}
diff --git a/groot-tests/test-common/src/test/resources/log4j2.properties b/groot-tests/test-common/src/test/resources/log4j2.properties
new file mode 100644
index 0000000..fb3ac1e
--- /dev/null
+++ b/groot-tests/test-common/src/test/resources/log4j2.properties
@@ -0,0 +1,42 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = INFO
+
+rootLogger.appenderRef.consoleStdout.ref = consoleStdoutAppender
+rootLogger.appenderRef.consoleStderr.ref = consoleStderrAppender
+
+appender.consoleStdout.name = consoleStdoutAppender
+appender.consoleStdout.type = CONSOLE
+appender.consoleStdout.target = SYSTEM_OUT
+appender.consoleStdout.layout.type = PatternLayout
+appender.consoleStdout.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c [%t] - %m%n
+appender.consoleStdout.filter.acceptLtWarn.type = ThresholdFilter
+appender.consoleStdout.filter.acceptLtWarn.level = WARN
+appender.consoleStdout.filter.acceptLtWarn.onMatch = DENY
+appender.consoleStdout.filter.acceptLtWarn.onMismatch = ACCEPT
+
+appender.consoleStderr.name = consoleStderrAppender
+appender.consoleStderr.type = CONSOLE
+appender.consoleStderr.target = SYSTEM_ERR
+appender.consoleStderr.layout.type = PatternLayout
+appender.consoleStderr.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c [%t] - %m%n
+appender.consoleStderr.filter.acceptGteWarn.type = ThresholdFilter
+appender.consoleStderr.filter.acceptGteWarn.level = WARN
+appender.consoleStderr.filter.acceptGteWarn.onMatch = ACCEPT
+appender.consoleStderr.filter.acceptGteWarn.onMismatch = DENY
diff --git a/groot-tests/test-e2e-base/pom.xml b/groot-tests/test-e2e-base/pom.xml
new file mode 100644
index 0000000..4a664b9
--- /dev/null
+++ b/groot-tests/test-e2e-base/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-tests</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>test-e2e-base</artifactId>
+ <name>Groot : Tests : E2E : Base</name>
+
+ <properties>
+
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>test-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>${maven-jar-plugin.version}</version>
+ <configuration>
+ <skip>false</skip>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project> \ No newline at end of file
diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/Flink13Container.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/Flink13Container.java
new file mode 100644
index 0000000..43c6eeb
--- /dev/null
+++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/Flink13Container.java
@@ -0,0 +1,34 @@
+package com.geedgenetworks.test.e2e.base;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestInstance;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+@Slf4j
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class Flink13Container extends com.geedgenetworks.test.common.container.Flink13Container {
+ @Override
+ @BeforeAll
+ public void startUp() throws Exception {
+ super.startUp();
+ log.info("The TestContainer[{}] is running.", identifier());
+ }
+
+ @Override
+ @AfterAll
+ public void tearDown() throws Exception {
+ super.tearDown();
+ log.info("The TestContainer[{}] is closed.", identifier());
+ }
+
+ public Container.ExecResult executeGrootStreamJob(String confFile)
+ throws IOException, InterruptedException {
+ return executeJob(confFile);
+ }
+
+
+}
diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java
new file mode 100644
index 0000000..2f9a203
--- /dev/null
+++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java
@@ -0,0 +1,16 @@
+package com.geedgenetworks.test.e2e.base;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class InlineToPrint extends Flink13Container{
+
+ @Test
+ public void testInlineToPrint() throws IOException, InterruptedException {
+ Container.ExecResult execResult = executeGrootStreamJob("/inline_to_print.yaml");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+}
diff --git a/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml b/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml
new file mode 100644
index 0000000..daf6e32
--- /dev/null
+++ b/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml
@@ -0,0 +1,44 @@
+sources:
+ inline_source:
+ type: inline
+ properties:
+ data: '[{"tcp_rtt_ms":128,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]'
+ format: json
+ json.ignore.parse.errors: false
+
+filters:
+ filter_operator:
+ type: com.geedgenetworks.core.filter.AviatorFilter
+ properties:
+ expression: event.server_ip != '12.12.12.12'
+
+processing_pipelines:
+ projection_processor:
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ remove_fields: [http_request_line, http_response_line, http_response_content_type]
+ functions:
+ - function: DROP
+ filter: event.server_ip == '4.4.4.4'
+
+sinks:
+ print_sink:
+ type: print
+ properties:
+ format: json
+ mode: log_warn
+
+application:
+ env:
+ name: example-inline-to-print
+ parallelism: 3
+ pipeline:
+ object-reuse: true
+ topology:
+ - name: inline_source
+ downstream: [filter_operator]
+ - name: filter_operator
+ downstream: [ projection_processor ]
+ - name: projection_processor
+ downstream: [ print_sink ]
+ - name: print_sink
+ downstream: [] \ No newline at end of file
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 23f7c1a..c835941 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -1,6 +1,5 @@
#Connectors
-
grootstream.source.Kafka = connector-kafka
grootstream.sink.kafka = connector-kafka
-grootstream.source.Clickhouse= connector-Clickhouse
-grootstream.source.IPFix = connector-ipfix-collector \ No newline at end of file
+grootstream.source.clickhouse= connector-clickhouse
+grootstream.source.ipfix = connector-ipfix-collector
diff --git a/pom.xml b/pom.xml
index c732e92..266eaed 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,7 +25,6 @@
<properties>
<revision>1.1.0</revision>
<java.version>11</java.version>
- <flink.scope>provided</flink.scope>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
@@ -34,16 +33,17 @@
<maven-helper-plugin.version>3.2.0</maven-helper-plugin.version>
<maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version>
<flatten-maven-plugin.version>1.3.0</flatten-maven-plugin.version>
+ <testcontainer.version>1.19.6</testcontainer.version>
+ <awaitility.version>4.2.0</awaitility.version>
<spotless.version>2.29.0</spotless.version>
- <skip.spotless>true</skip.spotless>
- <grootstream.shaded.package>com.geedgenetworks.shaded</grootstream.shaded.package>
<slf4j.version>1.7.25</slf4j.version>
<log4j2.version>2.17.1</log4j2.version>
<log4j2-disruptor.version>3.4.4</log4j2-disruptor.version>
<log4j.version>1.2.17</log4j.version>
<logback.version>1.2.3</logback.version>
<commons-logging.version>1.2</commons-logging.version>
- <junit.version>5.9.0</junit.version>
+ <junit4.version>4.13.2</junit4.version>
+ <junit5.version>5.9.0</junit5.version>
<flink.version>1.13.1</flink.version>
<flink-shaded-hadoop-2.version>2.7.5-8.0</flink-shaded-hadoop-2.version>
<hbase.version>2.2.3</hbase.version>
@@ -64,6 +64,13 @@
<config.version>1.3.3</config.version>
<hazelcast.version>5.1</hazelcast.version>
<quartz.version>2.3.2</quartz.version>
+ <!--Option config-->
+ <test.dependency.skip>true</test.dependency.skip>
+ <skip.spotless>true</skip.spotless>
+ <flink.scope>provided</flink.scope>
+ <grootstream.shaded.package>com.geedgenetworks.shaded</grootstream.shaded.package>
+ <auto-service.version>1.0.1</auto-service.version>
+
</properties>
<dependencyManagement>
@@ -241,13 +248,20 @@
</dependency>
<dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit4.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.junit</groupId>
<artifactId>junit-bom</artifactId>
- <version>${junit.version}</version>
+ <version>${junit5.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
+
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4</artifactId>
@@ -360,7 +374,6 @@
</dependency>
-
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
@@ -384,7 +397,36 @@
<version>${quartz.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <version>${testcontainer.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <version>${awaitility.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.auto.service</groupId>
+ <artifactId>auto-service</artifactId>
+ <version>${auto-service.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
@@ -399,6 +441,13 @@
</dependency>
<dependency>
+ <groupId>com.google.auto.service</groupId>
+ <artifactId>auto-service</artifactId>
+ <version>${auto-service.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>