summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author窦凤虎 <[email protected]>2024-03-16 07:58:33 +0000
committer窦凤虎 <[email protected]>2024-03-16 07:58:33 +0000
commit9ff68b2c631606cf06a7001036ff16475c52371c (patch)
treed58322d7737470faed8f2ec17d8afccb13839d74
parentdfabdff861f12fcd99267d6dcbd5fc4c64bcd01e (diff)
parentcd00fed1c16ae236ddeb18dac45d567736960c3e (diff)
Merge branch 'feature/testcontainers' into 'develop'
Feature/testcontainers See merge request galaxy/platform/groot-stream!25
-rw-r--r--docs/env-config.md22
-rw-r--r--groot-bootstrap/pom.xml9
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java27
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java111
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java78
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java2
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink17Container.java43
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java3
-rw-r--r--groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterTest.java198
-rw-r--r--groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java91
-rw-r--r--groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/TestUtils.java21
-rw-r--r--groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml47
-rw-r--r--pom.xml2
13 files changed, 614 insertions, 40 deletions
diff --git a/docs/env-config.md b/docs/env-config.md
index ac9ca27..3e91f3e 100644
--- a/docs/env-config.md
+++ b/docs/env-config.md
@@ -10,12 +10,20 @@ Above three ways to specify the job name, the priority is `flink run` > `name` i
An execution environment defines a default parallelism for all processors, filters, data sources, and data sinks it executes. In addition, the parallelism of a job can be specified on different levels, and the priority is `Operator Level` > `Execution Environment Level` > `Client Level` > `System Level`.
Note: The parallelism of a job can be overridden by explicitly configuring the parallelism of a processor, filter, data source, or data sink in the configuration file.
-
- Operator Level: The parallelism of a processor, filter, data source, or data sink can be specified in the configuration file.
- Execution Environment Level: The parallelism of a job can be specified in the env configuration file.
- Client Level: The parallelism of a job can be specified by using the `flink run -p` command.
- System Level: The parallelism of a job can be specified by using the `flink-conf.yaml` file.
+### execution.buffer-timeout
+The maximum time frequency (milliseconds) for the flushing of the output buffers. If is not specified, the default value is `100`.
+You can set directly in Flink's parameter `fink.execution.buffer-timeout` to override the value in the configuration file.
+- A positive value triggers flushing periodically by that interval
+- 0 triggers flushing after every record thus minimizing latency
+- -1 ms triggers flushing only when the output buffer is full thus maximizing throughput
+### execution.runtime-mode
+This parameter is used to define the runtime mode of the job, the default value is `STREAMING`. If you want to run the job in batch mode, you can set `execution.runtime-mode = "BATCH"`.
+
### shade.identifier
Specify the method of encryption, if you didn't have the requirement for encrypting or decrypting sensitive information in the configuration file, this option can be ignored.
For more details, you can refer to the documentation [config-encryption-decryption](connector/config-encryption-decryption.md)
@@ -33,5 +41,15 @@ Specify a list of jar URLs via `pipeline.jars`, The jars are separated by `;` an
Specify a list of classpath URLs via `pipeline.classpaths`, The classpaths are separated by `;` and will be added to the classpath of the flink cluster.
## Engine Parameter
+You can directly use the flink parameter by prefixing `flink.`, such as `flink.execution.buffer-timeout`, `flink.object-reuse`, etc. More details can be found in the official [flink documentation](https://flink.apache.org/).
+Of course, you can use groot stream parameter, here are some parameter names corresponding to the names in Flink.
+
+| Groot Stream | Flink |
+|--------------------------|--------------------------------|
+| execution.buffer-timeout | flink.execution.buffer-timeout |
+| pipeline.object-reuse | flink.object-reuse |
+| pipeline.max-parallelism | flink.pipeline.max-parallelism |
+| ... | ... |
+
+
-Some flink parameter names corresponding use prefix `flink.`, more details please refer to the official [flink documentation](https://flink.apache.org/). such as `flink.execution.checkpointing.mode`, `flink.execution.checkpointing.timeout`, etc.
diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml
index 0a1fd9c..7b21a43 100644
--- a/groot-bootstrap/pom.xml
+++ b/groot-bootstrap/pom.xml
@@ -115,10 +115,17 @@
<scope>${scope}</scope>
</dependency>
+ <!-- flink state backend rocksdb api -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>${scope}</scope>
+ </dependency>
+
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
- <scope>${scope}</scope>
</dependency>
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java
index 2d82bd5..e487e2d 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java
@@ -4,18 +4,21 @@ public class ExecutionConfigKeyName {
private ExecutionConfigKeyName() {
throw new UnsupportedOperationException("Utility class should not be instantiated");
}
-
- public static final String TIME_CHARACTERISTIC = "execution.time-characteristic";
- public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer.timeout";
public static final String PARALLELISM = "parallelism";
- public static final String MAX_PARALLELISM = "execution.max-parallelism";
- public static final String CHECKPOINT_MODE = "execution.checkpoint.mode";
- public static final String CHECKPOINT_TIMEOUT = "execution.checkpoint.timeout";
- public static final String CHECKPOINT_DATA_URI = "execution.checkpoint.data-uri";
- public static final String MAX_CONCURRENT_CHECKPOINTS = "execution.max-concurrent-checkpoints";
- public static final String CHECKPOINT_CLEANUP_MODE = "execution.checkpoint.cleanup-mode";
- public static final String MIN_PAUSE_BETWEEN_CHECKPOINTS = "execution.checkpoint.min-pause";
- public static final String FAIL_ON_CHECKPOINTING_ERRORS = "execution.checkpoint.fail-on-error";
+ public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer-timeout";
+
+ public static final String RUNTIME_MODE = "execution.runtime-mode";
+ public static final String MAX_PARALLELISM = "pipeline.max-parallelism";
+ public static final String TIME_CHARACTERISTIC = "pipeline.time-characteristic";
+ public static final String CHECKPOINTING_INTERVAL = "execution.checkpointing.interval";
+ public static final String CHECKPOINTING_MODE = "execution.checkpointing.mode";
+ public static final String CHECKPOINTING_TIMEOUT = "execution.checkpointing.timeout";
+ public static final String CHECKPOINTING_DATA_URI = "execution.checkpointing.data-uri";
+ public static final String MAX_CONCURRENT_CHECKPOINTS = "execution.checkpointing.max-concurrent-checkpoints";
+ public static final String CHECKPOINTING_CLEANUP = "execution.checkpointing.cleanup";
+ public static final String MIN_PAUSE_BETWEEN_CHECKPOINTS = "execution.checkpointing.min-pause";
+ public static final String TOLERABLE_FAILED_CHECKPOINTS = "execution.checkpointing.tolerable-failed-checkpoints";
+ public static final String STATE_BACKEND = "state.backend";
public static final String RESTART_STRATEGY = "execution.restart.strategy";
public static final String RESTART_ATTEMPTS = "execution.restart.attempts";
public static final String RESTART_DELAY_BETWEEN_ATTEMPTS = "execution.restart.delayBetweenAttempts";
@@ -24,7 +27,7 @@ public class ExecutionConfigKeyName {
public static final String RESTART_DELAY_INTERVAL = "execution.restart.delayInterval";
public static final String MAX_STATE_RETENTION_TIME = "execution.query.state.max-retention";
public static final String MIN_STATE_RETENTION_TIME = "execution.query.state.min-retention";
- public static final String STATE_BACKEND = "execution.state.backend";
+
public static final String PLANNER = "execution.planner";
//third-party packages can be loaded via `jars`
public static final String JARS = "jars";
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
index c719f46..7141f5e 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
@@ -9,11 +9,21 @@ import com.geedgenetworks.common.config.GrootStreamConfig;
import com.geedgenetworks.common.utils.ReflectionUtils;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.TernaryBoolean;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
@@ -129,8 +139,10 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(grootStreamConfig.getCommonConfig()));
environment.getConfig().setGlobalJobParameters(configuration);
setTimeCharacteristic();
- // setCheckpoint();
+ setCheckpoint();
+ setRuntimeMode();
EnvironmentUtil.setRestartStrategy(envConfig, environment.getConfig());
+
if (envConfig.hasPath(ExecutionConfigKeyName.BUFFER_TIMEOUT_MILLIS)) {
long timeout = envConfig.getLong(ExecutionConfigKeyName.BUFFER_TIMEOUT_MILLIS);
environment.setBufferTimeout(timeout);
@@ -141,6 +153,26 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
environment.setParallelism(parallelism);
}
+ if (envConfig.hasPath(ExecutionConfigKeyName.MAX_PARALLELISM)) {
+ int maxParallelism = envConfig.getInt(ExecutionConfigKeyName.MAX_PARALLELISM);
+ environment.setMaxParallelism(maxParallelism);
+ }
+
+ }
+
+ private void setRuntimeMode() {
+ if (envConfig.hasPath(ExecutionConfigKeyName.RUNTIME_MODE)) {
+ String runtimeMode = envConfig.getString(ExecutionConfigKeyName.RUNTIME_MODE);
+ if (runtimeMode.equalsIgnoreCase("batch")) {
+ environment.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ } else if (runtimeMode.equalsIgnoreCase("streaming")) {
+ environment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+ } else {
+ log.warn(
+ "set runtime-mode failed, unknown runtime-mode [{}],only support batch,streaming",
+ runtimeMode);
+ }
+ }
}
private void setTimeCharacteristic() {
@@ -165,6 +197,83 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
}
}
+ private void setCheckpoint() {
+ long interval = 0;
+ if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_INTERVAL)) {
+ interval = envConfig.getLong(ExecutionConfigKeyName.CHECKPOINTING_INTERVAL);
+ }
+
+ if (interval > 0) {
+ CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
+ environment.enableCheckpointing(interval);
+ if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_MODE)) {
+ String mode = envConfig.getString(ExecutionConfigKeyName.CHECKPOINTING_MODE);
+ switch (mode.toLowerCase()) {
+ case "exactly-once":
+ checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+ break;
+ case "at-least-once":
+ checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
+ break;
+ default:
+ log.warn(
+ "set checkpointing.mode failed, unknown checkpointing.mode [{}],only support exactly-once,at-least-once",
+ mode);
+ break;
+ }
+ }
+
+ if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_TIMEOUT)) {
+ long timeout = envConfig.getLong(ExecutionConfigKeyName.CHECKPOINTING_TIMEOUT);
+ checkpointConfig.setCheckpointTimeout(timeout);
+ }
+
+ if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_DATA_URI)) {
+ String uri = envConfig.getString(ExecutionConfigKeyName.CHECKPOINTING_DATA_URI);
+ StateBackend fsStateBackend = new FsStateBackend(uri);
+ if (envConfig.hasPath(ExecutionConfigKeyName.STATE_BACKEND)) {
+ String stateBackend = envConfig.getString(ExecutionConfigKeyName.STATE_BACKEND);
+ if ("rocksdb".equalsIgnoreCase(stateBackend)) {
+ StateBackend rocksDBStateBackend =
+ new RocksDBStateBackend(fsStateBackend, TernaryBoolean.TRUE);
+ environment.setStateBackend(rocksDBStateBackend);
+ }
+ } else {
+ environment.setStateBackend(fsStateBackend);
+ }
+ }
+
+ if (envConfig.hasPath(ExecutionConfigKeyName.MAX_CONCURRENT_CHECKPOINTS)) {
+ int max = envConfig.getInt(ExecutionConfigKeyName.MAX_CONCURRENT_CHECKPOINTS);
+ checkpointConfig.setMaxConcurrentCheckpoints(max);
+ }
+
+ if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_CLEANUP)) {
+ boolean cleanup = envConfig.getBoolean(ExecutionConfigKeyName.CHECKPOINTING_CLEANUP);
+ if (cleanup) {
+ checkpointConfig.enableExternalizedCheckpoints(
+ CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
+ } else {
+ checkpointConfig.enableExternalizedCheckpoints(
+ CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+ }
+ }
+
+ if (envConfig.hasPath(ExecutionConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS)) {
+ long minPause = envConfig.getLong(ExecutionConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS);
+ checkpointConfig.setMinPauseBetweenCheckpoints(minPause);
+ }
+
+ if (envConfig.hasPath(ExecutionConfigKeyName.TOLERABLE_FAILED_CHECKPOINTS)) {
+ int failNum = envConfig.getInt(ExecutionConfigKeyName.TOLERABLE_FAILED_CHECKPOINTS);
+ checkpointConfig.setTolerableCheckpointFailureNumber(failNum);
+ }
+ }
+
+
+
+ }
+
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java
index 39c1e2c..3f8435d 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java
@@ -1,32 +1,22 @@
package com.geedgenetworks.test.common.container;
import cn.hutool.core.util.XmlUtil;
-import cn.hutool.json.XML;
import com.geedgenetworks.bootstrap.utils.ConfigBuilder;
-import com.geedgenetworks.common.config.CheckConfigUtil;
-import com.geedgenetworks.common.config.CheckResult;
-import com.geedgenetworks.common.config.SourceConfigOptions;
-import com.geedgenetworks.common.exception.CommonErrorCode;
-import com.geedgenetworks.common.exception.ConfigValidationException;
-import com.geedgenetworks.core.pojo.SourceConfig;
-import com.geedgenetworks.core.types.StructType;
-import com.geedgenetworks.core.types.Types;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigResolveOptions;
+import groovy.lang.Tuple2;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Assertions;
+import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.MountableFile;
-import org.w3c.dom.Attr;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-import org.w3c.dom.NodeList;
-
-import javax.xml.xpath.XPathConstants;
import java.io.File;
+import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
@@ -282,6 +272,68 @@ public final class ContainerUtil {
ConfigResolveOptions.defaults().setAllowUnresolved(true));
}
+ public static List<String> getJVMThreadNames(GenericContainer<?> container)
+ throws IOException, InterruptedException {
+ return getJVMThreads(container).stream().map(Tuple2::getV1).collect(Collectors.toList());
+ }
+
+ public static Map<String, Integer> getJVMLiveObject(GenericContainer<?> container)
+ throws IOException, InterruptedException {
+ Container.ExecResult liveObjects =
+ container.execInContainer("jmap", "-histo:live", getJVMProcessId(container));
+ Assertions.assertEquals(0, liveObjects.getExitCode());
+ String value = liveObjects.getStdout().trim();
+ return Arrays.stream(value.split("\n"))
+ .skip(2)
+ .map(
+ str ->
+ Arrays.stream(str.split(" "))
+ .filter(StringUtils::isNotEmpty)
+ .collect(Collectors.toList()))
+ .filter(list -> list.size() == 4)
+ .collect(
+ Collectors.toMap(
+ list -> list.get(3),
+ list -> Integer.valueOf(list.get(1)),
+ (a, b) -> a));
+ }
+
+ public static List<Tuple2<String, String>> getJVMThreads(GenericContainer<?> container)
+ throws IOException, InterruptedException {
+ Container.ExecResult threads =
+ container.execInContainer("jstack", getJVMProcessId(container));
+ Assertions.assertEquals(0, threads.getExitCode());
+ // Thread name line example
+ // "hz.main.MetricsRegistry.thread-2" #232 prio=5 os_prio=0 tid=0x0000ffff3c003000 nid=0x5e
+ // waiting on condition [0x0000ffff6cf3a000]
+ return Arrays.stream(threads.getStdout().trim().split("\n\n"))
+ .filter(s -> s.startsWith("\""))
+ .map(
+ threadStr ->
+ new Tuple2<>(
+ Arrays.stream(threadStr.split("\n"))
+ .filter(s -> s.startsWith("\""))
+ .map(s -> s.substring(1, s.lastIndexOf("\"")))
+ .findFirst()
+ .get(),
+ threadStr))
+ .collect(Collectors.toList());
+ }
+
+ private static String getJVMProcessId(GenericContainer<?> container)
+ throws IOException, InterruptedException {
+ Container.ExecResult processes = container.execInContainer("jps");
+ Assertions.assertEquals(0, processes.getExitCode());
+ Optional<String> server =
+ Arrays.stream(processes.getStdout().trim().split("\n"))
+ .filter(s -> s.contains("GrootstreamServer"))
+ .findFirst();
+ Assertions.assertTrue(server.isPresent());
+ return server.get().trim().split(" ")[0];
+ }
+
+
+
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java
index 7c41167..01d2133 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java
@@ -5,7 +5,7 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
@AutoService(TestContainer.class)
-public class Flink13Container extends AbstractTestFlinkContainer{
+public class Flink13Container extends AbstractTestFlinkContainer {
@Override
protected String getStartModuleName() {
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink17Container.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink17Container.java
new file mode 100644
index 0000000..371b542
--- /dev/null
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink17Container.java
@@ -0,0 +1,43 @@
+package com.geedgenetworks.test.common.container;
+
+import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor
+@AutoService(TestContainer.class)
+public class Flink17Container extends AbstractTestFlinkContainer {
+
+ @Override
+ protected String getStartModuleName() {
+ return "groot-bootstrap";
+ }
+
+ @Override
+ protected String getStartShellName() {
+ return "start.sh";
+ }
+
+ @Override
+ protected String getConnectorModulePath() {
+ return "groot-connectors";
+ }
+
+ @Override
+ protected String getConnectorType() {
+ return "grootstream";
+ }
+
+ @Override
+ protected String getConnectorNamePrefix() {
+ return "connector-";
+ }
+
+ @Override
+ public TestContainerId identifier() {
+ return TestContainerId.FLINK_1_17 ;
+ }
+ @Override
+ protected String getDockerImage() {
+ return "flink:1.17.2-scala_2.12-java11";
+ }
+}
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java
index ddd2012..e837d25 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java
@@ -7,7 +7,8 @@ import static com.geedgenetworks.test.common.container.EngineType.FLINK;
@Getter
@AllArgsConstructor
public enum TestContainerId {
- FLINK_1_13(FLINK, "1.13.6");
+ FLINK_1_13(FLINK, "1.13.1"),
+ FLINK_1_17(FLINK, "1.17.2");
private final EngineType engineType;
private final String version;
diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterTest.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterTest.java
new file mode 100644
index 0000000..02ce012
--- /dev/null
+++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterTest.java
@@ -0,0 +1,198 @@
+package com.geedgenetworks.test.e2e.base;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.TypeReference;
+import com.alibaba.nacos.client.naming.utils.CollectionUtils;
+import com.geedgenetworks.test.common.TestSuiteBase;
+import com.geedgenetworks.test.common.container.AbstractTestFlinkContainer;
+import com.geedgenetworks.test.common.container.ContainerExtendedFactory;
+import com.geedgenetworks.test.common.container.TestContainerId;
+import com.geedgenetworks.test.common.junit.DisabledOnContainer;
+import com.geedgenetworks.test.common.junit.TestContainerExtension;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {TestContainerId.FLINK_1_17},
+ type = {},
+ disabledReason = "only flink adjusts the parameter configuration rules")
+public class EnvParameterTest extends TestSuiteBase {
+ @TestContainerExtension
+ protected final ContainerExtendedFactory extendedFactory =
+ container -> {
+ Container.ExecResult extraCommands =
+ container.execInContainer(
+ "bash",
+ "-c",
+ "mkdir -p /tmp/grootstream && chown -R flink /tmp/grootstream");
+ Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr());
+ };
+
+ @TestTemplate
+ public void testGeneralEnvParameter(AbstractTestFlinkContainer container)
+ throws IOException, InterruptedException {
+ genericTest(
+ "/test_env_parameter_inline_to_print.yaml", container);
+ }
+
+
+ public void genericTest(String configPath, AbstractTestFlinkContainer container)
+ throws IOException, InterruptedException {
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return container.executeJob(configPath);
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+ // wait obtain job id
+ AtomicReference<String> jobId = new AtomicReference<>();
+ await().atMost(300000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Map<String, Object> jobInfo = JSON.parseObject(container.executeJobManagerInnerCommand(
+ "curl http://localhost:8081/jobs/overview"), new TypeReference<Map<String, Object>>() {
+ });
+ List<Map<String, Object>> jobs =
+ (List<Map<String, Object>>) jobInfo.get("jobs");
+ if (!CollectionUtils.isEmpty(jobs)) {
+ jobId.set(jobs.get(0).get("jid").toString());
+ }
+ Assertions.assertNotNull(jobId.get());
+ });
+
+ // obtain job info
+ AtomicReference<Map<String, Object>> jobInfoReference = new AtomicReference<>();
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Map<String, Object> jobInfo = JSON.parseObject( container.executeJobManagerInnerCommand(
+ String.format(
+ "curl http://localhost:8081/jobs/%s",
+ jobId.get())), new TypeReference<Map<String, Object>>() {
+ });
+
+ // wait the job initialization is complete and enters the Running state
+ if (null != jobInfo && "RUNNING".equals(jobInfo.get("state"))) {
+ jobInfoReference.set(jobInfo);
+ }
+ Assertions.assertNotNull(jobInfoReference.get());
+ });
+ Map<String, Object> jobInfo = jobInfoReference.get();
+
+ // obtain execution configuration
+ Map<String, Object> jobConfig = JSON.parseObject(container.executeJobManagerInnerCommand(
+ String.format(
+ "curl http://localhost:8081/jobs/%s/config", jobId.get())), new TypeReference<Map<String, Object>>() {
+ });
+
+ Map<String, Object> executionConfig =
+ (Map<String, Object>) jobConfig.get("execution-config");
+
+ // obtain checkpoint configuration
+ Map<String, Object> checkpointConfig =
+ JSON.parseObject(container.executeJobManagerInnerCommand(
+ String.format(
+ "curl http://localhost:8081/jobs/%s/checkpoints/config", jobId.get())), new TypeReference<Map<String, Object>>() {
+ });
+
+ // obtain checkpoint storage
+ AtomicReference<Map<String, Object>> completedCheckpointReference = new AtomicReference<>();
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Map<String, Object> checkpointsInfo =
+ JSON.parseObject(container.executeJobManagerInnerCommand(
+ String.format(
+ "curl http://localhost:8081/jobs/%s/checkpoints", jobId.get())), new TypeReference<Map<String, Object>>() {
+ });
+ Map<String, Object> latestCheckpoint =
+ (Map<String, Object>) checkpointsInfo.get("latest");
+ // waiting for at least one checkpoint trigger
+ if (null != latestCheckpoint) {
+ completedCheckpointReference.set(
+ (Map<String, Object>) latestCheckpoint.get("completed"));
+ Assertions.assertNotNull(completedCheckpointReference.get());
+ }
+ });
+ /**
+ * adjust the configuration of this {@link
+ * com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName} to use the 'flink.' and the
+ * flink parameter name, and check whether the configuration takes effect
+ */
+ // PARALLELISM
+ int parallelism = (int) executionConfig.get("job-parallelism");
+ Assertions.assertEquals(1, parallelism);
+
+ // MAX_PARALLELISM
+ int maxParallelism = (int) jobInfo.get("maxParallelism");
+ Assertions.assertEquals(5, maxParallelism);
+
+ // CHECKPOINT_INTERVAL
+ int interval = (int) checkpointConfig.get("interval");
+ Assertions.assertEquals(10000, interval);
+
+ // CHECKPOINT_MODE
+ String mode = checkpointConfig.get("mode").toString();
+ Assertions.assertEquals("exactly_once", mode);
+
+ // CHECKPOINT_TIMEOUT
+ int checkpointTimeout = (int) checkpointConfig.get("timeout");
+ Assertions.assertEquals(1200000, checkpointTimeout);
+
+ // CHECKPOINT_DATA_URI
+ String externalPath = completedCheckpointReference.get().get("external_path").toString();
+ Assertions.assertTrue(externalPath.startsWith("file:/tmp/grootstream/checkpoints"));
+
+ // MAX_CONCURRENT_CHECKPOINTS
+ int maxConcurrent = (int) checkpointConfig.get("max_concurrent");
+ Assertions.assertEquals(2, maxConcurrent);
+
+ // CHECKPOINT_CLEANUP_MODE
+ Map<String, Object> externalizationMap =
+ (Map<String, Object>) checkpointConfig.get("externalization");
+ boolean externalization = (boolean) externalizationMap.get("delete_on_cancellation");
+ Assertions.assertTrue(externalization);
+
+ // MIN_PAUSE_BETWEEN_CHECKPOINTS
+ int minPause = (int) checkpointConfig.get("min_pause");
+ Assertions.assertEquals(100, minPause);
+
+ // FAIL_ON_CHECKPOINTING_ERRORS
+ int tolerableFailedCheckpoints = (int) checkpointConfig.get("tolerable_failed_checkpoints");
+ Assertions.assertEquals(5, tolerableFailedCheckpoints);
+
+ // RESTART_STRATEGY / because the restart strategy is fixed-delay in config file, so don't
+ // check failure-rate
+ String restartStrategy = executionConfig.get("restart-strategy").toString();
+ Assertions.assertTrue(restartStrategy.contains("fixed delay"));
+
+ // RESTART_ATTEMPTS
+ Assertions.assertTrue(restartStrategy.contains("2 restart attempts"));
+
+ // RESTART_DELAY_BETWEEN_ATTEMPTS
+ Assertions.assertTrue(restartStrategy.contains("fixed delay (1000 ms)"));
+
+ // STATE_BACKEND
+ String stateBackend = checkpointConfig.get("state_backend").toString();
+ Assertions.assertTrue(stateBackend.contains("RocksDBStateBackend"));
+ }
+
+
+
+}
+
diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java
index 2f9a203..b84d169 100644
--- a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java
+++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java
@@ -1,16 +1,91 @@
package com.geedgenetworks.test.e2e.base;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.TypeReference;
+import com.alibaba.nacos.client.naming.utils.CollectionUtils;
+import com.geedgenetworks.test.common.TestSuiteBase;
+import com.geedgenetworks.test.common.container.AbstractTestFlinkContainer;
+import com.geedgenetworks.test.common.container.TestContainer;
+import com.geedgenetworks.test.common.container.TestContainerId;
+import com.geedgenetworks.test.common.junit.DisabledOnContainer;
+import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.testcontainers.containers.Container;
-
+import org.junit.jupiter.api.TestTemplate;
import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {TestContainerId.FLINK_1_17},
+ type = {},
+ disabledReason = "only flink adjusts the parameter configuration rules")
+public class InlineToPrint extends TestSuiteBase {
+
+ @TestTemplate
+ public void testInlineToPrint(AbstractTestFlinkContainer container) throws IOException, InterruptedException {
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return container.executeJob("/inline_to_print.yaml");
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
-public class InlineToPrint extends Flink13Container{
+ AtomicReference<String> taskMangerID = new AtomicReference<>();
+ await().atMost(300000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Map<String, Object> taskMangerInfo = JSON.parseObject(container.executeJobManagerInnerCommand(
+ "curl http://localhost:8081/taskmanagers"), new TypeReference<Map<String, Object>>() {
+ });
+ List<Map<String, Object>> taskManagers =
+ (List<Map<String, Object>>) taskMangerInfo.get("taskmanagers");
+ if (!CollectionUtils.isEmpty(taskManagers)) {
+ taskMangerID.set(taskManagers.get(0).get("id").toString());
+ }
+ Assertions.assertNotNull(taskMangerID.get());
+ });
- @Test
- public void testInlineToPrint() throws IOException, InterruptedException {
- Container.ExecResult execResult = executeGrootStreamJob("/inline_to_print.yaml");
- Assertions.assertEquals(0, execResult.getExitCode());
+ AtomicReference<String> jobId = new AtomicReference<>();
+ await().atMost(300000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Map<String, Object> jobInfo = JSON.parseObject(container.executeJobManagerInnerCommand(
+ "curl http://localhost:8081/jobs/overview"), new TypeReference<Map<String, Object>>() {
+ });
+ List<Map<String, Object>> jobs =
+ (List<Map<String, Object>>) jobInfo.get("jobs");
+ if (!CollectionUtils.isEmpty(jobs)) {
+ jobId.set(jobs.get(0).get("jid").toString());
+ }
+ Assertions.assertNotNull(jobId.get());
+ });
+ //Obtain job metrics
+ AtomicReference<List<Map<String, Object>>> jobNumRestartsReference = new AtomicReference<>();
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Thread.sleep(5000);
+ String result = container.executeJobManagerInnerCommand(
+ String.format(
+ "curl http://localhost:8081/jobs/%s/metrics?get=numRestarts", jobId.get()));
+ List<Map<String, Object>> jobNumRestartsInfo = JSON.parseObject(result, new TypeReference<List<Map<String, Object>>>() {
+ });
+ if (!CollectionUtils.isEmpty(jobNumRestartsInfo)) {
+ jobNumRestartsReference.set(jobNumRestartsInfo);
+ }
+
+ Assertions.assertNotNull(jobNumRestartsReference.get());
+
+ });
}
+
}
diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/TestUtils.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/TestUtils.java
new file mode 100644
index 0000000..4aa2dc6
--- /dev/null
+++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/TestUtils.java
@@ -0,0 +1,21 @@
+package com.geedgenetworks.test.e2e.base;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+
+@Slf4j
+public class TestUtils {
+ public static String getResource(String confFile) {
+ return System.getProperty("user.dir")
+ + File.separator
+ + "src"
+ + File.separator
+ + "test"
+ + File.separator
+ + "resources"
+ + File.separator
+ + confFile;
+ }
+
+}
diff --git a/groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml b/groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml
new file mode 100644
index 0000000..1d09282
--- /dev/null
+++ b/groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml
@@ -0,0 +1,47 @@
+sources:
+ inline_source:
+ type: inline
+ properties:
+ data: '[{"tcp_rtt_ms":128,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]'
+ format: json
+ json.ignore.parse.errors: false
+
+sinks:
+ print_sink:
+ type: print
+ properties:
+ format: json
+ mode: log_warn
+
+application:
+ env:
+ name: example-inline-to-print
+ parallelism: 1
+ execution:
+ runtime-mode: streaming
+ buffer-timeout: 10
+ checkpointing:
+ interval: 10000
+ mode: exactly_once
+ timeout: 1200000
+ data-uri: file:///tmp/grootstream/checkpoints
+ max-concurrent-checkpoints: 2
+ cleanup: true
+ min-pause: 100
+ tolerable-failed-checkpoints: 5
+ restart:
+ strategy: fixed-delay
+ attempts: 2
+ delayBetweenAttempts: 1000
+ state:
+ backend: rocksdb
+ flink:
+ pipeline:
+ max-parallelism: 5
+ pipeline:
+ object-reuse: true
+ topology:
+ - name: inline_source
+ downstream: [print_sink]
+ - name: print_sink
+ downstream: [] \ No newline at end of file
diff --git a/pom.xml b/pom.xml
index cf9731d..ca8b732 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,7 @@
</modules>
<properties>
- <revision>1.1.0</revision>
+ <revision>1.2.0-rc</revision>
<java.version>11</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>${java.version}</maven.compiler.source>