diff options
| author | doufenghu <[email protected]> | 2024-03-16 14:40:22 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-03-16 14:40:22 +0800 |
| commit | cd00fed1c16ae236ddeb18dac45d567736960c3e (patch) | |
| tree | ff301b6af8b1551efbec62c88683df948b8fee0b /groot-bootstrap | |
| parent | 9a7cc00ee0c27ec664b96f151df50ed21f6831e0 (diff) | |
[Improve][Tests] Improve unit test in Flink13Contaner.
Diffstat (limited to 'groot-bootstrap')
3 files changed, 21 insertions, 1 deletions
diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml index f39ebb4..8e42947 100644 --- a/groot-bootstrap/pom.xml +++ b/groot-bootstrap/pom.xml @@ -126,7 +126,6 @@ <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> - <scope>${scope}</scope> </dependency> <dependency> diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java index 7309710..e487e2d 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java @@ -6,6 +6,8 @@ public class ExecutionConfigKeyName { } public static final String PARALLELISM = "parallelism"; public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer-timeout"; + + public static final String RUNTIME_MODE = "execution.runtime-mode"; public static final String MAX_PARALLELISM = "pipeline.max-parallelism"; public static final String TIME_CHARACTERISTIC = "pipeline.time-characteristic"; public static final String CHECKPOINTING_INTERVAL = "execution.checkpointing.interval"; diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java index 2e29ca9..7141f5e 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java @@ -9,6 +9,7 @@ import com.geedgenetworks.common.config.GrootStreamConfig; import com.geedgenetworks.common.utils.ReflectionUtils; import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.RestOptions; @@ -21,6 +22,8 @@ import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.TernaryBoolean; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.URL; import java.util.ArrayList; import java.util.List; @@ -137,6 +140,7 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ environment.getConfig().setGlobalJobParameters(configuration); setTimeCharacteristic(); setCheckpoint(); + setRuntimeMode(); EnvironmentUtil.setRestartStrategy(envConfig, environment.getConfig()); if (envConfig.hasPath(ExecutionConfigKeyName.BUFFER_TIMEOUT_MILLIS)) { @@ -156,6 +160,21 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } + private void setRuntimeMode() { + if (envConfig.hasPath(ExecutionConfigKeyName.RUNTIME_MODE)) { + String runtimeMode = envConfig.getString(ExecutionConfigKeyName.RUNTIME_MODE); + if (runtimeMode.equalsIgnoreCase("batch")) { + environment.setRuntimeMode(RuntimeExecutionMode.BATCH); + } else if (runtimeMode.equalsIgnoreCase("streaming")) { + environment.setRuntimeMode(RuntimeExecutionMode.STREAMING); + } else { + log.warn( + "set runtime-mode failed, unknown runtime-mode [{}],only support batch,streaming", + runtimeMode); + } + } + } + private void setTimeCharacteristic() { if (envConfig.hasPath(ExecutionConfigKeyName.TIME_CHARACTERISTIC)) { String timeType = envConfig.getString(ExecutionConfigKeyName.TIME_CHARACTERISTIC); |
