summaryrefslogtreecommitdiff
path: root/groot-bootstrap
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-11-16 00:26:30 +0800
committerdoufenghu <[email protected]>2024-11-16 00:26:30 +0800
commitbc8fe110e1037e75012c4fb655fff888d4356bf4 (patch)
treea4352ac83b8a0d95f1a045d90997fc0a6c06cc05 /groot-bootstrap
parent7d6c1eb13837931b4b526f05adb550a58fec1aea (diff)
[Improve][core] Preprocessing 和 postprocessing 标识已过期,后续任务将被移除。修复了 Split side output 下游节点存在其他边无法正确构建拓扑的问题。
Diffstat (limited to 'groot-bootstrap')
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java2
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java143
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java16
3 files changed, 94 insertions, 67 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java
index dcb3e56..88505ee 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java
@@ -4,8 +4,10 @@ public enum JobStage {
SOURCE("source"),
FILTER("filter"),
SPLIT("split"),
+ @Deprecated
PREPROCESSING("preprocessing"),
PROCESSING("processing"),
+ @Deprecated
POSTPROCESSING("postprocessing"),
SINK("sink");
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
index b7f8b97..2cdfbb3 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
@@ -25,6 +25,9 @@ import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkNotNull;
+
@Slf4j
public class JobExecution {
@@ -34,7 +37,6 @@ public class JobExecution {
private final Executor<DataStream<Event>> processorExecutor;
private final List<JobTopologyNode> jobTopologyNodes;
private final List<URL> jarPaths;
- private final Map<String,String> nodeNameWithSplitTags = new HashMap<>();
public JobExecution(Config jobConfig, GrootStreamConfig grootStreamConfig) {
try {
@@ -147,7 +149,6 @@ public class JobExecution {
Map<String, Object> processingPipelines = Maps.newHashMap();
Map<String, Object> postprocessingPipelines = Maps.newHashMap();
-
if (config.hasPath(Constants.SOURCES)) {
sources = config.getConfig(Constants.SOURCES).root().unwrapped();
}
@@ -180,20 +181,23 @@ public class JobExecution {
});
for (JobTopologyNode jobTopologyNode : jobTopologyNodes) {
+ checkNotNull(jobTopologyNode.getName(), "Node name is null");
if (sources.containsKey(jobTopologyNode.getName())) {
- jobTopologyNode.setType(JobStage.SOURCE);
+ jobTopologyNode.setStage(JobStage.SOURCE);
} else if (sinks.containsKey(jobTopologyNode.getName())) {
- jobTopologyNode.setType(JobStage.SINK);
+ jobTopologyNode.setStage(JobStage.SINK);
} else if (filters.containsKey(jobTopologyNode.getName())) {
- jobTopologyNode.setType(JobStage.FILTER);
+ jobTopologyNode.setStage(JobStage.FILTER);
} else if (splits.containsKey(jobTopologyNode.getName())) {
- jobTopologyNode.setType(JobStage.SPLIT);
+ checkArgument( jobTopologyNode.getTags().size() == jobTopologyNode.getDownstream().size(),
+ "split node downstream size not equal tags size");
+ jobTopologyNode.setStage(JobStage.SPLIT);
} else if (preprocessingPipelines.containsKey(jobTopologyNode.getName())) {
- jobTopologyNode.setType(JobStage.PREPROCESSING);
+ jobTopologyNode.setStage(JobStage.PREPROCESSING);
} else if (processingPipelines.containsKey(jobTopologyNode.getName())) {
- jobTopologyNode.setType(JobStage.PROCESSING);
+ jobTopologyNode.setStage(JobStage.PROCESSING);
} else if (postprocessingPipelines.containsKey(jobTopologyNode.getName())) {
- jobTopologyNode.setType(JobStage.POSTPROCESSING);
+ jobTopologyNode.setStage(JobStage.POSTPROCESSING);
} else {
throw new JobExecuteException("Unsupported operator type " + jobTopologyNode.getName());
}
@@ -205,17 +209,18 @@ public class JobExecution {
public void execute() throws JobExecuteException {
+
if (!jobRuntimeEnvironment.isLocalMode() && !jobRuntimeEnvironment.isTestMode()) {
jobRuntimeEnvironment.registerPlugin(jarPaths);
}
- List<JobTopologyNode> sourceJobTopologyNodes = jobTopologyNodes
- .stream().filter(v -> v.getType().name().equals(JobStage.SOURCE.name())).collect(Collectors.toList());
+ List<JobTopologyNode> sourceNodes = jobTopologyNodes
+ .stream().filter(v -> v.getStage().name().equals(JobStage.SOURCE.name())).collect(Collectors.toList());
DataStream<Event> dataStream = null;
- for (JobTopologyNode sourceJobTopologyNode : sourceJobTopologyNodes) {
- dataStream = sourceExecutor.execute(dataStream, sourceJobTopologyNode);
- for (String nodeName : sourceJobTopologyNode.getDownstream()) {
+ for (JobTopologyNode sourceNode : sourceNodes) {
+ dataStream = sourceExecutor.execute(dataStream, sourceNode);
+ for (String nodeName : sourceNode.getDownstream()) {
buildJobGraph(dataStream, nodeName);
}
}
@@ -235,67 +240,73 @@ public class JobExecution {
}
- private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) {
- JobTopologyNode jobTopologyNode = getNode(downstreamNodeName).orElseGet(() -> {
- throw new JobExecuteException("Can't find downstream node " + downstreamNodeName);
- });
- if (jobTopologyNode.getType().name().equals(JobStage.FILTER.name())) {
- if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) {
- dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream)
- .getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) {}), jobTopologyNode);
- } else {
- dataStream = processorExecutor.execute(dataStream, jobTopologyNode);
- }
- } else if (jobTopologyNode.getType().name().equals(JobStage.SPLIT.name())) {
- if (jobTopologyNode.getTags().size() == jobTopologyNode.getDownstream().size()) {
- for (int i = 0; i < jobTopologyNode.getDownstream().size(); i++) {
- nodeNameWithSplitTags.put(jobTopologyNode.getDownstream().get(i), jobTopologyNode.getTags().get(i));
- }
- }
- else {
- throw new JobExecuteException("split node downstream size not equal tags size");
- }
- dataStream = processorExecutor.execute(dataStream, jobTopologyNode);
- } else if (jobTopologyNode.getType().name().equals(JobStage.PREPROCESSING.name())) {
- if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) {
- dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())){
- }), jobTopologyNode);
- } else {
- dataStream = processorExecutor.execute(dataStream, jobTopologyNode);
- }
- } else if (jobTopologyNode.getType().name().equals(JobStage.PROCESSING.name())) {
- if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) {
- dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) {
- }), jobTopologyNode);
- } else {
- dataStream = processorExecutor.execute(dataStream, jobTopologyNode);
- }
- } else if (jobTopologyNode.getType().name().equals(JobStage.POSTPROCESSING.name())) {
- if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) {
- dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) {
- }), jobTopologyNode);
- } else {
- dataStream = processorExecutor.execute(dataStream, jobTopologyNode);
- }
- } else if (jobTopologyNode.getType().name().equals(JobStage.SINK.name())) {
- if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) {
- dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) {
- }), jobTopologyNode);
- } else {
- dataStream = sinkExecutor.execute(dataStream, jobTopologyNode);
- }
+ private void buildJobGraph(DataStream<Event> dataStream, String nodeName) {
+ JobTopologyNode currentNode = getNode(nodeName).orElseThrow(() ->
+ new JobExecuteException("Node not found: " + nodeName));
+ switch (currentNode.getStage()) {
+ case SPLIT:
+ case FILTER:
+ case PREPROCESSING:
+ case PROCESSING:
+ case POSTPROCESSING:
+ dataStream = executeProcessorNode(dataStream, currentNode);
+ break;
+ case SINK:
+ dataStream = executeSinkNode(dataStream, currentNode);
+ break;
+ default:
+ throw new JobExecuteException("Unsupported Job stage: " + currentNode.getStage());
+ }
+ // Recursively build job graph for downstream nodes
+ for (String downstreamNodeName : currentNode.getDownstream()) {
+ buildJobGraph(dataStream, downstreamNodeName);
+ }
+
+ }
+
+ private DataStream<Event> executeProcessorNode(DataStream<Event> dataStream, JobTopologyNode currentNode) {
+
+ JobTopologyNode inputNode = getInputNode(dataStream);
+
+ if (isSplitAndHasTag(inputNode, currentNode)) {
+ return executeWithSideOutput(dataStream, inputNode, currentNode, processorExecutor);
} else {
- throw new JobExecuteException("unsupported process type " + jobTopologyNode.getType().name());
+ return processorExecutor.execute(dataStream, currentNode);
}
+ }
+ private DataStream<Event> executeSinkNode(DataStream<Event> dataStream, JobTopologyNode currentNode) {
+ JobTopologyNode inputNode = getInputNode(dataStream);
- for (String nodeName : jobTopologyNode.getDownstream()) {
- buildJobGraph(dataStream, nodeName);
+ if (isSplitAndHasTag(inputNode, currentNode)) {
+ return executeWithSideOutput(dataStream, inputNode, currentNode, sinkExecutor);
+ } else {
+ return sinkExecutor.execute(dataStream, currentNode);
}
+ }
+ // Helper method to get the input node based on the current data stream
+ private JobTopologyNode getInputNode(DataStream<Event> dataStream) {
+ String inputName = dataStream.getTransformation().getName();
+ return getNode(inputName).orElseThrow(() -> new JobExecuteException("Node not found: " + inputName));
+ }
+
+ // Helper method to check if input node is SPLIT and has a tag for downstream
+ private boolean isSplitAndHasTag(JobTopologyNode inputNode, JobTopologyNode currentNode) {
+ return inputNode.getStage().equals(JobStage.SPLIT)
+ && inputNode.getTagForDownstream(currentNode.getName()) != null;
+ }
+ // Helper method to execute with side output
+ private DataStream<Event> executeWithSideOutput(DataStream<Event> dataStream, JobTopologyNode inputNode,
+ JobTopologyNode currentNode, Executor<DataStream<Event>> executor) {
+ OutputTag<Event> outputTag = new OutputTag<Event>(inputNode.getTagForDownstream(currentNode.getName())) {};
+ SingleOutputStreamOperator<Event> singleOutputStream = (SingleOutputStreamOperator<Event>) dataStream;
+ DataStream<Event> sideOutput = singleOutputStream.getSideOutput(outputTag);
+ return executor.execute(sideOutput, currentNode);
}
+ // Helper method to get the node based on the name
private Optional<JobTopologyNode> getNode(String name) {
return jobTopologyNodes.stream().filter(v -> v.getName().equals(name)).findFirst();
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java
index ab2aec3..27881a2 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java
@@ -1,6 +1,7 @@
package com.geedgenetworks.bootstrap.execution;
import com.geedgenetworks.bootstrap.enums.JobStage;
+import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -8,7 +9,9 @@ import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* Represents an operator node in the execution graph.
@@ -19,9 +22,20 @@ import java.util.List;
@EqualsAndHashCode
public class JobTopologyNode implements Serializable {
private String name;
- private JobStage type;
+ private JobStage stage;
private int parallelism;
private List<String> downstream = Collections.emptyList();
private List<String> tags = Collections.emptyList();
+ public String getTagForDownstream(String downstreamNode) {
+ int index = downstream.indexOf(downstreamNode);
+ if (index < 0) {
+ return null;
+ }
+ return tags.get(index);
+ }
+
+
+
+
}