diff options
Diffstat (limited to 'groot-bootstrap')
3 files changed, 94 insertions, 67 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java index dcb3e56..88505ee 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java @@ -4,8 +4,10 @@ public enum JobStage { SOURCE("source"), FILTER("filter"), SPLIT("split"), + @Deprecated PREPROCESSING("preprocessing"), PROCESSING("processing"), + @Deprecated POSTPROCESSING("postprocessing"), SINK("sink"); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java index b7f8b97..2cdfbb3 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java @@ -25,6 +25,9 @@ import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkArgument; +import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkNotNull; + @Slf4j public class JobExecution { @@ -34,7 +37,6 @@ public class JobExecution { private final Executor<DataStream<Event>> processorExecutor; private final List<JobTopologyNode> jobTopologyNodes; private final List<URL> jarPaths; - private final Map<String,String> nodeNameWithSplitTags = new HashMap<>(); public JobExecution(Config jobConfig, GrootStreamConfig grootStreamConfig) { try { @@ -147,7 +149,6 @@ public class JobExecution { Map<String, Object> processingPipelines = Maps.newHashMap(); Map<String, Object> postprocessingPipelines = Maps.newHashMap(); - if (config.hasPath(Constants.SOURCES)) { sources = config.getConfig(Constants.SOURCES).root().unwrapped(); } @@ -180,20 +181,23 @@ public class JobExecution { }); for (JobTopologyNode jobTopologyNode : jobTopologyNodes) { + checkNotNull(jobTopologyNode.getName(), "Node name is null"); if (sources.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(JobStage.SOURCE); + jobTopologyNode.setStage(JobStage.SOURCE); } else if (sinks.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(JobStage.SINK); + jobTopologyNode.setStage(JobStage.SINK); } else if (filters.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(JobStage.FILTER); + jobTopologyNode.setStage(JobStage.FILTER); } else if (splits.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(JobStage.SPLIT); + checkArgument( jobTopologyNode.getTags().size() == jobTopologyNode.getDownstream().size(), + "split node downstream size not equal tags size"); + jobTopologyNode.setStage(JobStage.SPLIT); } else if (preprocessingPipelines.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(JobStage.PREPROCESSING); + jobTopologyNode.setStage(JobStage.PREPROCESSING); } else if (processingPipelines.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(JobStage.PROCESSING); + jobTopologyNode.setStage(JobStage.PROCESSING); } else if (postprocessingPipelines.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(JobStage.POSTPROCESSING); + jobTopologyNode.setStage(JobStage.POSTPROCESSING); } else { throw new JobExecuteException("Unsupported operator type " + jobTopologyNode.getName()); } @@ -205,17 +209,18 @@ public class JobExecution { public void execute() throws JobExecuteException { + if (!jobRuntimeEnvironment.isLocalMode() && !jobRuntimeEnvironment.isTestMode()) { jobRuntimeEnvironment.registerPlugin(jarPaths); } - List<JobTopologyNode> sourceJobTopologyNodes = jobTopologyNodes - .stream().filter(v -> v.getType().name().equals(JobStage.SOURCE.name())).collect(Collectors.toList()); + List<JobTopologyNode> sourceNodes = jobTopologyNodes + .stream().filter(v -> v.getStage().name().equals(JobStage.SOURCE.name())).collect(Collectors.toList()); DataStream<Event> dataStream = null; - for (JobTopologyNode sourceJobTopologyNode : sourceJobTopologyNodes) { - dataStream = sourceExecutor.execute(dataStream, sourceJobTopologyNode); - for (String nodeName : sourceJobTopologyNode.getDownstream()) { + for (JobTopologyNode sourceNode : sourceNodes) { + dataStream = sourceExecutor.execute(dataStream, sourceNode); + for (String nodeName : sourceNode.getDownstream()) { buildJobGraph(dataStream, nodeName); } } @@ -235,67 +240,73 @@ public class JobExecution { } - private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) { - JobTopologyNode jobTopologyNode = getNode(downstreamNodeName).orElseGet(() -> { - throw new JobExecuteException("Can't find downstream node " + downstreamNodeName); - }); - if (jobTopologyNode.getType().name().equals(JobStage.FILTER.name())) { - if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) { - dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream) - .getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) {}), jobTopologyNode); - } else { - dataStream = processorExecutor.execute(dataStream, jobTopologyNode); - } - } else if (jobTopologyNode.getType().name().equals(JobStage.SPLIT.name())) { - if (jobTopologyNode.getTags().size() == jobTopologyNode.getDownstream().size()) { - for (int i = 0; i < jobTopologyNode.getDownstream().size(); i++) { - nodeNameWithSplitTags.put(jobTopologyNode.getDownstream().get(i), jobTopologyNode.getTags().get(i)); - } - } - else { - throw new JobExecuteException("split node downstream size not equal tags size"); - } - dataStream = processorExecutor.execute(dataStream, jobTopologyNode); - } else if (jobTopologyNode.getType().name().equals(JobStage.PREPROCESSING.name())) { - if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) { - dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())){ - }), jobTopologyNode); - } else { - dataStream = processorExecutor.execute(dataStream, jobTopologyNode); - } - } else if (jobTopologyNode.getType().name().equals(JobStage.PROCESSING.name())) { - if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) { - dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) { - }), jobTopologyNode); - } else { - dataStream = processorExecutor.execute(dataStream, jobTopologyNode); - } - } else if (jobTopologyNode.getType().name().equals(JobStage.POSTPROCESSING.name())) { - if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) { - dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) { - }), jobTopologyNode); - } else { - dataStream = processorExecutor.execute(dataStream, jobTopologyNode); - } - } else if (jobTopologyNode.getType().name().equals(JobStage.SINK.name())) { - if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) { - dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) { - }), jobTopologyNode); - } else { - dataStream = sinkExecutor.execute(dataStream, jobTopologyNode); - } + private void buildJobGraph(DataStream<Event> dataStream, String nodeName) { + JobTopologyNode currentNode = getNode(nodeName).orElseThrow(() -> + new JobExecuteException("Node not found: " + nodeName)); + switch (currentNode.getStage()) { + case SPLIT: + case FILTER: + case PREPROCESSING: + case PROCESSING: + case POSTPROCESSING: + dataStream = executeProcessorNode(dataStream, currentNode); + break; + case SINK: + dataStream = executeSinkNode(dataStream, currentNode); + break; + default: + throw new JobExecuteException("Unsupported Job stage: " + currentNode.getStage()); + } + // Recursively build job graph for downstream nodes + for (String downstreamNodeName : currentNode.getDownstream()) { + buildJobGraph(dataStream, downstreamNodeName); + } + + } + + private DataStream<Event> executeProcessorNode(DataStream<Event> dataStream, JobTopologyNode currentNode) { + + JobTopologyNode inputNode = getInputNode(dataStream); + + if (isSplitAndHasTag(inputNode, currentNode)) { + return executeWithSideOutput(dataStream, inputNode, currentNode, processorExecutor); } else { - throw new JobExecuteException("unsupported process type " + jobTopologyNode.getType().name()); + return processorExecutor.execute(dataStream, currentNode); } + } + private DataStream<Event> executeSinkNode(DataStream<Event> dataStream, JobTopologyNode currentNode) { + JobTopologyNode inputNode = getInputNode(dataStream); - for (String nodeName : jobTopologyNode.getDownstream()) { - buildJobGraph(dataStream, nodeName); + if (isSplitAndHasTag(inputNode, currentNode)) { + return executeWithSideOutput(dataStream, inputNode, currentNode, sinkExecutor); + } else { + return sinkExecutor.execute(dataStream, currentNode); } + } + // Helper method to get the input node based on the current data stream + private JobTopologyNode getInputNode(DataStream<Event> dataStream) { + String inputName = dataStream.getTransformation().getName(); + return getNode(inputName).orElseThrow(() -> new JobExecuteException("Node not found: " + inputName)); + } + + // Helper method to check if input node is SPLIT and has a tag for downstream + private boolean isSplitAndHasTag(JobTopologyNode inputNode, JobTopologyNode currentNode) { + return inputNode.getStage().equals(JobStage.SPLIT) + && inputNode.getTagForDownstream(currentNode.getName()) != null; + } + // Helper method to execute with side output + private DataStream<Event> executeWithSideOutput(DataStream<Event> dataStream, JobTopologyNode inputNode, + JobTopologyNode currentNode, Executor<DataStream<Event>> executor) { + OutputTag<Event> outputTag = new OutputTag<Event>(inputNode.getTagForDownstream(currentNode.getName())) {}; + SingleOutputStreamOperator<Event> singleOutputStream = (SingleOutputStreamOperator<Event>) dataStream; + DataStream<Event> sideOutput = singleOutputStream.getSideOutput(outputTag); + return executor.execute(sideOutput, currentNode); } + // Helper method to get the node based on the name private Optional<JobTopologyNode> getNode(String name) { return jobTopologyNodes.stream().filter(v -> v.getName().equals(name)).findFirst(); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java index ab2aec3..27881a2 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java @@ -1,6 +1,7 @@ package com.geedgenetworks.bootstrap.execution; import com.geedgenetworks.bootstrap.enums.JobStage; +import com.geedgenetworks.bootstrap.exception.JobExecuteException; import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; @@ -8,7 +9,9 @@ import lombok.NoArgsConstructor; import java.io.Serializable; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Represents an operator node in the execution graph. @@ -19,9 +22,20 @@ import java.util.List; @EqualsAndHashCode public class JobTopologyNode implements Serializable { private String name; - private JobStage type; + private JobStage stage; private int parallelism; private List<String> downstream = Collections.emptyList(); private List<String> tags = Collections.emptyList(); + public String getTagForDownstream(String downstreamNode) { + int index = downstream.indexOf(downstreamNode); + if (index < 0) { + return null; + } + return tags.get(index); + } + + + + } |
