diff options
Diffstat (limited to 'groot-bootstrap')
18 files changed, 261 insertions, 621 deletions
diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml index 150c941..60e602a 100644 --- a/groot-bootstrap/pom.xml +++ b/groot-bootstrap/pom.xml @@ -30,7 +30,7 @@ <dependency> <groupId>com.geedgenetworks</groupId> - <artifactId>groot-spi</artifactId> + <artifactId>groot-api</artifactId> <version>${revision}</version> </dependency> diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/StageType.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/OperatorType.java index 8b4e154..a32c844 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/StageType.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/OperatorType.java @@ -1,6 +1,6 @@ package com.geedgenetworks.bootstrap.enums; -public enum StageType { +public enum OperatorType { SOURCE("source"), FILTER("filter"), SPLIT("split"), @@ -13,10 +13,10 @@ public enum StageType { public String getType() { return type; } - StageType(String type) {this.type = type;} + OperatorType(String type) {this.type = type;} - public static StageType fromType(String type) { - for (StageType stage : values()) { + public static OperatorType fromType(String type) { + for (OperatorType stage : values()) { if (stage.type.equalsIgnoreCase(type)) { return stage; } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java index fe440f7..8ad33a2 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java @@ -1,35 +1,25 @@ package com.geedgenetworks.bootstrap.execution; import com.geedgenetworks.common.utils.ReflectionUtils; -import com.geedgenetworks.spi.table.event.Event; -import com.typesafe.config.Config; +import com.geedgenetworks.api.connector.event.Event; import org.apache.flink.streaming.api.datastream.DataStream; import java.net.URL; import java.net.URLClassLoader; -import java.util.*; import java.util.function.BiConsumer; -public abstract class AbstractExecutor<K, V> - implements Executor<DataStream<Event>, JobRuntimeEnvironment> { - protected JobRuntimeEnvironment jobRuntimeEnvironment; - protected final Config operatorConfig; - protected final Map<K,V> operatorMap; +public abstract class AbstractExecutor<E, C> implements Executor<DataStream<Event>> { + public E environment; + protected final C jobConfig; - protected AbstractExecutor(Config operatorConfig) { - this.operatorConfig = operatorConfig; - this.operatorMap = initialize(operatorConfig); + protected AbstractExecutor(E environment, C jobConfig) { + this.environment = environment; + this.jobConfig = jobConfig; + initialize(jobConfig); } + protected abstract void initialize(C jobConfig); - @Override - public void setRuntimeEnvironment(JobRuntimeEnvironment jobRuntimeEnvironment) { - this.jobRuntimeEnvironment = jobRuntimeEnvironment; - - } - - protected abstract Map<K, V> initialize(Config operatorConfig); - - protected static final BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER = + protected static final BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER = (classLoader, url) -> { if (classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) { URLClassLoader c = diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java deleted file mode 100644 index a45380e..0000000 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java +++ /dev/null @@ -1,87 +0,0 @@ -package com.geedgenetworks.bootstrap.execution; - -import com.geedgenetworks.bootstrap.exception.JobExecuteException; -import com.geedgenetworks.common.config.*; -import com.geedgenetworks.common.exception.CommonErrorCode; -import com.geedgenetworks.common.exception.ConfigValidationException; -import com.geedgenetworks.spi.configuration.ProjectionConfigOptions; -import com.geedgenetworks.spi.processor.Processor; -import com.geedgenetworks.spi.processor.ProcessorConfig; -import com.geedgenetworks.spi.table.event.Event; -import com.typesafe.config.Config; -import org.apache.flink.streaming.api.datastream.DataStream; - -import java.net.URL; -import java.util.List; -import java.util.Map; -import java.util.ServiceLoader; - -public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, ProcessorConfig> { - - - protected AbstractProcessorExecutor(Config operatorConfig) { - super(operatorConfig); - } - - @Override - public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException { - - ProcessorConfig processorConfig = operatorMap.get(operatorNode.getName()); - boolean found = false; // 标志变量 - ServiceLoader<Processor> processors = ServiceLoader.load(Processor.class); - for (Processor processor : processors) { - if(processor.type().equals(processorConfig.getType())){ - found = true; - if (operatorNode.getParallelism() > 0) { - processorConfig.setParallelism(operatorNode.getParallelism()); - } - try { - input = processor.process(jobRuntimeEnvironment.getStreamExecutionEnvironment(), input, processorConfig); - } catch (Exception e) { - throw new JobExecuteException("Create orderby pipeline instance failed!", e); - } - break; - } - } - if (!found) { - throw new JobExecuteException("No matching processor found for type: " + processorConfig.getType()); - } - return input; - } - - protected ProcessorConfig checkConfig(String key, Map<String, Object> value, Config processorsConfig) { - ProcessorConfig ProcessorConfig = new ProcessorConfig(); - boolean found = false; // 标志变量 - CheckResult result = CheckConfigUtil.checkAllExists(processorsConfig.getConfig(key), - ProjectionConfigOptions.TYPE.key()); - if (!result.isSuccess()) { - throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( - "Postprocessor: %s, Message: %s", - key, result.getMsg())); - } - ServiceLoader<Processor> processors = ServiceLoader.load(Processor.class); - for (Processor processor : processors) { - if(processor.type().equals(value.getOrDefault("type", "").toString())){ - found = true; - try { - ProcessorConfig = processor.checkConfig(key, value, processorsConfig); - - } catch (Exception e) { - throw new JobExecuteException("Create orderby pipeline instance failed!", e); - } - break; - } - } - if (!found) { - throw new JobExecuteException("No matching processor found for type: " + value.getOrDefault("type", "").toString()); - } - return ProcessorConfig; - } - - - - - - - -} diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java index e43c949..e36971d 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java @@ -2,10 +2,7 @@ package com.geedgenetworks.bootstrap.execution; import com.geedgenetworks.bootstrap.exception.JobExecuteException; -public interface Executor<T, ENV extends RuntimeEnvironment> { - - T execute(T dataStream, OperatorNode edge) throws JobExecuteException; - - void setRuntimeEnvironment(ENV runtimeEnvironment); +public interface Executor<T> { + T execute(T dataStream, JobTopologyNode jobTopologyNode) throws JobExecuteException; } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java deleted file mode 100644 index d70420e..0000000 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java +++ /dev/null @@ -1,102 +0,0 @@ -package com.geedgenetworks.bootstrap.execution; - -import com.geedgenetworks.bootstrap.enums.StageType; -import com.geedgenetworks.bootstrap.exception.JobExecuteException; -import com.geedgenetworks.common.config.Constants; -import com.geedgenetworks.common.config.CheckConfigUtil; -import com.geedgenetworks.common.config.CheckResult; -import com.geedgenetworks.common.exception.CommonErrorCode; -import com.geedgenetworks.common.exception.ConfigValidationException; - -import com.geedgenetworks.spi.configuration.FilterConfigOptions; -import com.geedgenetworks.spi.filter.Filter; -import com.geedgenetworks.spi.filter.FilterConfig; -import com.geedgenetworks.spi.table.event.Event; -import com.google.common.collect.Maps; -import com.typesafe.config.Config; -import lombok.extern.slf4j.Slf4j; -import org.apache.flink.streaming.api.datastream.DataStream; - -import java.net.URL; -import java.util.List; -import java.util.Map; -import java.util.ServiceLoader; - -/** - * Initialize config and execute filter operator - */ -@Slf4j -public class FilterExecutor extends AbstractExecutor<String, FilterConfig> { - private static final String PROCESSOR_TYPE = StageType.FILTER.getType(); - - public FilterExecutor(Config config) { - super(config); - } - - @Override - protected Map<String, FilterConfig> initialize(Config operatorConfig) { - Map<String, FilterConfig> filterConfigMap = Maps.newHashMap(); - if (operatorConfig.hasPath(Constants.FILTERS)) { - Config filterConfig = operatorConfig.getConfig(Constants.FILTERS); - filterConfig.root().unwrapped().forEach((key, value) -> { - CheckResult result = CheckConfigUtil.checkAllExists(filterConfig.getConfig(key), - FilterConfigOptions.TYPE.key(), FilterConfigOptions.PROPERTIES.key()); - if (!result.isSuccess()) { - throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( - "Filter: %s, Message: %s", - key, result.getMsg())); - } - filterConfigMap.put(key, checkConfig(key, (Map<String, Object>) value, filterConfig)); - }); - } - - return filterConfigMap; - } - - @Override - public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException { - FilterConfig filterConfig = operatorMap.get(operatorNode.getName()); - boolean found = false; // 标志变量 - ServiceLoader<Filter> filters = ServiceLoader.load(Filter.class); - for (Filter filter : filters) { - if(filter.type().equals(filterConfig.getType())){ - found = true; - if (operatorNode.getParallelism() > 0) { - filterConfig.setParallelism(operatorNode.getParallelism()); - } - try { - input = - filter.filterFunction( - input, filterConfig); - } catch (Exception e) { - throw new JobExecuteException("Create filter instance failed!", e); - } - break; - } - } - if (!found) { - throw new JobExecuteException("No matching filter found for type: " + filterConfig.getType()); - } - return input; - } - - protected FilterConfig checkConfig(String key, Map<String, Object> value, Config config) { - FilterConfig filterConfig = new FilterConfig(); - boolean found = false; // 标志变量 - ServiceLoader<Filter> filters = ServiceLoader.load(Filter.class); - for (Filter filter : filters) { - if(filter.type().equals(value.getOrDefault("type", "").toString())){ - found = true; - try { - filterConfig = filter.checkConfig(key, value, config); - } catch (Exception e) { - throw new JobExecuteException("Create split pipeline instance failed!", e); - } - } - } - if (!found) { - throw new JobExecuteException("No matching filter found for type: " + value.getOrDefault("type", "").toString()); - } - return filterConfig; - } -} diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java index cd70f44..ad31d88 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java @@ -1,12 +1,12 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson2.JSONObject; -import com.geedgenetworks.bootstrap.enums.StageType; +import com.geedgenetworks.bootstrap.enums.OperatorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.main.GrootStreamRunner; import com.geedgenetworks.common.config.Constants; import com.geedgenetworks.common.config.GrootStreamConfig; -import com.geedgenetworks.spi.table.event.Event; +import com.geedgenetworks.api.connector.event.Event; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.typesafe.config.Config; @@ -29,14 +29,10 @@ import java.util.stream.Stream; public class JobExecution { private final JobRuntimeEnvironment jobRuntimeEnvironment; - private final Executor<DataStream<Event>, JobRuntimeEnvironment> sourceExecutor; - private final Executor<DataStream<Event>, JobRuntimeEnvironment> sinkExecutor; - private final Executor<DataStream<Event>, JobRuntimeEnvironment> filterExecutor; - private final Executor<DataStream<Event>, JobRuntimeEnvironment> splitExecutor; - private final Executor<DataStream<Event>, JobRuntimeEnvironment> preprocessingExecutor; - private final Executor<DataStream<Event>, JobRuntimeEnvironment> processingExecutor; - private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor; - private final List<OperatorNode> operatorNodes; + private final Executor<DataStream<Event>> sourceExecutor; + private final Executor<DataStream<Event>> sinkExecutor; + private final Executor<DataStream<Event>> processorExecutor; + private final List<JobTopologyNode> jobTopologyNodes; private final List<URL> jarPaths; private final Map<String,String> nodeNameWithSplitTags = new HashMap<>(); @@ -50,25 +46,13 @@ public class JobExecution { } registerPlugin(jobConfig.getConfig(Constants.APPLICATION)); - - this.sourceExecutor = new SourceExecutor(jobConfig); - this.sinkExecutor = new SinkExecutor(jobConfig); - this.filterExecutor = new FilterExecutor(jobConfig); - this.splitExecutor = new SplitExecutor(jobConfig); - this.preprocessingExecutor = new PreprocessingExecutor(jobConfig); - this.processingExecutor = new ProcessingExecutor(jobConfig); - this.postprocessingExecutor = new PostprocessingExecutor(jobConfig); this.jobRuntimeEnvironment = JobRuntimeEnvironment.getInstance(this.registerPlugin(jobConfig, jarPaths), grootStreamConfig); - this.sourceExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.sinkExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.filterExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.splitExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.operatorNodes = buildJobNode(jobConfig); + this.sourceExecutor = new SourceExecutor(jobRuntimeEnvironment, jobConfig); + this.sinkExecutor = new SinkExecutor(jobRuntimeEnvironment, jobConfig); + this.processorExecutor = new ProcessorExecutor(jobRuntimeEnvironment, jobConfig); + this.jobTopologyNodes = buildJobNode(jobConfig); } @@ -88,7 +72,7 @@ public class JobExecution { try { return uri.toURL(); } catch (MalformedURLException e) { - throw new RuntimeException("the uri of jar illegal: " + uri, e); + throw new RuntimeException("The uri of jar illegal:" + uri, e); } }).collect(Collectors.toList()); jarDependencies.forEach(url -> { @@ -153,7 +137,7 @@ public class JobExecution { return config; } - private List<OperatorNode> buildJobNode(Config config) { + private List<JobTopologyNode> buildJobNode(Config config) { Map<String, Object> sources = Maps.newHashMap(); Map<String, Object> sinks = Maps.newHashMap(); @@ -187,34 +171,34 @@ public class JobExecution { List<? extends Config> topology = config.getConfig(Constants.APPLICATION).getConfigList(Constants.APPLICATION_TOPOLOGY); - List<OperatorNode> operatorNodes = Lists.newArrayList(); + List<JobTopologyNode> jobTopologyNodes = Lists.newArrayList(); topology.forEach(item -> { - OperatorNode operatorNode = JSONObject.from(item.root().unwrapped()).toJavaObject(OperatorNode.class); - operatorNodes.add(operatorNode); + JobTopologyNode jobTopologyNode = JSONObject.from(item.root().unwrapped()).toJavaObject(JobTopologyNode.class); + jobTopologyNodes.add(jobTopologyNode); }); - for (OperatorNode operatorNode : operatorNodes) { - if (sources.containsKey(operatorNode.getName())) { - operatorNode.setType(StageType.SOURCE); - } else if (sinks.containsKey(operatorNode.getName())) { - operatorNode.setType(StageType.SINK); - } else if (filters.containsKey(operatorNode.getName())) { - operatorNode.setType(StageType.FILTER); - } else if (splits.containsKey(operatorNode.getName())) { - operatorNode.setType(StageType.SPLIT); - } else if (preprocessingPipelines.containsKey(operatorNode.getName())) { - operatorNode.setType(StageType.PREPROCESSING); - } else if (processingPipelines.containsKey(operatorNode.getName())) { - operatorNode.setType(StageType.PROCESSING); - } else if (postprocessingPipelines.containsKey(operatorNode.getName())) { - operatorNode.setType(StageType.POSTPROCESSING); + for (JobTopologyNode jobTopologyNode : jobTopologyNodes) { + if (sources.containsKey(jobTopologyNode.getName())) { + jobTopologyNode.setType(OperatorType.SOURCE); + } else if (sinks.containsKey(jobTopologyNode.getName())) { + jobTopologyNode.setType(OperatorType.SINK); + } else if (filters.containsKey(jobTopologyNode.getName())) { + jobTopologyNode.setType(OperatorType.FILTER); + } else if (splits.containsKey(jobTopologyNode.getName())) { + jobTopologyNode.setType(OperatorType.SPLIT); + } else if (preprocessingPipelines.containsKey(jobTopologyNode.getName())) { + jobTopologyNode.setType(OperatorType.PREPROCESSING); + } else if (processingPipelines.containsKey(jobTopologyNode.getName())) { + jobTopologyNode.setType(OperatorType.PROCESSING); + } else if (postprocessingPipelines.containsKey(jobTopologyNode.getName())) { + jobTopologyNode.setType(OperatorType.POSTPROCESSING); } else { - throw new JobExecuteException("unsupported process type " + operatorNode.getName()); + throw new JobExecuteException("unsupported process type " + jobTopologyNode.getName()); } } - return operatorNodes; + return jobTopologyNodes; } @@ -223,14 +207,14 @@ public class JobExecution { if (!jobRuntimeEnvironment.isLocalMode() && !jobRuntimeEnvironment.isTestMode()) { jobRuntimeEnvironment.registerPlugin(jarPaths); } - List<OperatorNode> sourceOperatorNodes = operatorNodes - .stream().filter(v -> v.getType().name().equals(StageType.SOURCE.name())).collect(Collectors.toList()); + List<JobTopologyNode> sourceJobTopologyNodes = jobTopologyNodes + .stream().filter(v -> v.getType().name().equals(OperatorType.SOURCE.name())).collect(Collectors.toList()); DataStream<Event> dataStream = null; - for (OperatorNode sourceOperatorNode : sourceOperatorNodes) { - dataStream = sourceExecutor.execute(dataStream, sourceOperatorNode); - for (String nodeName : sourceOperatorNode.getDownstream()) { + for (JobTopologyNode sourceJobTopologyNode : sourceJobTopologyNodes) { + dataStream = sourceExecutor.execute(dataStream, sourceJobTopologyNode); + for (String nodeName : sourceJobTopologyNode.getDownstream()) { buildJobGraph(dataStream, nodeName); } } @@ -251,68 +235,68 @@ public class JobExecution { } private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) { - OperatorNode operatorNode = getNode(downstreamNodeName).orElseGet(() -> { + JobTopologyNode jobTopologyNode = getNode(downstreamNodeName).orElseGet(() -> { throw new JobExecuteException("Can't find downstream node " + downstreamNodeName); }); - if (operatorNode.getType().name().equals(StageType.FILTER.name())) { - if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) { - dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream) - .getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())) {}), operatorNode); + if (jobTopologyNode.getType().name().equals(OperatorType.FILTER.name())) { + if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) { + dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream) + .getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) {}), jobTopologyNode); } else { - dataStream = filterExecutor.execute(dataStream, operatorNode); + dataStream = processorExecutor.execute(dataStream, jobTopologyNode); } - } else if (operatorNode.getType().name().equals(StageType.SPLIT.name())) { - if (operatorNode.getTags().size() == operatorNode.getDownstream().size()) { - for (int i = 0; i < operatorNode.getDownstream().size(); i++) { - nodeNameWithSplitTags.put(operatorNode.getDownstream().get(i), operatorNode.getTags().get(i)); + } else if (jobTopologyNode.getType().name().equals(OperatorType.SPLIT.name())) { + if (jobTopologyNode.getTags().size() == jobTopologyNode.getDownstream().size()) { + for (int i = 0; i < jobTopologyNode.getDownstream().size(); i++) { + nodeNameWithSplitTags.put(jobTopologyNode.getDownstream().get(i), jobTopologyNode.getTags().get(i)); } } else { throw new JobExecuteException("split node downstream size not equal tags size"); } - dataStream = splitExecutor.execute(dataStream, operatorNode); - } else if (operatorNode.getType().name().equals(StageType.PREPROCESSING.name())) { - if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) { - dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())){ - }), operatorNode); + dataStream = processorExecutor.execute(dataStream, jobTopologyNode); + } else if (jobTopologyNode.getType().name().equals(OperatorType.PREPROCESSING.name())) { + if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) { + dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())){ + }), jobTopologyNode); } else { - dataStream = preprocessingExecutor.execute(dataStream, operatorNode); + dataStream = processorExecutor.execute(dataStream, jobTopologyNode); } - } else if (operatorNode.getType().name().equals(StageType.PROCESSING.name())) { - if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) { - dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())) { - }), operatorNode); + } else if (jobTopologyNode.getType().name().equals(OperatorType.PROCESSING.name())) { + if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) { + dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) { + }), jobTopologyNode); } else { - dataStream = processingExecutor.execute(dataStream, operatorNode); + dataStream = processorExecutor.execute(dataStream, jobTopologyNode); } - } else if (operatorNode.getType().name().equals(StageType.POSTPROCESSING.name())) { - if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) { - dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())) { - }), operatorNode); + } else if (jobTopologyNode.getType().name().equals(OperatorType.POSTPROCESSING.name())) { + if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) { + dataStream = processorExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) { + }), jobTopologyNode); } else { - dataStream = postprocessingExecutor.execute(dataStream, operatorNode); + dataStream = processorExecutor.execute(dataStream, jobTopologyNode); } - } else if (operatorNode.getType().name().equals(StageType.SINK.name())) { - if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) { - dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())) { - }), operatorNode); + } else if (jobTopologyNode.getType().name().equals(OperatorType.SINK.name())) { + if (nodeNameWithSplitTags.containsKey(jobTopologyNode.getName())) { + dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(jobTopologyNode.getName())) { + }), jobTopologyNode); } else { - dataStream = sinkExecutor.execute(dataStream, operatorNode); + dataStream = sinkExecutor.execute(dataStream, jobTopologyNode); } } else { - throw new JobExecuteException("unsupported process type " + operatorNode.getType().name()); + throw new JobExecuteException("unsupported process type " + jobTopologyNode.getType().name()); } - for (String nodeName : operatorNode.getDownstream()) { + for (String nodeName : jobTopologyNode.getDownstream()) { buildJobGraph(dataStream, nodeName); } } - private Optional<OperatorNode> getNode(String name) { - return operatorNodes.stream().filter(v -> v.getName().equals(name)).findFirst(); + private Optional<JobTopologyNode> getNode(String name) { + return jobTopologyNodes.stream().filter(v -> v.getName().equals(name)).findFirst(); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/OperatorNode.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java index 8c4b392..dcc15e9 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/OperatorNode.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobTopologyNode.java @@ -1,6 +1,6 @@ package com.geedgenetworks.bootstrap.execution; -import com.geedgenetworks.bootstrap.enums.StageType; +import com.geedgenetworks.bootstrap.enums.OperatorType; import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; @@ -10,13 +10,16 @@ import java.io.Serializable; import java.util.Collections; import java.util.List; +/** + * Represents an operator node in the execution graph. + */ @Data @NoArgsConstructor @AllArgsConstructor @EqualsAndHashCode -public class OperatorNode implements Serializable { +public class JobTopologyNode implements Serializable { private String name; - private StageType type; + private OperatorType type; private int parallelism; private List<String> downstream = Collections.emptyList(); private List<String> tags = Collections.emptyList(); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java deleted file mode 100644 index 10d9188..0000000 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.geedgenetworks.bootstrap.execution; - -import com.geedgenetworks.bootstrap.enums.StageType; -import com.geedgenetworks.bootstrap.exception.JobExecuteException; -import com.geedgenetworks.common.config.Constants; -import com.geedgenetworks.spi.processor.ProcessorConfig; -import com.geedgenetworks.spi.table.event.Event; -import com.google.common.collect.Maps; -import com.typesafe.config.Config; -import org.apache.flink.streaming.api.datastream.DataStream; - -import java.net.URL; -import java.util.List; -import java.util.Map; - -/** - * Initialize config and execute postprocessor - */ -public class PostprocessingExecutor extends AbstractProcessorExecutor { - private static final String PROCESSOR_TYPE = StageType.POSTPROCESSING.getType(); - - public PostprocessingExecutor(Config config) { - super(config); - } - - @Override - protected Map<String, ProcessorConfig> initialize(Config operatorConfig) { - Map<String, ProcessorConfig> postprocessingConfigMap = Maps.newHashMap(); - if (operatorConfig.hasPath(Constants.POSTPROCESSING_PIPELINES)) { - Config postprocessors = operatorConfig.getConfig(Constants.POSTPROCESSING_PIPELINES); - postprocessors.root().unwrapped().forEach((key, value) -> { - postprocessingConfigMap.put(key, checkConfig(key, (Map<String, Object>) value, postprocessors)); - }); - } - return postprocessingConfigMap; - } - - - @Override - public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException { - return super.execute(input, operatorNode); - } -} diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java deleted file mode 100644 index 9acda99..0000000 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.geedgenetworks.bootstrap.execution; - -import com.geedgenetworks.bootstrap.enums.StageType; -import com.geedgenetworks.bootstrap.exception.JobExecuteException; -import com.geedgenetworks.common.config.Constants; -import com.geedgenetworks.spi.processor.ProcessorConfig; -import com.geedgenetworks.spi.table.event.Event; -import com.google.common.collect.Maps; -import com.typesafe.config.Config; -import lombok.extern.slf4j.Slf4j; -import org.apache.flink.streaming.api.datastream.DataStream; - -import java.net.URL; -import java.util.List; -import java.util.Map; - -/** - * Initialize config and execute preprocessor - */ -@Slf4j -public class PreprocessingExecutor extends AbstractProcessorExecutor { - private static final String PROCESSOR_TYPE = StageType.PREPROCESSING.getType(); - - public PreprocessingExecutor(Config config) { - super(config); - } - - @Override - protected Map<String, ProcessorConfig> initialize(Config operatorConfig) { - Map<String, ProcessorConfig> preprocessingConfigMap = Maps.newHashMap(); - if (operatorConfig.hasPath(Constants.PREPROCESSING_PIPELINES)) { - Config preprocessors = operatorConfig.getConfig(Constants.PREPROCESSING_PIPELINES); - preprocessors.root().unwrapped().forEach((key, value) -> { - preprocessingConfigMap.put(key, checkConfig(key, (Map<String, Object>) value, preprocessors)); - }); - } - return preprocessingConfigMap; - } - - @Override - public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException { - - return super.execute(input, operatorNode); - - } -} diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java deleted file mode 100644 index c49df88..0000000 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.geedgenetworks.bootstrap.execution; - -import com.geedgenetworks.bootstrap.enums.StageType; -import com.geedgenetworks.bootstrap.exception.JobExecuteException; -import com.geedgenetworks.common.config.Constants; -import com.geedgenetworks.spi.processor.Processor; -import com.geedgenetworks.spi.processor.ProcessorConfig; -import com.geedgenetworks.spi.processor.ProcessorProvider; -import com.geedgenetworks.spi.table.event.Event; -import com.google.common.collect.Maps; -import com.typesafe.config.Config; -import org.apache.flink.streaming.api.datastream.DataStream; - -import java.net.URL; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Initialize config and execute processor - */ -public class ProcessingExecutor extends AbstractProcessorExecutor { - private static final String PROCESSOR_TYPE = StageType.PROCESSING.getType(); - //private Map<String, Processor<?>> processors; - - public ProcessingExecutor(Config config) { - super(config); - } - - @Override - protected Map<String, ProcessorConfig> initialize(Config operatorConfig) { - Map<String, ProcessorConfig> processingConfigMap = Maps.newHashMap(); - //processors = new HashMap<>(); - if (operatorConfig.hasPath(Constants.PROCESSING_PIPELINES)) { - Config processingConfig = operatorConfig.getConfig(Constants.PROCESSING_PIPELINES); - processingConfig.root().unwrapped().forEach((key, value) -> { - processingConfigMap.put(key, checkConfig(key, (Map<String, Object>) value, processingConfig)); - //processors.put(key, ProcessorProvider.load(((Map<?, ?>) value).get("type").toString())); - - }); - } - return processingConfigMap; - } - - @Override - public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException { - return super.execute(input, operatorNode); - } -} diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessorExecutor.java new file mode 100644 index 0000000..204866f --- /dev/null +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessorExecutor.java @@ -0,0 +1,102 @@ +package com.geedgenetworks.bootstrap.execution; + +import com.geedgenetworks.api.processor.ProcessorConfigOptions; +import com.geedgenetworks.api.factory.FactoryUtil; +import com.geedgenetworks.api.factory.ProcessorFactory; +import com.geedgenetworks.bootstrap.exception.JobExecuteException; +import com.geedgenetworks.common.config.CheckConfigUtil; +import com.geedgenetworks.common.config.CheckResult; +import com.geedgenetworks.common.config.Constants; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.ConfigValidationException; +import com.geedgenetworks.api.processor.Processor; +import com.geedgenetworks.api.processor.ProcessorConfig; +import com.geedgenetworks.api.connector.event.Event; +import com.google.common.collect.Maps; +import com.typesafe.config.Config; +import org.apache.flink.streaming.api.datastream.DataStream; +import java.util.Map; +/** + * Initialize config and execute processor + */ +public class ProcessorExecutor extends AbstractExecutor<JobRuntimeEnvironment, Config> { + private Map<String, ProcessorConfig> operators; + private Map<String, Processor<?>> processors; + + public ProcessorExecutor(JobRuntimeEnvironment environment, Config jobConfig) { + super(environment, jobConfig); + } + + @Override + protected void initialize(Config jobConfig) { + operators = Maps.newHashMap(); + processors = Maps.newHashMap(); + + if (jobConfig.hasPath(Constants.FILTERS)) { + discoveryProcessors(jobConfig.getConfig(Constants.FILTERS)); + } + + if (jobConfig.hasPath(Constants.SPLITS)) { + discoveryProcessors(jobConfig.getConfig(Constants.SPLITS)); + } + + if (jobConfig.hasPath(Constants.PREPROCESSING_PIPELINES)) { + discoveryProcessors(jobConfig.getConfig(Constants.PREPROCESSING_PIPELINES)); + } + + if (jobConfig.hasPath(Constants.PROCESSING_PIPELINES)) { + discoveryProcessors(jobConfig.getConfig(Constants.PROCESSING_PIPELINES)); + } + + if (jobConfig.hasPath(Constants.POSTPROCESSING_PIPELINES)) { + discoveryProcessors(jobConfig.getConfig(Constants.POSTPROCESSING_PIPELINES)); + } + } + + private void discoveryProcessors(Config config) { + + config.root().unwrapped().forEach((key, value) -> { + + CheckResult result = CheckConfigUtil.checkAllExists(config.getConfig(key), + ProcessorConfigOptions.TYPE.key()); + if (!result.isSuccess()) { + throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( + "Processor: %s, Message: %s", key, result.getMsg())); + } + + Processor processor = FactoryUtil + .discoverProcessorFactory(ProcessorFactory.class, ((Map<?, ?>) value).get("type").toString()).createProcessor(); + processors.put(key, processor); + operators.put(key, processor.parseConfig(key,config.getConfig(key))); + }); + + } + + @Override + public DataStream<Event> execute(DataStream<Event> input, JobTopologyNode jobTopologyNode) throws JobExecuteException { + String name = jobTopologyNode.getName(); + ProcessorConfig operatorConfig = operators.get(name); + if (operatorConfig == null) { + throw new JobExecuteException("No matching operator configuration found for: " + name); + } + + Processor processor = processors.get(operatorConfig.getName()); + + if (processor == null) { + throw new JobExecuteException("No matching processor found for type: " + operatorConfig.getType()); + } + + // Set parallelism if needed + int parallelism = jobTopologyNode.getParallelism(); + if (parallelism > 0) { + operatorConfig.setParallelism(parallelism); + } + + try { + return processor.process(environment.getStreamExecutionEnvironment(), input, operatorConfig); + } catch (Exception e) { + throw new JobExecuteException("Failed to execute processor due to unexpected error.", e); + } + + } +} diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java index 130705a..501fa81 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java @@ -1,7 +1,7 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson.JSONObject; -import com.geedgenetworks.bootstrap.enums.StageType; +import com.geedgenetworks.bootstrap.enums.OperatorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.utils.SchemaConfigParse; import com.geedgenetworks.common.config.Constants; @@ -9,14 +9,14 @@ import com.geedgenetworks.common.config.CheckConfigUtil; import com.geedgenetworks.common.config.CheckResult; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; -import com.geedgenetworks.spi.sink.SinkConfig; -import com.geedgenetworks.spi.sink.SinkConfigOptions; -import com.geedgenetworks.spi.sink.SinkProvider; -import com.geedgenetworks.spi.sink.SinkTableFactory; -import com.geedgenetworks.spi.table.event.Event; -import com.geedgenetworks.spi.table.factory.FactoryUtil; -import com.geedgenetworks.spi.table.factory.TableFactory; -import com.geedgenetworks.spi.table.schema.Schema; +import com.geedgenetworks.api.connector.sink.SinkConfig; +import com.geedgenetworks.api.connector.sink.SinkConfigOptions; +import com.geedgenetworks.api.connector.sink.SinkProvider; +import com.geedgenetworks.api.connector.sink.SinkTableFactory; +import com.geedgenetworks.api.connector.event.Event; +import com.geedgenetworks.api.factory.FactoryUtil; +import com.geedgenetworks.api.factory.ConnectorFactory; +import com.geedgenetworks.api.connector.schema.Schema; import com.google.common.collect.Maps; import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; @@ -30,18 +30,17 @@ import java.util.Map; * Initialize config and execute sink connector */ @Slf4j -public class SinkExecutor extends AbstractExecutor<String, SinkConfig> { - private static final String PROCESSOR_TYPE = StageType.SINK.getType(); - - public SinkExecutor(Config config) { - super(config); +public class SinkExecutor extends AbstractExecutor<JobRuntimeEnvironment, Config> { + private static final String PROCESSOR_TYPE = OperatorType.SINK.getType(); + private Map<String, SinkConfig> operators; + public SinkExecutor(JobRuntimeEnvironment environment, Config jobConfig) { + super(environment, jobConfig); } @Override - protected Map<String, SinkConfig> initialize(Config operatorConfig) { - Map<String, SinkConfig> sinkConfigMap = Maps.newHashMap(); - - if (operatorConfig.hasPath(Constants.SINKS)) { - Config sinks = operatorConfig.getConfig(Constants.SINKS); + protected void initialize(Config jobConfig) { + operators = Maps.newHashMap(); + if (jobConfig.hasPath(Constants.SINKS)) { + Config sinks = jobConfig.getConfig(Constants.SINKS); sinks.root().unwrapped().forEach((key,value) -> { CheckResult result = CheckConfigUtil.checkAllExists(sinks.getConfig(key), SinkConfigOptions.TYPE.key(), SinkConfigOptions.PROPERTIES.key()); @@ -53,26 +52,25 @@ public class SinkExecutor extends AbstractExecutor<String, SinkConfig> { SinkConfig sinkConfig = new JSONObject((Map<String, Object>) value).toJavaObject(SinkConfig.class); sinkConfig.setName(key); - sinkConfigMap.put(key, sinkConfig); + operators.put(key, sinkConfig); }); } - return sinkConfigMap; } @Override - public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException { - SinkConfig sinkConfig = operatorMap.get(operatorNode.getName()); + public DataStream<Event> execute(DataStream<Event> input, JobTopologyNode jobTopologyNode) throws JobExecuteException { + SinkConfig sinkConfig = operators.get(jobTopologyNode.getName()); try { - SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, sinkConfig.getType()); + SinkTableFactory sinkTableFactory = FactoryUtil.discoverConnectorFactory(SinkTableFactory.class, sinkConfig.getType()); Map<String, String> options = sinkConfig.getProperties(); Configuration configuration = Configuration.fromMap(options); Schema schema = null; if(sinkConfig.getSchema() != null && !sinkConfig.getSchema().isEmpty()){ schema = SchemaConfigParse.parseSchemaConfig(sinkConfig.getSchema()); } - TableFactory.Context context = new TableFactory.Context(schema, options, configuration); + ConnectorFactory.Context context = new ConnectorFactory.Context(schema, options, configuration); SinkProvider sinkProvider = sinkTableFactory.getSinkProvider(context); if(!sinkProvider.supportDynamicSchema() && schema != null && schema.isDynamic()){ throw new UnsupportedOperationException(String.format("sink(%s) not support DynamicSchema", sinkConfig.getName())); @@ -83,8 +81,8 @@ public class SinkExecutor extends AbstractExecutor<String, SinkConfig> { } DataStreamSink<?> dataStreamSink = sinkProvider.consumeDataStream(input); - if (operatorNode.getParallelism() > 0) { - dataStreamSink.setParallelism(operatorNode.getParallelism()); + if (jobTopologyNode.getParallelism() > 0) { + dataStreamSink.setParallelism(jobTopologyNode.getParallelism()); } dataStreamSink.name(sinkConfig.getName()); } catch (Exception e) { diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java index 5109540..ca4fc1d 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java @@ -1,7 +1,7 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson2.JSONObject; -import com.geedgenetworks.bootstrap.enums.StageType; +import com.geedgenetworks.bootstrap.enums.OperatorType; import com.geedgenetworks.bootstrap.exception.ConfigCheckException; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.utils.SchemaConfigParse; @@ -10,14 +10,14 @@ import com.geedgenetworks.common.config.CheckConfigUtil; import com.geedgenetworks.common.config.CheckResult; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; -import com.geedgenetworks.spi.configuration.SourceConfigOptions; -import com.geedgenetworks.spi.source.SourceConfig; -import com.geedgenetworks.spi.source.SourceProvider; -import com.geedgenetworks.spi.source.SourceTableFactory; -import com.geedgenetworks.spi.table.event.Event; -import com.geedgenetworks.spi.table.factory.FactoryUtil; -import com.geedgenetworks.spi.table.factory.TableFactory; -import com.geedgenetworks.spi.table.schema.Schema; +import com.geedgenetworks.api.connector.source.SourceConfigOptions; +import com.geedgenetworks.api.connector.source.SourceConfig; +import com.geedgenetworks.api.connector.source.SourceProvider; +import com.geedgenetworks.api.connector.source.SourceTableFactory; +import com.geedgenetworks.api.connector.event.Event; +import com.geedgenetworks.api.factory.FactoryUtil; +import com.geedgenetworks.api.factory.ConnectorFactory; +import com.geedgenetworks.api.connector.schema.Schema; import com.google.common.collect.Maps; import com.typesafe.config.*; import lombok.extern.slf4j.Slf4j; @@ -35,17 +35,17 @@ import java.util.Map; * Initialize config and execute source connector */ @Slf4j -public class SourceExecutor extends AbstractExecutor<String, SourceConfig> { - private static final String PROCESSOR_TYPE = StageType.SOURCE.getType(); - - public SourceExecutor(Config config) { - super(config); +public class SourceExecutor extends AbstractExecutor<JobRuntimeEnvironment, Config> { + private static final String PROCESSOR_TYPE = OperatorType.SOURCE.getType(); + private Map<String, SourceConfig> operators; + public SourceExecutor(JobRuntimeEnvironment environment, Config jobConfig) { + super(environment, jobConfig); } @Override - protected Map<String, SourceConfig> initialize(Config operatorConfig) { - Map<String, SourceConfig> sourceConfigMap = Maps.newHashMap(); - if (operatorConfig.hasPath(Constants.SOURCES)) { - Config sources = operatorConfig.getConfig(Constants.SOURCES); + protected void initialize(Config jobConfig) { + operators = Maps.newHashMap(); + if (jobConfig.hasPath(Constants.SOURCES)) { + Config sources = jobConfig.getConfig(Constants.SOURCES); sources.root().unwrapped().forEach((key,value) -> { CheckResult result = CheckConfigUtil.checkAllExists(sources.getConfig(key), SourceConfigOptions.TYPE.key(), SourceConfigOptions.PROPERTIES.key()); @@ -57,27 +57,25 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> { SourceConfig sourceConfig = new JSONObject((Map<String, Object>) value).toJavaObject(SourceConfig.class); sourceConfig.setName(key); - sourceConfigMap.put(key, sourceConfig); + operators.put(key, sourceConfig); }); } - - return sourceConfigMap; } @Override - public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException { - SourceConfig sourceConfig = operatorMap.get(operatorNode.getName()); + public DataStream<Event> execute(DataStream<Event> input, JobTopologyNode jobTopologyNode) throws JobExecuteException { + SourceConfig sourceConfig = operators.get(jobTopologyNode.getName()); SingleOutputStreamOperator sourceSingleOutputStreamOperator; try { - SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, sourceConfig.getType()); + SourceTableFactory tableFactory = FactoryUtil.discoverConnectorFactory(SourceTableFactory.class, sourceConfig.getType()); Map<String, String> options = sourceConfig.getProperties(); Configuration configuration = Configuration.fromMap(options); Schema schema = null; if(sourceConfig.getSchema() != null && !sourceConfig.getSchema().isEmpty()){ schema = SchemaConfigParse.parseSchemaConfig(sourceConfig.getSchema()); } - TableFactory.Context context = new TableFactory.Context(schema, options, configuration); + ConnectorFactory.Context context = new ConnectorFactory.Context(schema, options, configuration); SourceProvider sourceProvider = tableFactory.getSourceProvider(context); if(!sourceProvider.supportDynamicSchema() && schema != null && schema.isDynamic()){ throw new UnsupportedOperationException(String.format("source(%s) not support DynamicSchema", sourceConfig.getName())); @@ -87,18 +85,18 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> { System.out.println(String.format("source(%s) schema:\n%s", sourceConfig.getName(), schema.getDataType().treeString())); } - sourceSingleOutputStreamOperator = sourceProvider.produceDataStream(jobRuntimeEnvironment.getStreamExecutionEnvironment()).name(sourceConfig.getName()); - if (operatorNode.getParallelism() > 0) { - sourceSingleOutputStreamOperator.setParallelism(operatorNode.getParallelism()); + sourceSingleOutputStreamOperator = sourceProvider.produceDataStream(environment.getStreamExecutionEnvironment()).name(sourceConfig.getName()); + if (jobTopologyNode.getParallelism() > 0) { + sourceSingleOutputStreamOperator.setParallelism(jobTopologyNode.getParallelism()); } - sourceSingleOutputStreamOperator = setWatermarkIfNecessary(sourceSingleOutputStreamOperator, sourceConfig, operatorNode); + sourceSingleOutputStreamOperator = setWatermarkIfNecessary(sourceSingleOutputStreamOperator, sourceConfig, jobTopologyNode); return sourceSingleOutputStreamOperator; } catch (Exception e) { throw new JobExecuteException("Create source instance failed!", e); } } - private SingleOutputStreamOperator<Event> setWatermarkIfNecessary(SingleOutputStreamOperator<Event> dataStream, SourceConfig sourceConfig, OperatorNode operatorNode){ + private SingleOutputStreamOperator<Event> setWatermarkIfNecessary(SingleOutputStreamOperator<Event> dataStream, SourceConfig sourceConfig, JobTopologyNode jobTopologyNode){ final String watermarkTimestamp = sourceConfig.getWatermark_timestamp(); if(StringUtils.isNotBlank(watermarkTimestamp)){ String timestampUnit = sourceConfig.getWatermark_timestamp_unit(); @@ -137,8 +135,8 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> { WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofMillis(watermarkLag)) .withTimestampAssigner(timestampAssigner) ); - if (operatorNode.getParallelism() > 0) { - dataStream.setParallelism(operatorNode.getParallelism()); + if (jobTopologyNode.getParallelism() > 0) { + dataStream.setParallelism(jobTopologyNode.getParallelism()); } } return dataStream; diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java deleted file mode 100644 index c142614..0000000 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java +++ /dev/null @@ -1,105 +0,0 @@ -package com.geedgenetworks.bootstrap.execution; - -import com.alibaba.fastjson.JSONObject; -import com.geedgenetworks.bootstrap.exception.JobExecuteException; -import com.geedgenetworks.common.config.Constants; -import com.geedgenetworks.common.config.CheckConfigUtil; -import com.geedgenetworks.common.config.CheckResult; -import com.geedgenetworks.common.exception.CommonErrorCode; -import com.geedgenetworks.common.exception.ConfigValidationException; -import com.geedgenetworks.spi.split.Split; -import com.geedgenetworks.spi.split.SplitConfig; -import com.geedgenetworks.spi.split.SplitConfigOptions; -import com.geedgenetworks.spi.table.event.Event; -import com.google.common.collect.Maps; -import com.typesafe.config.Config; -import lombok.extern.slf4j.Slf4j; -import org.apache.flink.streaming.api.datastream.DataStream; - -import java.net.URL; -import java.util.List; -import java.util.Map; -import java.util.ServiceLoader; - - -/** - * Initialize config and execute filter operator - */ -@Slf4j -public class SplitExecutor extends AbstractExecutor<String, SplitConfig> { - - - public SplitExecutor(Config config) { - super(config); - } - - @Override - protected Map<String, SplitConfig> initialize(Config operatorConfig) { - Map<String, SplitConfig> splitConfigMap = Maps.newHashMap(); - if (operatorConfig.hasPath(Constants.SPLITS)) { - Config splitsConfig = operatorConfig.getConfig(Constants.SPLITS); - splitsConfig.root().unwrapped().forEach((key, value) -> { - CheckResult result = CheckConfigUtil.checkAllExists(splitsConfig.getConfig(key), - SplitConfigOptions.TYPE.key()); - if (!result.isSuccess()) { - throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( - "split: %s, Message: %s", - key, result.getMsg())); - } - SplitConfig splitConfig = new JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class); - splitConfig.setName(key); - splitConfigMap.put(key, splitConfig); - }); - } - - return splitConfigMap; - } - - @Override - public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException { - SplitConfig splitConfig = operatorMap.get(operatorNode.getName()); - boolean found = false; // 标志变量 - ServiceLoader<Split> splits = ServiceLoader.load(Split.class); - for (Split split : splits) { - found = true; // 标志变量 - if(split.type().equals(splitConfig.getType())){ - if (operatorNode.getParallelism() > 0) { - splitConfig.setParallelism(operatorNode.getParallelism()); - } - try { - input = - split.splitFunction( - input, splitConfig); - } catch (Exception e) { - throw new JobExecuteException("Create split instance failed!", e); - } - break; - } - } - if (!found) { - throw new JobExecuteException("No matching split found for type: " + splitConfig.getType()); - } - return input; - } - - protected SplitConfig checkConfig(String key, Map<String, Object> value, Config config) { - SplitConfig splitConfig = new SplitConfig(); - boolean found = false; // 标志变量 - ServiceLoader<Split> splits = ServiceLoader.load(Split.class); - for (Split split : splits) { - if(split.type().equals(value.getOrDefault("type", "").toString())){ - found = true; - try { - splitConfig = split.checkConfig(key, value, config); - } catch (Exception e) { - throw new JobExecuteException("Create split pipeline instance failed!", e); - } - break; - } - } - if (!found) { - throw new JobExecuteException("No matching split found for type: " + value.getOrDefault("type", "").toString()); - } - return splitConfig; - } -} diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/SchemaConfigParse.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/SchemaConfigParse.java index f6b4292..00fcd61 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/SchemaConfigParse.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/SchemaConfigParse.java @@ -3,10 +3,10 @@ package com.geedgenetworks.bootstrap.utils; import com.alibaba.fastjson2.JSON; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; -import com.geedgenetworks.spi.table.schema.Schema; -import com.geedgenetworks.spi.table.schema.SchemaParser; -import com.geedgenetworks.spi.table.type.StructType; -import com.geedgenetworks.spi.table.type.Types; +import com.geedgenetworks.api.connector.schema.Schema; +import com.geedgenetworks.api.connector.schema.SchemaParser; +import com.geedgenetworks.api.connector.type.StructType; +import com.geedgenetworks.api.connector.type.Types; import org.apache.commons.io.FileUtils; import java.io.File; diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java index 74b40a4..ccb01a4 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java @@ -1,6 +1,6 @@ package com.geedgenetworks.bootstrap.main.simple.collect; -import com.geedgenetworks.spi.table.event.Event; +import com.geedgenetworks.api.connector.event.Event; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import java.util.*; diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java index 130478e..15d6328 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java @@ -1,8 +1,8 @@ package com.geedgenetworks.bootstrap.main.simple.collect; -import com.geedgenetworks.spi.sink.SinkProvider; -import com.geedgenetworks.spi.sink.SinkTableFactory; -import com.geedgenetworks.spi.table.event.Event; +import com.geedgenetworks.api.connector.sink.SinkProvider; +import com.geedgenetworks.api.connector.sink.SinkTableFactory; +import com.geedgenetworks.api.connector.event.Event; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; @@ -16,7 +16,7 @@ import java.util.Set; public class CollectTableFactory implements SinkTableFactory { public static final String IDENTIFIER = "collect"; @Override - public String factoryIdentifier() { + public String type() { return IDENTIFIER; } |
