diff options
| author | doufenghu <[email protected]> | 2024-09-01 23:49:48 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-09-01 23:49:48 +0800 |
| commit | a55399cb95c6408233e84540db482ae5e6131746 (patch) | |
| tree | de9b913e817190830ea92122abf779b91b238114 /groot-bootstrap | |
| parent | 2947507b84076e5c6a2accc7b142bd27594ba3d8 (diff) | |
[Improve][bootstrap] Improve job-level user-defined variables, move the path from application/properties to application/env/properties, and add support for defining variables via the runtime CLI. (GAL-651)
Diffstat (limited to 'groot-bootstrap')
6 files changed, 58 insertions, 52 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java index f83183c..6ee5151 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java @@ -31,10 +31,10 @@ public abstract class CommandArgs { description = "Whether check config") protected boolean checkConfig = false; - + // user-defined parameters @Parameter(names = {"-i", "--variable"}, splitter = ParameterSplitter.class, - description = "user-defined parameters , such as -i data_center=bj " + description = "Job level user-defined parameters, such as -i traffic_file_bucket=traffic_file_bucket, or -i scheduler.knowledge_base.update.interval.minutes=1. " + "We use ',' as separator, when inside \"\", ',' are treated as normal characters instead of delimiters.") protected List<String> variables = Collections.emptyList(); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java index 79a27e1..c3538b0 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java @@ -26,27 +26,43 @@ public class ExecuteCommand implements Command<ExecuteCommandArgs> { @Override public void execute() throws CommandExecuteException, ConfigCheckException { + // Groot Stream Global Config for all processing jobs GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig(); Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs); // check config file exist checkConfigExist(configFile); - Config config = ConfigBuilder.of(configFile); + Config jobConfig = ConfigBuilder.of(configFile); + // if user specified job name using command line arguments, override config option if (!executeCommandArgs.getJobName().equals(Constants.DEFAULT_JOB_NAME)) { - config = config.withValue( + jobConfig = jobConfig.withValue( ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV , Constants.JOB_NAME), ConfigValueFactory.fromAnyRef(executeCommandArgs.getJobName())); } + // if user specified target type using command line arguments, override config option if(executeCommandArgs.getTargetType() != null) { - config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE), + jobConfig = jobConfig.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE), ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); } - JobExecution jobExecution = new JobExecution(config, grootStreamConfig); + // if user specified variables using command line arguments, override config option + if(!executeCommandArgs.getVariables().isEmpty()) { + for (String variable : executeCommandArgs.getVariables()) { + String[] keyValue = variable.split("="); + if (keyValue.length != 2) { + throw new CommandExecuteException("Invalid variable format: " + variable); + } + jobConfig = jobConfig.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, Constants.PROPERTIES, keyValue[0]), + ConfigValueFactory.fromAnyRef(keyValue[1])); + } + } + + + JobExecution jobExecution = new JobExecution(jobConfig, grootStreamConfig); try { jobExecution.execute(); } catch (Exception e) { - throw new JobExecuteException("Job executed error", e); + throw new JobExecuteException("Job executed failed", e); } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java index 0c00a61..61ced82 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java @@ -20,14 +20,14 @@ public class ExecuteCommandArgs extends CommandArgs { @Parameter(names={"-e", "--deploy-mode"}, converter = DeployModeConverter.class, - description = "deploy mode, only support [run] ") + description = "Job deploy mode, only support [run] ") private DeployMode deployMode = DeployMode.RUN; @Parameter( names = {"--target"}, converter = TargetTypeConverter.class, description = - "job submitted target type, support [local, remote, yarn-session, yarn-per-job]") + "Job submitted target type, support [local, remote, yarn-session, yarn-per-job]") private TargetType targetType; @Override @@ -55,7 +55,7 @@ public class ExecuteCommandArgs extends CommandArgs { @Override public String toString() { - return "FlinkCommandArgs{" + return "CommandArgs{" + "deployMode=" + deployMode + ", targetType=" @@ -74,7 +74,6 @@ public class ExecuteCommandArgs extends CommandArgs { } - private void userParamsToSysEnv() { if (!this.variables.isEmpty()) { variables.stream() @@ -111,7 +110,7 @@ public class ExecuteCommandArgs extends CommandArgs { return targetType; } else { throw new IllegalArgumentException( - "Groot-Stream job on flink engine submitted target type only " + "GrootStream job submitted target type only " + "support these options: [local, remote, yarn-session, yarn-per-job]"); } } @@ -132,7 +131,7 @@ public class ExecuteCommandArgs extends CommandArgs { return deployMode; } else { throw new IllegalArgumentException( - "Groot-Stream job on flink engine deploy mode only " + "GrootStream job deploy mode only " + "support these options: [run, run-application]"); } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java index 22b23a7..6a23a0b 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java @@ -39,26 +39,26 @@ public class JobExecution { private final List<Node> nodes; private final List<URL> jarPaths; - public JobExecution(Config config, GrootStreamConfig grootStreamConfig) { + public JobExecution(Config jobConfig, GrootStreamConfig grootStreamConfig) { try { jarPaths = new ArrayList<>(Collections.singletonList(new File(StartBuilder.appBootstrapDir() .resolve(GrootStreamRunner.APP_JAR_NAME).toString()) .toURI().toURL())); } catch (MalformedURLException e) { - throw new JobExecuteException("load groot stream bootstrap jar error.", e); + throw new JobExecuteException("Load GrootStream Bootstrap jar failed.", e); } - registerPlugin(config.getConfig(Constants.APPLICATION)); + registerPlugin(jobConfig.getConfig(Constants.APPLICATION)); - this.sourceExecutor = new SourceExecutor(jarPaths, config); - this.sinkExecutor = new SinkExecutor(jarPaths, config); - this.filterExecutor = new FilterExecutor(jarPaths, config); - this.splitExecutor = new SplitExecutor(jarPaths, config); - this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, config); - this.processingExecutor = new ProcessingExecutor(jarPaths, config); - this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, config); + this.sourceExecutor = new SourceExecutor(jarPaths, jobConfig); + this.sinkExecutor = new SinkExecutor(jarPaths, jobConfig); + this.filterExecutor = new FilterExecutor(jarPaths, jobConfig); + this.splitExecutor = new SplitExecutor(jarPaths, jobConfig); + this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, jobConfig); + this.processingExecutor = new ProcessingExecutor(jarPaths, jobConfig); + this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, jobConfig); this.jobRuntimeEnvironment = - JobRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths), grootStreamConfig); + JobRuntimeEnvironment.getInstance(this.registerPlugin(jobConfig, jarPaths), grootStreamConfig); this.sourceExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.sinkExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); @@ -67,7 +67,7 @@ public class JobExecution { this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.nodes = buildJobNode(config); + this.nodes = buildJobNode(jobConfig); } @@ -126,7 +126,7 @@ public class JobExecution { return new URL(uri); } catch (MalformedURLException e) { throw new RuntimeException( - "the uri of jar illegal:" + uri, e); + "The uri of jar illegal:" + uri, e); } }) .collect(Collectors.toSet()); @@ -251,7 +251,7 @@ public class JobExecution { private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) { Node node = getNode(downstreamNodeName).orElseGet(() -> { - throw new JobExecuteException("can't find downstream node " + downstreamNodeName); + throw new JobExecuteException("Can't find downstream node " + downstreamNodeName); }); if (node.getType().name().equals(ProcessorType.FILTER.name())) { if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java index 07fde4d..4c6c2d2 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java @@ -35,16 +35,16 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ private String jobName = Constants.DEFAULT_JOB_NAME; private Set<String> splitSet = new HashSet<>(); - private JobRuntimeEnvironment(Config config, GrootStreamConfig grootStreamConfig) { + private JobRuntimeEnvironment(Config jobConfig, GrootStreamConfig grootStreamConfig) { this.grootStreamConfig = grootStreamConfig; - this.initialize(config); + this.initialize(jobConfig); } - public static JobRuntimeEnvironment getInstance(Config config, GrootStreamConfig grootStreamConfig) { + public static JobRuntimeEnvironment getInstance(Config jobConfig, GrootStreamConfig grootStreamConfig) { if (INSTANCE == null) { synchronized (JobRuntimeEnvironment.class) { if (INSTANCE == null) { - INSTANCE = new JobRuntimeEnvironment(config, grootStreamConfig); + INSTANCE = new JobRuntimeEnvironment(jobConfig, grootStreamConfig); } } } @@ -63,14 +63,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } @Override - public RuntimeEnvironment loadJobProperties(Config jobProperties) { - this.grootStreamConfig.getCommonConfig().getPropertiesConfig().putAll(jobProperties.root().unwrapped().entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue())))); - return this; - } - - @Override public CheckResult checkConfig() { return EnvironmentUtil.checkRestartStrategy(envConfig); } @@ -78,10 +70,19 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ @Override public RuntimeEnvironment prepare() { - createStreamEnvironment(); + if (envConfig.hasPath(Constants.JOB_NAME)) { jobName = envConfig.getString(Constants.JOB_NAME); } + // Job-level user-defined variables override the grootStreamConfig + if (envConfig.hasPath(Constants.PROPERTIES)) { + envConfig.getConfig(Constants.PROPERTIES).root().unwrapped().entrySet().forEach(entry -> { + this.grootStreamConfig.getCommonConfig().getPropertiesConfig().put(entry.getKey(), String.valueOf(entry.getValue())); + }); + } + + createStreamEnvironment(); + return this; } @@ -89,8 +90,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ return jobName; } - - public boolean isLocalMode() { return envConfig.hasPath(ExecutionConfigKeyName.ENV_TARGET_TYPE) && envConfig.getString(ExecutionConfigKeyName.ENV_TARGET_TYPE).equals(TargetType.LOCAL.getTarget()); @@ -131,7 +130,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } private void createStreamEnvironment() { - Configuration configuration = new Configuration(); EnvironmentUtil.initConfiguration(envConfig, configuration); if (isLocalMode()) { diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java index 710e7f6..b177e40 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java @@ -1,7 +1,4 @@ package com.geedgenetworks.bootstrap.execution; - - -import com.geedgenetworks.bootstrap.enums.TargetType; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.CheckResult; import com.typesafe.config.Config; @@ -12,16 +9,12 @@ import java.util.List; public interface RuntimeEnvironment { RuntimeEnvironment setEnvConfig(Config envConfig); + //Prepare runtime environment for job execution + RuntimeEnvironment prepare(); Config getEnvConfig(); - RuntimeEnvironment loadJobProperties(Config jobProperties); CheckResult checkConfig(); - RuntimeEnvironment prepare(); - void registerPlugin(List<URL> pluginPaths); default void initialize(Config config) { - if (config.getConfig(Constants.APPLICATION).hasPath(Constants.PROPERTIES)) { - this.loadJobProperties(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.PROPERTIES))); - } this.setEnvConfig(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV))).prepare(); } } |
