diff options
| author | doufenghu <[email protected]> | 2023-12-31 00:43:48 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2023-12-31 00:43:48 +0800 |
| commit | fdd7119689ec54c3a5446062b71e759b5fed4b9f (patch) | |
| tree | b7f873904608e8183b944105b045c0f43ac71546 /groot-bootstrap | |
| parent | 92e5b06386419a550099598eaaab3abcc4584e74 (diff) | |
[Fix][Bootstrap] 修复任务配置文件并行度不生效的问题,梳理RuntimeEnvironment加载envconfig使其边界更清晰。撰写env-config文档,描述不同方式指定任务名的优先级;描述不同level指定并行度优先级及覆盖逻辑。
Diffstat (limited to 'groot-bootstrap')
12 files changed, 194 insertions, 87 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java index d838c11..2a00474 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java @@ -32,7 +32,7 @@ public abstract class CommandArgs { protected boolean checkConfig = false; @Parameter(names = {"-i", "--variable"}, splitter = ParameterSplitter.class, - description = "user-defined parameters , such as -i data_center=bj" + + description = "user-defined parameters , such as -i data_center=bj " + "We use ',' as separator, when inside \"\", ',' are treated as normal characters instead of delimiters.") protected List<String> variables = Collections.emptyList(); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java index 3510daa..dc73bd1 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java @@ -4,7 +4,7 @@ import cn.hutool.setting.yaml.YamlUtil; import com.geedgenetworks.bootstrap.exception.CommandExecuteException; import com.geedgenetworks.bootstrap.exception.ConfigCheckException; import com.geedgenetworks.bootstrap.exception.JobExecuteException; -import com.geedgenetworks.bootstrap.execution.ExecutionConfigKey; +import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName; import com.geedgenetworks.bootstrap.execution.JobExecution; import com.geedgenetworks.bootstrap.utils.ConfigFileUtils; import com.geedgenetworks.common.Constants; @@ -38,11 +38,11 @@ public class ExecuteCommand implements Command<ExecuteCommandArgs> { // if user specified job name using command line arguments, override config option if (!executeCommandArgs.getJobName().equals(Constants.DEFAULT_JOB_NAME)) { config = config.withValue( - ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_NAME), ConfigValueFactory.fromAnyRef(executeCommandArgs.getJobName())); + ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV , Constants.JOB_NAME), ConfigValueFactory.fromAnyRef(executeCommandArgs.getJobName())); } if(executeCommandArgs.getTargetType() != null) { - config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKey.ENV_TARGET_TYPE), + config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE), ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java index fbf28f1..a6b2dd6 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java @@ -108,7 +108,6 @@ public class ExecuteCommandArgs extends CommandArgs { private static final List<DeployMode> DEPLOY_MODE_TYPE_LIST = new ArrayList<>(); static { - DEPLOY_MODE_TYPE_LIST.add(DeployMode.CLIENT); DEPLOY_MODE_TYPE_LIST.add(DeployMode.RUN); DEPLOY_MODE_TYPE_LIST.add(DeployMode.RUN_APPLICATION); } @@ -121,7 +120,7 @@ public class ExecuteCommandArgs extends CommandArgs { } else { throw new IllegalArgumentException( "Groot-Stream job on flink engine deploy mode only " - + "support these options: [client, run, run-application]"); + + "support these options: [run, run-application]"); } } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKey.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java index 2c7f05a..2d82bd5 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKey.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java @@ -1,13 +1,13 @@ package com.geedgenetworks.bootstrap.execution; -public class ExecutionConfigKey { - private ExecutionConfigKey() { - throw new UnsupportedOperationException("ExecutionConfigKey is a utility class and cannot be instantiated"); +public class ExecutionConfigKeyName { + private ExecutionConfigKeyName() { + throw new UnsupportedOperationException("Utility class should not be instantiated"); } public static final String TIME_CHARACTERISTIC = "execution.time-characteristic"; public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer.timeout"; - public static final String PARALLELISM = "execution.parallelism"; + public static final String PARALLELISM = "parallelism"; public static final String MAX_PARALLELISM = "execution.max-parallelism"; public static final String CHECKPOINT_MODE = "execution.checkpoint.mode"; public static final String CHECKPOINT_TIMEOUT = "execution.checkpoint.timeout"; diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java index fac544c..bd6056f 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java @@ -1,14 +1,12 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson2.JSONObject; -import com.geedgenetworks.bootstrap.enums.DeployMode; import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.main.GrootStreamRunner; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.GrootStreamConfig; import com.geedgenetworks.core.pojo.Event; -import com.geedgenetworks.utils.StringUtil; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.typesafe.config.Config; @@ -71,9 +69,9 @@ public class JobExecution { private void registerPlugin(Config appConfig) { List<Path> thirdPartyJars = new ArrayList<>(); Config envConfig = appConfig.getConfig(Constants.APPLICATION_ENV); - if(envConfig.hasPath(ExecutionConfigKey.JARS)) { + if(envConfig.hasPath(ExecutionConfigKeyName.JARS)) { thirdPartyJars = new ArrayList<>(StartBuilder - .getThirdPartyJars(envConfig.getString(ExecutionConfigKey.JARS))); + .getThirdPartyJars(envConfig.getString(ExecutionConfigKeyName.JARS))); } thirdPartyJars.addAll(StartBuilder.getConnectorJars()); thirdPartyJars.addAll(StartBuilder.getPluginsJarDependencies()); @@ -98,9 +96,9 @@ public class JobExecution { private Config registerPlugin(Config config , List<URL> jars) { config = this.injectJarsToConfig( - config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKey.PIPELINE_JARS), jars); + config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_JARS), jars); return this.injectJarsToConfig( - config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKey.PIPELINE_CLASSPATHS), jars); + config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_CLASSPATHS), jars); } @@ -178,7 +176,7 @@ public class JobExecution { postprocessingPipelines = config.getConfig(Constants.POSTPROCESSING_PIPELINES).root().unwrapped(); } - List<? extends Config> topology = jobRuntimeEnvironment.getConfig().getConfigList(Constants.APPLICATION_TOPOLOGY); + List<? extends Config> topology = config.getConfig(Constants.APPLICATION).getConfigList(Constants.APPLICATION_TOPOLOGY); List<Node> nodes = Lists.newArrayList(); @@ -211,8 +209,7 @@ public class JobExecution { public void execute() throws JobExecuteException { - - if (!StartBuilder.getDeployMode().name().equals(DeployMode.CLIENT.name())) { + if (!jobRuntimeEnvironment.isLocalMode()) { jobRuntimeEnvironment.registerPlugin(jarPaths); } List<Node> sourceNodes = nodes @@ -228,10 +225,14 @@ public class JobExecution { } try { - log.info("Execution Plan: {}", jobRuntimeEnvironment.getStreamExecutionEnvironment().getExecutionPlan()); + final long jobStartTime = System.currentTimeMillis(); log.info('\n' + Constants.GROOT_LOGO); - jobRuntimeEnvironment.getStreamExecutionEnvironment() - .execute(jobRuntimeEnvironment.getConfig().getString(Constants.APPLICATION_NAME)); + log.info("Execution Plan: {}", jobRuntimeEnvironment.getStreamExecutionEnvironment().getExecutionPlan()); + log.info("Execution Job: {}", jobRuntimeEnvironment.getJobName()); + jobRuntimeEnvironment.getStreamExecutionEnvironment().execute(jobRuntimeEnvironment.getJobName()); + final long jobEndTime = System.currentTimeMillis(); + log.info("Job finished, execution duration: {} ms", jobEndTime-jobStartTime); + } catch (Exception e) { throw new JobExecuteException("Execute job error", e); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java index baa9230..c719f46 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java @@ -22,11 +22,11 @@ import java.util.stream.Collectors; @Slf4j public class JobRuntimeEnvironment implements RuntimeEnvironment{ - private static JobRuntimeEnvironment INSTANCE = null; - private Config appConfig; - + private static volatile JobRuntimeEnvironment INSTANCE = null; + private Config envConfig; private GrootStreamConfig grootStreamConfig; private StreamExecutionEnvironment environment; + private String jobName = Constants.DEFAULT_JOB_NAME; private JobRuntimeEnvironment(Config config, GrootStreamConfig grootStreamConfig) { this.grootStreamConfig = grootStreamConfig; @@ -45,27 +45,42 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } @Override - public RuntimeEnvironment setConfig(Config appConfig) { - this.appConfig = appConfig; + public RuntimeEnvironment setEnvConfig(Config envConfig) { + this.envConfig = envConfig; return this; } @Override - public Config getConfig() { - return appConfig; + public Config getEnvConfig() { + return envConfig; } @Override public CheckResult checkConfig() { - return EnvironmentUtil.checkRestartStrategy(appConfig); + return EnvironmentUtil.checkRestartStrategy(envConfig); } + @Override public RuntimeEnvironment prepare() { createStreamEnvironment(); + if (envConfig.hasPath(Constants.JOB_NAME)) { + jobName = envConfig.getString(Constants.JOB_NAME); + } return this; } + public String getJobName() { + return jobName; + } + + + + public boolean isLocalMode() { + return envConfig.hasPath(ExecutionConfigKeyName.ENV_TARGET_TYPE) + && envConfig.getString(ExecutionConfigKeyName.ENV_TARGET_TYPE).equals(TargetType.LOCAL.getTarget()); + } + @Override public void registerPlugin(List<URL> pluginPaths) { pluginPaths.forEach(url -> log.info("Begin register plugins: {}", url)); @@ -103,10 +118,8 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ private void createStreamEnvironment() { Configuration configuration = new Configuration(); - EnvironmentUtil.initConfiguration(appConfig, configuration); - Config envConfig = appConfig.getConfig(Constants.APPLICATION_ENV); - if (envConfig.hasPath(ExecutionConfigKey.ENV_TARGET_TYPE) - && envConfig.getString(ExecutionConfigKey.ENV_TARGET_TYPE).equals(TargetType.LOCAL.getTarget())) { + EnvironmentUtil.initConfiguration(envConfig, configuration); + if (isLocalMode()) { configuration.setInteger(RestOptions.PORT, 8082); environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration); } else { @@ -114,26 +127,25 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } configuration.setString(Constants.SYSPROP_UDF_PLUGIN_CONFIG, JSON.toJSONString(grootStreamConfig.getUDFPluginConfig())); configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(grootStreamConfig.getCommonConfig())); - environment.getConfig().enableObjectReuse(); environment.getConfig().setGlobalJobParameters(configuration); setTimeCharacteristic(); // setCheckpoint(); - EnvironmentUtil.setRestartStrategy(appConfig, environment.getConfig()); - if (envConfig.hasPath(ExecutionConfigKey.BUFFER_TIMEOUT_MILLIS)) { - long timeout = envConfig.getLong(ExecutionConfigKey.BUFFER_TIMEOUT_MILLIS); + EnvironmentUtil.setRestartStrategy(envConfig, environment.getConfig()); + if (envConfig.hasPath(ExecutionConfigKeyName.BUFFER_TIMEOUT_MILLIS)) { + long timeout = envConfig.getLong(ExecutionConfigKeyName.BUFFER_TIMEOUT_MILLIS); environment.setBufferTimeout(timeout); } - if (envConfig.hasPath(ExecutionConfigKey.PARALLELISM)) { - int parallelism = envConfig.getInt(ExecutionConfigKey.PARALLELISM); + if (envConfig.hasPath(ExecutionConfigKeyName.PARALLELISM)) { + int parallelism = envConfig.getInt(ExecutionConfigKeyName.PARALLELISM); environment.setParallelism(parallelism); } } private void setTimeCharacteristic() { - if (appConfig.hasPath(ExecutionConfigKey.TIME_CHARACTERISTIC)) { - String timeType = appConfig.getString(ExecutionConfigKey.TIME_CHARACTERISTIC); + if (envConfig.hasPath(ExecutionConfigKeyName.TIME_CHARACTERISTIC)) { + String timeType = envConfig.getString(ExecutionConfigKeyName.TIME_CHARACTERISTIC); switch (timeType.toLowerCase()) { case "event-time": environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java index 999a1ea..bee1c0a 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java @@ -1,32 +1,24 @@ package com.geedgenetworks.bootstrap.execution; +import com.geedgenetworks.bootstrap.enums.TargetType; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.CheckResult; import com.typesafe.config.Config; +import com.typesafe.config.ConfigUtil; import java.net.URL; import java.util.List; public interface RuntimeEnvironment { - - //storage application config - RuntimeEnvironment setConfig(Config config); - - Config getConfig(); + RuntimeEnvironment setEnvConfig(Config envConfig); + Config getEnvConfig(); CheckResult checkConfig(); - RuntimeEnvironment prepare(); void registerPlugin(List<URL> pluginPaths); - default void initialize(Config config) { - this.setConfig(config.getConfig(Constants.APPLICATION)).prepare(); + this.setEnvConfig(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV))).prepare(); } - - - - - } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/StartBuilder.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/StartBuilder.java index a37075a..620fa96 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/StartBuilder.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/StartBuilder.java @@ -67,7 +67,7 @@ public class StartBuilder { public static Path appRootDir() { - if (DeployMode.CLIENT == MODE || DeployMode.RUN == MODE || STARTER) { + if (DeployMode.RUN == MODE || STARTER) { try { String path = StartBuilder.class diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/metric/JobMetricSummary.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/metric/JobMetricSummary.java new file mode 100644 index 0000000..1c5bbc3 --- /dev/null +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/metric/JobMetricSummary.java @@ -0,0 +1,82 @@ +package com.geedgenetworks.bootstrap.metric; + +import com.geedgenetworks.common.utils.StringFormatUtil; +import com.geedgenetworks.utils.DateUtils; +import org.apache.flink.api.common.JobExecutionResult; + +import java.time.Duration; +public final class JobMetricSummary { + private final JobExecutionResult jobExecutionResult; + private final Long jobStartTime; + private final Long jobEndTime; + + JobMetricSummary( + JobExecutionResult jobExecutionResult, + long jobStartTime, + long jobEndTime) { + this.jobExecutionResult = jobExecutionResult; + this.jobStartTime = jobStartTime; + this.jobEndTime = jobEndTime; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private JobExecutionResult jobExecutionResult; + + private long jobStartTime; + + private long jobEndTime; + + private Builder() {} + + public Builder jobExecutionResult(JobExecutionResult jobExecutionResult) { + this.jobExecutionResult = jobExecutionResult; + return this; + } + + public Builder jobStartTime(long jobStartTime) { + this.jobStartTime = jobStartTime; + return this; + } + + public Builder jobEndTime(long jobEndTime) { + this.jobEndTime = jobEndTime; + return this; + } + + public JobMetricSummary build() { + return new JobMetricSummary( + jobExecutionResult, + jobStartTime, + jobEndTime); + } + } + + @Override + public String toString() { + return StringFormatUtil.formatTable( + "Job Statistic Information", + "Start Time", + DateUtils.convertTimestampToString(jobStartTime, DateUtils.YYYY_MM_DD_HH_MM_SS), + "End Time", + DateUtils.convertTimestampToString(jobEndTime, DateUtils.YYYY_MM_DD_HH_MM_SS), + "Total Time(s)", + (jobEndTime - jobStartTime)/1000, + "Total Read Count", + jobExecutionResult + .getAllAccumulatorResults() + .get(MetricNames.SOURCE_RECEIVED_COUNT), + "Total Write Count", + jobExecutionResult.getAllAccumulatorResults().get(MetricNames.SINK_WRITE_COUNT), + "Total Read Bytes", + jobExecutionResult + .getAllAccumulatorResults() + .get(MetricNames.SOURCE_RECEIVED_BYTES), + "Total Write Bytes", + jobExecutionResult.getAllAccumulatorResults().get(MetricNames.SINK_WRITE_BYTES)); + } +} diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/metric/MetricNames.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/metric/MetricNames.java new file mode 100644 index 0000000..91f6200 --- /dev/null +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/metric/MetricNames.java @@ -0,0 +1,19 @@ +package com.geedgenetworks.bootstrap.metric; + +public final class MetricNames { + + private MetricNames() {} + + public static final String RECEIVED_COUNT = "receivedCount"; + + public static final String RECEIVED_BATCHES = "receivedBatches"; + + public static final String SOURCE_RECEIVED_COUNT = "SourceReceivedCount"; + public static final String SOURCE_RECEIVED_BYTES = "SourceReceivedBytes"; + public static final String SOURCE_RECEIVED_QPS = "SourceReceivedQPS"; + public static final String SOURCE_RECEIVED_BYTES_PER_SECONDS = "SourceReceivedBytesPerSeconds"; + public static final String SINK_WRITE_COUNT = "SinkWriteCount"; + public static final String SINK_WRITE_BYTES = "SinkWriteBytes"; + public static final String SINK_WRITE_QPS = "SinkWriteQPS"; + public static final String SINK_WRITE_BYTES_PER_SECONDS = "SinkWriteBytesPerSeconds"; +} diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/ConfigFileUtils.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/ConfigFileUtils.java index 99f336f..0820559 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/ConfigFileUtils.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/ConfigFileUtils.java @@ -28,8 +28,7 @@ public class ConfigFileUtils { public static void checkConfigExist(Path configFile) { if (!configFile.toFile().exists()) { - String message = "Can't find config file: " + configFile; - throw new GrootStreamRuntimeException(CommonErrorCode.FILE_OPERATION_ERROR, message); + throw new GrootStreamRuntimeException(CommonErrorCode.FILE_OPERATION_ERROR, "Can't find config file: " + configFile); } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java index 1b4a5b8..79cd8bf 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java @@ -1,6 +1,6 @@ package com.geedgenetworks.bootstrap.utils; -import com.geedgenetworks.bootstrap.execution.ExecutionConfigKey; +import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName; import com.geedgenetworks.common.config.CheckResult; import com.typesafe.config.Config; import com.typesafe.config.ConfigValue; @@ -20,9 +20,9 @@ public final class EnvironmentUtil { throw new UnsupportedOperationException("EnvironmentUtil is a utility class and cannot be instantiated"); } - public static void initConfiguration(Config appConfig, Configuration configuration) { - if (appConfig.hasPath("pipeline")) { - Config pipeline = appConfig.getConfig("pipeline"); + public static void initConfiguration(Config envConfig, Configuration configuration) { + if (envConfig.hasPath("pipeline")) { + Config pipeline = envConfig.getConfig("pipeline"); if (pipeline.hasPath("jars")) { configuration.setString(PipelineOptions.JARS.key(), pipeline.getString("jars")); } @@ -30,39 +30,42 @@ public final class EnvironmentUtil { configuration.setString( PipelineOptions.CLASSPATHS.key(), pipeline.getString("classpaths")); } + if(pipeline.hasPath("object-reuse")) { + configuration.setBoolean(PipelineOptions.OBJECT_REUSE.key(), pipeline.getBoolean("object-reuse")); + } } String prefixConf = "flink."; - if (!appConfig.isEmpty()) { - for (Map.Entry<String, ConfigValue> entryConfKey : appConfig.entrySet()) { + if (!envConfig.isEmpty()) { + for (Map.Entry<String, ConfigValue> entryConfKey : envConfig.entrySet()) { String confKey = entryConfKey.getKey().trim(); if (confKey.startsWith(prefixConf)) { configuration.setString( - confKey.replaceFirst(prefixConf, ""), entryConfKey.getValue().render()); + confKey.replaceFirst(prefixConf, ""), entryConfKey.getValue().unwrapped().toString()); } } } } - public static void setRestartStrategy(Config config, ExecutionConfig executionConfig) { + public static void setRestartStrategy(Config envConfig, ExecutionConfig executionConfig) { try { - if (config.hasPath(ExecutionConfigKey.RESTART_STRATEGY)) { - String restartStrategy = config.getString(ExecutionConfigKey.RESTART_STRATEGY); + if (envConfig.hasPath(ExecutionConfigKeyName.RESTART_STRATEGY)) { + String restartStrategy = envConfig.getString(ExecutionConfigKeyName.RESTART_STRATEGY); switch (restartStrategy.toLowerCase()) { case "no": executionConfig.setRestartStrategy(RestartStrategies.noRestart()); break; case "fixed-delay": - int attempts = config.getInt(ExecutionConfigKey.RESTART_ATTEMPTS); - long delay = config.getLong(ExecutionConfigKey.RESTART_DELAY_BETWEEN_ATTEMPTS); + int attempts = envConfig.getInt(ExecutionConfigKeyName.RESTART_ATTEMPTS); + long delay = envConfig.getLong(ExecutionConfigKeyName.RESTART_DELAY_BETWEEN_ATTEMPTS); executionConfig.setRestartStrategy( RestartStrategies.fixedDelayRestart(attempts, delay)); break; case "failure-rate": long failureInterval = - config.getLong(ExecutionConfigKey.RESTART_FAILURE_INTERVAL); - int rate = config.getInt(ExecutionConfigKey.RESTART_FAILURE_RATE); - long delayInterval = config.getLong(ExecutionConfigKey.RESTART_DELAY_INTERVAL); + envConfig.getLong(ExecutionConfigKeyName.RESTART_FAILURE_INTERVAL); + int rate = envConfig.getInt(ExecutionConfigKeyName.RESTART_FAILURE_RATE); + long delayInterval = envConfig.getLong(ExecutionConfigKeyName.RESTART_DELAY_INTERVAL); executionConfig.setRestartStrategy( RestartStrategies.failureRateRestart( rate, @@ -76,34 +79,34 @@ public final class EnvironmentUtil { } } } catch (Exception e) { - log.warn("set restart.strategy in config '{}' exception", config, e); + log.warn("set restart.strategy in config '{}' exception", envConfig, e); } } - public static CheckResult checkRestartStrategy(Config config) { - if (config.hasPath(ExecutionConfigKey.RESTART_STRATEGY)) { - String restartStrategy = config.getString(ExecutionConfigKey.RESTART_STRATEGY); + public static CheckResult checkRestartStrategy(Config envConfig) { + if (envConfig.hasPath(ExecutionConfigKeyName.RESTART_STRATEGY)) { + String restartStrategy = envConfig.getString(ExecutionConfigKeyName.RESTART_STRATEGY); switch (restartStrategy.toLowerCase()) { case "fixed-delay": - if (!(config.hasPath(ExecutionConfigKey.RESTART_ATTEMPTS) - && config.hasPath(ExecutionConfigKey.RESTART_DELAY_BETWEEN_ATTEMPTS))) { + if (!(envConfig.hasPath(ExecutionConfigKeyName.RESTART_ATTEMPTS) + && envConfig.hasPath(ExecutionConfigKeyName.RESTART_DELAY_BETWEEN_ATTEMPTS))) { return CheckResult.error( String.format( "fixed-delay restart strategy must set [%s],[%s]", - ExecutionConfigKey.RESTART_ATTEMPTS, - ExecutionConfigKey.RESTART_DELAY_BETWEEN_ATTEMPTS)); + ExecutionConfigKeyName.RESTART_ATTEMPTS, + ExecutionConfigKeyName.RESTART_DELAY_BETWEEN_ATTEMPTS)); } break; case "failure-rate": - if (!(config.hasPath(ExecutionConfigKey.RESTART_FAILURE_INTERVAL) - && config.hasPath(ExecutionConfigKey.RESTART_FAILURE_RATE) - && config.hasPath(ExecutionConfigKey.RESTART_DELAY_INTERVAL))) { + if (!(envConfig.hasPath(ExecutionConfigKeyName.RESTART_FAILURE_INTERVAL) + && envConfig.hasPath(ExecutionConfigKeyName.RESTART_FAILURE_RATE) + && envConfig.hasPath(ExecutionConfigKeyName.RESTART_DELAY_INTERVAL))) { return CheckResult.error( String.format( "failure-rate restart strategy must set [%s],[%s],[%s]", - ExecutionConfigKey.RESTART_FAILURE_INTERVAL, - ExecutionConfigKey.RESTART_FAILURE_RATE, - ExecutionConfigKey.RESTART_DELAY_INTERVAL)); + ExecutionConfigKeyName.RESTART_FAILURE_INTERVAL, + ExecutionConfigKeyName.RESTART_FAILURE_RATE, + ExecutionConfigKeyName.RESTART_DELAY_INTERVAL)); } break; default: |
