summaryrefslogtreecommitdiff
path: root/groot-bootstrap
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-03-09 17:26:36 +0800
committerdoufenghu <[email protected]>2024-03-09 17:26:36 +0800
commitd54368632b07de402335514ac64e0abd5d49c7af (patch)
tree20ae24135cd64e2dbe874fbc80f462d6db24a82a /groot-bootstrap
parent2b75cf9b85298e602c2ba894e16fcc5381806332 (diff)
[Improve][bootstrap] Improve Env configuration names.
Diffstat (limited to 'groot-bootstrap')
-rw-r--r--groot-bootstrap/pom.xml8
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java25
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java92
3 files changed, 112 insertions, 13 deletions
diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml
index 62c2dc8..f39ebb4 100644
--- a/groot-bootstrap/pom.xml
+++ b/groot-bootstrap/pom.xml
@@ -115,6 +115,14 @@
<scope>${scope}</scope>
</dependency>
+ <!-- flink state backend rocksdb api -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>${scope}</scope>
+ </dependency>
+
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java
index 2d82bd5..7309710 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java
@@ -4,18 +4,19 @@ public class ExecutionConfigKeyName {
private ExecutionConfigKeyName() {
throw new UnsupportedOperationException("Utility class should not be instantiated");
}
-
- public static final String TIME_CHARACTERISTIC = "execution.time-characteristic";
- public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer.timeout";
public static final String PARALLELISM = "parallelism";
- public static final String MAX_PARALLELISM = "execution.max-parallelism";
- public static final String CHECKPOINT_MODE = "execution.checkpoint.mode";
- public static final String CHECKPOINT_TIMEOUT = "execution.checkpoint.timeout";
- public static final String CHECKPOINT_DATA_URI = "execution.checkpoint.data-uri";
- public static final String MAX_CONCURRENT_CHECKPOINTS = "execution.max-concurrent-checkpoints";
- public static final String CHECKPOINT_CLEANUP_MODE = "execution.checkpoint.cleanup-mode";
- public static final String MIN_PAUSE_BETWEEN_CHECKPOINTS = "execution.checkpoint.min-pause";
- public static final String FAIL_ON_CHECKPOINTING_ERRORS = "execution.checkpoint.fail-on-error";
+ public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer-timeout";
+ public static final String MAX_PARALLELISM = "pipeline.max-parallelism";
+ public static final String TIME_CHARACTERISTIC = "pipeline.time-characteristic";
+ public static final String CHECKPOINTING_INTERVAL = "execution.checkpointing.interval";
+ public static final String CHECKPOINTING_MODE = "execution.checkpointing.mode";
+ public static final String CHECKPOINTING_TIMEOUT = "execution.checkpointing.timeout";
+ public static final String CHECKPOINTING_DATA_URI = "execution.checkpointing.data-uri";
+ public static final String MAX_CONCURRENT_CHECKPOINTS = "execution.checkpointing.max-concurrent-checkpoints";
+ public static final String CHECKPOINTING_CLEANUP = "execution.checkpointing.cleanup";
+ public static final String MIN_PAUSE_BETWEEN_CHECKPOINTS = "execution.checkpointing.min-pause";
+ public static final String TOLERABLE_FAILED_CHECKPOINTS = "execution.checkpointing.tolerable-failed-checkpoints";
+ public static final String STATE_BACKEND = "state.backend";
public static final String RESTART_STRATEGY = "execution.restart.strategy";
public static final String RESTART_ATTEMPTS = "execution.restart.attempts";
public static final String RESTART_DELAY_BETWEEN_ATTEMPTS = "execution.restart.delayBetweenAttempts";
@@ -24,7 +25,7 @@ public class ExecutionConfigKeyName {
public static final String RESTART_DELAY_INTERVAL = "execution.restart.delayInterval";
public static final String MAX_STATE_RETENTION_TIME = "execution.query.state.max-retention";
public static final String MIN_STATE_RETENTION_TIME = "execution.query.state.min-retention";
- public static final String STATE_BACKEND = "execution.state.backend";
+
public static final String PLANNER = "execution.planner";
//third-party packages can be loaded via `jars`
public static final String JARS = "jars";
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
index c719f46..2e29ca9 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
@@ -12,8 +12,15 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.TernaryBoolean;
+
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
@@ -129,8 +136,9 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(grootStreamConfig.getCommonConfig()));
environment.getConfig().setGlobalJobParameters(configuration);
setTimeCharacteristic();
- // setCheckpoint();
+ setCheckpoint();
EnvironmentUtil.setRestartStrategy(envConfig, environment.getConfig());
+
if (envConfig.hasPath(ExecutionConfigKeyName.BUFFER_TIMEOUT_MILLIS)) {
long timeout = envConfig.getLong(ExecutionConfigKeyName.BUFFER_TIMEOUT_MILLIS);
environment.setBufferTimeout(timeout);
@@ -141,6 +149,11 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
environment.setParallelism(parallelism);
}
+ if (envConfig.hasPath(ExecutionConfigKeyName.MAX_PARALLELISM)) {
+ int maxParallelism = envConfig.getInt(ExecutionConfigKeyName.MAX_PARALLELISM);
+ environment.setMaxParallelism(maxParallelism);
+ }
+
}
private void setTimeCharacteristic() {
@@ -165,6 +178,83 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
}
}
+ private void setCheckpoint() {
+ long interval = 0;
+ if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_INTERVAL)) {
+ interval = envConfig.getLong(ExecutionConfigKeyName.CHECKPOINTING_INTERVAL);
+ }
+
+ if (interval > 0) {
+ CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
+ environment.enableCheckpointing(interval);
+ if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_MODE)) {
+ String mode = envConfig.getString(ExecutionConfigKeyName.CHECKPOINTING_MODE);
+ switch (mode.toLowerCase()) {
+ case "exactly-once":
+ checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+ break;
+ case "at-least-once":
+ checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
+ break;
+ default:
+ log.warn(
+ "set checkpointing.mode failed, unknown checkpointing.mode [{}],only support exactly-once,at-least-once",
+ mode);
+ break;
+ }
+ }
+
+ if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_TIMEOUT)) {
+ long timeout = envConfig.getLong(ExecutionConfigKeyName.CHECKPOINTING_TIMEOUT);
+ checkpointConfig.setCheckpointTimeout(timeout);
+ }
+
+ if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_DATA_URI)) {
+ String uri = envConfig.getString(ExecutionConfigKeyName.CHECKPOINTING_DATA_URI);
+ StateBackend fsStateBackend = new FsStateBackend(uri);
+ if (envConfig.hasPath(ExecutionConfigKeyName.STATE_BACKEND)) {
+ String stateBackend = envConfig.getString(ExecutionConfigKeyName.STATE_BACKEND);
+ if ("rocksdb".equalsIgnoreCase(stateBackend)) {
+ StateBackend rocksDBStateBackend =
+ new RocksDBStateBackend(fsStateBackend, TernaryBoolean.TRUE);
+ environment.setStateBackend(rocksDBStateBackend);
+ }
+ } else {
+ environment.setStateBackend(fsStateBackend);
+ }
+ }
+
+ if (envConfig.hasPath(ExecutionConfigKeyName.MAX_CONCURRENT_CHECKPOINTS)) {
+ int max = envConfig.getInt(ExecutionConfigKeyName.MAX_CONCURRENT_CHECKPOINTS);
+ checkpointConfig.setMaxConcurrentCheckpoints(max);
+ }
+
+ if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_CLEANUP)) {
+ boolean cleanup = envConfig.getBoolean(ExecutionConfigKeyName.CHECKPOINTING_CLEANUP);
+ if (cleanup) {
+ checkpointConfig.enableExternalizedCheckpoints(
+ CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
+ } else {
+ checkpointConfig.enableExternalizedCheckpoints(
+ CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+ }
+ }
+
+ if (envConfig.hasPath(ExecutionConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS)) {
+ long minPause = envConfig.getLong(ExecutionConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS);
+ checkpointConfig.setMinPauseBetweenCheckpoints(minPause);
+ }
+
+ if (envConfig.hasPath(ExecutionConfigKeyName.TOLERABLE_FAILED_CHECKPOINTS)) {
+ int failNum = envConfig.getInt(ExecutionConfigKeyName.TOLERABLE_FAILED_CHECKPOINTS);
+ checkpointConfig.setTolerableCheckpointFailureNumber(failNum);
+ }
+ }
+
+
+
+ }
+