summaryrefslogtreecommitdiff
path: root/groot-bootstrap
diff options
context:
space:
mode:
author窦凤虎 <[email protected]>2024-03-16 07:58:33 +0000
committer窦凤虎 <[email protected]>2024-03-16 07:58:33 +0000
commit9ff68b2c631606cf06a7001036ff16475c52371c (patch)
treed58322d7737470faed8f2ec17d8afccb13839d74 /groot-bootstrap
parentdfabdff861f12fcd99267d6dcbd5fc4c64bcd01e (diff)
parentcd00fed1c16ae236ddeb18dac45d567736960c3e (diff)
Merge branch 'feature/testcontainers' into 'develop'
Feature/testcontainers See merge request galaxy/platform/groot-stream!25
Diffstat (limited to 'groot-bootstrap')
-rw-r--r--groot-bootstrap/pom.xml9
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java27
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java111
3 files changed, 133 insertions, 14 deletions
diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml
index 0a1fd9c..7b21a43 100644
--- a/groot-bootstrap/pom.xml
+++ b/groot-bootstrap/pom.xml
@@ -115,10 +115,17 @@
<scope>${scope}</scope>
</dependency>
+ <!-- flink state backend rocksdb api -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>${scope}</scope>
+ </dependency>
+
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
- <scope>${scope}</scope>
</dependency>
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java
index 2d82bd5..e487e2d 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java
@@ -4,18 +4,21 @@ public class ExecutionConfigKeyName {
private ExecutionConfigKeyName() {
throw new UnsupportedOperationException("Utility class should not be instantiated");
}
-
- public static final String TIME_CHARACTERISTIC = "execution.time-characteristic";
- public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer.timeout";
public static final String PARALLELISM = "parallelism";
- public static final String MAX_PARALLELISM = "execution.max-parallelism";
- public static final String CHECKPOINT_MODE = "execution.checkpoint.mode";
- public static final String CHECKPOINT_TIMEOUT = "execution.checkpoint.timeout";
- public static final String CHECKPOINT_DATA_URI = "execution.checkpoint.data-uri";
- public static final String MAX_CONCURRENT_CHECKPOINTS = "execution.max-concurrent-checkpoints";
- public static final String CHECKPOINT_CLEANUP_MODE = "execution.checkpoint.cleanup-mode";
- public static final String MIN_PAUSE_BETWEEN_CHECKPOINTS = "execution.checkpoint.min-pause";
- public static final String FAIL_ON_CHECKPOINTING_ERRORS = "execution.checkpoint.fail-on-error";
+ public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer-timeout";
+
+ public static final String RUNTIME_MODE = "execution.runtime-mode";
+ public static final String MAX_PARALLELISM = "pipeline.max-parallelism";
+ public static final String TIME_CHARACTERISTIC = "pipeline.time-characteristic";
+ public static final String CHECKPOINTING_INTERVAL = "execution.checkpointing.interval";
+ public static final String CHECKPOINTING_MODE = "execution.checkpointing.mode";
+ public static final String CHECKPOINTING_TIMEOUT = "execution.checkpointing.timeout";
+ public static final String CHECKPOINTING_DATA_URI = "execution.checkpointing.data-uri";
+ public static final String MAX_CONCURRENT_CHECKPOINTS = "execution.checkpointing.max-concurrent-checkpoints";
+ public static final String CHECKPOINTING_CLEANUP = "execution.checkpointing.cleanup";
+ public static final String MIN_PAUSE_BETWEEN_CHECKPOINTS = "execution.checkpointing.min-pause";
+ public static final String TOLERABLE_FAILED_CHECKPOINTS = "execution.checkpointing.tolerable-failed-checkpoints";
+ public static final String STATE_BACKEND = "state.backend";
public static final String RESTART_STRATEGY = "execution.restart.strategy";
public static final String RESTART_ATTEMPTS = "execution.restart.attempts";
public static final String RESTART_DELAY_BETWEEN_ATTEMPTS = "execution.restart.delayBetweenAttempts";
@@ -24,7 +27,7 @@ public class ExecutionConfigKeyName {
public static final String RESTART_DELAY_INTERVAL = "execution.restart.delayInterval";
public static final String MAX_STATE_RETENTION_TIME = "execution.query.state.max-retention";
public static final String MIN_STATE_RETENTION_TIME = "execution.query.state.min-retention";
- public static final String STATE_BACKEND = "execution.state.backend";
+
public static final String PLANNER = "execution.planner";
//third-party packages can be loaded via `jars`
public static final String JARS = "jars";
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
index c719f46..7141f5e 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
@@ -9,11 +9,21 @@ import com.geedgenetworks.common.config.GrootStreamConfig;
import com.geedgenetworks.common.utils.ReflectionUtils;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.TernaryBoolean;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
@@ -129,8 +139,10 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(grootStreamConfig.getCommonConfig()));
environment.getConfig().setGlobalJobParameters(configuration);
setTimeCharacteristic();
- // setCheckpoint();
+ setCheckpoint();
+ setRuntimeMode();
EnvironmentUtil.setRestartStrategy(envConfig, environment.getConfig());
+
if (envConfig.hasPath(ExecutionConfigKeyName.BUFFER_TIMEOUT_MILLIS)) {
long timeout = envConfig.getLong(ExecutionConfigKeyName.BUFFER_TIMEOUT_MILLIS);
environment.setBufferTimeout(timeout);
@@ -141,6 +153,26 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
environment.setParallelism(parallelism);
}
+ if (envConfig.hasPath(ExecutionConfigKeyName.MAX_PARALLELISM)) {
+ int maxParallelism = envConfig.getInt(ExecutionConfigKeyName.MAX_PARALLELISM);
+ environment.setMaxParallelism(maxParallelism);
+ }
+
+ }
+
+ private void setRuntimeMode() {
+ if (envConfig.hasPath(ExecutionConfigKeyName.RUNTIME_MODE)) {
+ String runtimeMode = envConfig.getString(ExecutionConfigKeyName.RUNTIME_MODE);
+ if (runtimeMode.equalsIgnoreCase("batch")) {
+ environment.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ } else if (runtimeMode.equalsIgnoreCase("streaming")) {
+ environment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+ } else {
+ log.warn(
+ "set runtime-mode failed, unknown runtime-mode [{}],only support batch,streaming",
+ runtimeMode);
+ }
+ }
}
private void setTimeCharacteristic() {
@@ -165,6 +197,83 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
}
}
+ private void setCheckpoint() {
+ long interval = 0;
+ if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_INTERVAL)) {
+ interval = envConfig.getLong(ExecutionConfigKeyName.CHECKPOINTING_INTERVAL);
+ }
+
+ if (interval > 0) {
+ CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
+ environment.enableCheckpointing(interval);
+ if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_MODE)) {
+ String mode = envConfig.getString(ExecutionConfigKeyName.CHECKPOINTING_MODE);
+ switch (mode.toLowerCase()) {
+ case "exactly-once":
+ checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+ break;
+ case "at-least-once":
+ checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
+ break;
+ default:
+ log.warn(
+ "set checkpointing.mode failed, unknown checkpointing.mode [{}],only support exactly-once,at-least-once",
+ mode);
+ break;
+ }
+ }
+
+ if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_TIMEOUT)) {
+ long timeout = envConfig.getLong(ExecutionConfigKeyName.CHECKPOINTING_TIMEOUT);
+ checkpointConfig.setCheckpointTimeout(timeout);
+ }
+
+ if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_DATA_URI)) {
+ String uri = envConfig.getString(ExecutionConfigKeyName.CHECKPOINTING_DATA_URI);
+ StateBackend fsStateBackend = new FsStateBackend(uri);
+ if (envConfig.hasPath(ExecutionConfigKeyName.STATE_BACKEND)) {
+ String stateBackend = envConfig.getString(ExecutionConfigKeyName.STATE_BACKEND);
+ if ("rocksdb".equalsIgnoreCase(stateBackend)) {
+ StateBackend rocksDBStateBackend =
+ new RocksDBStateBackend(fsStateBackend, TernaryBoolean.TRUE);
+ environment.setStateBackend(rocksDBStateBackend);
+ }
+ } else {
+ environment.setStateBackend(fsStateBackend);
+ }
+ }
+
+ if (envConfig.hasPath(ExecutionConfigKeyName.MAX_CONCURRENT_CHECKPOINTS)) {
+ int max = envConfig.getInt(ExecutionConfigKeyName.MAX_CONCURRENT_CHECKPOINTS);
+ checkpointConfig.setMaxConcurrentCheckpoints(max);
+ }
+
+ if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_CLEANUP)) {
+ boolean cleanup = envConfig.getBoolean(ExecutionConfigKeyName.CHECKPOINTING_CLEANUP);
+ if (cleanup) {
+ checkpointConfig.enableExternalizedCheckpoints(
+ CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
+ } else {
+ checkpointConfig.enableExternalizedCheckpoints(
+ CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+ }
+ }
+
+ if (envConfig.hasPath(ExecutionConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS)) {
+ long minPause = envConfig.getLong(ExecutionConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS);
+ checkpointConfig.setMinPauseBetweenCheckpoints(minPause);
+ }
+
+ if (envConfig.hasPath(ExecutionConfigKeyName.TOLERABLE_FAILED_CHECKPOINTS)) {
+ int failNum = envConfig.getInt(ExecutionConfigKeyName.TOLERABLE_FAILED_CHECKPOINTS);
+ checkpointConfig.setTolerableCheckpointFailureNumber(failNum);
+ }
+ }
+
+
+
+ }
+