diff options
| author | doufenghu <[email protected]> | 2024-03-09 17:26:36 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-03-09 17:26:36 +0800 |
| commit | d54368632b07de402335514ac64e0abd5d49c7af (patch) | |
| tree | 20ae24135cd64e2dbe874fbc80f462d6db24a82a | |
| parent | 2b75cf9b85298e602c2ba894e16fcc5381806332 (diff) | |
[Improve][bootstrap] Improve Env configuration names.
4 files changed, 129 insertions, 15 deletions
diff --git a/docs/env-config.md b/docs/env-config.md index ac9ca27..ceb1f9e 100644 --- a/docs/env-config.md +++ b/docs/env-config.md @@ -10,12 +10,17 @@ Above three ways to specify the job name, the priority is `flink run` > `name` i An execution environment defines a default parallelism for all processors, filters, data sources, and data sinks it executes. In addition, the parallelism of a job can be specified on different levels, and the priority is `Operator Level` > `Execution Environment Level` > `Client Level` > `System Level`. Note: The parallelism of a job can be overridden by explicitly configuring the parallelism of a processor, filter, data source, or data sink in the configuration file. - - Operator Level: The parallelism of a processor, filter, data source, or data sink can be specified in the configuration file. - Execution Environment Level: The parallelism of a job can be specified in the env configuration file. - Client Level: The parallelism of a job can be specified by using the `flink run -p` command. - System Level: The parallelism of a job can be specified by using the `flink-conf.yaml` file. +### execution.buffer-timeout +The maximum time frequency (milliseconds) for the flushing of the output buffers. If is not specified, the default value is `1000`. +You can set directly in Flink's parameter `fink.execution.buffer-timeout` to override the value in the configuration file. +- A positive value triggers flushing periodically by that interval +- 0 triggers flushing after every record thus minimizing latency +- -1 ms triggers flushing only when the output buffer is full thus maximizing throughput ### shade.identifier Specify the method of encryption, if you didn't have the requirement for encrypting or decrypting sensitive information in the configuration file, this option can be ignored. For more details, you can refer to the documentation [config-encryption-decryption](connector/config-encryption-decryption.md) @@ -33,5 +38,15 @@ Specify a list of jar URLs via `pipeline.jars`, The jars are separated by `;` an Specify a list of classpath URLs via `pipeline.classpaths`, The classpaths are separated by `;` and will be added to the classpath of the flink cluster. ## Engine Parameter +You can directly use the flink parameter by prefixing `flink.`, such as `flink.execution.buffer-timeout`, `flink.object-reuse`, etc. More details can be found in the official [flink documentation](https://flink.apache.org/). +Of course, you can use groot stream parameter, here are some parameter names corresponding to the names in Flink. + +| Groot Stream | Flink | +|--------------------------|--------------------------------| +| execution.buffer-timeout | flink.execution.buffer-timeout | +| pipeline.object-reuse | flink.object-reuse | +| pipeline.max-parallelism | flink.pipeline.max-parallelism | +| ... | ... | + + -Some flink parameter names corresponding use prefix `flink.`, more details please refer to the official [flink documentation](https://flink.apache.org/). such as `flink.execution.checkpointing.mode`, `flink.execution.checkpointing.timeout`, etc. diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml index 62c2dc8..f39ebb4 100644 --- a/groot-bootstrap/pom.xml +++ b/groot-bootstrap/pom.xml @@ -115,6 +115,14 @@ <scope>${scope}</scope> </dependency> + <!-- flink state backend rocksdb api --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId> + <version>${flink.version}</version> + <scope>${scope}</scope> + </dependency> + <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java index 2d82bd5..7309710 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java @@ -4,18 +4,19 @@ public class ExecutionConfigKeyName { private ExecutionConfigKeyName() { throw new UnsupportedOperationException("Utility class should not be instantiated"); } - - public static final String TIME_CHARACTERISTIC = "execution.time-characteristic"; - public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer.timeout"; public static final String PARALLELISM = "parallelism"; - public static final String MAX_PARALLELISM = "execution.max-parallelism"; - public static final String CHECKPOINT_MODE = "execution.checkpoint.mode"; - public static final String CHECKPOINT_TIMEOUT = "execution.checkpoint.timeout"; - public static final String CHECKPOINT_DATA_URI = "execution.checkpoint.data-uri"; - public static final String MAX_CONCURRENT_CHECKPOINTS = "execution.max-concurrent-checkpoints"; - public static final String CHECKPOINT_CLEANUP_MODE = "execution.checkpoint.cleanup-mode"; - public static final String MIN_PAUSE_BETWEEN_CHECKPOINTS = "execution.checkpoint.min-pause"; - public static final String FAIL_ON_CHECKPOINTING_ERRORS = "execution.checkpoint.fail-on-error"; + public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer-timeout"; + public static final String MAX_PARALLELISM = "pipeline.max-parallelism"; + public static final String TIME_CHARACTERISTIC = "pipeline.time-characteristic"; + public static final String CHECKPOINTING_INTERVAL = "execution.checkpointing.interval"; + public static final String CHECKPOINTING_MODE = "execution.checkpointing.mode"; + public static final String CHECKPOINTING_TIMEOUT = "execution.checkpointing.timeout"; + public static final String CHECKPOINTING_DATA_URI = "execution.checkpointing.data-uri"; + public static final String MAX_CONCURRENT_CHECKPOINTS = "execution.checkpointing.max-concurrent-checkpoints"; + public static final String CHECKPOINTING_CLEANUP = "execution.checkpointing.cleanup"; + public static final String MIN_PAUSE_BETWEEN_CHECKPOINTS = "execution.checkpointing.min-pause"; + public static final String TOLERABLE_FAILED_CHECKPOINTS = "execution.checkpointing.tolerable-failed-checkpoints"; + public static final String STATE_BACKEND = "state.backend"; public static final String RESTART_STRATEGY = "execution.restart.strategy"; public static final String RESTART_ATTEMPTS = "execution.restart.attempts"; public static final String RESTART_DELAY_BETWEEN_ATTEMPTS = "execution.restart.delayBetweenAttempts"; @@ -24,7 +25,7 @@ public class ExecutionConfigKeyName { public static final String RESTART_DELAY_INTERVAL = "execution.restart.delayInterval"; public static final String MAX_STATE_RETENTION_TIME = "execution.query.state.max-retention"; public static final String MIN_STATE_RETENTION_TIME = "execution.query.state.min-retention"; - public static final String STATE_BACKEND = "execution.state.backend"; + public static final String PLANNER = "execution.planner"; //third-party packages can be loaded via `jars` public static final String JARS = "jars"; diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java index c719f46..2e29ca9 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java @@ -12,8 +12,15 @@ import lombok.extern.slf4j.Slf4j; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.TernaryBoolean; + import java.net.URL; import java.util.ArrayList; import java.util.List; @@ -129,8 +136,9 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(grootStreamConfig.getCommonConfig())); environment.getConfig().setGlobalJobParameters(configuration); setTimeCharacteristic(); - // setCheckpoint(); + setCheckpoint(); EnvironmentUtil.setRestartStrategy(envConfig, environment.getConfig()); + if (envConfig.hasPath(ExecutionConfigKeyName.BUFFER_TIMEOUT_MILLIS)) { long timeout = envConfig.getLong(ExecutionConfigKeyName.BUFFER_TIMEOUT_MILLIS); environment.setBufferTimeout(timeout); @@ -141,6 +149,11 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ environment.setParallelism(parallelism); } + if (envConfig.hasPath(ExecutionConfigKeyName.MAX_PARALLELISM)) { + int maxParallelism = envConfig.getInt(ExecutionConfigKeyName.MAX_PARALLELISM); + environment.setMaxParallelism(maxParallelism); + } + } private void setTimeCharacteristic() { @@ -165,6 +178,83 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } } + private void setCheckpoint() { + long interval = 0; + if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_INTERVAL)) { + interval = envConfig.getLong(ExecutionConfigKeyName.CHECKPOINTING_INTERVAL); + } + + if (interval > 0) { + CheckpointConfig checkpointConfig = environment.getCheckpointConfig(); + environment.enableCheckpointing(interval); + if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_MODE)) { + String mode = envConfig.getString(ExecutionConfigKeyName.CHECKPOINTING_MODE); + switch (mode.toLowerCase()) { + case "exactly-once": + checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); + break; + case "at-least-once": + checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); + break; + default: + log.warn( + "set checkpointing.mode failed, unknown checkpointing.mode [{}],only support exactly-once,at-least-once", + mode); + break; + } + } + + if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_TIMEOUT)) { + long timeout = envConfig.getLong(ExecutionConfigKeyName.CHECKPOINTING_TIMEOUT); + checkpointConfig.setCheckpointTimeout(timeout); + } + + if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_DATA_URI)) { + String uri = envConfig.getString(ExecutionConfigKeyName.CHECKPOINTING_DATA_URI); + StateBackend fsStateBackend = new FsStateBackend(uri); + if (envConfig.hasPath(ExecutionConfigKeyName.STATE_BACKEND)) { + String stateBackend = envConfig.getString(ExecutionConfigKeyName.STATE_BACKEND); + if ("rocksdb".equalsIgnoreCase(stateBackend)) { + StateBackend rocksDBStateBackend = + new RocksDBStateBackend(fsStateBackend, TernaryBoolean.TRUE); + environment.setStateBackend(rocksDBStateBackend); + } + } else { + environment.setStateBackend(fsStateBackend); + } + } + + if (envConfig.hasPath(ExecutionConfigKeyName.MAX_CONCURRENT_CHECKPOINTS)) { + int max = envConfig.getInt(ExecutionConfigKeyName.MAX_CONCURRENT_CHECKPOINTS); + checkpointConfig.setMaxConcurrentCheckpoints(max); + } + + if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_CLEANUP)) { + boolean cleanup = envConfig.getBoolean(ExecutionConfigKeyName.CHECKPOINTING_CLEANUP); + if (cleanup) { + checkpointConfig.enableExternalizedCheckpoints( + CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); + } else { + checkpointConfig.enableExternalizedCheckpoints( + CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + } + } + + if (envConfig.hasPath(ExecutionConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS)) { + long minPause = envConfig.getLong(ExecutionConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS); + checkpointConfig.setMinPauseBetweenCheckpoints(minPause); + } + + if (envConfig.hasPath(ExecutionConfigKeyName.TOLERABLE_FAILED_CHECKPOINTS)) { + int failNum = envConfig.getInt(ExecutionConfigKeyName.TOLERABLE_FAILED_CHECKPOINTS); + checkpointConfig.setTolerableCheckpointFailureNumber(failNum); + } + } + + + + } + |
