diff options
| author | doufenghu <[email protected]> | 2024-03-09 17:26:36 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-03-09 17:26:36 +0800 |
| commit | d54368632b07de402335514ac64e0abd5d49c7af (patch) | |
| tree | 20ae24135cd64e2dbe874fbc80f462d6db24a82a /groot-bootstrap | |
| parent | 2b75cf9b85298e602c2ba894e16fcc5381806332 (diff) | |
[Improve][bootstrap] Improve Env configuration names.
Diffstat (limited to 'groot-bootstrap')
3 files changed, 112 insertions, 13 deletions
diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml index 62c2dc8..f39ebb4 100644 --- a/groot-bootstrap/pom.xml +++ b/groot-bootstrap/pom.xml @@ -115,6 +115,14 @@ <scope>${scope}</scope> </dependency> + <!-- flink state backend rocksdb api --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId> + <version>${flink.version}</version> + <scope>${scope}</scope> + </dependency> + <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java index 2d82bd5..7309710 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java @@ -4,18 +4,19 @@ public class ExecutionConfigKeyName { private ExecutionConfigKeyName() { throw new UnsupportedOperationException("Utility class should not be instantiated"); } - - public static final String TIME_CHARACTERISTIC = "execution.time-characteristic"; - public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer.timeout"; public static final String PARALLELISM = "parallelism"; - public static final String MAX_PARALLELISM = "execution.max-parallelism"; - public static final String CHECKPOINT_MODE = "execution.checkpoint.mode"; - public static final String CHECKPOINT_TIMEOUT = "execution.checkpoint.timeout"; - public static final String CHECKPOINT_DATA_URI = "execution.checkpoint.data-uri"; - public static final String MAX_CONCURRENT_CHECKPOINTS = "execution.max-concurrent-checkpoints"; - public static final String CHECKPOINT_CLEANUP_MODE = "execution.checkpoint.cleanup-mode"; - public static final String MIN_PAUSE_BETWEEN_CHECKPOINTS = "execution.checkpoint.min-pause"; - public static final String FAIL_ON_CHECKPOINTING_ERRORS = "execution.checkpoint.fail-on-error"; + public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer-timeout"; + public static final String MAX_PARALLELISM = "pipeline.max-parallelism"; + public static final String TIME_CHARACTERISTIC = "pipeline.time-characteristic"; + public static final String CHECKPOINTING_INTERVAL = "execution.checkpointing.interval"; + public static final String CHECKPOINTING_MODE = "execution.checkpointing.mode"; + public static final String CHECKPOINTING_TIMEOUT = "execution.checkpointing.timeout"; + public static final String CHECKPOINTING_DATA_URI = "execution.checkpointing.data-uri"; + public static final String MAX_CONCURRENT_CHECKPOINTS = "execution.checkpointing.max-concurrent-checkpoints"; + public static final String CHECKPOINTING_CLEANUP = "execution.checkpointing.cleanup"; + public static final String MIN_PAUSE_BETWEEN_CHECKPOINTS = "execution.checkpointing.min-pause"; + public static final String TOLERABLE_FAILED_CHECKPOINTS = "execution.checkpointing.tolerable-failed-checkpoints"; + public static final String STATE_BACKEND = "state.backend"; public static final String RESTART_STRATEGY = "execution.restart.strategy"; public static final String RESTART_ATTEMPTS = "execution.restart.attempts"; public static final String RESTART_DELAY_BETWEEN_ATTEMPTS = "execution.restart.delayBetweenAttempts"; @@ -24,7 +25,7 @@ public class ExecutionConfigKeyName { public static final String RESTART_DELAY_INTERVAL = "execution.restart.delayInterval"; public static final String MAX_STATE_RETENTION_TIME = "execution.query.state.max-retention"; public static final String MIN_STATE_RETENTION_TIME = "execution.query.state.min-retention"; - public static final String STATE_BACKEND = "execution.state.backend"; + public static final String PLANNER = "execution.planner"; //third-party packages can be loaded via `jars` public static final String JARS = "jars"; diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java index c719f46..2e29ca9 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java @@ -12,8 +12,15 @@ import lombok.extern.slf4j.Slf4j; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.TernaryBoolean; + import java.net.URL; import java.util.ArrayList; import java.util.List; @@ -129,8 +136,9 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(grootStreamConfig.getCommonConfig())); environment.getConfig().setGlobalJobParameters(configuration); setTimeCharacteristic(); - // setCheckpoint(); + setCheckpoint(); EnvironmentUtil.setRestartStrategy(envConfig, environment.getConfig()); + if (envConfig.hasPath(ExecutionConfigKeyName.BUFFER_TIMEOUT_MILLIS)) { long timeout = envConfig.getLong(ExecutionConfigKeyName.BUFFER_TIMEOUT_MILLIS); environment.setBufferTimeout(timeout); @@ -141,6 +149,11 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ environment.setParallelism(parallelism); } + if (envConfig.hasPath(ExecutionConfigKeyName.MAX_PARALLELISM)) { + int maxParallelism = envConfig.getInt(ExecutionConfigKeyName.MAX_PARALLELISM); + environment.setMaxParallelism(maxParallelism); + } + } private void setTimeCharacteristic() { @@ -165,6 +178,83 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } } + private void setCheckpoint() { + long interval = 0; + if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_INTERVAL)) { + interval = envConfig.getLong(ExecutionConfigKeyName.CHECKPOINTING_INTERVAL); + } + + if (interval > 0) { + CheckpointConfig checkpointConfig = environment.getCheckpointConfig(); + environment.enableCheckpointing(interval); + if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_MODE)) { + String mode = envConfig.getString(ExecutionConfigKeyName.CHECKPOINTING_MODE); + switch (mode.toLowerCase()) { + case "exactly-once": + checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); + break; + case "at-least-once": + checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); + break; + default: + log.warn( + "set checkpointing.mode failed, unknown checkpointing.mode [{}],only support exactly-once,at-least-once", + mode); + break; + } + } + + if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_TIMEOUT)) { + long timeout = envConfig.getLong(ExecutionConfigKeyName.CHECKPOINTING_TIMEOUT); + checkpointConfig.setCheckpointTimeout(timeout); + } + + if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_DATA_URI)) { + String uri = envConfig.getString(ExecutionConfigKeyName.CHECKPOINTING_DATA_URI); + StateBackend fsStateBackend = new FsStateBackend(uri); + if (envConfig.hasPath(ExecutionConfigKeyName.STATE_BACKEND)) { + String stateBackend = envConfig.getString(ExecutionConfigKeyName.STATE_BACKEND); + if ("rocksdb".equalsIgnoreCase(stateBackend)) { + StateBackend rocksDBStateBackend = + new RocksDBStateBackend(fsStateBackend, TernaryBoolean.TRUE); + environment.setStateBackend(rocksDBStateBackend); + } + } else { + environment.setStateBackend(fsStateBackend); + } + } + + if (envConfig.hasPath(ExecutionConfigKeyName.MAX_CONCURRENT_CHECKPOINTS)) { + int max = envConfig.getInt(ExecutionConfigKeyName.MAX_CONCURRENT_CHECKPOINTS); + checkpointConfig.setMaxConcurrentCheckpoints(max); + } + + if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_CLEANUP)) { + boolean cleanup = envConfig.getBoolean(ExecutionConfigKeyName.CHECKPOINTING_CLEANUP); + if (cleanup) { + checkpointConfig.enableExternalizedCheckpoints( + CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); + } else { + checkpointConfig.enableExternalizedCheckpoints( + CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + } + } + + if (envConfig.hasPath(ExecutionConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS)) { + long minPause = envConfig.getLong(ExecutionConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS); + checkpointConfig.setMinPauseBetweenCheckpoints(minPause); + } + + if (envConfig.hasPath(ExecutionConfigKeyName.TOLERABLE_FAILED_CHECKPOINTS)) { + int failNum = envConfig.getInt(ExecutionConfigKeyName.TOLERABLE_FAILED_CHECKPOINTS); + checkpointConfig.setTolerableCheckpointFailureNumber(failNum); + } + } + + + + } + |
