summaryrefslogtreecommitdiff
path: root/groot-bootstrap
diff options
context:
space:
mode:
Diffstat (limited to 'groot-bootstrap')
-rw-r--r--groot-bootstrap/pom.xml2
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/OperatorType.java (renamed from groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/StageType.java)8
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java30
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java87
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java7
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java102
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java160
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java (renamed from groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/OperatorNode.java)9
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java43
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java46
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java49
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessorExecutor.java102
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java52
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java62
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java105
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/SchemaConfigParse.java8
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java2
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java8
18 files changed, 261 insertions, 621 deletions
diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml
index 150c941..60e602a 100644
--- a/groot-bootstrap/pom.xml
+++ b/groot-bootstrap/pom.xml
@@ -30,7 +30,7 @@
<dependency>
<groupId>com.geedgenetworks</groupId>
- <artifactId>groot-spi</artifactId>
+ <artifactId>groot-api</artifactId>
<version>${revision}</version>
</dependency>
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/StageType.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/OperatorType.java
index 8b4e154..a32c844 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/StageType.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/OperatorType.java
@@ -1,6 +1,6 @@
package com.geedgenetworks.bootstrap.enums;
-public enum StageType {
+public enum OperatorType {
SOURCE("source"),
FILTER("filter"),
SPLIT("split"),
@@ -13,10 +13,10 @@ public enum StageType {
public String getType() {
return type;
}
- StageType(String type) {this.type = type;}
+ OperatorType(String type) {this.type = type;}
- public static StageType fromType(String type) {
- for (StageType stage : values()) {
+ public static OperatorType fromType(String type) {
+ for (OperatorType stage : values()) {
if (stage.type.equalsIgnoreCase(type)) {
return stage;
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
index fe440f7..8ad33a2 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
@@ -1,35 +1,25 @@
package com.geedgenetworks.bootstrap.execution;
import com.geedgenetworks.common.utils.ReflectionUtils;
-import com.geedgenetworks.spi.table.event.Event;
-import com.typesafe.config.Config;
+import com.geedgenetworks.api.connector.event.Event;
import org.apache.flink.streaming.api.datastream.DataStream;
import java.net.URL;
import java.net.URLClassLoader;
-import java.util.*;
import java.util.function.BiConsumer;
-public abstract class AbstractExecutor<K, V>
- implements Executor<DataStream<Event>, JobRuntimeEnvironment> {
- protected JobRuntimeEnvironment jobRuntimeEnvironment;
- protected final Config operatorConfig;
- protected final Map<K,V> operatorMap;
+public abstract class AbstractExecutor<E, C> implements Executor<DataStream<Event>> {
+ public E environment;
+ protected final C jobConfig;
- protected AbstractExecutor(Config operatorConfig) {
- this.operatorConfig = operatorConfig;
- this.operatorMap = initialize(operatorConfig);
+ protected AbstractExecutor(E environment, C jobConfig) {
+ this.environment = environment;
+ this.jobConfig = jobConfig;
+ initialize(jobConfig);
}
+ protected abstract void initialize(C jobConfig);
- @Override
- public void setRuntimeEnvironment(JobRuntimeEnvironment jobRuntimeEnvironment) {
- this.jobRuntimeEnvironment = jobRuntimeEnvironment;
-
- }
-
- protected abstract Map<K, V> initialize(Config operatorConfig);
-
- protected static final BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER =
+ protected static final BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER =
(classLoader, url) -> {
if (classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) {
URLClassLoader c =
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
deleted file mode 100644
index a45380e..0000000
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
+++ /dev/null
@@ -1,87 +0,0 @@
-package com.geedgenetworks.bootstrap.execution;
-
-import com.geedgenetworks.bootstrap.exception.JobExecuteException;
-import com.geedgenetworks.common.config.*;
-import com.geedgenetworks.common.exception.CommonErrorCode;
-import com.geedgenetworks.common.exception.ConfigValidationException;
-import com.geedgenetworks.spi.configuration.ProjectionConfigOptions;
-import com.geedgenetworks.spi.processor.Processor;
-import com.geedgenetworks.spi.processor.ProcessorConfig;
-import com.geedgenetworks.spi.table.event.Event;
-import com.typesafe.config.Config;
-import org.apache.flink.streaming.api.datastream.DataStream;
-
-import java.net.URL;
-import java.util.List;
-import java.util.Map;
-import java.util.ServiceLoader;
-
-public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, ProcessorConfig> {
-
-
- protected AbstractProcessorExecutor(Config operatorConfig) {
- super(operatorConfig);
- }
-
- @Override
- public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException {
-
- ProcessorConfig processorConfig = operatorMap.get(operatorNode.getName());
- boolean found = false; // 标志变量
- ServiceLoader<Processor> processors = ServiceLoader.load(Processor.class);
- for (Processor processor : processors) {
- if(processor.type().equals(processorConfig.getType())){
- found = true;
- if (operatorNode.getParallelism() > 0) {
- processorConfig.setParallelism(operatorNode.getParallelism());
- }
- try {
- input = processor.process(jobRuntimeEnvironment.getStreamExecutionEnvironment(), input, processorConfig);
- } catch (Exception e) {
- throw new JobExecuteException("Create orderby pipeline instance failed!", e);
- }
- break;
- }
- }
- if (!found) {
- throw new JobExecuteException("No matching processor found for type: " + processorConfig.getType());
- }
- return input;
- }
-
- protected ProcessorConfig checkConfig(String key, Map<String, Object> value, Config processorsConfig) {
- ProcessorConfig ProcessorConfig = new ProcessorConfig();
- boolean found = false; // 标志变量
- CheckResult result = CheckConfigUtil.checkAllExists(processorsConfig.getConfig(key),
- ProjectionConfigOptions.TYPE.key());
- if (!result.isSuccess()) {
- throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
- "Postprocessor: %s, Message: %s",
- key, result.getMsg()));
- }
- ServiceLoader<Processor> processors = ServiceLoader.load(Processor.class);
- for (Processor processor : processors) {
- if(processor.type().equals(value.getOrDefault("type", "").toString())){
- found = true;
- try {
- ProcessorConfig = processor.checkConfig(key, value, processorsConfig);
-
- } catch (Exception e) {
- throw new JobExecuteException("Create orderby pipeline instance failed!", e);
- }
- break;
- }
- }
- if (!found) {
- throw new JobExecuteException("No matching processor found for type: " + value.getOrDefault("type", "").toString());
- }
- return ProcessorConfig;
- }
-
-
-
-
-
-
-
-}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java
index e43c949..e36971d 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java
@@ -2,10 +2,7 @@ package com.geedgenetworks.bootstrap.execution;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
-public interface Executor<T, ENV extends RuntimeEnvironment> {
-
- T execute(T dataStream, OperatorNode edge) throws JobExecuteException;
-
- void setRuntimeEnvironment(ENV runtimeEnvironment);
+public interface Executor<T> {
+ T execute(T dataStream, JobTopologyNode jobTopologyNode) throws JobExecuteException;
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
deleted file mode 100644
index d70420e..0000000
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
+++ /dev/null
@@ -1,102 +0,0 @@
-package com.geedgenetworks.bootstrap.execution;
-
-import com.geedgenetworks.bootstrap.enums.StageType;
-import com.geedgenetworks.bootstrap.exception.JobExecuteException;
-import com.geedgenetworks.common.config.Constants;
-import com.geedgenetworks.common.config.CheckConfigUtil;
-import com.geedgenetworks.common.config.CheckResult;
-import com.geedgenetworks.common.exception.CommonErrorCode;
-import com.geedgenetworks.common.exception.ConfigValidationException;
-
-import com.geedgenetworks.spi.configuration.FilterConfigOptions;
-import com.geedgenetworks.spi.filter.Filter;
-import com.geedgenetworks.spi.filter.FilterConfig;
-import com.geedgenetworks.spi.table.event.Event;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.flink.streaming.api.datastream.DataStream;
-
-import java.net.URL;
-import java.util.List;
-import java.util.Map;
-import java.util.ServiceLoader;
-
-/**
- * Initialize config and execute filter operator
- */
-@Slf4j
-public class FilterExecutor extends AbstractExecutor<String, FilterConfig> {
- private static final String PROCESSOR_TYPE = StageType.FILTER.getType();
-
- public FilterExecutor(Config config) {
- super(config);
- }
-
- @Override
- protected Map<String, FilterConfig> initialize(Config operatorConfig) {
- Map<String, FilterConfig> filterConfigMap = Maps.newHashMap();
- if (operatorConfig.hasPath(Constants.FILTERS)) {
- Config filterConfig = operatorConfig.getConfig(Constants.FILTERS);
- filterConfig.root().unwrapped().forEach((key, value) -> {
- CheckResult result = CheckConfigUtil.checkAllExists(filterConfig.getConfig(key),
- FilterConfigOptions.TYPE.key(), FilterConfigOptions.PROPERTIES.key());
- if (!result.isSuccess()) {
- throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
- "Filter: %s, Message: %s",
- key, result.getMsg()));
- }
- filterConfigMap.put(key, checkConfig(key, (Map<String, Object>) value, filterConfig));
- });
- }
-
- return filterConfigMap;
- }
-
- @Override
- public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException {
- FilterConfig filterConfig = operatorMap.get(operatorNode.getName());
- boolean found = false; // 标志变量
- ServiceLoader<Filter> filters = ServiceLoader.load(Filter.class);
- for (Filter filter : filters) {
- if(filter.type().equals(filterConfig.getType())){
- found = true;
- if (operatorNode.getParallelism() > 0) {
- filterConfig.setParallelism(operatorNode.getParallelism());
- }
- try {
- input =
- filter.filterFunction(
- input, filterConfig);
- } catch (Exception e) {
- throw new JobExecuteException("Create filter instance failed!", e);
- }
- break;
- }
- }
- if (!found) {
- throw new JobExecuteException("No matching filter found for type: " + filterConfig.getType());
- }
- return input;
- }
-
- protected FilterConfig checkConfig(String key, Map<String, Object> value, Config config) {
- FilterConfig filterConfig = new FilterConfig();
- boolean found = false; // 标志变量
- ServiceLoader<Filter> filters = ServiceLoader.load(Filter.class);
- for (Filter filter : filters) {
- if(filter.type().equals(value.getOrDefault("type", "").toString())){
- found = true;
- try {
- filterConfig = filter.checkConfig(key, value, config);
- } catch (Exception e) {
- throw new JobExecuteException("Create split pipeline instance failed!", e);
- }
- }
- }
- if (!found) {
- throw new JobExecuteException("No matching filter found for type: " + value.getOrDefault("type", "").toString());
- }
- return filterConfig;
- }
-}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
index cd70f44..ad31d88 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
@@ -1,12 +1,12 @@
package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson2.JSONObject;
-import com.geedgenetworks.bootstrap.enums.StageType;
+import com.geedgenetworks.bootstrap.enums.OperatorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.main.GrootStreamRunner;
import com.geedgenetworks.common.config.Constants;
import com.geedgenetworks.common.config.GrootStreamConfig;
-import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.api.connector.event.Event;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
@@ -29,14 +29,10 @@ import java.util.stream.Stream;
public class JobExecution {
private final JobRuntimeEnvironment jobRuntimeEnvironment;
- private final Executor<DataStream<Event>, JobRuntimeEnvironment> sourceExecutor;
- private final Executor<DataStream<Event>, JobRuntimeEnvironment> sinkExecutor;
- private final Executor<DataStream<Event>, JobRuntimeEnvironment> filterExecutor;
- private final Executor<DataStream<Event>, JobRuntimeEnvironment> splitExecutor;
- private final Executor<DataStream<Event>, JobRuntimeEnvironment> preprocessingExecutor;
- private final Executor<DataStream<Event>, JobRuntimeEnvironment> processingExecutor;
- private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor;
- private final List<OperatorNode> operatorNodes;
+ private final Executor<DataStream<Event>> sourceExecutor;
+ private final Executor<DataStream<Event>> sinkExecutor;
+ private final Executor<DataStream<Event>> processorExecutor;
+ private final List<JobTopologyNode> jobTopologyNodes;
private final List<URL> jarPaths;
private final Map<String,String> nodeNameWithSplitTags = new HashMap<>();
@@ -50,25 +46,13 @@ public class JobExecution {
}
registerPlugin(jobConfig.getConfig(Constants.APPLICATION));
-
- this.sourceExecutor = new SourceExecutor(jobConfig);
- this.sinkExecutor = new SinkExecutor(jobConfig);
- this.filterExecutor = new FilterExecutor(jobConfig);
- this.splitExecutor = new SplitExecutor(jobConfig);
- this.preprocessingExecutor = new PreprocessingExecutor(jobConfig);
- this.processingExecutor = new ProcessingExecutor(jobConfig);
- this.postprocessingExecutor = new PostprocessingExecutor(jobConfig);
this.jobRuntimeEnvironment =
JobRuntimeEnvironment.getInstance(this.registerPlugin(jobConfig, jarPaths), grootStreamConfig);
- this.sourceExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.sinkExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.filterExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.splitExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.operatorNodes = buildJobNode(jobConfig);
+ this.sourceExecutor = new SourceExecutor(jobRuntimeEnvironment, jobConfig);
+ this.sinkExecutor = new SinkExecutor(jobRuntimeEnvironment, jobConfig);
+ this.processorExecutor = new ProcessorExecutor(jobRuntimeEnvironment, jobConfig);
+ this.jobTopologyNodes = buildJobNode(jobConfig);
}
@@ -88,7 +72,7 @@ public class JobExecution {
try {
return uri.toURL();
} catch (MalformedURLException e) {
- throw new RuntimeException("the uri of jar illegal: " + uri, e);
+ throw new RuntimeException("The uri of jar illegal:" + uri, e);
}
}).collect(Collectors.toList());
jarDependencies.forEach(url -> {
@@ -153,7 +137,7 @@ public class JobExecution {
return config;
}
- private List<OperatorNode> buildJobNode(Config config) {
+ private List<JobTopologyNode> buildJobNode(Config config) {
Map<String, Object> sources = Maps.newHashMap();
Map<String, Object> sinks = Maps.newHashMap();
@@ -187,34 +171,34 @@ public class JobExecution {
List<? extends Config> topology = config.getConfig(Constants.APPLICATION).getConfigList(Constants.APPLICATION_TOPOLOGY);
- List<OperatorNode> operatorNodes = Lists.newArrayList();
+ List<JobTopologyNode> jobTopologyNodes = Lists.newArrayList();
topology.forEach(item -> {
- OperatorNode operatorNode = JSONObject.from(item.root().unwrapped()).toJavaObject(OperatorNode.class);
- operatorNodes.add(operatorNode);
+ JobTopologyNode jobTopologyNode = JSONObject.from(item.root().unwrapped()).toJavaObject(JobTopologyNode.class);
+ jobTopologyNodes.add(jobTopologyNode);
});
- for (OperatorNode operatorNode : operatorNodes) {
- if (sources.containsKey(operatorNode.getName())) {
- operatorNode.setType(StageType.SOURCE);
- } else if (sinks.containsKey(operatorNode.getName())) {
- operatorNode.setType(StageType.SINK);
- } else if (filters.containsKey(operatorNode.getName())) {
- operatorNode.setType(StageType.FILTER);
- } else if (splits.containsKey(operatorNode.getName())) {
- operatorNode.setType(StageType.SPLIT);
- } else if (preprocessingPipelines.containsKey(operatorNode.getName())) {
- operatorNode.setType(StageType.PREPROCESSING);
- } else if (processingPipelines.containsKey(operatorNode.getName())) {
- operatorNode.setType(StageType.PROCESSING);
- } else if (postprocessingPipelines.containsKey(operatorNode.getName())) {
- operatorNode.setType(StageType.POSTPROCESSING);
+ for (JobTopologyNode jobTopologyNode : jobTopologyNodes) {
+ if (sources.containsKey(jobTopologyNode.getName())) {
+ jobTopologyNode.setType(OperatorType.SOURCE);
+ } else if (sinks.containsKey(jobTopologyNode.getName())) {
+ jobTopologyNode.setType(OperatorType.SINK);
+ } else if (filters.containsKey(jobTopologyNode.getName())) {
+ jobTopologyNode.setType(OperatorType.FILTER);
+ } else if (splits.containsKey(jobTopologyNode.getName())) {
+ jobTopologyNode.setType(OperatorType.SPLIT);
+ } else if (preprocessingPipelines.containsKey(jobTopologyNode.getName())) {
+ jobTopologyNode.setType(OperatorType.PREPROCESSING);
+ } else if (processingPipelines.containsKey(jobTopologyNode.getName())) {
+ jobTopologyNode.setType(OperatorType.PROCESSING);
+ } else if (postprocessingPipelines.containsKey(jobTopologyNode.getName())) {
+ jobTopologyNode.setType(OperatorType.POSTPROCESSING);
} else {
- throw new JobExecuteException("unsupported process type " + operatorNode.getName());
+ throw new JobExecuteException("unsupported process type " + jobTopologyNode.getName());
}
}
- return operatorNodes;
+ return jobTopologyNodes;
}
@@ -223,14 +207,14 @@ public class JobExecution {
if (!jobRuntimeEnvironment.isLocalMode() && !jobRuntimeEnvironment.isTestMode()) {
jobRuntimeEnvironment.registerPlugin(jarPaths);
}
- List<OperatorNode> sourceOperatorNodes = operatorNodes
- .stream().filter(v -> v.getType().name().equals(StageType.SOURCE.name())).collect(Collectors.toList());
+ List<JobTopologyNode> sourceJobTopologyNodes = jobTopologyNodes
+ .stream().filter(v -> v.getType().name().equals(OperatorType.SOURCE.name())).collect(Collectors.toList());
DataStream<Event> dataStream = null;
- for (OperatorNode sourceOperatorNode : sourceOperatorNodes) {
- dataStream = sourceExecutor.execute(dataStream, sourceOperatorNode);
- for (String nodeName : sourceOperatorNode.getDownstream()) {
+ for (JobTopologyNode sourceJobTopologyNode : sourceJobTopologyNodes) {
+ dataStream = sourceExecutor.execute(dataStream, sourceJobTopologyNode);
+ for (String nodeName : sourceJobTopologyNode.getDownstream()) {
buildJobGraph(dataStream, nodeName);
}
}
@@ -251,68 +235,68 @@ public class JobExecution {
}
private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) {
- OperatorNode operatorNode = getNode(downstreamNodeName).orElseGet(() -> {
+ JobTopologyNode jobTopologyNode = getNode(downstreamNodeName).orElseGet(() -> {
throw new JobExecuteException("Can't find downstream node " + downstreamNodeName);
});
- if (operatorNode.getType().name().equals(StageType.FILTER.name())) {
- if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) {
- dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream)
- .getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())) {}), operatorNode);
+ if (jobTopologyNode.getType().name().equals(OperatorType.FILTER.name())) {
+ if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) {
+ dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream)
+ .getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) {}), jobTopologyNode);
} else {
- dataStream = filterExecutor.execute(dataStream, operatorNode);
+ dataStream = processorExecutor.execute(dataStream, jobTopologyNode);
}
- } else if (operatorNode.getType().name().equals(StageType.SPLIT.name())) {
- if (operatorNode.getTags().size() == operatorNode.getDownstream().size()) {
- for (int i = 0; i < operatorNode.getDownstream().size(); i++) {
- nodeNameWithSplitTags.put(operatorNode.getDownstream().get(i), operatorNode.getTags().get(i));
+ } else if (jobTopologyNode.getType().name().equals(OperatorType.SPLIT.name())) {
+ if (jobTopologyNode.getTags().size() == jobTopologyNode.getDownstream().size()) {
+ for (int i = 0; i < jobTopologyNode.getDownstream().size(); i++) {
+ nodeNameWithSplitTags.put(jobTopologyNode.getDownstream().get(i), jobTopologyNode.getTags().get(i));
}
}
else {
throw new JobExecuteException("split node downstream size not equal tags size");
}
- dataStream = splitExecutor.execute(dataStream, operatorNode);
- } else if (operatorNode.getType().name().equals(StageType.PREPROCESSING.name())) {
- if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) {
- dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())){
- }), operatorNode);
+ dataStream = processorExecutor.execute(dataStream, jobTopologyNode);
+ } else if (jobTopologyNode.getType().name().equals(OperatorType.PREPROCESSING.name())) {
+ if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) {
+ dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())){
+ }), jobTopologyNode);
} else {
- dataStream = preprocessingExecutor.execute(dataStream, operatorNode);
+ dataStream = processorExecutor.execute(dataStream, jobTopologyNode);
}
- } else if (operatorNode.getType().name().equals(StageType.PROCESSING.name())) {
- if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) {
- dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())) {
- }), operatorNode);
+ } else if (jobTopologyNode.getType().name().equals(OperatorType.PROCESSING.name())) {
+ if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) {
+ dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) {
+ }), jobTopologyNode);
} else {
- dataStream = processingExecutor.execute(dataStream, operatorNode);
+ dataStream = processorExecutor.execute(dataStream, jobTopologyNode);
}
- } else if (operatorNode.getType().name().equals(StageType.POSTPROCESSING.name())) {
- if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) {
- dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())) {
- }), operatorNode);
+ } else if (jobTopologyNode.getType().name().equals(OperatorType.POSTPROCESSING.name())) {
+ if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) {
+ dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) {
+ }), jobTopologyNode);
} else {
- dataStream = postprocessingExecutor.execute(dataStream, operatorNode);
+ dataStream = processorExecutor.execute(dataStream, jobTopologyNode);
}
- } else if (operatorNode.getType().name().equals(StageType.SINK.name())) {
- if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) {
- dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())) {
- }), operatorNode);
+ } else if (jobTopologyNode.getType().name().equals(OperatorType.SINK.name())) {
+ if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) {
+ dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) {
+ }), jobTopologyNode);
} else {
- dataStream = sinkExecutor.execute(dataStream, operatorNode);
+ dataStream = sinkExecutor.execute(dataStream, jobTopologyNode);
}
} else {
- throw new JobExecuteException("unsupported process type " + operatorNode.getType().name());
+ throw new JobExecuteException("unsupported process type " + jobTopologyNode.getType().name());
}
- for (String nodeName : operatorNode.getDownstream()) {
+ for (String nodeName : jobTopologyNode.getDownstream()) {
buildJobGraph(dataStream, nodeName);
}
}
- private Optional<OperatorNode> getNode(String name) {
- return operatorNodes.stream().filter(v -> v.getName().equals(name)).findFirst();
+ private Optional<JobTopologyNode> getNode(String name) {
+ return jobTopologyNodes.stream().filter(v -> v.getName().equals(name)).findFirst();
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/OperatorNode.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java
index 8c4b392..dcc15e9 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/OperatorNode.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java
@@ -1,6 +1,6 @@
package com.geedgenetworks.bootstrap.execution;
-import com.geedgenetworks.bootstrap.enums.StageType;
+import com.geedgenetworks.bootstrap.enums.OperatorType;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -10,13 +10,16 @@ import java.io.Serializable;
import java.util.Collections;
import java.util.List;
+/**
+ * Represents an operator node in the execution graph.
+ */
@Data
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
-public class OperatorNode implements Serializable {
+public class JobTopologyNode implements Serializable {
private String name;
- private StageType type;
+ private OperatorType type;
private int parallelism;
private List<String> downstream = Collections.emptyList();
private List<String> tags = Collections.emptyList();
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
deleted file mode 100644
index 10d9188..0000000
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package com.geedgenetworks.bootstrap.execution;
-
-import com.geedgenetworks.bootstrap.enums.StageType;
-import com.geedgenetworks.bootstrap.exception.JobExecuteException;
-import com.geedgenetworks.common.config.Constants;
-import com.geedgenetworks.spi.processor.ProcessorConfig;
-import com.geedgenetworks.spi.table.event.Event;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-import org.apache.flink.streaming.api.datastream.DataStream;
-
-import java.net.URL;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Initialize config and execute postprocessor
- */
-public class PostprocessingExecutor extends AbstractProcessorExecutor {
- private static final String PROCESSOR_TYPE = StageType.POSTPROCESSING.getType();
-
- public PostprocessingExecutor(Config config) {
- super(config);
- }
-
- @Override
- protected Map<String, ProcessorConfig> initialize(Config operatorConfig) {
- Map<String, ProcessorConfig> postprocessingConfigMap = Maps.newHashMap();
- if (operatorConfig.hasPath(Constants.POSTPROCESSING_PIPELINES)) {
- Config postprocessors = operatorConfig.getConfig(Constants.POSTPROCESSING_PIPELINES);
- postprocessors.root().unwrapped().forEach((key, value) -> {
- postprocessingConfigMap.put(key, checkConfig(key, (Map<String, Object>) value, postprocessors));
- });
- }
- return postprocessingConfigMap;
- }
-
-
- @Override
- public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException {
- return super.execute(input, operatorNode);
- }
-}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
deleted file mode 100644
index 9acda99..0000000
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package com.geedgenetworks.bootstrap.execution;
-
-import com.geedgenetworks.bootstrap.enums.StageType;
-import com.geedgenetworks.bootstrap.exception.JobExecuteException;
-import com.geedgenetworks.common.config.Constants;
-import com.geedgenetworks.spi.processor.ProcessorConfig;
-import com.geedgenetworks.spi.table.event.Event;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.flink.streaming.api.datastream.DataStream;
-
-import java.net.URL;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Initialize config and execute preprocessor
- */
-@Slf4j
-public class PreprocessingExecutor extends AbstractProcessorExecutor {
- private static final String PROCESSOR_TYPE = StageType.PREPROCESSING.getType();
-
- public PreprocessingExecutor(Config config) {
- super(config);
- }
-
- @Override
- protected Map<String, ProcessorConfig> initialize(Config operatorConfig) {
- Map<String, ProcessorConfig> preprocessingConfigMap = Maps.newHashMap();
- if (operatorConfig.hasPath(Constants.PREPROCESSING_PIPELINES)) {
- Config preprocessors = operatorConfig.getConfig(Constants.PREPROCESSING_PIPELINES);
- preprocessors.root().unwrapped().forEach((key, value) -> {
- preprocessingConfigMap.put(key, checkConfig(key, (Map<String, Object>) value, preprocessors));
- });
- }
- return preprocessingConfigMap;
- }
-
- @Override
- public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException {
-
- return super.execute(input, operatorNode);
-
- }
-}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
deleted file mode 100644
index c49df88..0000000
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package com.geedgenetworks.bootstrap.execution;
-
-import com.geedgenetworks.bootstrap.enums.StageType;
-import com.geedgenetworks.bootstrap.exception.JobExecuteException;
-import com.geedgenetworks.common.config.Constants;
-import com.geedgenetworks.spi.processor.Processor;
-import com.geedgenetworks.spi.processor.ProcessorConfig;
-import com.geedgenetworks.spi.processor.ProcessorProvider;
-import com.geedgenetworks.spi.table.event.Event;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-import org.apache.flink.streaming.api.datastream.DataStream;
-
-import java.net.URL;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Initialize config and execute processor
- */
-public class ProcessingExecutor extends AbstractProcessorExecutor {
- private static final String PROCESSOR_TYPE = StageType.PROCESSING.getType();
- //private Map<String, Processor<?>> processors;
-
- public ProcessingExecutor(Config config) {
- super(config);
- }
-
- @Override
- protected Map<String, ProcessorConfig> initialize(Config operatorConfig) {
- Map<String, ProcessorConfig> processingConfigMap = Maps.newHashMap();
- //processors = new HashMap<>();
- if (operatorConfig.hasPath(Constants.PROCESSING_PIPELINES)) {
- Config processingConfig = operatorConfig.getConfig(Constants.PROCESSING_PIPELINES);
- processingConfig.root().unwrapped().forEach((key, value) -> {
- processingConfigMap.put(key, checkConfig(key, (Map<String, Object>) value, processingConfig));
- //processors.put(key, ProcessorProvider.load(((Map<?, ?>) value).get("type").toString()));
-
- });
- }
- return processingConfigMap;
- }
-
- @Override
- public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException {
- return super.execute(input, operatorNode);
- }
-}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessorExecutor.java
new file mode 100644
index 0000000..204866f
--- /dev/null
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessorExecutor.java
@@ -0,0 +1,102 @@
+package com.geedgenetworks.bootstrap.execution;
+
+import com.geedgenetworks.api.processor.ProcessorConfigOptions;
+import com.geedgenetworks.api.factory.FactoryUtil;
+import com.geedgenetworks.api.factory.ProcessorFactory;
+import com.geedgenetworks.bootstrap.exception.JobExecuteException;
+import com.geedgenetworks.common.config.CheckConfigUtil;
+import com.geedgenetworks.common.config.CheckResult;
+import com.geedgenetworks.common.config.Constants;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.ConfigValidationException;
+import com.geedgenetworks.api.processor.Processor;
+import com.geedgenetworks.api.processor.ProcessorConfig;
+import com.geedgenetworks.api.connector.event.Event;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import java.util.Map;
+/**
+ * Initialize config and execute processor
+ */
+public class ProcessorExecutor extends AbstractExecutor<JobRuntimeEnvironment, Config> {
+ private Map<String, ProcessorConfig> operators;
+ private Map<String, Processor<?>> processors;
+
+ public ProcessorExecutor(JobRuntimeEnvironment environment, Config jobConfig) {
+ super(environment, jobConfig);
+ }
+
+ @Override
+ protected void initialize(Config jobConfig) {
+ operators = Maps.newHashMap();
+ processors = Maps.newHashMap();
+
+ if (jobConfig.hasPath(Constants.FILTERS)) {
+ discoveryProcessors(jobConfig.getConfig(Constants.FILTERS));
+ }
+
+ if (jobConfig.hasPath(Constants.SPLITS)) {
+ discoveryProcessors(jobConfig.getConfig(Constants.SPLITS));
+ }
+
+ if (jobConfig.hasPath(Constants.PREPROCESSING_PIPELINES)) {
+ discoveryProcessors(jobConfig.getConfig(Constants.PREPROCESSING_PIPELINES));
+ }
+
+ if (jobConfig.hasPath(Constants.PROCESSING_PIPELINES)) {
+ discoveryProcessors(jobConfig.getConfig(Constants.PROCESSING_PIPELINES));
+ }
+
+ if (jobConfig.hasPath(Constants.POSTPROCESSING_PIPELINES)) {
+ discoveryProcessors(jobConfig.getConfig(Constants.POSTPROCESSING_PIPELINES));
+ }
+ }
+
+ private void discoveryProcessors(Config config) {
+
+ config.root().unwrapped().forEach((key, value) -> {
+
+ CheckResult result = CheckConfigUtil.checkAllExists(config.getConfig(key),
+ ProcessorConfigOptions.TYPE.key());
+ if (!result.isSuccess()) {
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
+ "Processor: %s, Message: %s", key, result.getMsg()));
+ }
+
+ Processor processor = FactoryUtil
+ .discoverProcessorFactory(ProcessorFactory.class, ((Map<?, ?>) value).get("type").toString()).createProcessor();
+ processors.put(key, processor);
+ operators.put(key, processor.parseConfig(key,config.getConfig(key)));
+ });
+
+ }
+
+ @Override
+ public DataStream<Event> execute(DataStream<Event> input, JobTopologyNode jobTopologyNode) throws JobExecuteException {
+ String name = jobTopologyNode.getName();
+ ProcessorConfig operatorConfig = operators.get(name);
+ if (operatorConfig == null) {
+ throw new JobExecuteException("No matching operator configuration found for: " + name);
+ }
+
+ Processor processor = processors.get(operatorConfig.getName());
+
+ if (processor == null) {
+ throw new JobExecuteException("No matching processor found for type: " + operatorConfig.getType());
+ }
+
+ // Set parallelism if needed
+ int parallelism = jobTopologyNode.getParallelism();
+ if (parallelism > 0) {
+ operatorConfig.setParallelism(parallelism);
+ }
+
+ try {
+ return processor.process(environment.getStreamExecutionEnvironment(), input, operatorConfig);
+ } catch (Exception e) {
+ throw new JobExecuteException("Failed to execute processor due to unexpected error.", e);
+ }
+
+ }
+}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
index 130705a..501fa81 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson.JSONObject;
-import com.geedgenetworks.bootstrap.enums.StageType;
+import com.geedgenetworks.bootstrap.enums.OperatorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.utils.SchemaConfigParse;
import com.geedgenetworks.common.config.Constants;
@@ -9,14 +9,14 @@ import com.geedgenetworks.common.config.CheckConfigUtil;
import com.geedgenetworks.common.config.CheckResult;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
-import com.geedgenetworks.spi.sink.SinkConfig;
-import com.geedgenetworks.spi.sink.SinkConfigOptions;
-import com.geedgenetworks.spi.sink.SinkProvider;
-import com.geedgenetworks.spi.sink.SinkTableFactory;
-import com.geedgenetworks.spi.table.event.Event;
-import com.geedgenetworks.spi.table.factory.FactoryUtil;
-import com.geedgenetworks.spi.table.factory.TableFactory;
-import com.geedgenetworks.spi.table.schema.Schema;
+import com.geedgenetworks.api.connector.sink.SinkConfig;
+import com.geedgenetworks.api.connector.sink.SinkConfigOptions;
+import com.geedgenetworks.api.connector.sink.SinkProvider;
+import com.geedgenetworks.api.connector.sink.SinkTableFactory;
+import com.geedgenetworks.api.connector.event.Event;
+import com.geedgenetworks.api.factory.FactoryUtil;
+import com.geedgenetworks.api.factory.ConnectorFactory;
+import com.geedgenetworks.api.connector.schema.Schema;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
@@ -30,18 +30,17 @@ import java.util.Map;
* Initialize config and execute sink connector
*/
@Slf4j
-public class SinkExecutor extends AbstractExecutor<String, SinkConfig> {
- private static final String PROCESSOR_TYPE = StageType.SINK.getType();
-
- public SinkExecutor(Config config) {
- super(config);
+public class SinkExecutor extends AbstractExecutor<JobRuntimeEnvironment, Config> {
+ private static final String PROCESSOR_TYPE = OperatorType.SINK.getType();
+ private Map<String, SinkConfig> operators;
+ public SinkExecutor(JobRuntimeEnvironment environment, Config jobConfig) {
+ super(environment, jobConfig);
}
@Override
- protected Map<String, SinkConfig> initialize(Config operatorConfig) {
- Map<String, SinkConfig> sinkConfigMap = Maps.newHashMap();
-
- if (operatorConfig.hasPath(Constants.SINKS)) {
- Config sinks = operatorConfig.getConfig(Constants.SINKS);
+ protected void initialize(Config jobConfig) {
+ operators = Maps.newHashMap();
+ if (jobConfig.hasPath(Constants.SINKS)) {
+ Config sinks = jobConfig.getConfig(Constants.SINKS);
sinks.root().unwrapped().forEach((key,value) -> {
CheckResult result = CheckConfigUtil.checkAllExists(sinks.getConfig(key),
SinkConfigOptions.TYPE.key(), SinkConfigOptions.PROPERTIES.key());
@@ -53,26 +52,25 @@ public class SinkExecutor extends AbstractExecutor<String, SinkConfig> {
SinkConfig sinkConfig = new JSONObject((Map<String, Object>) value).toJavaObject(SinkConfig.class);
sinkConfig.setName(key);
- sinkConfigMap.put(key, sinkConfig);
+ operators.put(key, sinkConfig);
});
}
- return sinkConfigMap;
}
@Override
- public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException {
- SinkConfig sinkConfig = operatorMap.get(operatorNode.getName());
+ public DataStream<Event> execute(DataStream<Event> input, JobTopologyNode jobTopologyNode) throws JobExecuteException {
+ SinkConfig sinkConfig = operators.get(jobTopologyNode.getName());
try {
- SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, sinkConfig.getType());
+ SinkTableFactory sinkTableFactory = FactoryUtil.discoverConnectorFactory(SinkTableFactory.class, sinkConfig.getType());
Map<String, String> options = sinkConfig.getProperties();
Configuration configuration = Configuration.fromMap(options);
Schema schema = null;
if(sinkConfig.getSchema() != null && !sinkConfig.getSchema().isEmpty()){
schema = SchemaConfigParse.parseSchemaConfig(sinkConfig.getSchema());
}
- TableFactory.Context context = new TableFactory.Context(schema, options, configuration);
+ ConnectorFactory.Context context = new ConnectorFactory.Context(schema, options, configuration);
SinkProvider sinkProvider = sinkTableFactory.getSinkProvider(context);
if(!sinkProvider.supportDynamicSchema() && schema != null && schema.isDynamic()){
throw new UnsupportedOperationException(String.format("sink(%s) not support DynamicSchema", sinkConfig.getName()));
@@ -83,8 +81,8 @@ public class SinkExecutor extends AbstractExecutor<String, SinkConfig> {
}
DataStreamSink<?> dataStreamSink = sinkProvider.consumeDataStream(input);
- if (operatorNode.getParallelism() > 0) {
- dataStreamSink.setParallelism(operatorNode.getParallelism());
+ if (jobTopologyNode.getParallelism() > 0) {
+ dataStreamSink.setParallelism(jobTopologyNode.getParallelism());
}
dataStreamSink.name(sinkConfig.getName());
} catch (Exception e) {
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
index 5109540..ca4fc1d 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson2.JSONObject;
-import com.geedgenetworks.bootstrap.enums.StageType;
+import com.geedgenetworks.bootstrap.enums.OperatorType;
import com.geedgenetworks.bootstrap.exception.ConfigCheckException;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.utils.SchemaConfigParse;
@@ -10,14 +10,14 @@ import com.geedgenetworks.common.config.CheckConfigUtil;
import com.geedgenetworks.common.config.CheckResult;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
-import com.geedgenetworks.spi.configuration.SourceConfigOptions;
-import com.geedgenetworks.spi.source.SourceConfig;
-import com.geedgenetworks.spi.source.SourceProvider;
-import com.geedgenetworks.spi.source.SourceTableFactory;
-import com.geedgenetworks.spi.table.event.Event;
-import com.geedgenetworks.spi.table.factory.FactoryUtil;
-import com.geedgenetworks.spi.table.factory.TableFactory;
-import com.geedgenetworks.spi.table.schema.Schema;
+import com.geedgenetworks.api.connector.source.SourceConfigOptions;
+import com.geedgenetworks.api.connector.source.SourceConfig;
+import com.geedgenetworks.api.connector.source.SourceProvider;
+import com.geedgenetworks.api.connector.source.SourceTableFactory;
+import com.geedgenetworks.api.connector.event.Event;
+import com.geedgenetworks.api.factory.FactoryUtil;
+import com.geedgenetworks.api.factory.ConnectorFactory;
+import com.geedgenetworks.api.connector.schema.Schema;
import com.google.common.collect.Maps;
import com.typesafe.config.*;
import lombok.extern.slf4j.Slf4j;
@@ -35,17 +35,17 @@ import java.util.Map;
* Initialize config and execute source connector
*/
@Slf4j
-public class SourceExecutor extends AbstractExecutor<String, SourceConfig> {
- private static final String PROCESSOR_TYPE = StageType.SOURCE.getType();
-
- public SourceExecutor(Config config) {
- super(config);
+public class SourceExecutor extends AbstractExecutor<JobRuntimeEnvironment, Config> {
+ private static final String PROCESSOR_TYPE = OperatorType.SOURCE.getType();
+ private Map<String, SourceConfig> operators;
+ public SourceExecutor(JobRuntimeEnvironment environment, Config jobConfig) {
+ super(environment, jobConfig);
}
@Override
- protected Map<String, SourceConfig> initialize(Config operatorConfig) {
- Map<String, SourceConfig> sourceConfigMap = Maps.newHashMap();
- if (operatorConfig.hasPath(Constants.SOURCES)) {
- Config sources = operatorConfig.getConfig(Constants.SOURCES);
+ protected void initialize(Config jobConfig) {
+ operators = Maps.newHashMap();
+ if (jobConfig.hasPath(Constants.SOURCES)) {
+ Config sources = jobConfig.getConfig(Constants.SOURCES);
sources.root().unwrapped().forEach((key,value) -> {
CheckResult result = CheckConfigUtil.checkAllExists(sources.getConfig(key),
SourceConfigOptions.TYPE.key(), SourceConfigOptions.PROPERTIES.key());
@@ -57,27 +57,25 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> {
SourceConfig sourceConfig = new JSONObject((Map<String, Object>) value).toJavaObject(SourceConfig.class);
sourceConfig.setName(key);
- sourceConfigMap.put(key, sourceConfig);
+ operators.put(key, sourceConfig);
});
}
-
- return sourceConfigMap;
}
@Override
- public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException {
- SourceConfig sourceConfig = operatorMap.get(operatorNode.getName());
+ public DataStream<Event> execute(DataStream<Event> input, JobTopologyNode jobTopologyNode) throws JobExecuteException {
+ SourceConfig sourceConfig = operators.get(jobTopologyNode.getName());
SingleOutputStreamOperator sourceSingleOutputStreamOperator;
try {
- SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, sourceConfig.getType());
+ SourceTableFactory tableFactory = FactoryUtil.discoverConnectorFactory(SourceTableFactory.class, sourceConfig.getType());
Map<String, String> options = sourceConfig.getProperties();
Configuration configuration = Configuration.fromMap(options);
Schema schema = null;
if(sourceConfig.getSchema() != null && !sourceConfig.getSchema().isEmpty()){
schema = SchemaConfigParse.parseSchemaConfig(sourceConfig.getSchema());
}
- TableFactory.Context context = new TableFactory.Context(schema, options, configuration);
+ ConnectorFactory.Context context = new ConnectorFactory.Context(schema, options, configuration);
SourceProvider sourceProvider = tableFactory.getSourceProvider(context);
if(!sourceProvider.supportDynamicSchema() && schema != null && schema.isDynamic()){
throw new UnsupportedOperationException(String.format("source(%s) not support DynamicSchema", sourceConfig.getName()));
@@ -87,18 +85,18 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> {
System.out.println(String.format("source(%s) schema:\n%s", sourceConfig.getName(), schema.getDataType().treeString()));
}
- sourceSingleOutputStreamOperator = sourceProvider.produceDataStream(jobRuntimeEnvironment.getStreamExecutionEnvironment()).name(sourceConfig.getName());
- if (operatorNode.getParallelism() > 0) {
- sourceSingleOutputStreamOperator.setParallelism(operatorNode.getParallelism());
+ sourceSingleOutputStreamOperator = sourceProvider.produceDataStream(environment.getStreamExecutionEnvironment()).name(sourceConfig.getName());
+ if (jobTopologyNode.getParallelism() > 0) {
+ sourceSingleOutputStreamOperator.setParallelism(jobTopologyNode.getParallelism());
}
- sourceSingleOutputStreamOperator = setWatermarkIfNecessary(sourceSingleOutputStreamOperator, sourceConfig, operatorNode);
+ sourceSingleOutputStreamOperator = setWatermarkIfNecessary(sourceSingleOutputStreamOperator, sourceConfig, jobTopologyNode);
return sourceSingleOutputStreamOperator;
} catch (Exception e) {
throw new JobExecuteException("Create source instance failed!", e);
}
}
- private SingleOutputStreamOperator<Event> setWatermarkIfNecessary(SingleOutputStreamOperator<Event> dataStream, SourceConfig sourceConfig, OperatorNode operatorNode){
+ private SingleOutputStreamOperator<Event> setWatermarkIfNecessary(SingleOutputStreamOperator<Event> dataStream, SourceConfig sourceConfig, JobTopologyNode jobTopologyNode){
final String watermarkTimestamp = sourceConfig.getWatermark_timestamp();
if(StringUtils.isNotBlank(watermarkTimestamp)){
String timestampUnit = sourceConfig.getWatermark_timestamp_unit();
@@ -137,8 +135,8 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> {
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofMillis(watermarkLag))
.withTimestampAssigner(timestampAssigner)
);
- if (operatorNode.getParallelism() > 0) {
- dataStream.setParallelism(operatorNode.getParallelism());
+ if (jobTopologyNode.getParallelism() > 0) {
+ dataStream.setParallelism(jobTopologyNode.getParallelism());
}
}
return dataStream;
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java
deleted file mode 100644
index c142614..0000000
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java
+++ /dev/null
@@ -1,105 +0,0 @@
-package com.geedgenetworks.bootstrap.execution;
-
-import com.alibaba.fastjson.JSONObject;
-import com.geedgenetworks.bootstrap.exception.JobExecuteException;
-import com.geedgenetworks.common.config.Constants;
-import com.geedgenetworks.common.config.CheckConfigUtil;
-import com.geedgenetworks.common.config.CheckResult;
-import com.geedgenetworks.common.exception.CommonErrorCode;
-import com.geedgenetworks.common.exception.ConfigValidationException;
-import com.geedgenetworks.spi.split.Split;
-import com.geedgenetworks.spi.split.SplitConfig;
-import com.geedgenetworks.spi.split.SplitConfigOptions;
-import com.geedgenetworks.spi.table.event.Event;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.flink.streaming.api.datastream.DataStream;
-
-import java.net.URL;
-import java.util.List;
-import java.util.Map;
-import java.util.ServiceLoader;
-
-
-/**
- * Initialize config and execute filter operator
- */
-@Slf4j
-public class SplitExecutor extends AbstractExecutor<String, SplitConfig> {
-
-
- public SplitExecutor(Config config) {
- super(config);
- }
-
- @Override
- protected Map<String, SplitConfig> initialize(Config operatorConfig) {
- Map<String, SplitConfig> splitConfigMap = Maps.newHashMap();
- if (operatorConfig.hasPath(Constants.SPLITS)) {
- Config splitsConfig = operatorConfig.getConfig(Constants.SPLITS);
- splitsConfig.root().unwrapped().forEach((key, value) -> {
- CheckResult result = CheckConfigUtil.checkAllExists(splitsConfig.getConfig(key),
- SplitConfigOptions.TYPE.key());
- if (!result.isSuccess()) {
- throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
- "split: %s, Message: %s",
- key, result.getMsg()));
- }
- SplitConfig splitConfig = new JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class);
- splitConfig.setName(key);
- splitConfigMap.put(key, splitConfig);
- });
- }
-
- return splitConfigMap;
- }
-
- @Override
- public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException {
- SplitConfig splitConfig = operatorMap.get(operatorNode.getName());
- boolean found = false; // 标志变量
- ServiceLoader<Split> splits = ServiceLoader.load(Split.class);
- for (Split split : splits) {
- found = true; // 标志变量
- if(split.type().equals(splitConfig.getType())){
- if (operatorNode.getParallelism() > 0) {
- splitConfig.setParallelism(operatorNode.getParallelism());
- }
- try {
- input =
- split.splitFunction(
- input, splitConfig);
- } catch (Exception e) {
- throw new JobExecuteException("Create split instance failed!", e);
- }
- break;
- }
- }
- if (!found) {
- throw new JobExecuteException("No matching split found for type: " + splitConfig.getType());
- }
- return input;
- }
-
- protected SplitConfig checkConfig(String key, Map<String, Object> value, Config config) {
- SplitConfig splitConfig = new SplitConfig();
- boolean found = false; // 标志变量
- ServiceLoader<Split> splits = ServiceLoader.load(Split.class);
- for (Split split : splits) {
- if(split.type().equals(value.getOrDefault("type", "").toString())){
- found = true;
- try {
- splitConfig = split.checkConfig(key, value, config);
- } catch (Exception e) {
- throw new JobExecuteException("Create split pipeline instance failed!", e);
- }
- break;
- }
- }
- if (!found) {
- throw new JobExecuteException("No matching split found for type: " + value.getOrDefault("type", "").toString());
- }
- return splitConfig;
- }
-}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/SchemaConfigParse.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/SchemaConfigParse.java
index f6b4292..00fcd61 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/SchemaConfigParse.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/SchemaConfigParse.java
@@ -3,10 +3,10 @@ package com.geedgenetworks.bootstrap.utils;
import com.alibaba.fastjson2.JSON;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
-import com.geedgenetworks.spi.table.schema.Schema;
-import com.geedgenetworks.spi.table.schema.SchemaParser;
-import com.geedgenetworks.spi.table.type.StructType;
-import com.geedgenetworks.spi.table.type.Types;
+import com.geedgenetworks.api.connector.schema.Schema;
+import com.geedgenetworks.api.connector.schema.SchemaParser;
+import com.geedgenetworks.api.connector.type.StructType;
+import com.geedgenetworks.api.connector.type.Types;
import org.apache.commons.io.FileUtils;
import java.io.File;
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java
index 74b40a4..ccb01a4 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java
@@ -1,6 +1,6 @@
package com.geedgenetworks.bootstrap.main.simple.collect;
-import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.api.connector.event.Event;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.util.*;
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java
index 130478e..15d6328 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java
@@ -1,8 +1,8 @@
package com.geedgenetworks.bootstrap.main.simple.collect;
-import com.geedgenetworks.spi.sink.SinkProvider;
-import com.geedgenetworks.spi.sink.SinkTableFactory;
-import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.api.connector.sink.SinkProvider;
+import com.geedgenetworks.api.connector.sink.SinkTableFactory;
+import com.geedgenetworks.api.connector.event.Event;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -16,7 +16,7 @@ import java.util.Set;
public class CollectTableFactory implements SinkTableFactory {
public static final String IDENTIFIER = "collect";
@Override
- public String factoryIdentifier() {
+ public String type() {
return IDENTIFIER;
}