summaryrefslogtreecommitdiff
path: root/groot-bootstrap
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-09-01 23:49:48 +0800
committerdoufenghu <[email protected]>2024-09-01 23:49:48 +0800
commita55399cb95c6408233e84540db482ae5e6131746 (patch)
treede9b913e817190830ea92122abf779b91b238114 /groot-bootstrap
parent2947507b84076e5c6a2accc7b142bd27594ba3d8 (diff)
[Improve][bootstrap] Improve job-level user-defined variables, move the path from application/properties to application/env/properties, and add support for defining variables via the runtime CLI. (GAL-651)
Diffstat (limited to 'groot-bootstrap')
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java4
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java26
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java11
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java28
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java30
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java11
6 files changed, 58 insertions, 52 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java
index f83183c..6ee5151 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java
@@ -31,10 +31,10 @@ public abstract class CommandArgs {
description = "Whether check config")
protected boolean checkConfig = false;
-
+ // user-defined parameters
@Parameter(names = {"-i", "--variable"},
splitter = ParameterSplitter.class,
- description = "user-defined parameters , such as -i data_center=bj "
+ description = "Job level user-defined parameters, such as -i traffic_file_bucket=traffic_file_bucket, or -i scheduler.knowledge_base.update.interval.minutes=1. "
+ "We use ',' as separator, when inside \"\", ',' are treated as normal characters instead of delimiters.")
protected List<String> variables = Collections.emptyList();
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java
index 79a27e1..c3538b0 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java
@@ -26,27 +26,43 @@ public class ExecuteCommand implements Command<ExecuteCommandArgs> {
@Override
public void execute() throws CommandExecuteException, ConfigCheckException {
+ // Groot Stream Global Config for all processing jobs
GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig();
Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs);
// check config file exist
checkConfigExist(configFile);
- Config config = ConfigBuilder.of(configFile);
+ Config jobConfig = ConfigBuilder.of(configFile);
+
// if user specified job name using command line arguments, override config option
if (!executeCommandArgs.getJobName().equals(Constants.DEFAULT_JOB_NAME)) {
- config = config.withValue(
+ jobConfig = jobConfig.withValue(
ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV , Constants.JOB_NAME), ConfigValueFactory.fromAnyRef(executeCommandArgs.getJobName()));
}
+ // if user specified target type using command line arguments, override config option
if(executeCommandArgs.getTargetType() != null) {
- config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE),
+ jobConfig = jobConfig.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE),
ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget()));
}
- JobExecution jobExecution = new JobExecution(config, grootStreamConfig);
+ // if user specified variables using command line arguments, override config option
+ if(!executeCommandArgs.getVariables().isEmpty()) {
+ for (String variable : executeCommandArgs.getVariables()) {
+ String[] keyValue = variable.split("=");
+ if (keyValue.length != 2) {
+ throw new CommandExecuteException("Invalid variable format: " + variable);
+ }
+ jobConfig = jobConfig.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, Constants.PROPERTIES, keyValue[0]),
+ ConfigValueFactory.fromAnyRef(keyValue[1]));
+ }
+ }
+
+
+ JobExecution jobExecution = new JobExecution(jobConfig, grootStreamConfig);
try {
jobExecution.execute();
} catch (Exception e) {
- throw new JobExecuteException("Job executed error", e);
+ throw new JobExecuteException("Job executed failed", e);
}
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java
index 0c00a61..61ced82 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java
@@ -20,14 +20,14 @@ public class ExecuteCommandArgs extends CommandArgs {
@Parameter(names={"-e", "--deploy-mode"},
converter = DeployModeConverter.class,
- description = "deploy mode, only support [run] ")
+ description = "Job deploy mode, only support [run] ")
private DeployMode deployMode = DeployMode.RUN;
@Parameter(
names = {"--target"},
converter = TargetTypeConverter.class,
description =
- "job submitted target type, support [local, remote, yarn-session, yarn-per-job]")
+ "Job submitted target type, support [local, remote, yarn-session, yarn-per-job]")
private TargetType targetType;
@Override
@@ -55,7 +55,7 @@ public class ExecuteCommandArgs extends CommandArgs {
@Override
public String toString() {
- return "FlinkCommandArgs{"
+ return "CommandArgs{"
+ "deployMode="
+ deployMode
+ ", targetType="
@@ -74,7 +74,6 @@ public class ExecuteCommandArgs extends CommandArgs {
}
-
private void userParamsToSysEnv() {
if (!this.variables.isEmpty()) {
variables.stream()
@@ -111,7 +110,7 @@ public class ExecuteCommandArgs extends CommandArgs {
return targetType;
} else {
throw new IllegalArgumentException(
- "Groot-Stream job on flink engine submitted target type only "
+ "GrootStream job submitted target type only "
+ "support these options: [local, remote, yarn-session, yarn-per-job]");
}
}
@@ -132,7 +131,7 @@ public class ExecuteCommandArgs extends CommandArgs {
return deployMode;
} else {
throw new IllegalArgumentException(
- "Groot-Stream job on flink engine deploy mode only "
+ "GrootStream job deploy mode only "
+ "support these options: [run, run-application]");
}
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
index 22b23a7..6a23a0b 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
@@ -39,26 +39,26 @@ public class JobExecution {
private final List<Node> nodes;
private final List<URL> jarPaths;
- public JobExecution(Config config, GrootStreamConfig grootStreamConfig) {
+ public JobExecution(Config jobConfig, GrootStreamConfig grootStreamConfig) {
try {
jarPaths = new ArrayList<>(Collections.singletonList(new File(StartBuilder.appBootstrapDir()
.resolve(GrootStreamRunner.APP_JAR_NAME).toString())
.toURI().toURL()));
} catch (MalformedURLException e) {
- throw new JobExecuteException("load groot stream bootstrap jar error.", e);
+ throw new JobExecuteException("Load GrootStream Bootstrap jar failed.", e);
}
- registerPlugin(config.getConfig(Constants.APPLICATION));
+ registerPlugin(jobConfig.getConfig(Constants.APPLICATION));
- this.sourceExecutor = new SourceExecutor(jarPaths, config);
- this.sinkExecutor = new SinkExecutor(jarPaths, config);
- this.filterExecutor = new FilterExecutor(jarPaths, config);
- this.splitExecutor = new SplitExecutor(jarPaths, config);
- this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, config);
- this.processingExecutor = new ProcessingExecutor(jarPaths, config);
- this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, config);
+ this.sourceExecutor = new SourceExecutor(jarPaths, jobConfig);
+ this.sinkExecutor = new SinkExecutor(jarPaths, jobConfig);
+ this.filterExecutor = new FilterExecutor(jarPaths, jobConfig);
+ this.splitExecutor = new SplitExecutor(jarPaths, jobConfig);
+ this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, jobConfig);
+ this.processingExecutor = new ProcessingExecutor(jarPaths, jobConfig);
+ this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, jobConfig);
this.jobRuntimeEnvironment =
- JobRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths), grootStreamConfig);
+ JobRuntimeEnvironment.getInstance(this.registerPlugin(jobConfig, jarPaths), grootStreamConfig);
this.sourceExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.sinkExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
@@ -67,7 +67,7 @@ public class JobExecution {
this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.nodes = buildJobNode(config);
+ this.nodes = buildJobNode(jobConfig);
}
@@ -126,7 +126,7 @@ public class JobExecution {
return new URL(uri);
} catch (MalformedURLException e) {
throw new RuntimeException(
- "the uri of jar illegal:" + uri, e);
+ "The uri of jar illegal:" + uri, e);
}
})
.collect(Collectors.toSet());
@@ -251,7 +251,7 @@ public class JobExecution {
private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) {
Node node = getNode(downstreamNodeName).orElseGet(() -> {
- throw new JobExecuteException("can't find downstream node " + downstreamNodeName);
+ throw new JobExecuteException("Can't find downstream node " + downstreamNodeName);
});
if (node.getType().name().equals(ProcessorType.FILTER.name())) {
if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
index 07fde4d..4c6c2d2 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
@@ -35,16 +35,16 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
private String jobName = Constants.DEFAULT_JOB_NAME;
private Set<String> splitSet = new HashSet<>();
- private JobRuntimeEnvironment(Config config, GrootStreamConfig grootStreamConfig) {
+ private JobRuntimeEnvironment(Config jobConfig, GrootStreamConfig grootStreamConfig) {
this.grootStreamConfig = grootStreamConfig;
- this.initialize(config);
+ this.initialize(jobConfig);
}
- public static JobRuntimeEnvironment getInstance(Config config, GrootStreamConfig grootStreamConfig) {
+ public static JobRuntimeEnvironment getInstance(Config jobConfig, GrootStreamConfig grootStreamConfig) {
if (INSTANCE == null) {
synchronized (JobRuntimeEnvironment.class) {
if (INSTANCE == null) {
- INSTANCE = new JobRuntimeEnvironment(config, grootStreamConfig);
+ INSTANCE = new JobRuntimeEnvironment(jobConfig, grootStreamConfig);
}
}
}
@@ -63,14 +63,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
}
@Override
- public RuntimeEnvironment loadJobProperties(Config jobProperties) {
- this.grootStreamConfig.getCommonConfig().getPropertiesConfig().putAll(jobProperties.root().unwrapped().entrySet()
- .stream()
- .collect(Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue()))));
- return this;
- }
-
- @Override
public CheckResult checkConfig() {
return EnvironmentUtil.checkRestartStrategy(envConfig);
}
@@ -78,10 +70,19 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
@Override
public RuntimeEnvironment prepare() {
- createStreamEnvironment();
+
if (envConfig.hasPath(Constants.JOB_NAME)) {
jobName = envConfig.getString(Constants.JOB_NAME);
}
+ // Job-level user-defined variables override the grootStreamConfig
+ if (envConfig.hasPath(Constants.PROPERTIES)) {
+ envConfig.getConfig(Constants.PROPERTIES).root().unwrapped().entrySet().forEach(entry -> {
+ this.grootStreamConfig.getCommonConfig().getPropertiesConfig().put(entry.getKey(), String.valueOf(entry.getValue()));
+ });
+ }
+
+ createStreamEnvironment();
+
return this;
}
@@ -89,8 +90,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
return jobName;
}
-
-
public boolean isLocalMode() {
return envConfig.hasPath(ExecutionConfigKeyName.ENV_TARGET_TYPE)
&& envConfig.getString(ExecutionConfigKeyName.ENV_TARGET_TYPE).equals(TargetType.LOCAL.getTarget());
@@ -131,7 +130,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
}
private void createStreamEnvironment() {
-
Configuration configuration = new Configuration();
EnvironmentUtil.initConfiguration(envConfig, configuration);
if (isLocalMode()) {
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java
index 710e7f6..b177e40 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java
@@ -1,7 +1,4 @@
package com.geedgenetworks.bootstrap.execution;
-
-
-import com.geedgenetworks.bootstrap.enums.TargetType;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.config.CheckResult;
import com.typesafe.config.Config;
@@ -12,16 +9,12 @@ import java.util.List;
public interface RuntimeEnvironment {
RuntimeEnvironment setEnvConfig(Config envConfig);
+ //Prepare runtime environment for job execution
+ RuntimeEnvironment prepare();
Config getEnvConfig();
- RuntimeEnvironment loadJobProperties(Config jobProperties);
CheckResult checkConfig();
- RuntimeEnvironment prepare();
-
void registerPlugin(List<URL> pluginPaths);
default void initialize(Config config) {
- if (config.getConfig(Constants.APPLICATION).hasPath(Constants.PROPERTIES)) {
- this.loadJobProperties(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.PROPERTIES)));
- }
this.setEnvConfig(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV))).prepare();
}
}