diff options
| author | 窦凤虎 <[email protected]> | 2024-03-16 07:58:33 +0000 |
|---|---|---|
| committer | 窦凤虎 <[email protected]> | 2024-03-16 07:58:33 +0000 |
| commit | 9ff68b2c631606cf06a7001036ff16475c52371c (patch) | |
| tree | d58322d7737470faed8f2ec17d8afccb13839d74 /groot-bootstrap | |
| parent | dfabdff861f12fcd99267d6dcbd5fc4c64bcd01e (diff) | |
| parent | cd00fed1c16ae236ddeb18dac45d567736960c3e (diff) | |
Merge branch 'feature/testcontainers' into 'develop'
Feature/testcontainers
See merge request galaxy/platform/groot-stream!25
Diffstat (limited to 'groot-bootstrap')
3 files changed, 133 insertions, 14 deletions
diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml index 0a1fd9c..7b21a43 100644 --- a/groot-bootstrap/pom.xml +++ b/groot-bootstrap/pom.xml @@ -115,10 +115,17 @@ <scope>${scope}</scope> </dependency> + <!-- flink state backend rocksdb api --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId> + <version>${flink.version}</version> + <scope>${scope}</scope> + </dependency> + <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> - <scope>${scope}</scope> </dependency> diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java index 2d82bd5..e487e2d 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java @@ -4,18 +4,21 @@ public class ExecutionConfigKeyName { private ExecutionConfigKeyName() { throw new UnsupportedOperationException("Utility class should not be instantiated"); } - - public static final String TIME_CHARACTERISTIC = "execution.time-characteristic"; - public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer.timeout"; public static final String PARALLELISM = "parallelism"; - public static final String MAX_PARALLELISM = "execution.max-parallelism"; - public static final String CHECKPOINT_MODE = "execution.checkpoint.mode"; - public static final String CHECKPOINT_TIMEOUT = "execution.checkpoint.timeout"; - public static final String CHECKPOINT_DATA_URI = "execution.checkpoint.data-uri"; - public static final String MAX_CONCURRENT_CHECKPOINTS = "execution.max-concurrent-checkpoints"; - public static final String CHECKPOINT_CLEANUP_MODE = "execution.checkpoint.cleanup-mode"; - public static final String MIN_PAUSE_BETWEEN_CHECKPOINTS = "execution.checkpoint.min-pause"; - public static final String FAIL_ON_CHECKPOINTING_ERRORS = "execution.checkpoint.fail-on-error"; + public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer-timeout"; + + public static final String RUNTIME_MODE = "execution.runtime-mode"; + public static final String MAX_PARALLELISM = "pipeline.max-parallelism"; + public static final String TIME_CHARACTERISTIC = "pipeline.time-characteristic"; + public static final String CHECKPOINTING_INTERVAL = "execution.checkpointing.interval"; + public static final String CHECKPOINTING_MODE = "execution.checkpointing.mode"; + public static final String CHECKPOINTING_TIMEOUT = "execution.checkpointing.timeout"; + public static final String CHECKPOINTING_DATA_URI = "execution.checkpointing.data-uri"; + public static final String MAX_CONCURRENT_CHECKPOINTS = "execution.checkpointing.max-concurrent-checkpoints"; + public static final String CHECKPOINTING_CLEANUP = "execution.checkpointing.cleanup"; + public static final String MIN_PAUSE_BETWEEN_CHECKPOINTS = "execution.checkpointing.min-pause"; + public static final String TOLERABLE_FAILED_CHECKPOINTS = "execution.checkpointing.tolerable-failed-checkpoints"; + public static final String STATE_BACKEND = "state.backend"; public static final String RESTART_STRATEGY = "execution.restart.strategy"; public static final String RESTART_ATTEMPTS = "execution.restart.attempts"; public static final String RESTART_DELAY_BETWEEN_ATTEMPTS = "execution.restart.delayBetweenAttempts"; @@ -24,7 +27,7 @@ public class ExecutionConfigKeyName { public static final String RESTART_DELAY_INTERVAL = "execution.restart.delayInterval"; public static final String MAX_STATE_RETENTION_TIME = "execution.query.state.max-retention"; public static final String MIN_STATE_RETENTION_TIME = "execution.query.state.min-retention"; - public static final String STATE_BACKEND = "execution.state.backend"; + public static final String PLANNER = "execution.planner"; //third-party packages can be loaded via `jars` public static final String JARS = "jars"; diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java index c719f46..7141f5e 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java @@ -9,11 +9,21 @@ import com.geedgenetworks.common.config.GrootStreamConfig; import com.geedgenetworks.common.utils.ReflectionUtils; import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.TernaryBoolean; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.URL; import java.util.ArrayList; import java.util.List; @@ -129,8 +139,10 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(grootStreamConfig.getCommonConfig())); environment.getConfig().setGlobalJobParameters(configuration); setTimeCharacteristic(); - // setCheckpoint(); + setCheckpoint(); + setRuntimeMode(); EnvironmentUtil.setRestartStrategy(envConfig, environment.getConfig()); + if (envConfig.hasPath(ExecutionConfigKeyName.BUFFER_TIMEOUT_MILLIS)) { long timeout = envConfig.getLong(ExecutionConfigKeyName.BUFFER_TIMEOUT_MILLIS); environment.setBufferTimeout(timeout); @@ -141,6 +153,26 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ environment.setParallelism(parallelism); } + if (envConfig.hasPath(ExecutionConfigKeyName.MAX_PARALLELISM)) { + int maxParallelism = envConfig.getInt(ExecutionConfigKeyName.MAX_PARALLELISM); + environment.setMaxParallelism(maxParallelism); + } + + } + + private void setRuntimeMode() { + if (envConfig.hasPath(ExecutionConfigKeyName.RUNTIME_MODE)) { + String runtimeMode = envConfig.getString(ExecutionConfigKeyName.RUNTIME_MODE); + if (runtimeMode.equalsIgnoreCase("batch")) { + environment.setRuntimeMode(RuntimeExecutionMode.BATCH); + } else if (runtimeMode.equalsIgnoreCase("streaming")) { + environment.setRuntimeMode(RuntimeExecutionMode.STREAMING); + } else { + log.warn( + "set runtime-mode failed, unknown runtime-mode [{}],only support batch,streaming", + runtimeMode); + } + } } private void setTimeCharacteristic() { @@ -165,6 +197,83 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } } + private void setCheckpoint() { + long interval = 0; + if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_INTERVAL)) { + interval = envConfig.getLong(ExecutionConfigKeyName.CHECKPOINTING_INTERVAL); + } + + if (interval > 0) { + CheckpointConfig checkpointConfig = environment.getCheckpointConfig(); + environment.enableCheckpointing(interval); + if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_MODE)) { + String mode = envConfig.getString(ExecutionConfigKeyName.CHECKPOINTING_MODE); + switch (mode.toLowerCase()) { + case "exactly-once": + checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); + break; + case "at-least-once": + checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); + break; + default: + log.warn( + "set checkpointing.mode failed, unknown checkpointing.mode [{}],only support exactly-once,at-least-once", + mode); + break; + } + } + + if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_TIMEOUT)) { + long timeout = envConfig.getLong(ExecutionConfigKeyName.CHECKPOINTING_TIMEOUT); + checkpointConfig.setCheckpointTimeout(timeout); + } + + if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_DATA_URI)) { + String uri = envConfig.getString(ExecutionConfigKeyName.CHECKPOINTING_DATA_URI); + StateBackend fsStateBackend = new FsStateBackend(uri); + if (envConfig.hasPath(ExecutionConfigKeyName.STATE_BACKEND)) { + String stateBackend = envConfig.getString(ExecutionConfigKeyName.STATE_BACKEND); + if ("rocksdb".equalsIgnoreCase(stateBackend)) { + StateBackend rocksDBStateBackend = + new RocksDBStateBackend(fsStateBackend, TernaryBoolean.TRUE); + environment.setStateBackend(rocksDBStateBackend); + } + } else { + environment.setStateBackend(fsStateBackend); + } + } + + if (envConfig.hasPath(ExecutionConfigKeyName.MAX_CONCURRENT_CHECKPOINTS)) { + int max = envConfig.getInt(ExecutionConfigKeyName.MAX_CONCURRENT_CHECKPOINTS); + checkpointConfig.setMaxConcurrentCheckpoints(max); + } + + if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_CLEANUP)) { + boolean cleanup = envConfig.getBoolean(ExecutionConfigKeyName.CHECKPOINTING_CLEANUP); + if (cleanup) { + checkpointConfig.enableExternalizedCheckpoints( + CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); + } else { + checkpointConfig.enableExternalizedCheckpoints( + CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + } + } + + if (envConfig.hasPath(ExecutionConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS)) { + long minPause = envConfig.getLong(ExecutionConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS); + checkpointConfig.setMinPauseBetweenCheckpoints(minPause); + } + + if (envConfig.hasPath(ExecutionConfigKeyName.TOLERABLE_FAILED_CHECKPOINTS)) { + int failNum = envConfig.getInt(ExecutionConfigKeyName.TOLERABLE_FAILED_CHECKPOINTS); + checkpointConfig.setTolerableCheckpointFailureNumber(failNum); + } + } + + + + } + |
