diff options
| author | doufenghu <[email protected]> | 2024-11-14 09:57:14 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-11-14 09:57:14 +0800 |
| commit | 52f43de2ef7614782570c8b5052cbc4fd99406d9 (patch) | |
| tree | 344b6fd8ff6392364fec3d82b6417f852db0cb52 /groot-bootstrap | |
| parent | df18fbe845df119e884e2e8f281bbf019d96c7e7 (diff) | |
[Improve][bootstrap] OperatorType has been renamed to JobStage, defined as a job stream processing stage. Each stage can be configured with multiple types of operators.
Diffstat (limited to 'groot-bootstrap')
| -rw-r--r-- | groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java (renamed from groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/OperatorType.java) | 8 | ||||
| -rw-r--r-- | groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java | 33 | ||||
| -rw-r--r-- | groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java | 4 | ||||
| -rw-r--r-- | groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java | 4 | ||||
| -rw-r--r-- | groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java | 4 |
5 files changed, 27 insertions, 26 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/OperatorType.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java index a32c844..dcb3e56 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/OperatorType.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java @@ -1,6 +1,6 @@ package com.geedgenetworks.bootstrap.enums; -public enum OperatorType { +public enum JobStage { SOURCE("source"), FILTER("filter"), SPLIT("split"), @@ -13,10 +13,10 @@ public enum OperatorType { public String getType() { return type; } - OperatorType(String type) {this.type = type;} + JobStage(String type) {this.type = type;} - public static OperatorType fromType(String type) { - for (OperatorType stage : values()) { + public static JobStage fromType(String type) { + for (JobStage stage : values()) { if (stage.type.equalsIgnoreCase(type)) { return stage; } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java index 3c55944..b7f8b97 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java @@ -1,7 +1,7 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson2.JSONObject; -import com.geedgenetworks.bootstrap.enums.OperatorType; +import com.geedgenetworks.bootstrap.enums.JobStage; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.main.GrootStreamRunner; import com.geedgenetworks.common.config.Constants; @@ -147,6 +147,7 @@ public class JobExecution { Map<String, Object> processingPipelines = Maps.newHashMap(); Map<String, Object> postprocessingPipelines = Maps.newHashMap(); + if (config.hasPath(Constants.SOURCES)) { sources = config.getConfig(Constants.SOURCES).root().unwrapped(); } @@ -180,21 +181,21 @@ public class JobExecution { for (JobTopologyNode jobTopologyNode : jobTopologyNodes) { if (sources.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(OperatorType.SOURCE); + jobTopologyNode.setType(JobStage.SOURCE); } else if (sinks.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(OperatorType.SINK); + jobTopologyNode.setType(JobStage.SINK); } else if (filters.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(OperatorType.FILTER); + jobTopologyNode.setType(JobStage.FILTER); } else if (splits.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(OperatorType.SPLIT); + jobTopologyNode.setType(JobStage.SPLIT); } else if (preprocessingPipelines.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(OperatorType.PREPROCESSING); + jobTopologyNode.setType(JobStage.PREPROCESSING); } else if (processingPipelines.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(OperatorType.PROCESSING); + jobTopologyNode.setType(JobStage.PROCESSING); } else if (postprocessingPipelines.containsKey(jobTopologyNode.getName())) { - jobTopologyNode.setType(OperatorType.POSTPROCESSING); + jobTopologyNode.setType(JobStage.POSTPROCESSING); } else { - throw new JobExecuteException("unsupported process type " + jobTopologyNode.getName()); + throw new JobExecuteException("Unsupported operator type " + jobTopologyNode.getName()); } } @@ -208,7 +209,7 @@ public class JobExecution { jobRuntimeEnvironment.registerPlugin(jarPaths); } List<JobTopologyNode> sourceJobTopologyNodes = jobTopologyNodes - .stream().filter(v -> v.getType().name().equals(OperatorType.SOURCE.name())).collect(Collectors.toList()); + .stream().filter(v -> v.getType().name().equals(JobStage.SOURCE.name())).collect(Collectors.toList()); DataStream<Event> dataStream = null; @@ -238,14 +239,14 @@ public class JobExecution { JobTopologyNode jobTopologyNode = getNode(downstreamNodeName).orElseGet(() -> { throw new JobExecuteException("Can't find downstream node " + downstreamNodeName); }); - if (jobTopologyNode.getType().name().equals(OperatorType.FILTER.name())) { + if (jobTopologyNode.getType().name().equals(JobStage.FILTER.name())) { if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) { dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream) .getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) {}), jobTopologyNode); } else { dataStream = processorExecutor.execute(dataStream, jobTopologyNode); } - } else if (jobTopologyNode.getType().name().equals(OperatorType.SPLIT.name())) { + } else if (jobTopologyNode.getType().name().equals(JobStage.SPLIT.name())) { if (jobTopologyNode.getTags().size() == jobTopologyNode.getDownstream().size()) { for (int i = 0; i < jobTopologyNode.getDownstream().size(); i++) { nodeNameWithSplitTags.put(jobTopologyNode.getDownstream().get(i), jobTopologyNode.getTags().get(i)); @@ -255,28 +256,28 @@ public class JobExecution { throw new JobExecuteException("split node downstream size not equal tags size"); } dataStream = processorExecutor.execute(dataStream, jobTopologyNode); - } else if (jobTopologyNode.getType().name().equals(OperatorType.PREPROCESSING.name())) { + } else if (jobTopologyNode.getType().name().equals(JobStage.PREPROCESSING.name())) { if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) { dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())){ }), jobTopologyNode); } else { dataStream = processorExecutor.execute(dataStream, jobTopologyNode); } - } else if (jobTopologyNode.getType().name().equals(OperatorType.PROCESSING.name())) { + } else if (jobTopologyNode.getType().name().equals(JobStage.PROCESSING.name())) { if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) { dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) { }), jobTopologyNode); } else { dataStream = processorExecutor.execute(dataStream, jobTopologyNode); } - } else if (jobTopologyNode.getType().name().equals(OperatorType.POSTPROCESSING.name())) { + } else if (jobTopologyNode.getType().name().equals(JobStage.POSTPROCESSING.name())) { if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) { dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) { }), jobTopologyNode); } else { dataStream = processorExecutor.execute(dataStream, jobTopologyNode); } - } else if (jobTopologyNode.getType().name().equals(OperatorType.SINK.name())) { + } else if (jobTopologyNode.getType().name().equals(JobStage.SINK.name())) { if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) { dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) { }), jobTopologyNode); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java index dcc15e9..ab2aec3 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java @@ -1,6 +1,6 @@ package com.geedgenetworks.bootstrap.execution; -import com.geedgenetworks.bootstrap.enums.OperatorType; +import com.geedgenetworks.bootstrap.enums.JobStage; import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; @@ -19,7 +19,7 @@ import java.util.List; @EqualsAndHashCode public class JobTopologyNode implements Serializable { private String name; - private OperatorType type; + private JobStage type; private int parallelism; private List<String> downstream = Collections.emptyList(); private List<String> tags = Collections.emptyList(); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java index b40a88c..d402280 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java @@ -1,7 +1,7 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson.JSONObject; -import com.geedgenetworks.bootstrap.enums.OperatorType; +import com.geedgenetworks.bootstrap.enums.JobStage; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.utils.SchemaConfigParse; import com.geedgenetworks.common.config.Constants; @@ -31,7 +31,7 @@ import java.util.Map; */ @Slf4j public class SinkExecutor extends AbstractExecutor<JobRuntimeEnvironment, Config> { - private static final String PROCESSOR_TYPE = OperatorType.SINK.getType(); + private static final String PROCESSOR_TYPE = JobStage.SINK.getType(); private Map<String, SinkConfig> operators; public SinkExecutor(JobRuntimeEnvironment environment, Config jobConfig) { super(environment, jobConfig); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java index 57438e9..fcb94e3 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java @@ -1,7 +1,7 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson2.JSONObject; -import com.geedgenetworks.bootstrap.enums.OperatorType; +import com.geedgenetworks.bootstrap.enums.JobStage; import com.geedgenetworks.bootstrap.exception.ConfigCheckException; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.utils.SchemaConfigParse; @@ -36,7 +36,7 @@ import java.util.Map; */ @Slf4j public class SourceExecutor extends AbstractExecutor<JobRuntimeEnvironment, Config> { - private static final String PROCESSOR_TYPE = OperatorType.SOURCE.getType(); + private static final String PROCESSOR_TYPE = JobStage.SOURCE.getType(); private Map<String, SourceConfig> operators; public SourceExecutor(JobRuntimeEnvironment environment, Config jobConfig) { super(environment, jobConfig); |
