summaryrefslogtreecommitdiff
path: root/groot-bootstrap
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2023-12-31 00:43:48 +0800
committerdoufenghu <[email protected]>2023-12-31 00:43:48 +0800
commitfdd7119689ec54c3a5446062b71e759b5fed4b9f (patch)
treeb7f873904608e8183b944105b045c0f43ac71546 /groot-bootstrap
parent92e5b06386419a550099598eaaab3abcc4584e74 (diff)
[Fix][Bootstrap] 修复任务配置文件并行度不生效的问题,梳理RuntimeEnvironment加载envconfig使其边界更清晰。撰写env-config文档,描述不同方式指定任务名的优先级;描述不同level指定并行度优先级及覆盖逻辑。
Diffstat (limited to 'groot-bootstrap')
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java2
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java6
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java3
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java (renamed from groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKey.java)8
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java25
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java52
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java18
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/StartBuilder.java2
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/metric/JobMetricSummary.java82
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/metric/MetricNames.java19
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/ConfigFileUtils.java3
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java61
12 files changed, 194 insertions, 87 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java
index d838c11..2a00474 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java
@@ -32,7 +32,7 @@ public abstract class CommandArgs {
protected boolean checkConfig = false;
@Parameter(names = {"-i", "--variable"},
splitter = ParameterSplitter.class,
- description = "user-defined parameters , such as -i data_center=bj" +
+ description = "user-defined parameters , such as -i data_center=bj " +
"We use ',' as separator, when inside \"\", ',' are treated as normal characters instead of delimiters.")
protected List<String> variables = Collections.emptyList();
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java
index 3510daa..dc73bd1 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java
@@ -4,7 +4,7 @@ import cn.hutool.setting.yaml.YamlUtil;
import com.geedgenetworks.bootstrap.exception.CommandExecuteException;
import com.geedgenetworks.bootstrap.exception.ConfigCheckException;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
-import com.geedgenetworks.bootstrap.execution.ExecutionConfigKey;
+import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName;
import com.geedgenetworks.bootstrap.execution.JobExecution;
import com.geedgenetworks.bootstrap.utils.ConfigFileUtils;
import com.geedgenetworks.common.Constants;
@@ -38,11 +38,11 @@ public class ExecuteCommand implements Command<ExecuteCommandArgs> {
// if user specified job name using command line arguments, override config option
if (!executeCommandArgs.getJobName().equals(Constants.DEFAULT_JOB_NAME)) {
config = config.withValue(
- ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_NAME), ConfigValueFactory.fromAnyRef(executeCommandArgs.getJobName()));
+ ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV , Constants.JOB_NAME), ConfigValueFactory.fromAnyRef(executeCommandArgs.getJobName()));
}
if(executeCommandArgs.getTargetType() != null) {
- config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKey.ENV_TARGET_TYPE),
+ config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE),
ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget()));
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java
index fbf28f1..a6b2dd6 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java
@@ -108,7 +108,6 @@ public class ExecuteCommandArgs extends CommandArgs {
private static final List<DeployMode> DEPLOY_MODE_TYPE_LIST = new ArrayList<>();
static {
- DEPLOY_MODE_TYPE_LIST.add(DeployMode.CLIENT);
DEPLOY_MODE_TYPE_LIST.add(DeployMode.RUN);
DEPLOY_MODE_TYPE_LIST.add(DeployMode.RUN_APPLICATION);
}
@@ -121,7 +120,7 @@ public class ExecuteCommandArgs extends CommandArgs {
} else {
throw new IllegalArgumentException(
"Groot-Stream job on flink engine deploy mode only "
- + "support these options: [client, run, run-application]");
+ + "support these options: [run, run-application]");
}
}
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKey.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java
index 2c7f05a..2d82bd5 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKey.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java
@@ -1,13 +1,13 @@
package com.geedgenetworks.bootstrap.execution;
-public class ExecutionConfigKey {
- private ExecutionConfigKey() {
- throw new UnsupportedOperationException("ExecutionConfigKey is a utility class and cannot be instantiated");
+public class ExecutionConfigKeyName {
+ private ExecutionConfigKeyName() {
+ throw new UnsupportedOperationException("Utility class should not be instantiated");
}
public static final String TIME_CHARACTERISTIC = "execution.time-characteristic";
public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer.timeout";
- public static final String PARALLELISM = "execution.parallelism";
+ public static final String PARALLELISM = "parallelism";
public static final String MAX_PARALLELISM = "execution.max-parallelism";
public static final String CHECKPOINT_MODE = "execution.checkpoint.mode";
public static final String CHECKPOINT_TIMEOUT = "execution.checkpoint.timeout";
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
index fac544c..bd6056f 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
@@ -1,14 +1,12 @@
package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson2.JSONObject;
-import com.geedgenetworks.bootstrap.enums.DeployMode;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.main.GrootStreamRunner;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.config.GrootStreamConfig;
import com.geedgenetworks.core.pojo.Event;
-import com.geedgenetworks.utils.StringUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
@@ -71,9 +69,9 @@ public class JobExecution {
private void registerPlugin(Config appConfig) {
List<Path> thirdPartyJars = new ArrayList<>();
Config envConfig = appConfig.getConfig(Constants.APPLICATION_ENV);
- if(envConfig.hasPath(ExecutionConfigKey.JARS)) {
+ if(envConfig.hasPath(ExecutionConfigKeyName.JARS)) {
thirdPartyJars = new ArrayList<>(StartBuilder
- .getThirdPartyJars(envConfig.getString(ExecutionConfigKey.JARS)));
+ .getThirdPartyJars(envConfig.getString(ExecutionConfigKeyName.JARS)));
}
thirdPartyJars.addAll(StartBuilder.getConnectorJars());
thirdPartyJars.addAll(StartBuilder.getPluginsJarDependencies());
@@ -98,9 +96,9 @@ public class JobExecution {
private Config registerPlugin(Config config , List<URL> jars) {
config = this.injectJarsToConfig(
- config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKey.PIPELINE_JARS), jars);
+ config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_JARS), jars);
return this.injectJarsToConfig(
- config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKey.PIPELINE_CLASSPATHS), jars);
+ config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_CLASSPATHS), jars);
}
@@ -178,7 +176,7 @@ public class JobExecution {
postprocessingPipelines = config.getConfig(Constants.POSTPROCESSING_PIPELINES).root().unwrapped();
}
- List<? extends Config> topology = jobRuntimeEnvironment.getConfig().getConfigList(Constants.APPLICATION_TOPOLOGY);
+ List<? extends Config> topology = config.getConfig(Constants.APPLICATION).getConfigList(Constants.APPLICATION_TOPOLOGY);
List<Node> nodes = Lists.newArrayList();
@@ -211,8 +209,7 @@ public class JobExecution {
public void execute() throws JobExecuteException {
-
- if (!StartBuilder.getDeployMode().name().equals(DeployMode.CLIENT.name())) {
+ if (!jobRuntimeEnvironment.isLocalMode()) {
jobRuntimeEnvironment.registerPlugin(jarPaths);
}
List<Node> sourceNodes = nodes
@@ -228,10 +225,14 @@ public class JobExecution {
}
try {
- log.info("Execution Plan: {}", jobRuntimeEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
+ final long jobStartTime = System.currentTimeMillis();
log.info('\n' + Constants.GROOT_LOGO);
- jobRuntimeEnvironment.getStreamExecutionEnvironment()
- .execute(jobRuntimeEnvironment.getConfig().getString(Constants.APPLICATION_NAME));
+ log.info("Execution Plan: {}", jobRuntimeEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
+ log.info("Execution Job: {}", jobRuntimeEnvironment.getJobName());
+ jobRuntimeEnvironment.getStreamExecutionEnvironment().execute(jobRuntimeEnvironment.getJobName());
+ final long jobEndTime = System.currentTimeMillis();
+ log.info("Job finished, execution duration: {} ms", jobEndTime-jobStartTime);
+
} catch (Exception e) {
throw new JobExecuteException("Execute job error", e);
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
index baa9230..c719f46 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
@@ -22,11 +22,11 @@ import java.util.stream.Collectors;
@Slf4j
public class JobRuntimeEnvironment implements RuntimeEnvironment{
- private static JobRuntimeEnvironment INSTANCE = null;
- private Config appConfig;
-
+ private static volatile JobRuntimeEnvironment INSTANCE = null;
+ private Config envConfig;
private GrootStreamConfig grootStreamConfig;
private StreamExecutionEnvironment environment;
+ private String jobName = Constants.DEFAULT_JOB_NAME;
private JobRuntimeEnvironment(Config config, GrootStreamConfig grootStreamConfig) {
this.grootStreamConfig = grootStreamConfig;
@@ -45,27 +45,42 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
}
@Override
- public RuntimeEnvironment setConfig(Config appConfig) {
- this.appConfig = appConfig;
+ public RuntimeEnvironment setEnvConfig(Config envConfig) {
+ this.envConfig = envConfig;
return this;
}
@Override
- public Config getConfig() {
- return appConfig;
+ public Config getEnvConfig() {
+ return envConfig;
}
@Override
public CheckResult checkConfig() {
- return EnvironmentUtil.checkRestartStrategy(appConfig);
+ return EnvironmentUtil.checkRestartStrategy(envConfig);
}
+
@Override
public RuntimeEnvironment prepare() {
createStreamEnvironment();
+ if (envConfig.hasPath(Constants.JOB_NAME)) {
+ jobName = envConfig.getString(Constants.JOB_NAME);
+ }
return this;
}
+ public String getJobName() {
+ return jobName;
+ }
+
+
+
+ public boolean isLocalMode() {
+ return envConfig.hasPath(ExecutionConfigKeyName.ENV_TARGET_TYPE)
+ && envConfig.getString(ExecutionConfigKeyName.ENV_TARGET_TYPE).equals(TargetType.LOCAL.getTarget());
+ }
+
@Override
public void registerPlugin(List<URL> pluginPaths) {
pluginPaths.forEach(url -> log.info("Begin register plugins: {}", url));
@@ -103,10 +118,8 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
private void createStreamEnvironment() {
Configuration configuration = new Configuration();
- EnvironmentUtil.initConfiguration(appConfig, configuration);
- Config envConfig = appConfig.getConfig(Constants.APPLICATION_ENV);
- if (envConfig.hasPath(ExecutionConfigKey.ENV_TARGET_TYPE)
- && envConfig.getString(ExecutionConfigKey.ENV_TARGET_TYPE).equals(TargetType.LOCAL.getTarget())) {
+ EnvironmentUtil.initConfiguration(envConfig, configuration);
+ if (isLocalMode()) {
configuration.setInteger(RestOptions.PORT, 8082);
environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
} else {
@@ -114,26 +127,25 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
}
configuration.setString(Constants.SYSPROP_UDF_PLUGIN_CONFIG, JSON.toJSONString(grootStreamConfig.getUDFPluginConfig()));
configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(grootStreamConfig.getCommonConfig()));
- environment.getConfig().enableObjectReuse();
environment.getConfig().setGlobalJobParameters(configuration);
setTimeCharacteristic();
// setCheckpoint();
- EnvironmentUtil.setRestartStrategy(appConfig, environment.getConfig());
- if (envConfig.hasPath(ExecutionConfigKey.BUFFER_TIMEOUT_MILLIS)) {
- long timeout = envConfig.getLong(ExecutionConfigKey.BUFFER_TIMEOUT_MILLIS);
+ EnvironmentUtil.setRestartStrategy(envConfig, environment.getConfig());
+ if (envConfig.hasPath(ExecutionConfigKeyName.BUFFER_TIMEOUT_MILLIS)) {
+ long timeout = envConfig.getLong(ExecutionConfigKeyName.BUFFER_TIMEOUT_MILLIS);
environment.setBufferTimeout(timeout);
}
- if (envConfig.hasPath(ExecutionConfigKey.PARALLELISM)) {
- int parallelism = envConfig.getInt(ExecutionConfigKey.PARALLELISM);
+ if (envConfig.hasPath(ExecutionConfigKeyName.PARALLELISM)) {
+ int parallelism = envConfig.getInt(ExecutionConfigKeyName.PARALLELISM);
environment.setParallelism(parallelism);
}
}
private void setTimeCharacteristic() {
- if (appConfig.hasPath(ExecutionConfigKey.TIME_CHARACTERISTIC)) {
- String timeType = appConfig.getString(ExecutionConfigKey.TIME_CHARACTERISTIC);
+ if (envConfig.hasPath(ExecutionConfigKeyName.TIME_CHARACTERISTIC)) {
+ String timeType = envConfig.getString(ExecutionConfigKeyName.TIME_CHARACTERISTIC);
switch (timeType.toLowerCase()) {
case "event-time":
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java
index 999a1ea..bee1c0a 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java
@@ -1,32 +1,24 @@
package com.geedgenetworks.bootstrap.execution;
+import com.geedgenetworks.bootstrap.enums.TargetType;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.config.CheckResult;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigUtil;
import java.net.URL;
import java.util.List;
public interface RuntimeEnvironment {
-
- //storage application config
- RuntimeEnvironment setConfig(Config config);
-
- Config getConfig();
+ RuntimeEnvironment setEnvConfig(Config envConfig);
+ Config getEnvConfig();
CheckResult checkConfig();
-
RuntimeEnvironment prepare();
void registerPlugin(List<URL> pluginPaths);
-
default void initialize(Config config) {
- this.setConfig(config.getConfig(Constants.APPLICATION)).prepare();
+ this.setEnvConfig(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV))).prepare();
}
-
-
-
-
-
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/StartBuilder.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/StartBuilder.java
index a37075a..620fa96 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/StartBuilder.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/StartBuilder.java
@@ -67,7 +67,7 @@ public class StartBuilder {
public static Path appRootDir() {
- if (DeployMode.CLIENT == MODE || DeployMode.RUN == MODE || STARTER) {
+ if (DeployMode.RUN == MODE || STARTER) {
try {
String path =
StartBuilder.class
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/metric/JobMetricSummary.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/metric/JobMetricSummary.java
new file mode 100644
index 0000000..1c5bbc3
--- /dev/null
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/metric/JobMetricSummary.java
@@ -0,0 +1,82 @@
+package com.geedgenetworks.bootstrap.metric;
+
+import com.geedgenetworks.common.utils.StringFormatUtil;
+import com.geedgenetworks.utils.DateUtils;
+import org.apache.flink.api.common.JobExecutionResult;
+
+import java.time.Duration;
+public final class JobMetricSummary {
+ private final JobExecutionResult jobExecutionResult;
+ private final Long jobStartTime;
+ private final Long jobEndTime;
+
+ JobMetricSummary(
+ JobExecutionResult jobExecutionResult,
+ long jobStartTime,
+ long jobEndTime) {
+ this.jobExecutionResult = jobExecutionResult;
+ this.jobStartTime = jobStartTime;
+ this.jobEndTime = jobEndTime;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private JobExecutionResult jobExecutionResult;
+
+ private long jobStartTime;
+
+ private long jobEndTime;
+
+ private Builder() {}
+
+ public Builder jobExecutionResult(JobExecutionResult jobExecutionResult) {
+ this.jobExecutionResult = jobExecutionResult;
+ return this;
+ }
+
+ public Builder jobStartTime(long jobStartTime) {
+ this.jobStartTime = jobStartTime;
+ return this;
+ }
+
+ public Builder jobEndTime(long jobEndTime) {
+ this.jobEndTime = jobEndTime;
+ return this;
+ }
+
+ public JobMetricSummary build() {
+ return new JobMetricSummary(
+ jobExecutionResult,
+ jobStartTime,
+ jobEndTime);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return StringFormatUtil.formatTable(
+ "Job Statistic Information",
+ "Start Time",
+ DateUtils.convertTimestampToString(jobStartTime, DateUtils.YYYY_MM_DD_HH_MM_SS),
+ "End Time",
+ DateUtils.convertTimestampToString(jobEndTime, DateUtils.YYYY_MM_DD_HH_MM_SS),
+ "Total Time(s)",
+ (jobEndTime - jobStartTime)/1000,
+ "Total Read Count",
+ jobExecutionResult
+ .getAllAccumulatorResults()
+ .get(MetricNames.SOURCE_RECEIVED_COUNT),
+ "Total Write Count",
+ jobExecutionResult.getAllAccumulatorResults().get(MetricNames.SINK_WRITE_COUNT),
+ "Total Read Bytes",
+ jobExecutionResult
+ .getAllAccumulatorResults()
+ .get(MetricNames.SOURCE_RECEIVED_BYTES),
+ "Total Write Bytes",
+ jobExecutionResult.getAllAccumulatorResults().get(MetricNames.SINK_WRITE_BYTES));
+ }
+}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/metric/MetricNames.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/metric/MetricNames.java
new file mode 100644
index 0000000..91f6200
--- /dev/null
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/metric/MetricNames.java
@@ -0,0 +1,19 @@
+package com.geedgenetworks.bootstrap.metric;
+
+public final class MetricNames {
+
+ private MetricNames() {}
+
+ public static final String RECEIVED_COUNT = "receivedCount";
+
+ public static final String RECEIVED_BATCHES = "receivedBatches";
+
+ public static final String SOURCE_RECEIVED_COUNT = "SourceReceivedCount";
+ public static final String SOURCE_RECEIVED_BYTES = "SourceReceivedBytes";
+ public static final String SOURCE_RECEIVED_QPS = "SourceReceivedQPS";
+ public static final String SOURCE_RECEIVED_BYTES_PER_SECONDS = "SourceReceivedBytesPerSeconds";
+ public static final String SINK_WRITE_COUNT = "SinkWriteCount";
+ public static final String SINK_WRITE_BYTES = "SinkWriteBytes";
+ public static final String SINK_WRITE_QPS = "SinkWriteQPS";
+ public static final String SINK_WRITE_BYTES_PER_SECONDS = "SinkWriteBytesPerSeconds";
+}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/ConfigFileUtils.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/ConfigFileUtils.java
index 99f336f..0820559 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/ConfigFileUtils.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/ConfigFileUtils.java
@@ -28,8 +28,7 @@ public class ConfigFileUtils {
public static void checkConfigExist(Path configFile) {
if (!configFile.toFile().exists()) {
- String message = "Can't find config file: " + configFile;
- throw new GrootStreamRuntimeException(CommonErrorCode.FILE_OPERATION_ERROR, message);
+ throw new GrootStreamRuntimeException(CommonErrorCode.FILE_OPERATION_ERROR, "Can't find config file: " + configFile);
}
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java
index 1b4a5b8..79cd8bf 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java
@@ -1,6 +1,6 @@
package com.geedgenetworks.bootstrap.utils;
-import com.geedgenetworks.bootstrap.execution.ExecutionConfigKey;
+import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName;
import com.geedgenetworks.common.config.CheckResult;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValue;
@@ -20,9 +20,9 @@ public final class EnvironmentUtil {
throw new UnsupportedOperationException("EnvironmentUtil is a utility class and cannot be instantiated");
}
- public static void initConfiguration(Config appConfig, Configuration configuration) {
- if (appConfig.hasPath("pipeline")) {
- Config pipeline = appConfig.getConfig("pipeline");
+ public static void initConfiguration(Config envConfig, Configuration configuration) {
+ if (envConfig.hasPath("pipeline")) {
+ Config pipeline = envConfig.getConfig("pipeline");
if (pipeline.hasPath("jars")) {
configuration.setString(PipelineOptions.JARS.key(), pipeline.getString("jars"));
}
@@ -30,39 +30,42 @@ public final class EnvironmentUtil {
configuration.setString(
PipelineOptions.CLASSPATHS.key(), pipeline.getString("classpaths"));
}
+ if(pipeline.hasPath("object-reuse")) {
+ configuration.setBoolean(PipelineOptions.OBJECT_REUSE.key(), pipeline.getBoolean("object-reuse"));
+ }
}
String prefixConf = "flink.";
- if (!appConfig.isEmpty()) {
- for (Map.Entry<String, ConfigValue> entryConfKey : appConfig.entrySet()) {
+ if (!envConfig.isEmpty()) {
+ for (Map.Entry<String, ConfigValue> entryConfKey : envConfig.entrySet()) {
String confKey = entryConfKey.getKey().trim();
if (confKey.startsWith(prefixConf)) {
configuration.setString(
- confKey.replaceFirst(prefixConf, ""), entryConfKey.getValue().render());
+ confKey.replaceFirst(prefixConf, ""), entryConfKey.getValue().unwrapped().toString());
}
}
}
}
- public static void setRestartStrategy(Config config, ExecutionConfig executionConfig) {
+ public static void setRestartStrategy(Config envConfig, ExecutionConfig executionConfig) {
try {
- if (config.hasPath(ExecutionConfigKey.RESTART_STRATEGY)) {
- String restartStrategy = config.getString(ExecutionConfigKey.RESTART_STRATEGY);
+ if (envConfig.hasPath(ExecutionConfigKeyName.RESTART_STRATEGY)) {
+ String restartStrategy = envConfig.getString(ExecutionConfigKeyName.RESTART_STRATEGY);
switch (restartStrategy.toLowerCase()) {
case "no":
executionConfig.setRestartStrategy(RestartStrategies.noRestart());
break;
case "fixed-delay":
- int attempts = config.getInt(ExecutionConfigKey.RESTART_ATTEMPTS);
- long delay = config.getLong(ExecutionConfigKey.RESTART_DELAY_BETWEEN_ATTEMPTS);
+ int attempts = envConfig.getInt(ExecutionConfigKeyName.RESTART_ATTEMPTS);
+ long delay = envConfig.getLong(ExecutionConfigKeyName.RESTART_DELAY_BETWEEN_ATTEMPTS);
executionConfig.setRestartStrategy(
RestartStrategies.fixedDelayRestart(attempts, delay));
break;
case "failure-rate":
long failureInterval =
- config.getLong(ExecutionConfigKey.RESTART_FAILURE_INTERVAL);
- int rate = config.getInt(ExecutionConfigKey.RESTART_FAILURE_RATE);
- long delayInterval = config.getLong(ExecutionConfigKey.RESTART_DELAY_INTERVAL);
+ envConfig.getLong(ExecutionConfigKeyName.RESTART_FAILURE_INTERVAL);
+ int rate = envConfig.getInt(ExecutionConfigKeyName.RESTART_FAILURE_RATE);
+ long delayInterval = envConfig.getLong(ExecutionConfigKeyName.RESTART_DELAY_INTERVAL);
executionConfig.setRestartStrategy(
RestartStrategies.failureRateRestart(
rate,
@@ -76,34 +79,34 @@ public final class EnvironmentUtil {
}
}
} catch (Exception e) {
- log.warn("set restart.strategy in config '{}' exception", config, e);
+ log.warn("set restart.strategy in config '{}' exception", envConfig, e);
}
}
- public static CheckResult checkRestartStrategy(Config config) {
- if (config.hasPath(ExecutionConfigKey.RESTART_STRATEGY)) {
- String restartStrategy = config.getString(ExecutionConfigKey.RESTART_STRATEGY);
+ public static CheckResult checkRestartStrategy(Config envConfig) {
+ if (envConfig.hasPath(ExecutionConfigKeyName.RESTART_STRATEGY)) {
+ String restartStrategy = envConfig.getString(ExecutionConfigKeyName.RESTART_STRATEGY);
switch (restartStrategy.toLowerCase()) {
case "fixed-delay":
- if (!(config.hasPath(ExecutionConfigKey.RESTART_ATTEMPTS)
- && config.hasPath(ExecutionConfigKey.RESTART_DELAY_BETWEEN_ATTEMPTS))) {
+ if (!(envConfig.hasPath(ExecutionConfigKeyName.RESTART_ATTEMPTS)
+ && envConfig.hasPath(ExecutionConfigKeyName.RESTART_DELAY_BETWEEN_ATTEMPTS))) {
return CheckResult.error(
String.format(
"fixed-delay restart strategy must set [%s],[%s]",
- ExecutionConfigKey.RESTART_ATTEMPTS,
- ExecutionConfigKey.RESTART_DELAY_BETWEEN_ATTEMPTS));
+ ExecutionConfigKeyName.RESTART_ATTEMPTS,
+ ExecutionConfigKeyName.RESTART_DELAY_BETWEEN_ATTEMPTS));
}
break;
case "failure-rate":
- if (!(config.hasPath(ExecutionConfigKey.RESTART_FAILURE_INTERVAL)
- && config.hasPath(ExecutionConfigKey.RESTART_FAILURE_RATE)
- && config.hasPath(ExecutionConfigKey.RESTART_DELAY_INTERVAL))) {
+ if (!(envConfig.hasPath(ExecutionConfigKeyName.RESTART_FAILURE_INTERVAL)
+ && envConfig.hasPath(ExecutionConfigKeyName.RESTART_FAILURE_RATE)
+ && envConfig.hasPath(ExecutionConfigKeyName.RESTART_DELAY_INTERVAL))) {
return CheckResult.error(
String.format(
"failure-rate restart strategy must set [%s],[%s],[%s]",
- ExecutionConfigKey.RESTART_FAILURE_INTERVAL,
- ExecutionConfigKey.RESTART_FAILURE_RATE,
- ExecutionConfigKey.RESTART_DELAY_INTERVAL));
+ ExecutionConfigKeyName.RESTART_FAILURE_INTERVAL,
+ ExecutionConfigKeyName.RESTART_FAILURE_RATE,
+ ExecutionConfigKeyName.RESTART_DELAY_INTERVAL));
}
break;
default: