summaryrefslogtreecommitdiff
path: root/groot-bootstrap
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-09-04 09:50:32 +0800
committerwangkuan <[email protected]>2024-09-04 09:50:32 +0800
commit8217c23fa564ad2051427a12f4eabf3fea75aafc (patch)
tree761f6b2dbe28c6a4e6e1392465aecdc869cf1cb1 /groot-bootstrap
parente3efdcac80dc1ca8fb0bdd08f69318f745f9bf7c (diff)
[improve][bootstrap]优化代码,修改splitsset位置,单元测试适配properties配置变更
Diffstat (limited to 'groot-bootstrap')
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java24
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java9
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java9
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java20
4 files changed, 36 insertions, 26 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
index 6a23a0b..f6e19eb 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
@@ -6,7 +6,14 @@ import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.main.GrootStreamRunner;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.config.CheckConfigUtil;
+import com.geedgenetworks.common.config.CheckResult;
import com.geedgenetworks.common.config.GrootStreamConfig;
+import com.geedgenetworks.common.config.SplitConfigOptions;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.ConfigValidationException;
+import com.geedgenetworks.common.udf.RuleContext;
+import com.geedgenetworks.core.pojo.SplitConfig;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
@@ -38,6 +45,7 @@ public class JobExecution {
private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor;
private final List<Node> nodes;
private final List<URL> jarPaths;
+ private final Set<String> splitSet = new HashSet<>();
public JobExecution(Config jobConfig, GrootStreamConfig grootStreamConfig) {
try {
@@ -201,6 +209,12 @@ public class JobExecution {
} else if (filters.containsKey(node.getName())) {
node.setType(ProcessorType.FILTER);
} else if (splits.containsKey(node.getName())) {
+ splits.forEach((key, value) -> {
+ SplitConfig splitConfig = new com.alibaba.fastjson.JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class);
+ for(RuleContext ruleContext:splitConfig.getRules()) {
+ splitSet.add(ruleContext.getName());
+ }
+ });
node.setType(ProcessorType.SPLIT);
} else if (preprocessingPipelines.containsKey(node.getName())) {
node.setType(ProcessorType.PREPROCESSING);
@@ -254,7 +268,7 @@ public class JobExecution {
throw new JobExecuteException("Can't find downstream node " + downstreamNodeName);
});
if (node.getType().name().equals(ProcessorType.FILTER.name())) {
- if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ if (splitSet.contains(node.getName())) {
dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
}), node);
} else {
@@ -264,28 +278,28 @@ public class JobExecution {
dataStream = splitExecutor.execute(dataStream, node);
} else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) {
- if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ if (splitSet.contains(node.getName())) {
dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
}), node);
} else {
dataStream = preprocessingExecutor.execute(dataStream, node);
}
} else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) {
- if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ if (splitSet.contains(node.getName())) {
dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
}), node);
} else {
dataStream = processingExecutor.execute(dataStream, node);
}
} else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) {
- if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ if (splitSet.contains(node.getName())) {
dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
}), node);
} else {
dataStream = postprocessingExecutor.execute(dataStream, node);
}
} else if (node.getType().name().equals(ProcessorType.SINK.name())) {
- if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ if (splitSet.contains(node.getName())) {
dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
}), node);
} else {
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
index 4c6c2d2..a4289ff 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
@@ -33,7 +33,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
private GrootStreamConfig grootStreamConfig;
private StreamExecutionEnvironment environment;
private String jobName = Constants.DEFAULT_JOB_NAME;
- private Set<String> splitSet = new HashSet<>();
private JobRuntimeEnvironment(Config jobConfig, GrootStreamConfig grootStreamConfig) {
this.grootStreamConfig = grootStreamConfig;
@@ -200,14 +199,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
}
}
- public Set<String> getSplitSet() {
- return splitSet;
- }
-
- public void setSplitSet(Set<String> splitSet) {
- this.splitSet = splitSet;
- }
-
private void setCheckpoint() {
long interval = 0;
if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_INTERVAL)) {
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java
index 3513a67..e549087 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java
@@ -37,9 +37,9 @@ public class SplitExecutor extends AbstractExecutor<String, SplitConfig> {
protected Map<String, SplitConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
Map<String, SplitConfig> splitConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.SPLITS)) {
- Config routes = operatorConfig.getConfig(Constants.SPLITS);
- routes.root().unwrapped().forEach((key, value) -> {
- CheckResult result = CheckConfigUtil.checkAllExists(routes.getConfig(key),
+ Config splitsConfig = operatorConfig.getConfig(Constants.SPLITS);
+ splitsConfig.root().unwrapped().forEach((key, value) -> {
+ CheckResult result = CheckConfigUtil.checkAllExists(splitsConfig.getConfig(key),
SplitConfigOptions.TYPE.key());
if (!result.isSuccess()) {
throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
@@ -75,9 +75,6 @@ public class SplitExecutor extends AbstractExecutor<String, SplitConfig> {
if (node.getParallelism() > 0) {
splitConfig.setParallelism(node.getParallelism());
}
- for(RuleContext ruleContext:splitConfig.getRules()) {
- jobRuntimeEnvironment.getSplitSet().add(ruleContext.getName());
- }
try {
dataStream =
split.splitFunction(
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java
index 7e995cf..7b9544a 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java
@@ -7,8 +7,10 @@ import com.geedgenetworks.bootstrap.execution.*;
import com.geedgenetworks.bootstrap.main.GrootStreamRunner;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.config.GrootStreamConfig;
+import com.geedgenetworks.common.udf.RuleContext;
import com.geedgenetworks.common.utils.ReflectionUtils;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.pojo.SplitConfig;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
@@ -42,7 +44,7 @@ public class JobExecutionTest {
private final Executor<DataStream<Event>, JobRuntimeEnvironment> processingExecutor;
private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor;
private final Executor<DataStream<Event>, JobRuntimeEnvironment> sinkExecutor;
-
+ private final Set<String> splitSet = new HashSet<>();
private final List<Node> nodes;
private BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER =
@@ -219,6 +221,12 @@ public class JobExecutionTest {
} else if (sinks.containsKey(node.getName())) {
node.setType(ProcessorType.SINK);
} else if (splits.containsKey(node.getName())) {
+ splits.forEach((key, value) -> {
+ SplitConfig splitConfig = new com.alibaba.fastjson.JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class);
+ for(RuleContext ruleContext:splitConfig.getRules()) {
+ splitSet.add(ruleContext.getName());
+ }
+ });
node.setType(ProcessorType.SPLIT);
} else if (filters.containsKey(node.getName())) {
node.setType(ProcessorType.FILTER);
@@ -262,7 +270,7 @@ public class JobExecutionTest {
throw new JobExecuteException("can't find downstream node " + downstreamNodeName);
});
if (node.getType().name().equals(ProcessorType.FILTER.name())) {
- if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ if (splitSet.contains(node.getName())) {
dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
}), node);
} else {
@@ -272,28 +280,28 @@ public class JobExecutionTest {
dataStream = splitExecutor.execute(dataStream, node);
} else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) {
- if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ if (splitSet.contains(node.getName())) {
dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
}), node);
} else {
dataStream = preprocessingExecutor.execute(dataStream, node);
}
} else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) {
- if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ if (splitSet.contains(node.getName())) {
dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
}), node);
} else {
dataStream = processingExecutor.execute(dataStream, node);
}
} else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) {
- if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ if (splitSet.contains(node.getName())) {
dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
}), node);
} else {
dataStream = postprocessingExecutor.execute(dataStream, node);
}
} else if (node.getType().name().equals(ProcessorType.SINK.name())) {
- if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ if (splitSet.contains(node.getName())) {
dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
}), node);
} else {