summaryrefslogtreecommitdiff
path: root/groot-bootstrap/src
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-11-10 22:43:16 +0800
committerdoufenghu <[email protected]>2024-11-10 22:43:16 +0800
commit73a5f46181af3c9e596e8b08dc27f63339b04c53 (patch)
tree93bb7a830deb742211ec7cb8d8416002b4a5e54e /groot-bootstrap/src
parent16769de2e5ba334a5cfaacd8a53db2989264d022 (diff)
[Feature][SPI] SPI/Common module 依赖库梳理,xxExecutor删除不必要的参数传递。
Diffstat (limited to 'groot-bootstrap/src')
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java19
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/StageType.java32
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java6
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java18
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java2
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java24
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java142
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/OperatorNode.java (renamed from groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java)8
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java14
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java14
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java28
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java26
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java32
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java20
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java4
15 files changed, 201 insertions, 188 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java
deleted file mode 100644
index 6f33cae..0000000
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package com.geedgenetworks.bootstrap.enums;
-
-public enum ProcessorType {
- SOURCE("source"),
- FILTER("filter"),
- SPLIT("split"),
- PREPROCESSING("preprocessing"),
- PROCESSING("processing"),
- POSTPROCESSING("postprocessing"),
- SINK("sink");
-
- private final String type;
-
- ProcessorType(String type) {this.type = type;}
-
- public String getType() {
- return type;
- }
-}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/StageType.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/StageType.java
new file mode 100644
index 0000000..8b4e154
--- /dev/null
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/StageType.java
@@ -0,0 +1,32 @@
+package com.geedgenetworks.bootstrap.enums;
+
+public enum StageType {
+ SOURCE("source"),
+ FILTER("filter"),
+ SPLIT("split"),
+ PREPROCESSING("preprocessing"),
+ PROCESSING("processing"),
+ POSTPROCESSING("postprocessing"),
+ SINK("sink");
+
+ private final String type;
+ public String getType() {
+ return type;
+ }
+ StageType(String type) {this.type = type;}
+
+ public static StageType fromType(String type) {
+ for (StageType stage : values()) {
+ if (stage.type.equalsIgnoreCase(type)) {
+ return stage;
+ }
+ }
+ throw new IllegalArgumentException("Unknown type: " + type);
+ }
+
+ @Override
+ public String toString() {
+ return type;
+ }
+
+}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
index e0828a0..fe440f7 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
@@ -16,9 +16,9 @@ public abstract class AbstractExecutor<K, V>
protected final Config operatorConfig;
protected final Map<K,V> operatorMap;
- protected AbstractExecutor(List<URL> jarPaths, Config operatorConfig) {
+ protected AbstractExecutor(Config operatorConfig) {
this.operatorConfig = operatorConfig;
- this.operatorMap = initialize(jarPaths, operatorConfig);
+ this.operatorMap = initialize(operatorConfig);
}
@Override
@@ -27,7 +27,7 @@ public abstract class AbstractExecutor<K, V>
}
- protected abstract Map<K, V> initialize(List<URL> jarPaths, Config operatorConfig);
+ protected abstract Map<K, V> initialize(Config operatorConfig);
protected static final BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER =
(classLoader, url) -> {
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
index ec748cc..a45380e 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
@@ -19,26 +19,24 @@ import java.util.ServiceLoader;
public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, ProcessorConfig> {
- protected AbstractProcessorExecutor(List<URL> jarPaths, Config operatorConfig) {
- super(jarPaths, operatorConfig);
+ protected AbstractProcessorExecutor(Config operatorConfig) {
+ super(operatorConfig);
}
@Override
- public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
+ public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException {
- ProcessorConfig processorConfig = operatorMap.get(node.getName());
+ ProcessorConfig processorConfig = operatorMap.get(operatorNode.getName());
boolean found = false; // 标志变量
ServiceLoader<Processor> processors = ServiceLoader.load(Processor.class);
for (Processor processor : processors) {
if(processor.type().equals(processorConfig.getType())){
found = true;
- if (node.getParallelism() > 0) {
- processorConfig.setParallelism(node.getParallelism());
+ if (operatorNode.getParallelism() > 0) {
+ processorConfig.setParallelism(operatorNode.getParallelism());
}
try {
-
- dataStream = processor.processorFunction(
- dataStream, processorConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig());
+ input = processor.process(jobRuntimeEnvironment.getStreamExecutionEnvironment(), input, processorConfig);
} catch (Exception e) {
throw new JobExecuteException("Create orderby pipeline instance failed!", e);
}
@@ -48,7 +46,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
if (!found) {
throw new JobExecuteException("No matching processor found for type: " + processorConfig.getType());
}
- return dataStream;
+ return input;
}
protected ProcessorConfig checkConfig(String key, Map<String, Object> value, Config processorsConfig) {
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java
index d57d6bf..e43c949 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java
@@ -4,7 +4,7 @@ import com.geedgenetworks.bootstrap.exception.JobExecuteException;
public interface Executor<T, ENV extends RuntimeEnvironment> {
- T execute(T dataStream, Node edge) throws JobExecuteException;
+ T execute(T dataStream, OperatorNode edge) throws JobExecuteException;
void setRuntimeEnvironment(ENV runtimeEnvironment);
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
index 1ea19f8..d70420e 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
@@ -1,6 +1,6 @@
package com.geedgenetworks.bootstrap.execution;
-import com.geedgenetworks.bootstrap.enums.ProcessorType;
+import com.geedgenetworks.bootstrap.enums.StageType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.config.Constants;
import com.geedgenetworks.common.config.CheckConfigUtil;
@@ -27,14 +27,14 @@ import java.util.ServiceLoader;
*/
@Slf4j
public class FilterExecutor extends AbstractExecutor<String, FilterConfig> {
- private static final String PROCESSOR_TYPE = ProcessorType.FILTER.getType();
+ private static final String PROCESSOR_TYPE = StageType.FILTER.getType();
- public FilterExecutor(List<URL> jarPaths, Config config) {
- super(jarPaths, config);
+ public FilterExecutor(Config config) {
+ super(config);
}
@Override
- protected Map<String, FilterConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
+ protected Map<String, FilterConfig> initialize(Config operatorConfig) {
Map<String, FilterConfig> filterConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.FILTERS)) {
Config filterConfig = operatorConfig.getConfig(Constants.FILTERS);
@@ -54,20 +54,20 @@ public class FilterExecutor extends AbstractExecutor<String, FilterConfig> {
}
@Override
- public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
- FilterConfig filterConfig = operatorMap.get(node.getName());
+ public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException {
+ FilterConfig filterConfig = operatorMap.get(operatorNode.getName());
boolean found = false; // 标志变量
ServiceLoader<Filter> filters = ServiceLoader.load(Filter.class);
for (Filter filter : filters) {
if(filter.type().equals(filterConfig.getType())){
found = true;
- if (node.getParallelism() > 0) {
- filterConfig.setParallelism(node.getParallelism());
+ if (operatorNode.getParallelism() > 0) {
+ filterConfig.setParallelism(operatorNode.getParallelism());
}
try {
- dataStream =
+ input =
filter.filterFunction(
- dataStream, filterConfig);
+ input, filterConfig);
} catch (Exception e) {
throw new JobExecuteException("Create filter instance failed!", e);
}
@@ -77,7 +77,7 @@ public class FilterExecutor extends AbstractExecutor<String, FilterConfig> {
if (!found) {
throw new JobExecuteException("No matching filter found for type: " + filterConfig.getType());
}
- return dataStream;
+ return input;
}
protected FilterConfig checkConfig(String key, Map<String, Object> value, Config config) {
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
index 325f8a4..cd70f44 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson2.JSONObject;
-import com.geedgenetworks.bootstrap.enums.ProcessorType;
+import com.geedgenetworks.bootstrap.enums.StageType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.main.GrootStreamRunner;
import com.geedgenetworks.common.config.Constants;
@@ -36,7 +36,7 @@ public class JobExecution {
private final Executor<DataStream<Event>, JobRuntimeEnvironment> preprocessingExecutor;
private final Executor<DataStream<Event>, JobRuntimeEnvironment> processingExecutor;
private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor;
- private final List<Node> nodes;
+ private final List<OperatorNode> operatorNodes;
private final List<URL> jarPaths;
private final Map<String,String> nodeNameWithSplitTags = new HashMap<>();
@@ -51,13 +51,13 @@ public class JobExecution {
registerPlugin(jobConfig.getConfig(Constants.APPLICATION));
- this.sourceExecutor = new SourceExecutor(jarPaths, jobConfig);
- this.sinkExecutor = new SinkExecutor(jarPaths, jobConfig);
- this.filterExecutor = new FilterExecutor(jarPaths, jobConfig);
- this.splitExecutor = new SplitExecutor(jarPaths, jobConfig);
- this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, jobConfig);
- this.processingExecutor = new ProcessingExecutor(jarPaths, jobConfig);
- this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, jobConfig);
+ this.sourceExecutor = new SourceExecutor(jobConfig);
+ this.sinkExecutor = new SinkExecutor(jobConfig);
+ this.filterExecutor = new FilterExecutor(jobConfig);
+ this.splitExecutor = new SplitExecutor(jobConfig);
+ this.preprocessingExecutor = new PreprocessingExecutor(jobConfig);
+ this.processingExecutor = new ProcessingExecutor(jobConfig);
+ this.postprocessingExecutor = new PostprocessingExecutor(jobConfig);
this.jobRuntimeEnvironment =
JobRuntimeEnvironment.getInstance(this.registerPlugin(jobConfig, jarPaths), grootStreamConfig);
@@ -68,7 +68,7 @@ public class JobExecution {
this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.nodes = buildJobNode(jobConfig);
+ this.operatorNodes = buildJobNode(jobConfig);
}
@@ -153,7 +153,7 @@ public class JobExecution {
return config;
}
- private List<Node> buildJobNode(Config config) {
+ private List<OperatorNode> buildJobNode(Config config) {
Map<String, Object> sources = Maps.newHashMap();
Map<String, Object> sinks = Maps.newHashMap();
@@ -187,34 +187,34 @@ public class JobExecution {
List<? extends Config> topology = config.getConfig(Constants.APPLICATION).getConfigList(Constants.APPLICATION_TOPOLOGY);
- List<Node> nodes = Lists.newArrayList();
+ List<OperatorNode> operatorNodes = Lists.newArrayList();
topology.forEach(item -> {
- Node node = JSONObject.from(item.root().unwrapped()).toJavaObject(Node.class);
- nodes.add(node);
+ OperatorNode operatorNode = JSONObject.from(item.root().unwrapped()).toJavaObject(OperatorNode.class);
+ operatorNodes.add(operatorNode);
});
- for (Node node : nodes) {
- if (sources.containsKey(node.getName())) {
- node.setType(ProcessorType.SOURCE);
- } else if (sinks.containsKey(node.getName())) {
- node.setType(ProcessorType.SINK);
- } else if (filters.containsKey(node.getName())) {
- node.setType(ProcessorType.FILTER);
- } else if (splits.containsKey(node.getName())) {
- node.setType(ProcessorType.SPLIT);
- } else if (preprocessingPipelines.containsKey(node.getName())) {
- node.setType(ProcessorType.PREPROCESSING);
- } else if (processingPipelines.containsKey(node.getName())) {
- node.setType(ProcessorType.PROCESSING);
- } else if (postprocessingPipelines.containsKey(node.getName())) {
- node.setType(ProcessorType.POSTPROCESSING);
+ for (OperatorNode operatorNode : operatorNodes) {
+ if (sources.containsKey(operatorNode.getName())) {
+ operatorNode.setType(StageType.SOURCE);
+ } else if (sinks.containsKey(operatorNode.getName())) {
+ operatorNode.setType(StageType.SINK);
+ } else if (filters.containsKey(operatorNode.getName())) {
+ operatorNode.setType(StageType.FILTER);
+ } else if (splits.containsKey(operatorNode.getName())) {
+ operatorNode.setType(StageType.SPLIT);
+ } else if (preprocessingPipelines.containsKey(operatorNode.getName())) {
+ operatorNode.setType(StageType.PREPROCESSING);
+ } else if (processingPipelines.containsKey(operatorNode.getName())) {
+ operatorNode.setType(StageType.PROCESSING);
+ } else if (postprocessingPipelines.containsKey(operatorNode.getName())) {
+ operatorNode.setType(StageType.POSTPROCESSING);
} else {
- throw new JobExecuteException("unsupported process type " + node.getName());
+ throw new JobExecuteException("unsupported process type " + operatorNode.getName());
}
}
- return nodes;
+ return operatorNodes;
}
@@ -223,14 +223,14 @@ public class JobExecution {
if (!jobRuntimeEnvironment.isLocalMode() && !jobRuntimeEnvironment.isTestMode()) {
jobRuntimeEnvironment.registerPlugin(jarPaths);
}
- List<Node> sourceNodes = nodes
- .stream().filter(v -> v.getType().name().equals(ProcessorType.SOURCE.name())).collect(Collectors.toList());
+ List<OperatorNode> sourceOperatorNodes = operatorNodes
+ .stream().filter(v -> v.getType().name().equals(StageType.SOURCE.name())).collect(Collectors.toList());
DataStream<Event> dataStream = null;
- for (Node sourceNode : sourceNodes) {
- dataStream = sourceExecutor.execute(dataStream, sourceNode);
- for (String nodeName : sourceNode.getDownstream()) {
+ for (OperatorNode sourceOperatorNode : sourceOperatorNodes) {
+ dataStream = sourceExecutor.execute(dataStream, sourceOperatorNode);
+ for (String nodeName : sourceOperatorNode.getDownstream()) {
buildJobGraph(dataStream, nodeName);
}
}
@@ -251,68 +251,68 @@ public class JobExecution {
}
private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) {
- Node node = getNode(downstreamNodeName).orElseGet(() -> {
+ OperatorNode operatorNode = getNode(downstreamNodeName).orElseGet(() -> {
throw new JobExecuteException("Can't find downstream node " + downstreamNodeName);
});
- if (node.getType().name().equals(ProcessorType.FILTER.name())) {
- if (nodeNameWithSplitTags.containsKey(node.getName())) {
- dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) {
- }), node);
+ if (operatorNode.getType().name().equals(StageType.FILTER.name())) {
+ if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) {
+ dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream)
+ .getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())) {}), operatorNode);
} else {
- dataStream = filterExecutor.execute(dataStream, node);
+ dataStream = filterExecutor.execute(dataStream, operatorNode);
}
- } else if (node.getType().name().equals(ProcessorType.SPLIT.name())) {
- if (node.getTags().size() == node.getDownstream().size()) {
- for (int i = 0; i < node.getDownstream().size();i++) {
- nodeNameWithSplitTags.put(node.getDownstream().get(i),node.getTags().get(i));
+ } else if (operatorNode.getType().name().equals(StageType.SPLIT.name())) {
+ if (operatorNode.getTags().size() == operatorNode.getDownstream().size()) {
+ for (int i = 0; i < operatorNode.getDownstream().size(); i++) {
+ nodeNameWithSplitTags.put(operatorNode.getDownstream().get(i), operatorNode.getTags().get(i));
}
}
else {
throw new JobExecuteException("split node downstream size not equal tags size");
}
- dataStream = splitExecutor.execute(dataStream, node);
- } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) {
- if (nodeNameWithSplitTags.containsKey(node.getName())) {
- dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())){
- }), node);
+ dataStream = splitExecutor.execute(dataStream, operatorNode);
+ } else if (operatorNode.getType().name().equals(StageType.PREPROCESSING.name())) {
+ if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) {
+ dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())){
+ }), operatorNode);
} else {
- dataStream = preprocessingExecutor.execute(dataStream, node);
+ dataStream = preprocessingExecutor.execute(dataStream, operatorNode);
}
- } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) {
- if (nodeNameWithSplitTags.containsKey(node.getName())) {
- dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) {
- }), node);
+ } else if (operatorNode.getType().name().equals(StageType.PROCESSING.name())) {
+ if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) {
+ dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())) {
+ }), operatorNode);
} else {
- dataStream = processingExecutor.execute(dataStream, node);
+ dataStream = processingExecutor.execute(dataStream, operatorNode);
}
- } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) {
- if (nodeNameWithSplitTags.containsKey(node.getName())) {
- dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) {
- }), node);
+ } else if (operatorNode.getType().name().equals(StageType.POSTPROCESSING.name())) {
+ if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) {
+ dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())) {
+ }), operatorNode);
} else {
- dataStream = postprocessingExecutor.execute(dataStream, node);
+ dataStream = postprocessingExecutor.execute(dataStream, operatorNode);
}
- } else if (node.getType().name().equals(ProcessorType.SINK.name())) {
- if (nodeNameWithSplitTags.containsKey(node.getName())) {
- dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) {
- }), node);
+ } else if (operatorNode.getType().name().equals(StageType.SINK.name())) {
+ if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) {
+ dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())) {
+ }), operatorNode);
} else {
- dataStream = sinkExecutor.execute(dataStream, node);
+ dataStream = sinkExecutor.execute(dataStream, operatorNode);
}
} else {
- throw new JobExecuteException("unsupported process type " + node.getType().name());
+ throw new JobExecuteException("unsupported process type " + operatorNode.getType().name());
}
- for (String nodeName : node.getDownstream()) {
+ for (String nodeName : operatorNode.getDownstream()) {
buildJobGraph(dataStream, nodeName);
}
}
- private Optional<Node> getNode(String name) {
- return nodes.stream().filter(v -> v.getName().equals(name)).findFirst();
+ private Optional<OperatorNode> getNode(String name) {
+ return operatorNodes.stream().filter(v -> v.getName().equals(name)).findFirst();
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/OperatorNode.java
index 66303c2..8c4b392 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/OperatorNode.java
@@ -1,6 +1,6 @@
package com.geedgenetworks.bootstrap.execution;
-import com.geedgenetworks.bootstrap.enums.ProcessorType;
+import com.geedgenetworks.bootstrap.enums.StageType;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -14,11 +14,11 @@ import java.util.List;
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
-public class Node implements Serializable {
+public class OperatorNode implements Serializable {
private String name;
- private ProcessorType type;
- private List<String> downstream = Collections.emptyList();
+ private StageType type;
private int parallelism;
+ private List<String> downstream = Collections.emptyList();
private List<String> tags = Collections.emptyList();
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
index e73b7dd..10d9188 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
@@ -1,6 +1,6 @@
package com.geedgenetworks.bootstrap.execution;
-import com.geedgenetworks.bootstrap.enums.ProcessorType;
+import com.geedgenetworks.bootstrap.enums.StageType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.config.Constants;
import com.geedgenetworks.spi.processor.ProcessorConfig;
@@ -17,14 +17,14 @@ import java.util.Map;
* Initialize config and execute postprocessor
*/
public class PostprocessingExecutor extends AbstractProcessorExecutor {
- private static final String PROCESSOR_TYPE = ProcessorType.POSTPROCESSING.getType();
+ private static final String PROCESSOR_TYPE = StageType.POSTPROCESSING.getType();
- public PostprocessingExecutor(List<URL> jarPaths, Config config) {
- super(jarPaths, config);
+ public PostprocessingExecutor(Config config) {
+ super(config);
}
@Override
- protected Map<String, ProcessorConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
+ protected Map<String, ProcessorConfig> initialize(Config operatorConfig) {
Map<String, ProcessorConfig> postprocessingConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.POSTPROCESSING_PIPELINES)) {
Config postprocessors = operatorConfig.getConfig(Constants.POSTPROCESSING_PIPELINES);
@@ -37,7 +37,7 @@ public class PostprocessingExecutor extends AbstractProcessorExecutor {
@Override
- public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
- return super.execute(dataStream, node);
+ public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException {
+ return super.execute(input, operatorNode);
}
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
index 6179265..9acda99 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
@@ -1,6 +1,6 @@
package com.geedgenetworks.bootstrap.execution;
-import com.geedgenetworks.bootstrap.enums.ProcessorType;
+import com.geedgenetworks.bootstrap.enums.StageType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.config.Constants;
import com.geedgenetworks.spi.processor.ProcessorConfig;
@@ -19,14 +19,14 @@ import java.util.Map;
*/
@Slf4j
public class PreprocessingExecutor extends AbstractProcessorExecutor {
- private static final String PROCESSOR_TYPE = ProcessorType.PREPROCESSING.getType();
+ private static final String PROCESSOR_TYPE = StageType.PREPROCESSING.getType();
- public PreprocessingExecutor(List<URL> jarPaths, Config config) {
- super(jarPaths, config);
+ public PreprocessingExecutor(Config config) {
+ super(config);
}
@Override
- protected Map<String, ProcessorConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
+ protected Map<String, ProcessorConfig> initialize(Config operatorConfig) {
Map<String, ProcessorConfig> preprocessingConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.PREPROCESSING_PIPELINES)) {
Config preprocessors = operatorConfig.getConfig(Constants.PREPROCESSING_PIPELINES);
@@ -38,9 +38,9 @@ public class PreprocessingExecutor extends AbstractProcessorExecutor {
}
@Override
- public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
+ public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException {
- return super.execute(dataStream, node);
+ return super.execute(input, operatorNode);
}
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
index bc6a09e..c49df88 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
@@ -1,15 +1,18 @@
package com.geedgenetworks.bootstrap.execution;
-import com.geedgenetworks.bootstrap.enums.ProcessorType;
+import com.geedgenetworks.bootstrap.enums.StageType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.config.Constants;
+import com.geedgenetworks.spi.processor.Processor;
import com.geedgenetworks.spi.processor.ProcessorConfig;
+import com.geedgenetworks.spi.processor.ProcessorProvider;
import com.geedgenetworks.spi.table.event.Event;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import org.apache.flink.streaming.api.datastream.DataStream;
import java.net.URL;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -17,27 +20,30 @@ import java.util.Map;
* Initialize config and execute processor
*/
public class ProcessingExecutor extends AbstractProcessorExecutor {
- private static final String PROCESSOR_TYPE = ProcessorType.PROCESSING.getType();
+ private static final String PROCESSOR_TYPE = StageType.PROCESSING.getType();
+ //private Map<String, Processor<?>> processors;
- public ProcessingExecutor(List<URL> jarPaths, Config config) {
- super(jarPaths, config);
+ public ProcessingExecutor(Config config) {
+ super(config);
}
@Override
- protected Map<String, ProcessorConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
+ protected Map<String, ProcessorConfig> initialize(Config operatorConfig) {
Map<String, ProcessorConfig> processingConfigMap = Maps.newHashMap();
+ //processors = new HashMap<>();
if (operatorConfig.hasPath(Constants.PROCESSING_PIPELINES)) {
- Config processors = operatorConfig.getConfig(Constants.PROCESSING_PIPELINES);
- processors.root().unwrapped().forEach((key, value) -> {
- processingConfigMap.put(key, checkConfig(key, (Map<String, Object>) value, processors));
+ Config processingConfig = operatorConfig.getConfig(Constants.PROCESSING_PIPELINES);
+ processingConfig.root().unwrapped().forEach((key, value) -> {
+ processingConfigMap.put(key, checkConfig(key, (Map<String, Object>) value, processingConfig));
+ //processors.put(key, ProcessorProvider.load(((Map<?, ?>) value).get("type").toString()));
+
});
}
return processingConfigMap;
}
@Override
- public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
-
- return super.execute(dataStream, node);
+ public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException {
+ return super.execute(input, operatorNode);
}
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
index b61b6f9..130705a 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson.JSONObject;
-import com.geedgenetworks.bootstrap.enums.ProcessorType;
+import com.geedgenetworks.bootstrap.enums.StageType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.utils.SchemaConfigParse;
import com.geedgenetworks.common.config.Constants;
@@ -11,8 +11,8 @@ import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
import com.geedgenetworks.spi.sink.SinkConfig;
import com.geedgenetworks.spi.sink.SinkConfigOptions;
-import com.geedgenetworks.spi.table.connector.SinkProvider;
-import com.geedgenetworks.spi.table.connector.SinkTableFactory;
+import com.geedgenetworks.spi.sink.SinkProvider;
+import com.geedgenetworks.spi.sink.SinkTableFactory;
import com.geedgenetworks.spi.table.event.Event;
import com.geedgenetworks.spi.table.factory.FactoryUtil;
import com.geedgenetworks.spi.table.factory.TableFactory;
@@ -24,8 +24,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import java.net.URL;
-import java.util.List;
import java.util.Map;
/**
@@ -33,13 +31,13 @@ import java.util.Map;
*/
@Slf4j
public class SinkExecutor extends AbstractExecutor<String, SinkConfig> {
- private static final String PROCESSOR_TYPE = ProcessorType.SINK.getType();
+ private static final String PROCESSOR_TYPE = StageType.SINK.getType();
- public SinkExecutor(List<URL> jarPaths, Config config) {
- super(jarPaths, config);
+ public SinkExecutor(Config config) {
+ super(config);
}
@Override
- protected Map<String, SinkConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
+ protected Map<String, SinkConfig> initialize(Config operatorConfig) {
Map<String, SinkConfig> sinkConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.SINKS)) {
@@ -64,8 +62,8 @@ public class SinkExecutor extends AbstractExecutor<String, SinkConfig> {
}
@Override
- public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
- SinkConfig sinkConfig = operatorMap.get(node.getName());
+ public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException {
+ SinkConfig sinkConfig = operatorMap.get(operatorNode.getName());
try {
SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, sinkConfig.getType());
Map<String, String> options = sinkConfig.getProperties();
@@ -84,9 +82,9 @@ public class SinkExecutor extends AbstractExecutor<String, SinkConfig> {
System.out.println(String.format("sink(%s) schema:\n%s", sinkConfig.getName(), schema.getDataType().treeString()));
}
- DataStreamSink<?> dataStreamSink = sinkProvider.consumeDataStream(dataStream);
- if (node.getParallelism() > 0) {
- dataStreamSink.setParallelism(node.getParallelism());
+ DataStreamSink<?> dataStreamSink = sinkProvider.consumeDataStream(input);
+ if (operatorNode.getParallelism() > 0) {
+ dataStreamSink.setParallelism(operatorNode.getParallelism());
}
dataStreamSink.name(sinkConfig.getName());
} catch (Exception e) {
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
index 3eeaad6..5109540 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson2.JSONObject;
-import com.geedgenetworks.bootstrap.enums.ProcessorType;
+import com.geedgenetworks.bootstrap.enums.StageType;
import com.geedgenetworks.bootstrap.exception.ConfigCheckException;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.utils.SchemaConfigParse;
@@ -12,8 +12,8 @@ import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
import com.geedgenetworks.spi.configuration.SourceConfigOptions;
import com.geedgenetworks.spi.source.SourceConfig;
-import com.geedgenetworks.spi.table.connector.SourceProvider;
-import com.geedgenetworks.spi.table.connector.SourceTableFactory;
+import com.geedgenetworks.spi.source.SourceProvider;
+import com.geedgenetworks.spi.source.SourceTableFactory;
import com.geedgenetworks.spi.table.event.Event;
import com.geedgenetworks.spi.table.factory.FactoryUtil;
import com.geedgenetworks.spi.table.factory.TableFactory;
@@ -28,9 +28,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import java.net.URL;
import java.time.Duration;
-import java.util.List;
import java.util.Map;
/**
@@ -38,13 +36,13 @@ import java.util.Map;
*/
@Slf4j
public class SourceExecutor extends AbstractExecutor<String, SourceConfig> {
- private static final String PROCESSOR_TYPE = ProcessorType.SOURCE.getType();
+ private static final String PROCESSOR_TYPE = StageType.SOURCE.getType();
- public SourceExecutor(List<URL> jarPaths, Config config) {
- super(jarPaths, config);
+ public SourceExecutor(Config config) {
+ super(config);
}
@Override
- protected Map<String, SourceConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
+ protected Map<String, SourceConfig> initialize(Config operatorConfig) {
Map<String, SourceConfig> sourceConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.SOURCES)) {
Config sources = operatorConfig.getConfig(Constants.SOURCES);
@@ -68,8 +66,8 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> {
}
@Override
- public DataStream<Event> execute(DataStream<Event> outputStreamOperator, Node node) throws JobExecuteException {
- SourceConfig sourceConfig = operatorMap.get(node.getName());
+ public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException {
+ SourceConfig sourceConfig = operatorMap.get(operatorNode.getName());
SingleOutputStreamOperator sourceSingleOutputStreamOperator;
try {
SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, sourceConfig.getType());
@@ -90,17 +88,17 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> {
}
sourceSingleOutputStreamOperator = sourceProvider.produceDataStream(jobRuntimeEnvironment.getStreamExecutionEnvironment()).name(sourceConfig.getName());
- if (node.getParallelism() > 0) {
- sourceSingleOutputStreamOperator.setParallelism(node.getParallelism());
+ if (operatorNode.getParallelism() > 0) {
+ sourceSingleOutputStreamOperator.setParallelism(operatorNode.getParallelism());
}
- sourceSingleOutputStreamOperator = setWatermarkIfNecessary(sourceSingleOutputStreamOperator, sourceConfig, node);
+ sourceSingleOutputStreamOperator = setWatermarkIfNecessary(sourceSingleOutputStreamOperator, sourceConfig, operatorNode);
return sourceSingleOutputStreamOperator;
} catch (Exception e) {
throw new JobExecuteException("Create source instance failed!", e);
}
}
- private SingleOutputStreamOperator<Event> setWatermarkIfNecessary(SingleOutputStreamOperator<Event> dataStream, SourceConfig sourceConfig, Node node){
+ private SingleOutputStreamOperator<Event> setWatermarkIfNecessary(SingleOutputStreamOperator<Event> dataStream, SourceConfig sourceConfig, OperatorNode operatorNode){
final String watermarkTimestamp = sourceConfig.getWatermark_timestamp();
if(StringUtils.isNotBlank(watermarkTimestamp)){
String timestampUnit = sourceConfig.getWatermark_timestamp_unit();
@@ -139,8 +137,8 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> {
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofMillis(watermarkLag))
.withTimestampAssigner(timestampAssigner)
);
- if (node.getParallelism() > 0) {
- dataStream.setParallelism(node.getParallelism());
+ if (operatorNode.getParallelism() > 0) {
+ dataStream.setParallelism(operatorNode.getParallelism());
}
}
return dataStream;
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java
index 3d6f264..c142614 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java
@@ -29,12 +29,12 @@ import java.util.ServiceLoader;
public class SplitExecutor extends AbstractExecutor<String, SplitConfig> {
- public SplitExecutor(List<URL> jarPaths, Config config) {
- super(jarPaths, config);
+ public SplitExecutor(Config config) {
+ super(config);
}
@Override
- protected Map<String, SplitConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
+ protected Map<String, SplitConfig> initialize(Config operatorConfig) {
Map<String, SplitConfig> splitConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.SPLITS)) {
Config splitsConfig = operatorConfig.getConfig(Constants.SPLITS);
@@ -56,20 +56,20 @@ public class SplitExecutor extends AbstractExecutor<String, SplitConfig> {
}
@Override
- public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
- SplitConfig splitConfig = operatorMap.get(node.getName());
+ public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException {
+ SplitConfig splitConfig = operatorMap.get(operatorNode.getName());
boolean found = false; // 标志变量
ServiceLoader<Split> splits = ServiceLoader.load(Split.class);
for (Split split : splits) {
found = true; // 标志变量
if(split.type().equals(splitConfig.getType())){
- if (node.getParallelism() > 0) {
- splitConfig.setParallelism(node.getParallelism());
+ if (operatorNode.getParallelism() > 0) {
+ splitConfig.setParallelism(operatorNode.getParallelism());
}
try {
- dataStream =
+ input =
split.splitFunction(
- dataStream, splitConfig);
+ input, splitConfig);
} catch (Exception e) {
throw new JobExecuteException("Create split instance failed!", e);
}
@@ -79,7 +79,7 @@ public class SplitExecutor extends AbstractExecutor<String, SplitConfig> {
if (!found) {
throw new JobExecuteException("No matching split found for type: " + splitConfig.getType());
}
- return dataStream;
+ return input;
}
protected SplitConfig checkConfig(String key, Map<String, Object> value, Config config) {
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java
index e52fd3b..130478e 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.bootstrap.main.simple.collect;
-import com.geedgenetworks.spi.table.connector.SinkProvider;
-import com.geedgenetworks.spi.table.connector.SinkTableFactory;
+import com.geedgenetworks.spi.sink.SinkProvider;
+import com.geedgenetworks.spi.sink.SinkTableFactory;
import com.geedgenetworks.spi.table.event.Event;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.streaming.api.datastream.DataStream;