summaryrefslogtreecommitdiff
path: root/groot-bootstrap
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-03-16 14:40:22 +0800
committerdoufenghu <[email protected]>2024-03-16 14:40:22 +0800
commitcd00fed1c16ae236ddeb18dac45d567736960c3e (patch)
treeff301b6af8b1551efbec62c88683df948b8fee0b /groot-bootstrap
parent9a7cc00ee0c27ec664b96f151df50ed21f6831e0 (diff)
[Improve][Tests] Improve unit test in Flink13Contaner.
Diffstat (limited to 'groot-bootstrap')
-rw-r--r--groot-bootstrap/pom.xml1
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java2
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java19
3 files changed, 21 insertions, 1 deletions
diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml
index f39ebb4..8e42947 100644
--- a/groot-bootstrap/pom.xml
+++ b/groot-bootstrap/pom.xml
@@ -126,7 +126,6 @@
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
- <scope>${scope}</scope>
</dependency>
<dependency>
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java
index 7309710..e487e2d 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java
@@ -6,6 +6,8 @@ public class ExecutionConfigKeyName {
}
public static final String PARALLELISM = "parallelism";
public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer-timeout";
+
+ public static final String RUNTIME_MODE = "execution.runtime-mode";
public static final String MAX_PARALLELISM = "pipeline.max-parallelism";
public static final String TIME_CHARACTERISTIC = "pipeline.time-characteristic";
public static final String CHECKPOINTING_INTERVAL = "execution.checkpointing.interval";
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
index 2e29ca9..7141f5e 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
@@ -9,6 +9,7 @@ import com.geedgenetworks.common.config.GrootStreamConfig;
import com.geedgenetworks.common.utils.ReflectionUtils;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.RestOptions;
@@ -21,6 +22,8 @@ import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.TernaryBoolean;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
@@ -137,6 +140,7 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
environment.getConfig().setGlobalJobParameters(configuration);
setTimeCharacteristic();
setCheckpoint();
+ setRuntimeMode();
EnvironmentUtil.setRestartStrategy(envConfig, environment.getConfig());
if (envConfig.hasPath(ExecutionConfigKeyName.BUFFER_TIMEOUT_MILLIS)) {
@@ -156,6 +160,21 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
}
+ private void setRuntimeMode() {
+ if (envConfig.hasPath(ExecutionConfigKeyName.RUNTIME_MODE)) {
+ String runtimeMode = envConfig.getString(ExecutionConfigKeyName.RUNTIME_MODE);
+ if (runtimeMode.equalsIgnoreCase("batch")) {
+ environment.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ } else if (runtimeMode.equalsIgnoreCase("streaming")) {
+ environment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+ } else {
+ log.warn(
+ "set runtime-mode failed, unknown runtime-mode [{}],only support batch,streaming",
+ runtimeMode);
+ }
+ }
+ }
+
private void setTimeCharacteristic() {
if (envConfig.hasPath(ExecutionConfigKeyName.TIME_CHARACTERISTIC)) {
String timeType = envConfig.getString(ExecutionConfigKeyName.TIME_CHARACTERISTIC);