summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-11-23 19:23:16 +0800
committerdoufenghu <[email protected]>2024-11-23 19:23:16 +0800
commit53d7c1778d15ff3dd9b4c411799455592990d03f (patch)
tree3eee3117292f1c59a931a1a8314c67f3e9a1e5fa
parent08015fd2e926aeb5019e1e2d96d4e23ab549346b (diff)
[Improve][e2e] Rename all e2e test modules to adapt to changes in the API operators.
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml11
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml4
-rw-r--r--groot-tests/pom.xml8
-rw-r--r--groot-tests/test-e2e-base/src/test/resources/kafka_to_print.yaml40
-rw-r--r--groot-tests/test-e2e-common/pom.xml (renamed from groot-tests/test-common/pom.xml)4
-rw-r--r--groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/AbstractFlinkContainer.java (renamed from groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/AbstractFlinkContainer.java)4
-rw-r--r--groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/TestResource.java (renamed from groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/TestResource.java)2
-rw-r--r--groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/TestSuiteBase.java (renamed from groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/TestSuiteBase.java)16
-rw-r--r--groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/AbstractTestContainer.java (renamed from groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java)4
-rw-r--r--groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/AbstractTestFlinkContainer.java (renamed from groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java)7
-rw-r--r--groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/ContainerExtendedFactory.java (renamed from groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerExtendedFactory.java)2
-rw-r--r--groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/ContainerUtil.java (renamed from groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java)25
-rw-r--r--groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/EngineType.java (renamed from groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/EngineType.java)2
-rw-r--r--groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/Flink13Container.java (renamed from groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java)2
-rw-r--r--groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/Flink17Container.java (renamed from groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink17Container.java)2
-rw-r--r--groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/TestContainer.java (renamed from groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainer.java)4
-rw-r--r--groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/TestContainerId.java (renamed from groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java)4
-rw-r--r--groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/TestContainersFactory.java (renamed from groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainersFactory.java)2
-rw-r--r--groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/TestHelper.java (renamed from groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestHelper.java)2
-rw-r--r--groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/junit/AnnotationUtil.java (renamed from groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/AnnotationUtil.java)8
-rw-r--r--groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/junit/ContainerTestingExtension.java (renamed from groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/ContainerTestingExtension.java)8
-rw-r--r--groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/junit/DisabledOnContainer.java (renamed from groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/DisabledOnContainer.java)6
-rw-r--r--groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/junit/TestCaseInvocationContextProvider.java (renamed from groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestCaseInvocationContextProvider.java)12
-rw-r--r--groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/junit/TestContainerExtension.java (renamed from groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestContainerExtension.java)2
-rw-r--r--groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/junit/TestContainers.java (renamed from groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestContainers.java)2
-rw-r--r--groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/junit/TestLoggerExtension.java (renamed from groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestLoggerExtension.java)2
-rw-r--r--groot-tests/test-e2e-common/src/test/resources/grootstream.yaml (renamed from groot-tests/test-common/src/test/resources/grootstream.yaml)0
-rw-r--r--groot-tests/test-e2e-common/src/test/resources/log4j2.properties (renamed from groot-tests/test-common/src/test/resources/log4j2.properties)0
-rw-r--r--groot-tests/test-e2e-connector-clickhouse/pom.xml (renamed from groot-tests/test-e2e-clickhouse/pom.xml)6
-rw-r--r--groot-tests/test-e2e-connector-clickhouse/src/test/java/com/geedgenetworks/test/e2e/connector/clickhouse/ClickHouseIT.java (renamed from groot-tests/test-e2e-clickhouse/src/test/java/com/geedgenetworks/test/e2e/clickhouse/ClickHouseIT.java)18
-rw-r--r--groot-tests/test-e2e-connector-clickhouse/src/test/resources/clickhouse_data_type_sink.yaml (renamed from groot-tests/test-e2e-clickhouse/src/test/resources/clickhouse_data_type_sink.yaml)0
-rw-r--r--groot-tests/test-e2e-connector-clickhouse/src/test/resources/init/clickhouse_test_sql.conf (renamed from groot-tests/test-e2e-clickhouse/src/test/resources/init/clickhouse_test_sql.conf)0
-rw-r--r--groot-tests/test-e2e-connector-clickhouse/src/test/resources/init/init-clickhouse.sql (renamed from groot-tests/test-e2e-clickhouse/src/test/resources/init/init-clickhouse.sql)0
-rw-r--r--groot-tests/test-e2e-connector-clickhouse/src/test/resources/init/users.xml (renamed from groot-tests/test-e2e-clickhouse/src/test/resources/init/users.xml)0
-rw-r--r--groot-tests/test-e2e-connector-kafka/pom.xml (renamed from groot-tests/test-e2e-kafka/pom.xml)6
-rw-r--r--groot-tests/test-e2e-connector-kafka/src/test/java/com/geedgenetworks/test/e2e/connector/kafka/KafkaIT.java (renamed from groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaIT.java)14
-rw-r--r--groot-tests/test-e2e-connector-kafka/src/test/resources/kafka_client_jass_cli.properties (renamed from groot-tests/test-e2e-kafka/src/test/resources/kafka_client_jass_cli.properties)0
-rw-r--r--groot-tests/test-e2e-connector-kafka/src/test/resources/kafka_producer_quota.yaml (renamed from groot-tests/test-e2e-kafka/src/test/resources/kafka_producer_quota.yaml)0
-rw-r--r--groot-tests/test-e2e-connector-kafka/src/test/resources/kafka_server_jaas.conf (renamed from groot-tests/test-e2e-kafka/src/test/resources/kafka_server_jaas.conf)0
-rw-r--r--groot-tests/test-e2e-connector-kafka/src/test/resources/kafka_sink.yaml (renamed from groot-tests/test-e2e-kafka/src/test/resources/kafka_sink.yaml)0
-rw-r--r--groot-tests/test-e2e-connector-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml (renamed from groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml)0
-rw-r--r--groot-tests/test-e2e-connector-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml (renamed from groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml)0
-rw-r--r--groot-tests/test-e2e-connector-kafka/src/test/resources/kafka_source.yaml (renamed from groot-tests/test-e2e-kafka/src/test/resources/kafka_source.yaml)0
-rw-r--r--groot-tests/test-e2e-connector-kafka/src/test/resources/kafka_source_error_schema.yaml (renamed from groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml)0
-rw-r--r--groot-tests/test-e2e-core/pom.xml (renamed from groot-tests/test-e2e-base/pom.xml)6
-rw-r--r--groot-tests/test-e2e-core/src/test/java/com/geedgenetworks/test/e2e/core/EnvParameterIT.java (renamed from groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterIT.java)14
-rw-r--r--groot-tests/test-e2e-core/src/test/java/com/geedgenetworks/test/e2e/core/Flink13Container.java (renamed from groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/Flink13Container.java)4
-rw-r--r--groot-tests/test-e2e-core/src/test/java/com/geedgenetworks/test/e2e/core/InlineToPrintIT.java (renamed from groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java)19
-rw-r--r--groot-tests/test-e2e-core/src/test/java/com/geedgenetworks/test/e2e/core/ProcessorIT.java69
-rw-r--r--groot-tests/test-e2e-core/src/test/java/com/geedgenetworks/test/e2e/core/TestUtils.java (renamed from groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/TestUtils.java)2
-rw-r--r--groot-tests/test-e2e-core/src/test/resources/inline_to_print.yaml (renamed from groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml)8
-rw-r--r--groot-tests/test-e2e-core/src/test/resources/job_aggregate_processor.yaml76
-rw-r--r--groot-tests/test-e2e-core/src/test/resources/job_split_processor.yaml99
-rw-r--r--groot-tests/test-e2e-core/src/test/resources/test_env_parameter_inline_to_print.yaml (renamed from groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml)0
54 files changed, 366 insertions, 160 deletions
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml
index 3477b00..852415e 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml
@@ -18,18 +18,18 @@ splits:
type: split
rules:
- tag: http_a_tag
- expression: event.decoded_as == 'HTTP'
+ expression: decoded_as == 'HTTP'
- tag: dns_a_tag
- expression: event.decoded_as == 'DNS'
+ expression: decoded_as == 'DNS'
test_split_b:
type: split
rules:
- tag: base_b_tag
- expression: event.decoded_as == 'BASE'
+ expression: decoded_as == 'BASE'
- tag: dns_b_tag
- expression: event.decoded_as == 'DNS'
+ expression: decoded_as == 'DNS'
-postprocessing_pipelines:
+processing_pipelines:
pre_etl_processor: # [object] Processing Pipeline
type: projection
remove_fields: [fields,tags]
@@ -103,7 +103,6 @@ application: # [object] Application Configuration
- name: table_processor
parallelism: 1
downstream: [ collect_sink ]
-
- name: collect_sink
parallelism: 1
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml
index 1e7c835..34b6e2a 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml
@@ -10,7 +10,7 @@ filters:
filter_operator:
type: filter
properties:
- expression: event.server_ip != '12.12.12.12'
+ expression: server_ip != '12.12.12.12'
processing_pipelines:
projection_processor:
@@ -18,7 +18,7 @@ processing_pipelines:
remove_fields: [http_request_line, http_response_line, http_response_content_type]
functions:
- function: DROP
- filter: event.server_ip == '4.4.4.4'
+ filter: server_ip == '4.4.4.4'
sinks:
print_sink:
diff --git a/groot-tests/pom.xml b/groot-tests/pom.xml
index 47b9177..51ebbbd 100644
--- a/groot-tests/pom.xml
+++ b/groot-tests/pom.xml
@@ -13,10 +13,10 @@
<packaging>pom</packaging>
<name>Groot : Tests : </name>
<modules>
- <module>test-common</module>
- <module>test-e2e-base</module>
- <module>test-e2e-kafka</module>
- <module>test-e2e-clickhouse</module>
+ <module>test-e2e-common</module>
+ <module>test-e2e-core</module>
+ <module>test-e2e-connector-kafka</module>
+ <module>test-e2e-connector-clickhouse</module>
</modules>
<properties>
diff --git a/groot-tests/test-e2e-base/src/test/resources/kafka_to_print.yaml b/groot-tests/test-e2e-base/src/test/resources/kafka_to_print.yaml
deleted file mode 100644
index b1e4f35..0000000
--- a/groot-tests/test-e2e-base/src/test/resources/kafka_to_print.yaml
+++ /dev/null
@@ -1,40 +0,0 @@
-sources:
- kafka_source:
- type : kafka
- schema:
- fields: # [array of object] Schema field projection, support read data only from specified fields.
- - name: client_ip
- type: string
- - name: server_ip
- type: string
- properties: # [object] Kafka source properties
- topic: SESSION-RECORD
- kafka.bootstrap.servers: 192.168.44.11:9092
- kafka.session.timeout.ms: 60000
- kafka.max.poll.records: 3000
- kafka.max.partition.fetch.bytes: 31457280
- kafka.group.id: GROOT-STREAM-EXAMPLE-KAFKA-TO-PRINT
- kafka.auto.offset.reset: latest
- format: json
-
-sinks: # [object] Define connector sink
- print_sink:
- type: print
- properties:
- mode: log_info
- format: json
-
-application: # [object] Define job configuration
- env:
- name: example-kafka-to-print
- parallelism: 1
- pipeline:
- object-reuse: true
- execution:
- restart:
- strategy: no
- topology:
- - name: kafka_source
- downstream: [print_sink]
- - name: print_sink
- downstream: [] \ No newline at end of file
diff --git a/groot-tests/test-common/pom.xml b/groot-tests/test-e2e-common/pom.xml
index c086f41..be0de94 100644
--- a/groot-tests/test-common/pom.xml
+++ b/groot-tests/test-e2e-common/pom.xml
@@ -9,8 +9,8 @@
<version>${revision}</version>
</parent>
- <artifactId>test-common</artifactId>
- <name>Groot : Tests : Common</name>
+ <artifactId>test-e2e-common</artifactId>
+ <name>Groot : Tests : E2E: Common</name>
<properties>
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/AbstractFlinkContainer.java b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/AbstractFlinkContainer.java
index 3444e0d..a44c65a 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/AbstractFlinkContainer.java
+++ b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/AbstractFlinkContainer.java
@@ -1,7 +1,7 @@
-package com.geedgenetworks.test.common;
+package com.geedgenetworks.test.e2e.common;
-import com.geedgenetworks.test.common.container.AbstractTestFlinkContainer;
+import com.geedgenetworks.test.e2e.common.container.AbstractTestFlinkContainer;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/TestResource.java b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/TestResource.java
index f84307e..fc04588 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/TestResource.java
+++ b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/TestResource.java
@@ -1,4 +1,4 @@
-package com.geedgenetworks.test.common;
+package com.geedgenetworks.test.e2e.common;
/**
* Basic abstractions for all resources used in connector testing framework.
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/TestSuiteBase.java b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/TestSuiteBase.java
index 83f4a7c..6417062 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/TestSuiteBase.java
+++ b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/TestSuiteBase.java
@@ -1,12 +1,12 @@
-package com.geedgenetworks.test.common;
+package com.geedgenetworks.test.e2e.common;
-import com.geedgenetworks.test.common.container.ContainerUtil;
-import com.geedgenetworks.test.common.container.TestContainer;
-import com.geedgenetworks.test.common.container.TestContainersFactory;
-import com.geedgenetworks.test.common.junit.ContainerTestingExtension;
-import com.geedgenetworks.test.common.junit.TestCaseInvocationContextProvider;
-import com.geedgenetworks.test.common.junit.TestContainers;
-import com.geedgenetworks.test.common.junit.TestLoggerExtension;
+import com.geedgenetworks.test.e2e.common.container.ContainerUtil;
+import com.geedgenetworks.test.e2e.common.container.TestContainer;
+import com.geedgenetworks.test.e2e.common.container.TestContainersFactory;
+import com.geedgenetworks.test.e2e.common.junit.ContainerTestingExtension;
+import com.geedgenetworks.test.e2e.common.junit.TestCaseInvocationContextProvider;
+import com.geedgenetworks.test.e2e.common.junit.TestContainers;
+import com.geedgenetworks.test.e2e.common.junit.TestLoggerExtension;
import com.github.dockerjava.api.DockerClient;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/AbstractTestContainer.java
index 14eb5fb..35b4d8d 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java
+++ b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/AbstractTestContainer.java
@@ -1,4 +1,4 @@
-package com.geedgenetworks.test.common.container;
+package com.geedgenetworks.test.e2e.common.container;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -10,7 +10,7 @@ import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
-import static com.geedgenetworks.test.common.container.ContainerUtil.PROJECT_ROOT_PATH;
+import static com.geedgenetworks.test.e2e.common.container.ContainerUtil.PROJECT_ROOT_PATH;
public abstract class AbstractTestContainer implements TestContainer {
protected static final Logger LOG = LoggerFactory.getLogger(AbstractTestContainer.class);
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/AbstractTestFlinkContainer.java
index 4ac3d03..b558e9b 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
+++ b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/AbstractTestFlinkContainer.java
@@ -1,4 +1,4 @@
-package com.geedgenetworks.test.common.container;
+package com.geedgenetworks.test.e2e.common.container;
import com.google.common.collect.Lists;
import lombok.NoArgsConstructor;
@@ -7,7 +7,6 @@ import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
-import org.testcontainers.images.PullPolicy;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerLoggerFactory;
@@ -25,7 +24,7 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer {
Arrays.asList(
"jobmanager.rpc.address: jobmanager",
"taskmanager.numberOfTaskSlots: 10",
- "parallelism.default: 4",
+ "parallelism.default: 3",
"env.java.opts: -Doracle.jdbc.timezoneAsRegion=false");
protected static final String DEFAULT_DOCKER_IMAGE = "flink:1.13.1-scala_2.11-java11";
@@ -62,7 +61,7 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer {
copyGrootStreamStarterToContainer(jobManager);
copyGrootStreamStarterLoggingToContainer(jobManager);
- jobManager.setPortBindings(Lists.newArrayList(String.format("%s:%s", 8084, 8081)));
+ jobManager.setPortBindings(Lists.newArrayList(String.format("%s:%s", 8999, 8081)));
taskManager =
new GenericContainer<>(dockerImage)
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerExtendedFactory.java b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/ContainerExtendedFactory.java
index c7c1df0..6945432 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerExtendedFactory.java
+++ b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/ContainerExtendedFactory.java
@@ -1,4 +1,4 @@
-package com.geedgenetworks.test.common.container;
+package com.geedgenetworks.test.e2e.common.container;
import org.testcontainers.containers.GenericContainer;
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/ContainerUtil.java
index 811bdf6..0e6f3fd 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java
+++ b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/ContainerUtil.java
@@ -1,6 +1,7 @@
-package com.geedgenetworks.test.common.container;
+package com.geedgenetworks.test.e2e.common.container;
import cn.hutool.core.util.XmlUtil;
+import com.alibaba.fastjson2.JSONObject;
import com.geedgenetworks.bootstrap.utils.ConfigBuilder;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
@@ -20,6 +21,8 @@ import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
@Slf4j
@@ -107,7 +110,7 @@ public final class ContainerUtil {
Paths.get(GrootStreamHomeInContainer, "config").toString());
// copy grootstream.yaml
- final String grootTestsCommonPath = PROJECT_ROOT_PATH + "/groot-tests/test-common/src/test/resources";
+ final String grootTestsCommonPath = PROJECT_ROOT_PATH + "/groot-tests/test-e2e-common/src/test/resources";
checkPathExist(grootTestsCommonPath);
container.withCopyFileToContainer(
MountableFile.forHostPath(grootTestsCommonPath + "/grootstream.yaml"),
@@ -200,6 +203,19 @@ public final class ContainerUtil {
return path == null ? "" : path.replaceAll("\\\\", "/");
}
+
+ public static List<JSONObject> extractJsonFromServerLogs(String logs) {
+ List<JSONObject> jsons = new ArrayList<>();
+ Pattern jsonPattern = Pattern.compile("-\\s(\\{.*?\\})");
+ Matcher matcher = jsonPattern.matcher(logs);
+ while (matcher.find()) {
+ jsons.add(JSONObject.parseObject(matcher.group(1)));
+ }
+ return jsons;
+ }
+
+
+
private static List<File> getConnectorFiles(
File currentModule, Set<String> connectorNames, String connectorPrefix) {
List<File> connectorFiles = new ArrayList<>();
@@ -347,9 +363,4 @@ public final class ContainerUtil {
-
-
-
-
-
}
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/EngineType.java b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/EngineType.java
index 4f348ae..1447c51 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/EngineType.java
+++ b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/EngineType.java
@@ -1,4 +1,4 @@
-package com.geedgenetworks.test.common.container;
+package com.geedgenetworks.test.e2e.common.container;
import lombok.AllArgsConstructor;
import lombok.Getter;
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/Flink13Container.java
index 338c696..c672784 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java
+++ b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/Flink13Container.java
@@ -1,4 +1,4 @@
-package com.geedgenetworks.test.common.container;
+package com.geedgenetworks.test.e2e.common.container;
import com.google.auto.service.AutoService;
import lombok.NoArgsConstructor;
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink17Container.java b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/Flink17Container.java
index 371b542..4a5d14f 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink17Container.java
+++ b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/Flink17Container.java
@@ -1,4 +1,4 @@
-package com.geedgenetworks.test.common.container;
+package com.geedgenetworks.test.e2e.common.container;
import com.google.auto.service.AutoService;
import lombok.NoArgsConstructor;
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainer.java b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/TestContainer.java
index b3bf77a..e8b9388 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainer.java
+++ b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/TestContainer.java
@@ -1,6 +1,6 @@
-package com.geedgenetworks.test.common.container;
+package com.geedgenetworks.test.e2e.common.container;
-import com.geedgenetworks.test.common.TestResource;
+import com.geedgenetworks.test.e2e.common.TestResource;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.Network;
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/TestContainerId.java
index e837d25..6e51720 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java
+++ b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/TestContainerId.java
@@ -1,9 +1,9 @@
-package com.geedgenetworks.test.common.container;
+package com.geedgenetworks.test.e2e.common.container;
import lombok.AllArgsConstructor;
import lombok.Getter;
-import static com.geedgenetworks.test.common.container.EngineType.FLINK;
+import static com.geedgenetworks.test.e2e.common.container.EngineType.FLINK;
@Getter
@AllArgsConstructor
public enum TestContainerId {
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainersFactory.java b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/TestContainersFactory.java
index 675ee53..7406c28 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainersFactory.java
+++ b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/TestContainersFactory.java
@@ -1,4 +1,4 @@
-package com.geedgenetworks.test.common.container;
+package com.geedgenetworks.test.e2e.common.container;
import java.util.List;
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestHelper.java b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/TestHelper.java
index e343dcd..ac4f466 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestHelper.java
+++ b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/container/TestHelper.java
@@ -1,4 +1,4 @@
-package com.geedgenetworks.test.common.container;
+package com.geedgenetworks.test.e2e.common.container;
import org.junit.jupiter.api.Assertions;
import org.testcontainers.containers.Container;
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/AnnotationUtil.java b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/junit/AnnotationUtil.java
index c10e782..af623c6 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/AnnotationUtil.java
+++ b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/junit/AnnotationUtil.java
@@ -1,8 +1,8 @@
-package com.geedgenetworks.test.common.junit;
+package com.geedgenetworks.test.e2e.common.junit;
-import com.geedgenetworks.test.common.container.TestContainer;
-import com.geedgenetworks.test.common.container.TestContainerId;
-import com.geedgenetworks.test.common.container.EngineType;
+import com.geedgenetworks.test.e2e.common.container.TestContainer;
+import com.geedgenetworks.test.e2e.common.container.TestContainerId;
+import com.geedgenetworks.test.e2e.common.container.EngineType;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.junit.platform.commons.util.AnnotationUtils;
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/ContainerTestingExtension.java b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/junit/ContainerTestingExtension.java
index bad2ce4..57a6df9 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/ContainerTestingExtension.java
+++ b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/junit/ContainerTestingExtension.java
@@ -1,8 +1,8 @@
-package com.geedgenetworks.test.common.junit;
+package com.geedgenetworks.test.e2e.common.junit;
-import com.geedgenetworks.test.common.container.ContainerExtendedFactory;
-import com.geedgenetworks.test.common.container.TestContainer;
-import com.geedgenetworks.test.common.container.TestContainersFactory;
+import com.geedgenetworks.test.e2e.common.container.ContainerExtendedFactory;
+import com.geedgenetworks.test.e2e.common.container.TestContainer;
+import com.geedgenetworks.test.e2e.common.container.TestContainersFactory;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/DisabledOnContainer.java b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/junit/DisabledOnContainer.java
index 3c4e655..4147ac5 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/DisabledOnContainer.java
+++ b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/junit/DisabledOnContainer.java
@@ -1,7 +1,7 @@
-package com.geedgenetworks.test.common.junit;
+package com.geedgenetworks.test.e2e.common.junit;
-import com.geedgenetworks.test.common.container.EngineType;
-import com.geedgenetworks.test.common.container.TestContainerId;
+import com.geedgenetworks.test.e2e.common.container.EngineType;
+import com.geedgenetworks.test.e2e.common.container.TestContainerId;
import java.lang.annotation.*;
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestCaseInvocationContextProvider.java b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/junit/TestCaseInvocationContextProvider.java
index 01f29bf..f66603e 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestCaseInvocationContextProvider.java
+++ b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/junit/TestCaseInvocationContextProvider.java
@@ -1,7 +1,7 @@
-package com.geedgenetworks.test.common.junit;
+package com.geedgenetworks.test.e2e.common.junit;
-import com.geedgenetworks.test.common.container.ContainerExtendedFactory;
-import com.geedgenetworks.test.common.container.TestContainer;
+import com.geedgenetworks.test.e2e.common.container.ContainerExtendedFactory;
+import com.geedgenetworks.test.e2e.common.container.TestContainer;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.extension.*;
@@ -9,9 +9,9 @@ import org.junit.jupiter.api.extension.*;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
-import static com.geedgenetworks.test.common.junit.ContainerTestingExtension.TEST_CONTAINERS_STORE_KEY;
-import static com.geedgenetworks.test.common.junit.ContainerTestingExtension.TEST_EXTENDED_FACTORY_STORE_KEY;
-import static com.geedgenetworks.test.common.junit.ContainerTestingExtension.TEST_RESOURCE_NAMESPACE;
+import static com.geedgenetworks.test.e2e.common.junit.ContainerTestingExtension.TEST_CONTAINERS_STORE_KEY;
+import static com.geedgenetworks.test.e2e.common.junit.ContainerTestingExtension.TEST_EXTENDED_FACTORY_STORE_KEY;
+import static com.geedgenetworks.test.e2e.common.junit.ContainerTestingExtension.TEST_RESOURCE_NAMESPACE;
@Slf4j
public class TestCaseInvocationContextProvider implements TestTemplateInvocationContextProvider {
@Override
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestContainerExtension.java b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/junit/TestContainerExtension.java
index 0c18003..1f140df 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestContainerExtension.java
+++ b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/junit/TestContainerExtension.java
@@ -1,4 +1,4 @@
-package com.geedgenetworks.test.common.junit;
+package com.geedgenetworks.test.e2e.common.junit;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestContainers.java b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/junit/TestContainers.java
index c7ffa17..64d536e 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestContainers.java
+++ b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/junit/TestContainers.java
@@ -1,4 +1,4 @@
-package com.geedgenetworks.test.common.junit;
+package com.geedgenetworks.test.e2e.common.junit;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestLoggerExtension.java b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/junit/TestLoggerExtension.java
index e48135c..e6ec46a 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/junit/TestLoggerExtension.java
+++ b/groot-tests/test-e2e-common/src/test/java/com/geedgenetworks/test/e2e/common/junit/TestLoggerExtension.java
@@ -1,4 +1,4 @@
-package com.geedgenetworks.test.common.junit;
+package com.geedgenetworks.test.e2e.common.junit;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
diff --git a/groot-tests/test-common/src/test/resources/grootstream.yaml b/groot-tests/test-e2e-common/src/test/resources/grootstream.yaml
index 0def444..0def444 100644
--- a/groot-tests/test-common/src/test/resources/grootstream.yaml
+++ b/groot-tests/test-e2e-common/src/test/resources/grootstream.yaml
diff --git a/groot-tests/test-common/src/test/resources/log4j2.properties b/groot-tests/test-e2e-common/src/test/resources/log4j2.properties
index fb3ac1e..fb3ac1e 100644
--- a/groot-tests/test-common/src/test/resources/log4j2.properties
+++ b/groot-tests/test-e2e-common/src/test/resources/log4j2.properties
diff --git a/groot-tests/test-e2e-clickhouse/pom.xml b/groot-tests/test-e2e-connector-clickhouse/pom.xml
index d575f15..b9d4564 100644
--- a/groot-tests/test-e2e-clickhouse/pom.xml
+++ b/groot-tests/test-e2e-connector-clickhouse/pom.xml
@@ -9,8 +9,8 @@
<version>${revision}</version>
</parent>
- <artifactId>test-e2e-clickhouse</artifactId>
- <name>Groot : Tests : E2E : ClickHouse</name>
+ <artifactId>test-e2e-connector-clickhouse</artifactId>
+ <name>Groot : Tests : E2E : Connector : ClickHouse</name>
<properties>
<maven.compiler.source>11</maven.compiler.source>
@@ -25,7 +25,7 @@
<dependency>
<groupId>com.geedgenetworks</groupId>
- <artifactId>test-common</artifactId>
+ <artifactId>test-e2e-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
diff --git a/groot-tests/test-e2e-clickhouse/src/test/java/com/geedgenetworks/test/e2e/clickhouse/ClickHouseIT.java b/groot-tests/test-e2e-connector-clickhouse/src/test/java/com/geedgenetworks/test/e2e/connector/clickhouse/ClickHouseIT.java
index 8b44ed7..8b25f14 100644
--- a/groot-tests/test-e2e-clickhouse/src/test/java/com/geedgenetworks/test/e2e/clickhouse/ClickHouseIT.java
+++ b/groot-tests/test-e2e-connector-clickhouse/src/test/java/com/geedgenetworks/test/e2e/connector/clickhouse/ClickHouseIT.java
@@ -1,12 +1,12 @@
-package com.geedgenetworks.test.e2e.clickhouse;
+package com.geedgenetworks.test.e2e.connector.clickhouse;
import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.test.common.TestResource;
-import com.geedgenetworks.test.common.TestSuiteBase;
-import com.geedgenetworks.test.common.container.ContainerUtil;
-import com.geedgenetworks.test.common.container.TestContainer;
-import com.geedgenetworks.test.common.container.TestContainerId;
-import com.geedgenetworks.test.common.junit.DisabledOnContainer;
+import com.geedgenetworks.test.e2e.common.TestResource;
+import com.geedgenetworks.test.e2e.common.TestSuiteBase;
+import com.geedgenetworks.test.e2e.common.container.ContainerUtil;
+import com.geedgenetworks.test.e2e.common.container.TestContainer;
+import com.geedgenetworks.test.e2e.common.container.TestContainerId;
+import com.geedgenetworks.test.e2e.common.junit.DisabledOnContainer;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -20,20 +20,16 @@ import org.testcontainers.containers.ClickHouseContainer;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
import org.testcontainers.utility.DockerLoggerFactory;
import org.testcontainers.utility.MountableFile;
import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
import java.math.BigDecimal;
-import java.nio.charset.StandardCharsets;
import java.sql.*;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.awaitility.Awaitility.await;
diff --git a/groot-tests/test-e2e-clickhouse/src/test/resources/clickhouse_data_type_sink.yaml b/groot-tests/test-e2e-connector-clickhouse/src/test/resources/clickhouse_data_type_sink.yaml
index 3406a67..3406a67 100644
--- a/groot-tests/test-e2e-clickhouse/src/test/resources/clickhouse_data_type_sink.yaml
+++ b/groot-tests/test-e2e-connector-clickhouse/src/test/resources/clickhouse_data_type_sink.yaml
diff --git a/groot-tests/test-e2e-clickhouse/src/test/resources/init/clickhouse_test_sql.conf b/groot-tests/test-e2e-connector-clickhouse/src/test/resources/init/clickhouse_test_sql.conf
index f132795..f132795 100644
--- a/groot-tests/test-e2e-clickhouse/src/test/resources/init/clickhouse_test_sql.conf
+++ b/groot-tests/test-e2e-connector-clickhouse/src/test/resources/init/clickhouse_test_sql.conf
diff --git a/groot-tests/test-e2e-clickhouse/src/test/resources/init/init-clickhouse.sql b/groot-tests/test-e2e-connector-clickhouse/src/test/resources/init/init-clickhouse.sql
index fd9daac..fd9daac 100644
--- a/groot-tests/test-e2e-clickhouse/src/test/resources/init/init-clickhouse.sql
+++ b/groot-tests/test-e2e-connector-clickhouse/src/test/resources/init/init-clickhouse.sql
diff --git a/groot-tests/test-e2e-clickhouse/src/test/resources/init/users.xml b/groot-tests/test-e2e-connector-clickhouse/src/test/resources/init/users.xml
index 86a590d..86a590d 100644
--- a/groot-tests/test-e2e-clickhouse/src/test/resources/init/users.xml
+++ b/groot-tests/test-e2e-connector-clickhouse/src/test/resources/init/users.xml
diff --git a/groot-tests/test-e2e-kafka/pom.xml b/groot-tests/test-e2e-connector-kafka/pom.xml
index 3d66b2a..5f98746 100644
--- a/groot-tests/test-e2e-kafka/pom.xml
+++ b/groot-tests/test-e2e-connector-kafka/pom.xml
@@ -9,14 +9,14 @@
<version>${revision}</version>
</parent>
- <artifactId>test-e2e-kafka</artifactId>
- <name>Groot : Tests : E2E : Kafka</name>
+ <artifactId>test-e2e-connector-kafka</artifactId>
+ <name>Groot : Tests : E2E : Connector : Kafka</name>
<dependencies>
<dependency>
<groupId>com.geedgenetworks</groupId>
- <artifactId>test-common</artifactId>
+ <artifactId>test-e2e-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
diff --git a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaIT.java b/groot-tests/test-e2e-connector-kafka/src/test/java/com/geedgenetworks/test/e2e/connector/kafka/KafkaIT.java
index e60d34d..a1bd86b 100644
--- a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaIT.java
+++ b/groot-tests/test-e2e-connector-kafka/src/test/java/com/geedgenetworks/test/e2e/connector/kafka/KafkaIT.java
@@ -1,13 +1,13 @@
-package com.geedgenetworks.test.e2e.kafka;
+package com.geedgenetworks.test.e2e.connector.kafka;
import com.geedgenetworks.formats.json.JsonSerializer;
import com.geedgenetworks.api.connector.type.StructType;
import com.geedgenetworks.api.connector.type.Types;
-import com.geedgenetworks.test.common.TestResource;
-import com.geedgenetworks.test.common.TestSuiteBase;
-import com.geedgenetworks.test.common.container.TestContainer;
-import com.geedgenetworks.test.common.container.TestContainerId;
-import com.geedgenetworks.test.common.junit.DisabledOnContainer;
+import com.geedgenetworks.test.e2e.common.TestResource;
+import com.geedgenetworks.test.e2e.common.TestSuiteBase;
+import com.geedgenetworks.test.e2e.common.container.TestContainer;
+import com.geedgenetworks.test.e2e.common.container.TestContainerId;
+import com.geedgenetworks.test.e2e.common.junit.DisabledOnContainer;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -115,6 +115,7 @@ public class KafkaIT extends TestSuiteBase implements TestResource {
@TestTemplate
public void testKafkaAsSourceConsumeErrorSchema(TestContainer container) {
generateTestData("test_topic_error_json", 0, 10);
+
CompletableFuture.supplyAsync(
() -> {
try {
@@ -122,7 +123,6 @@ public class KafkaIT extends TestSuiteBase implements TestResource {
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
return execResult;
} catch (Exception e) {
- log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
});
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_client_jass_cli.properties b/groot-tests/test-e2e-connector-kafka/src/test/resources/kafka_client_jass_cli.properties
index 986cdb9..986cdb9 100644
--- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_client_jass_cli.properties
+++ b/groot-tests/test-e2e-connector-kafka/src/test/resources/kafka_client_jass_cli.properties
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_producer_quota.yaml b/groot-tests/test-e2e-connector-kafka/src/test/resources/kafka_producer_quota.yaml
index 8c2ad8d..8c2ad8d 100644
--- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_producer_quota.yaml
+++ b/groot-tests/test-e2e-connector-kafka/src/test/resources/kafka_producer_quota.yaml
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_server_jaas.conf b/groot-tests/test-e2e-connector-kafka/src/test/resources/kafka_server_jaas.conf
index cb4553f..cb4553f 100644
--- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_server_jaas.conf
+++ b/groot-tests/test-e2e-connector-kafka/src/test/resources/kafka_server_jaas.conf
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink.yaml b/groot-tests/test-e2e-connector-kafka/src/test/resources/kafka_sink.yaml
index e12e76b..e12e76b 100644
--- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink.yaml
+++ b/groot-tests/test-e2e-connector-kafka/src/test/resources/kafka_sink.yaml
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml b/groot-tests/test-e2e-connector-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml
index d65157a..d65157a 100644
--- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml
+++ b/groot-tests/test-e2e-connector-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml b/groot-tests/test-e2e-connector-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml
index d9cb80f..d9cb80f 100644
--- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml
+++ b/groot-tests/test-e2e-connector-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source.yaml b/groot-tests/test-e2e-connector-kafka/src/test/resources/kafka_source.yaml
index 3403ab9..3403ab9 100644
--- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source.yaml
+++ b/groot-tests/test-e2e-connector-kafka/src/test/resources/kafka_source.yaml
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml b/groot-tests/test-e2e-connector-kafka/src/test/resources/kafka_source_error_schema.yaml
index 1016560..1016560 100644
--- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml
+++ b/groot-tests/test-e2e-connector-kafka/src/test/resources/kafka_source_error_schema.yaml
diff --git a/groot-tests/test-e2e-base/pom.xml b/groot-tests/test-e2e-core/pom.xml
index 4a664b9..25e0cf9 100644
--- a/groot-tests/test-e2e-base/pom.xml
+++ b/groot-tests/test-e2e-core/pom.xml
@@ -9,8 +9,8 @@
<version>${revision}</version>
</parent>
- <artifactId>test-e2e-base</artifactId>
- <name>Groot : Tests : E2E : Base</name>
+ <artifactId>test-e2e-core</artifactId>
+ <name>Groot : Tests : E2E : Core</name>
<properties>
@@ -19,7 +19,7 @@
<dependencies>
<dependency>
<groupId>com.geedgenetworks</groupId>
- <artifactId>test-common</artifactId>
+ <artifactId>test-e2e-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterIT.java b/groot-tests/test-e2e-core/src/test/java/com/geedgenetworks/test/e2e/core/EnvParameterIT.java
index aa00d8d..1a24e6f 100644
--- a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterIT.java
+++ b/groot-tests/test-e2e-core/src/test/java/com/geedgenetworks/test/e2e/core/EnvParameterIT.java
@@ -1,14 +1,14 @@
-package com.geedgenetworks.test.e2e.base;
+package com.geedgenetworks.test.e2e.core;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
-import com.geedgenetworks.test.common.TestSuiteBase;
-import com.geedgenetworks.test.common.container.AbstractTestFlinkContainer;
-import com.geedgenetworks.test.common.container.ContainerExtendedFactory;
-import com.geedgenetworks.test.common.container.TestContainerId;
-import com.geedgenetworks.test.common.junit.DisabledOnContainer;
-import com.geedgenetworks.test.common.junit.TestContainerExtension;
+import com.geedgenetworks.test.e2e.common.TestSuiteBase;
+import com.geedgenetworks.test.e2e.common.container.AbstractTestFlinkContainer;
+import com.geedgenetworks.test.e2e.common.container.ContainerExtendedFactory;
+import com.geedgenetworks.test.e2e.common.container.TestContainerId;
+import com.geedgenetworks.test.e2e.common.junit.DisabledOnContainer;
+import com.geedgenetworks.test.e2e.common.junit.TestContainerExtension;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/Flink13Container.java b/groot-tests/test-e2e-core/src/test/java/com/geedgenetworks/test/e2e/core/Flink13Container.java
index 43c6eeb..b47dece 100644
--- a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/Flink13Container.java
+++ b/groot-tests/test-e2e-core/src/test/java/com/geedgenetworks/test/e2e/core/Flink13Container.java
@@ -1,4 +1,4 @@
-package com.geedgenetworks.test.e2e.base;
+package com.geedgenetworks.test.e2e.core;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterAll;
@@ -10,7 +10,7 @@ import java.io.IOException;
@Slf4j
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class Flink13Container extends com.geedgenetworks.test.common.container.Flink13Container {
+public class Flink13Container extends com.geedgenetworks.test.e2e.common.container.Flink13Container {
@Override
@BeforeAll
public void startUp() throws Exception {
diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java b/groot-tests/test-e2e-core/src/test/java/com/geedgenetworks/test/e2e/core/InlineToPrintIT.java
index fdba36f..fadf6f3 100644
--- a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java
+++ b/groot-tests/test-e2e-core/src/test/java/com/geedgenetworks/test/e2e/core/InlineToPrintIT.java
@@ -1,12 +1,12 @@
-package com.geedgenetworks.test.e2e.base;
+package com.geedgenetworks.test.e2e.core;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
-import com.geedgenetworks.test.common.TestSuiteBase;
-import com.geedgenetworks.test.common.container.AbstractTestFlinkContainer;
-import com.geedgenetworks.test.common.container.TestContainerId;
-import com.geedgenetworks.test.common.junit.DisabledOnContainer;
+import com.geedgenetworks.test.e2e.common.TestSuiteBase;
+import com.geedgenetworks.test.e2e.common.container.AbstractTestFlinkContainer;
+import com.geedgenetworks.test.e2e.common.container.TestContainerId;
+import com.geedgenetworks.test.e2e.common.junit.DisabledOnContainer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Assertions;
@@ -29,7 +29,7 @@ public class InlineToPrintIT extends TestSuiteBase {
@TestTemplate
- public void testJobExecution(AbstractTestFlinkContainer container) throws IOException, InterruptedException {
+ public void testJobExecution(AbstractTestFlinkContainer container) {
CompletableFuture.supplyAsync(
() -> {
try {
@@ -97,7 +97,7 @@ public class InlineToPrintIT extends TestSuiteBase {
}
@TestTemplate
- public void testUserDefinedJobVariables(AbstractTestFlinkContainer container) throws IOException, InterruptedException {
+ public void testUserDefinedJobVariables(AbstractTestFlinkContainer container) {
CompletableFuture.supplyAsync(
() -> {
@@ -117,6 +117,7 @@ public class InlineToPrintIT extends TestSuiteBase {
.untilAsserted(
() -> {
String logs = container.getServerLogs();
+
Assertions.assertTrue(StringUtils.countMatches(logs, "cli_job_level_traffic_rtp_file_bucket/test_pcap_file") > 10);
Assertions.assertTrue(StringUtils.countMatches(logs, "cli_job_level_traffic_http_file_bucket/test_http_req_file") > 10);
// Test server_ip filter -> output logs not contains 4.4.4.4 of server_ip
@@ -140,10 +141,6 @@ public class InlineToPrintIT extends TestSuiteBase {
Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "client_ip_list"));
-
-
-
-
});
diff --git a/groot-tests/test-e2e-core/src/test/java/com/geedgenetworks/test/e2e/core/ProcessorIT.java b/groot-tests/test-e2e-core/src/test/java/com/geedgenetworks/test/e2e/core/ProcessorIT.java
new file mode 100644
index 0000000..053bad6
--- /dev/null
+++ b/groot-tests/test-e2e-core/src/test/java/com/geedgenetworks/test/e2e/core/ProcessorIT.java
@@ -0,0 +1,69 @@
+package com.geedgenetworks.test.e2e.core;
+
+import com.alibaba.fastjson2.JSONObject;
+import com.geedgenetworks.test.e2e.common.TestSuiteBase;
+import com.geedgenetworks.test.e2e.common.container.AbstractTestFlinkContainer;
+import com.geedgenetworks.test.e2e.common.container.ContainerUtil;
+import com.geedgenetworks.test.e2e.common.container.TestContainerId;
+import com.geedgenetworks.test.e2e.common.junit.DisabledOnContainer;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {TestContainerId.FLINK_1_17},
+ disabledReason = "Override TestSuiteBase @DisabledOnContainer")
+public class ProcessorIT extends TestSuiteBase {
+
+ @TestTemplate
+ public void testJobSplitProcessor(AbstractTestFlinkContainer container) {
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return container.executeJob("/job_split_processor.yaml");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ await().atMost(90000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ String logs = container.getServerLogs();
+ List<JSONObject> result = ContainerUtil.extractJsonFromServerLogs(logs);
+ Assertions.assertEquals(7, result.size());
+ });
+
+
+ }
+
+
+ @TestTemplate
+ public void testJobAggregateProcessor(AbstractTestFlinkContainer container) {
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return container.executeJob("/job_aggregate_processor.yaml");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ await().atMost(90000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ String logs = container.getServerLogs();
+ List<JSONObject> result = ContainerUtil.extractJsonFromServerLogs(logs);
+ Assertions.assertEquals(4, result.size());
+ });
+ }
+}
diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/TestUtils.java b/groot-tests/test-e2e-core/src/test/java/com/geedgenetworks/test/e2e/core/TestUtils.java
index 4aa2dc6..7badb2b 100644
--- a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/TestUtils.java
+++ b/groot-tests/test-e2e-core/src/test/java/com/geedgenetworks/test/e2e/core/TestUtils.java
@@ -1,4 +1,4 @@
-package com.geedgenetworks.test.e2e.base;
+package com.geedgenetworks.test.e2e.core;
import lombok.extern.slf4j.Slf4j;
diff --git a/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml b/groot-tests/test-e2e-core/src/test/resources/inline_to_print.yaml
index abb42a4..4e452c7 100644
--- a/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml
+++ b/groot-tests/test-e2e-core/src/test/resources/inline_to_print.yaml
@@ -10,16 +10,16 @@ filters:
server_ip_filter:
type: filter
properties:
- expression: event.server_ip != '4.4.4.4'
+ expression: server_ip != '4.4.4.4'
splits:
decoded_as_split:
type: split
rules:
- tag: http_tag
- expression: event.decoded_as == 'HTTP'
+ expression: decoded_as == 'HTTP'
- tag: dns_tag
- expression: event.decoded_as == 'DNS'
+ expression: decoded_as == 'DNS'
processing_pipelines:
@@ -29,7 +29,7 @@ processing_pipelines:
functions:
- function: DROP
- filter: event.server_ip == '5.5.5.5'
+ filter: server_ip == '5.5.5.5'
- function: SNOWFLAKE_ID
output_fields: [ log_id ]
diff --git a/groot-tests/test-e2e-core/src/test/resources/job_aggregate_processor.yaml b/groot-tests/test-e2e-core/src/test/resources/job_aggregate_processor.yaml
new file mode 100644
index 0000000..ebd51e3
--- /dev/null
+++ b/groot-tests/test-e2e-core/src/test/resources/job_aggregate_processor.yaml
@@ -0,0 +1,76 @@
+sources:
+ inline_source:
+ type : inline
+ fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
+ properties: # record 3,4 will be aggreated
+ data: '[{"pkts":1,"sessions":1,"log_id": 1, "recv_time":"1724925692000","client_ips":["192.168.0.2","192.168.0.1"],"client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"pkts":1,"sessions":1,"decoded_as":null,"log_id": 1, "recv_time":"1724925692000","client_ips":["192.168.0.2","192.168.0.1"], "client_ip":"192.168.0.1","server_ip":"2600:1015:b002::"},{"pkts":2,"sessions":1,"decoded_as":"HTTP","log_id": 2, "recv_time":"1724925692000","client_ips":["192.168.0.2","192.168.0.3"], "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 2, "recv_time":"1724925692000","client_ips":["192.168.0.2","192.168.0.1"], "client_ip":"192.168.0.2","pkts":3,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1,"client_ips":["192.168.0.2","192.168.0.3"], "recv_time":"1724936692000", "client_ip":"192.168.0.2","pkts":4,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"1724937692000", "client_ip":"192.168.0.2","pkts":5,"server_ip":"2600:1015:b002::"}]'
+ interval.per.row: 1s # 可选
+ repeat.count: 1 # 可选
+ format: json
+ json.ignore.parse.errors: false
+ watermark_timestamp: recv_time
+ watermark_timestamp_unit: ms
+ watermark_lag: 10
+
+sinks:
+ print_sink:
+ type: print
+ properties:
+ format: json
+ mode: log_warn
+
+postprocessing_pipelines:
+
+ aggregate_processor:
+ type: aggregate
+ group_by_fields: [decoded_as]
+ window_type: tumbling_event_time # tumbling_event_time,sliding_processing_time,sliding_event_time
+ window_size: 5
+ window_timestamp_field: test_time
+ mini_batch: false
+ functions:
+ - function: NUMBER_SUM
+ lookup_fields: [ sessions ]
+ - function: MEAN
+ lookup_fields: [ pkts ]
+ - function: MAX
+ lookup_fields: [ pkts ]
+ output_fields: [ pktsmax ]
+ - function: MIN
+ lookup_fields: [ pkts ]
+ output_fields: [ pktsmin ]
+ - function: LONG_COUNT
+ output_fields: [ count ]
+ - function: COLLECT_LIST
+ lookup_fields: [ client_ip ]
+ output_fields: [ client_ip_list ]
+ - function: COLLECT_SET
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_ip_set ]
+ - function: FIRST_VALUE
+ lookup_fields: [ log_id ]
+ output_fields: [ log_id_first ]
+ - function: LAST_VALUE
+ lookup_fields: [ log_id ]
+ output_fields: [ log_id_last ]
+ - function: COLLECT_SET
+ lookup_fields: [ client_ips ]
+ output_fields: [ client_ips_set ]
+ parameters:
+ collect_type: array
+application: # [object] Application Configuration
+ env: # [object] Environment Variables
+ name: groot-stream-job # [string] Job Name
+ pipeline:
+ object-reuse: true # [boolean] Object Reuse, default is false
+ topology: # [array of object] Node List. It will be used build data flow for job dag graph.
+ - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE.
+ parallelism: 1 # [number] Operator-Level Parallelism.
+ downstream: [ aggregate_processor ]
+ - name: aggregate_processor
+ parallelism: 1
+ downstream: [ print_sink ]
+ - name: print_sink
+ parallelism: 1
+
+
diff --git a/groot-tests/test-e2e-core/src/test/resources/job_split_processor.yaml b/groot-tests/test-e2e-core/src/test/resources/job_split_processor.yaml
new file mode 100644
index 0000000..9dc68ec
--- /dev/null
+++ b/groot-tests/test-e2e-core/src/test/resources/job_split_processor.yaml
@@ -0,0 +1,99 @@
+sources:
+ inline_source:
+ type : inline
+ fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
+ properties:
+ data: '[{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]'
+ interval.per.row: 1s # 可选
+ repeat.count: 1 # 可选
+ format: json
+ json.ignore.parse.errors: false
+sinks:
+ print_sink:
+ type: print
+ properties:
+ format: json
+ mode: log_warn
+splits:
+ test_split:
+ type: split
+ rules:
+ - tag: http_tag
+ expression: decoded_as == 'HTTP'
+ - tag: dns_tag
+ expression: decoded_as == 'DNS'
+
+postprocessing_pipelines:
+ pre_etl_processor: # [object] Processing Pipeline
+ type: projection
+ remove_fields: [fields,tags]
+ output_fields:
+ functions: # [array of object] Function List
+
+ - function: FLATTEN
+ lookup_fields: [ fields,tags ]
+ output_fields: [ ]
+ parameters:
+ #prefix: ""
+ depth: 3
+ # delimiter: "."
+
+ - function: RENAME
+ lookup_fields: [ '' ]
+ output_fields: [ '' ]
+ filter:
+ parameters:
+ # parent_fields: [tags]
+ # rename_fields:
+ # tags: tags
+ rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key;
+
+
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [ timestamp_ms ]
+ output_fields: [ recv_time ]
+ parameters:
+ precision: seconds
+ interval: 300
+ #
+
+ aggregate_processor:
+ type: aggregate
+ group_by_fields: [decoded_as]
+ window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time
+ window_size: 5
+ window_timestamp_field: test_time
+ functions:
+ - function: NUMBER_SUM
+ lookup_fields: [ sessions ]
+
+ table_processor:
+ type: table
+ functions:
+ - function: JSON_UNROLL
+ lookup_fields: [ encapsulation ]
+ output_fields: [ new_name ]
+
+application: # [object] Application Configuration
+ env: # [object] Environment Variables
+ name: groot-stream-job # [string] Job Name
+ pipeline:
+ object-reuse: true # [boolean] Object Reuse, default is false
+ topology: # [array of object] Node List. It will be used build data flow for job dag graph.
+ - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE.
+ parallelism: 1 # [number] Operator-Level Parallelism.
+ downstream: [test_split,print_sink]
+ - name: test_split
+ tags: [http_tag,dns_tag]
+ downstream: [ table_processor,pre_etl_processor ]
+ parallelism: 1
+ - name: pre_etl_processor
+ parallelism: 1
+ downstream: [ print_sink ]
+ - name: table_processor
+ parallelism: 1
+ downstream: [ print_sink ]
+ - name: print_sink
+ parallelism: 1
+
+
diff --git a/groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml b/groot-tests/test-e2e-core/src/test/resources/test_env_parameter_inline_to_print.yaml
index 1d09282..1d09282 100644
--- a/groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml
+++ b/groot-tests/test-e2e-core/src/test/resources/test_env_parameter_inline_to_print.yaml