From 52f43de2ef7614782570c8b5052cbc4fd99406d9 Mon Sep 17 00:00:00 2001 From: doufenghu Date: Thu, 14 Nov 2024 09:57:14 +0800 Subject: [Improve][bootstrap] OperatorType has been renamed to JobStage, defined as a job stream processing stage. Each stage can be configured with multiple types of operators. --- .../geedgenetworks/bootstrap/enums/JobStage.java | 32 +++++++++++++++++++++ .../bootstrap/enums/OperatorType.java | 32 --------------------- .../bootstrap/execution/JobExecution.java | 33 +++++++++++----------- .../bootstrap/execution/JobTopologyNode.java | 4 +-- .../bootstrap/execution/SinkExecutor.java | 4 +-- .../bootstrap/execution/SourceExecutor.java | 4 +-- 6 files changed, 55 insertions(+), 54 deletions(-) create mode 100644 groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java delete mode 100644 groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/OperatorType.java (limited to 'groot-bootstrap') diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java new file mode 100644 index 0000000..dcb3e56 --- /dev/null +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java @@ -0,0 +1,32 @@ +package com.geedgenetworks.bootstrap.enums; + +public enum JobStage { + SOURCE("source"), + FILTER("filter"), + SPLIT("split"), + PREPROCESSING("preprocessing"), + PROCESSING("processing"), + POSTPROCESSING("postprocessing"), + SINK("sink"); + + private final String type; + public String getType() { + return type; + } + JobStage(String type) {this.type = type;} + + public static JobStage fromType(String type) { + for (JobStage stage : values()) { + if (stage.type.equalsIgnoreCase(type)) { + return stage; + } + } + throw new IllegalArgumentException("Unknown type: " + type); + } + + @Override + public String toString() { + return type; + } + +} diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/OperatorType.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/OperatorType.java deleted file mode 100644 index a32c844..0000000 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/OperatorType.java +++ /dev/null @@ -1,32 +0,0 @@ -package com.geedgenetworks.bootstrap.enums; - -public enum OperatorType { - SOURCE("source"), - FILTER("filter"), - SPLIT("split"), - PREPROCESSING("preprocessing"), - PROCESSING("processing"), - POSTPROCESSING("postprocessing"), - SINK("sink"); - - private final String type; - public String getType() { - return type; - } - OperatorType(String type) {this.type = type;} - - public static OperatorType fromType(String type) { - for (OperatorType stage : values()) { - if (stage.type.equalsIgnoreCase(type)) { - return stage; - } - } - throw new IllegalArgumentException("Unknown type: " + type); - } - - @Override - public String toString() { - return type; - } - -} diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java index 3c55944..b7f8b97 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java @@ -1,7 +1,7 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson2.JSONObject; -import com.geedgenetworks.bootstrap.enums.OperatorType; +import com.geedgenetworks.bootstrap.enums.JobStage; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.main.GrootStreamRunner; import com.geedgenetworks.common.config.Constants; @@ -147,6 +147,7 @@ public class JobExecution { Map processingPipelines = Maps.newHashMap(); Map postprocessingPipelines = Maps.newHashMap(); + if (config.hasPath(Constants.SOURCES)) { sources = config.getConfig(Constants.SOURCES).root().unwrapped(); } @@ -180,21 +181,21 @@ public class JobExecution { for (JobTopologyNode jobTopologyNode : jobTopologyNodes) { if (sources.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(OperatorType.SOURCE); + jobTopologyNode.setType(JobStage.SOURCE); } else if (sinks.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(OperatorType.SINK); + jobTopologyNode.setType(JobStage.SINK); } else if (filters.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(OperatorType.FILTER); + jobTopologyNode.setType(JobStage.FILTER); } else if (splits.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(OperatorType.SPLIT); + jobTopologyNode.setType(JobStage.SPLIT); } else if (preprocessingPipelines.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(OperatorType.PREPROCESSING); + jobTopologyNode.setType(JobStage.PREPROCESSING); } else if (processingPipelines.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(OperatorType.PROCESSING); + jobTopologyNode.setType(JobStage.PROCESSING); } else if (postprocessingPipelines.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(OperatorType.POSTPROCESSING); + jobTopologyNode.setType(JobStage.POSTPROCESSING); } else { - throw new JobExecuteException("unsupported process type " + jobTopologyNode.getName()); + throw new JobExecuteException("Unsupported operator type " + jobTopologyNode.getName()); } } @@ -208,7 +209,7 @@ public class JobExecution { jobRuntimeEnvironment.registerPlugin(jarPaths); } List sourceJobTopologyNodes = jobTopologyNodes - .stream().filter(v -> v.getType().name().equals(OperatorType.SOURCE.name())).collect(Collectors.toList()); + .stream().filter(v -> v.getType().name().equals(JobStage.SOURCE.name())).collect(Collectors.toList()); DataStream dataStream = null; @@ -238,14 +239,14 @@ public class JobExecution { JobTopologyNode jobTopologyNode = getNode(downstreamNodeName).orElseGet(() -> { throw new JobExecuteException("Can't find downstream node " + downstreamNodeName); }); - if (jobTopologyNode.getType().name().equals(OperatorType.FILTER.name())) { + if (jobTopologyNode.getType().name().equals(JobStage.FILTER.name())) { if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) { dataStream = processorExecutor.execute(((SingleOutputStreamOperator) dataStream) .getSideOutput(new OutputTag(nodeNameWithSplitTags.get(jobTopologyNode.getName())) {}), jobTopologyNode); } else { dataStream = processorExecutor.execute(dataStream, jobTopologyNode); } - } else if (jobTopologyNode.getType().name().equals(OperatorType.SPLIT.name())) { + } else if (jobTopologyNode.getType().name().equals(JobStage.SPLIT.name())) { if (jobTopologyNode.getTags().size() == jobTopologyNode.getDownstream().size()) { for (int i = 0; i < jobTopologyNode.getDownstream().size(); i++) { nodeNameWithSplitTags.put(jobTopologyNode.getDownstream().get(i), jobTopologyNode.getTags().get(i)); @@ -255,28 +256,28 @@ public class JobExecution { throw new JobExecuteException("split node downstream size not equal tags size"); } dataStream = processorExecutor.execute(dataStream, jobTopologyNode); - } else if (jobTopologyNode.getType().name().equals(OperatorType.PREPROCESSING.name())) { + } else if (jobTopologyNode.getType().name().equals(JobStage.PREPROCESSING.name())) { if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) { dataStream = processorExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(nodeNameWithSplitTags.get(jobTopologyNode.getName())){ }), jobTopologyNode); } else { dataStream = processorExecutor.execute(dataStream, jobTopologyNode); } - } else if (jobTopologyNode.getType().name().equals(OperatorType.PROCESSING.name())) { + } else if (jobTopologyNode.getType().name().equals(JobStage.PROCESSING.name())) { if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) { dataStream = processorExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(nodeNameWithSplitTags.get(jobTopologyNode.getName())) { }), jobTopologyNode); } else { dataStream = processorExecutor.execute(dataStream, jobTopologyNode); } - } else if (jobTopologyNode.getType().name().equals(OperatorType.POSTPROCESSING.name())) { + } else if (jobTopologyNode.getType().name().equals(JobStage.POSTPROCESSING.name())) { if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) { dataStream = processorExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(nodeNameWithSplitTags.get(jobTopologyNode.getName())) { }), jobTopologyNode); } else { dataStream = processorExecutor.execute(dataStream, jobTopologyNode); } - } else if (jobTopologyNode.getType().name().equals(OperatorType.SINK.name())) { + } else if (jobTopologyNode.getType().name().equals(JobStage.SINK.name())) { if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) { dataStream = sinkExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(nodeNameWithSplitTags.get(jobTopologyNode.getName())) { }), jobTopologyNode); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java index dcc15e9..ab2aec3 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java @@ -1,6 +1,6 @@ package com.geedgenetworks.bootstrap.execution; -import com.geedgenetworks.bootstrap.enums.OperatorType; +import com.geedgenetworks.bootstrap.enums.JobStage; import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; @@ -19,7 +19,7 @@ import java.util.List; @EqualsAndHashCode public class JobTopologyNode implements Serializable { private String name; - private OperatorType type; + private JobStage type; private int parallelism; private List downstream = Collections.emptyList(); private List tags = Collections.emptyList(); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java index b40a88c..d402280 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java @@ -1,7 +1,7 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson.JSONObject; -import com.geedgenetworks.bootstrap.enums.OperatorType; +import com.geedgenetworks.bootstrap.enums.JobStage; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.utils.SchemaConfigParse; import com.geedgenetworks.common.config.Constants; @@ -31,7 +31,7 @@ import java.util.Map; */ @Slf4j public class SinkExecutor extends AbstractExecutor { - private static final String PROCESSOR_TYPE = OperatorType.SINK.getType(); + private static final String PROCESSOR_TYPE = JobStage.SINK.getType(); private Map operators; public SinkExecutor(JobRuntimeEnvironment environment, Config jobConfig) { super(environment, jobConfig); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java index 57438e9..fcb94e3 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java @@ -1,7 +1,7 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson2.JSONObject; -import com.geedgenetworks.bootstrap.enums.OperatorType; +import com.geedgenetworks.bootstrap.enums.JobStage; import com.geedgenetworks.bootstrap.exception.ConfigCheckException; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.utils.SchemaConfigParse; @@ -36,7 +36,7 @@ import java.util.Map; */ @Slf4j public class SourceExecutor extends AbstractExecutor { - private static final String PROCESSOR_TYPE = OperatorType.SOURCE.getType(); + private static final String PROCESSOR_TYPE = JobStage.SOURCE.getType(); private Map operators; public SourceExecutor(JobRuntimeEnvironment environment, Config jobConfig) { super(environment, jobConfig); -- cgit v1.2.3