summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-11-14 09:57:14 +0800
committerdoufenghu <[email protected]>2024-11-14 09:57:14 +0800
commit52f43de2ef7614782570c8b5052cbc4fd99406d9 (patch)
tree344b6fd8ff6392364fec3d82b6417f852db0cb52
parentdf18fbe845df119e884e2e8f281bbf019d96c7e7 (diff)
[Improve][bootstrap] OperatorType has been renamed to JobStage, defined as a job stream processing stage. Each stage can be configured with multiple types of operators.
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java (renamed from groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/OperatorType.java)8
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java33
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java4
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java4
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java4
5 files changed, 27 insertions, 26 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/OperatorType.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java
index a32c844..dcb3e56 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/OperatorType.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/JobStage.java
@@ -1,6 +1,6 @@
package com.geedgenetworks.bootstrap.enums;
-public enum OperatorType {
+public enum JobStage {
SOURCE("source"),
FILTER("filter"),
SPLIT("split"),
@@ -13,10 +13,10 @@ public enum OperatorType {
public String getType() {
return type;
}
- OperatorType(String type) {this.type = type;}
+ JobStage(String type) {this.type = type;}
- public static OperatorType fromType(String type) {
- for (OperatorType stage : values()) {
+ public static JobStage fromType(String type) {
+ for (JobStage stage : values()) {
if (stage.type.equalsIgnoreCase(type)) {
return stage;
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
index 3c55944..b7f8b97 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson2.JSONObject;
-import com.geedgenetworks.bootstrap.enums.OperatorType;
+import com.geedgenetworks.bootstrap.enums.JobStage;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.main.GrootStreamRunner;
import com.geedgenetworks.common.config.Constants;
@@ -147,6 +147,7 @@ public class JobExecution {
Map<String, Object> processingPipelines = Maps.newHashMap();
Map<String, Object> postprocessingPipelines = Maps.newHashMap();
+
if (config.hasPath(Constants.SOURCES)) {
sources = config.getConfig(Constants.SOURCES).root().unwrapped();
}
@@ -180,21 +181,21 @@ public class JobExecution {
for (JobTopologyNode jobTopologyNode : jobTopologyNodes) {
if (sources.containsKey(jobTopologyNode.getName())) {
- jobTopologyNode.setType(OperatorType.SOURCE);
+ jobTopologyNode.setType(JobStage.SOURCE);
} else if (sinks.containsKey(jobTopologyNode.getName())) {
- jobTopologyNode.setType(OperatorType.SINK);
+ jobTopologyNode.setType(JobStage.SINK);
} else if (filters.containsKey(jobTopologyNode.getName())) {
- jobTopologyNode.setType(OperatorType.FILTER);
+ jobTopologyNode.setType(JobStage.FILTER);
} else if (splits.containsKey(jobTopologyNode.getName())) {
- jobTopologyNode.setType(OperatorType.SPLIT);
+ jobTopologyNode.setType(JobStage.SPLIT);
} else if (preprocessingPipelines.containsKey(jobTopologyNode.getName())) {
- jobTopologyNode.setType(OperatorType.PREPROCESSING);
+ jobTopologyNode.setType(JobStage.PREPROCESSING);
} else if (processingPipelines.containsKey(jobTopologyNode.getName())) {
- jobTopologyNode.setType(OperatorType.PROCESSING);
+ jobTopologyNode.setType(JobStage.PROCESSING);
} else if (postprocessingPipelines.containsKey(jobTopologyNode.getName())) {
- jobTopologyNode.setType(OperatorType.POSTPROCESSING);
+ jobTopologyNode.setType(JobStage.POSTPROCESSING);
} else {
- throw new JobExecuteException("unsupported process type " + jobTopologyNode.getName());
+ throw new JobExecuteException("Unsupported operator type " + jobTopologyNode.getName());
}
}
@@ -208,7 +209,7 @@ public class JobExecution {
jobRuntimeEnvironment.registerPlugin(jarPaths);
}
List<JobTopologyNode> sourceJobTopologyNodes = jobTopologyNodes
- .stream().filter(v -> v.getType().name().equals(OperatorType.SOURCE.name())).collect(Collectors.toList());
+ .stream().filter(v -> v.getType().name().equals(JobStage.SOURCE.name())).collect(Collectors.toList());
DataStream<Event> dataStream = null;
@@ -238,14 +239,14 @@ public class JobExecution {
JobTopologyNode jobTopologyNode = getNode(downstreamNodeName).orElseGet(() -> {
throw new JobExecuteException("Can't find downstream node " + downstreamNodeName);
});
- if (jobTopologyNode.getType().name().equals(OperatorType.FILTER.name())) {
+ if (jobTopologyNode.getType().name().equals(JobStage.FILTER.name())) {
if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) {
dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream)
.getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) {}), jobTopologyNode);
} else {
dataStream = processorExecutor.execute(dataStream, jobTopologyNode);
}
- } else if (jobTopologyNode.getType().name().equals(OperatorType.SPLIT.name())) {
+ } else if (jobTopologyNode.getType().name().equals(JobStage.SPLIT.name())) {
if (jobTopologyNode.getTags().size() == jobTopologyNode.getDownstream().size()) {
for (int i = 0; i < jobTopologyNode.getDownstream().size(); i++) {
nodeNameWithSplitTags.put(jobTopologyNode.getDownstream().get(i), jobTopologyNode.getTags().get(i));
@@ -255,28 +256,28 @@ public class JobExecution {
throw new JobExecuteException("split node downstream size not equal tags size");
}
dataStream = processorExecutor.execute(dataStream, jobTopologyNode);
- } else if (jobTopologyNode.getType().name().equals(OperatorType.PREPROCESSING.name())) {
+ } else if (jobTopologyNode.getType().name().equals(JobStage.PREPROCESSING.name())) {
if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) {
dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())){
}), jobTopologyNode);
} else {
dataStream = processorExecutor.execute(dataStream, jobTopologyNode);
}
- } else if (jobTopologyNode.getType().name().equals(OperatorType.PROCESSING.name())) {
+ } else if (jobTopologyNode.getType().name().equals(JobStage.PROCESSING.name())) {
if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) {
dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) {
}), jobTopologyNode);
} else {
dataStream = processorExecutor.execute(dataStream, jobTopologyNode);
}
- } else if (jobTopologyNode.getType().name().equals(OperatorType.POSTPROCESSING.name())) {
+ } else if (jobTopologyNode.getType().name().equals(JobStage.POSTPROCESSING.name())) {
if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) {
dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) {
}), jobTopologyNode);
} else {
dataStream = processorExecutor.execute(dataStream, jobTopologyNode);
}
- } else if (jobTopologyNode.getType().name().equals(OperatorType.SINK.name())) {
+ } else if (jobTopologyNode.getType().name().equals(JobStage.SINK.name())) {
if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) {
dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) {
}), jobTopologyNode);
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java
index dcc15e9..ab2aec3 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java
@@ -1,6 +1,6 @@
package com.geedgenetworks.bootstrap.execution;
-import com.geedgenetworks.bootstrap.enums.OperatorType;
+import com.geedgenetworks.bootstrap.enums.JobStage;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -19,7 +19,7 @@ import java.util.List;
@EqualsAndHashCode
public class JobTopologyNode implements Serializable {
private String name;
- private OperatorType type;
+ private JobStage type;
private int parallelism;
private List<String> downstream = Collections.emptyList();
private List<String> tags = Collections.emptyList();
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
index b40a88c..d402280 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson.JSONObject;
-import com.geedgenetworks.bootstrap.enums.OperatorType;
+import com.geedgenetworks.bootstrap.enums.JobStage;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.utils.SchemaConfigParse;
import com.geedgenetworks.common.config.Constants;
@@ -31,7 +31,7 @@ import java.util.Map;
*/
@Slf4j
public class SinkExecutor extends AbstractExecutor<JobRuntimeEnvironment, Config> {
- private static final String PROCESSOR_TYPE = OperatorType.SINK.getType();
+ private static final String PROCESSOR_TYPE = JobStage.SINK.getType();
private Map<String, SinkConfig> operators;
public SinkExecutor(JobRuntimeEnvironment environment, Config jobConfig) {
super(environment, jobConfig);
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
index 57438e9..fcb94e3 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson2.JSONObject;
-import com.geedgenetworks.bootstrap.enums.OperatorType;
+import com.geedgenetworks.bootstrap.enums.JobStage;
import com.geedgenetworks.bootstrap.exception.ConfigCheckException;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.utils.SchemaConfigParse;
@@ -36,7 +36,7 @@ import java.util.Map;
*/
@Slf4j
public class SourceExecutor extends AbstractExecutor<JobRuntimeEnvironment, Config> {
- private static final String PROCESSOR_TYPE = OperatorType.SOURCE.getType();
+ private static final String PROCESSOR_TYPE = JobStage.SOURCE.getType();
private Map<String, SourceConfig> operators;
public SourceExecutor(JobRuntimeEnvironment environment, Config jobConfig) {
super(environment, jobConfig);