diff options
| author | doufenghu <[email protected]> | 2024-11-10 22:43:16 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-11-10 22:43:16 +0800 |
| commit | 73a5f46181af3c9e596e8b08dc27f63339b04c53 (patch) | |
| tree | 93bb7a830deb742211ec7cb8d8416002b4a5e54e /groot-bootstrap/src | |
| parent | 16769de2e5ba334a5cfaacd8a53db2989264d022 (diff) | |
[Feature][SPI] SPI/Common module 依赖库梳理,xxExecutor删除不必要的参数传递。
Diffstat (limited to 'groot-bootstrap/src')
15 files changed, 201 insertions, 188 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java deleted file mode 100644 index 6f33cae..0000000 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.geedgenetworks.bootstrap.enums; - -public enum ProcessorType { - SOURCE("source"), - FILTER("filter"), - SPLIT("split"), - PREPROCESSING("preprocessing"), - PROCESSING("processing"), - POSTPROCESSING("postprocessing"), - SINK("sink"); - - private final String type; - - ProcessorType(String type) {this.type = type;} - - public String getType() { - return type; - } -} diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/StageType.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/StageType.java new file mode 100644 index 0000000..8b4e154 --- /dev/null +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/StageType.java @@ -0,0 +1,32 @@ +package com.geedgenetworks.bootstrap.enums; + +public enum StageType { + SOURCE("source"), + FILTER("filter"), + SPLIT("split"), + PREPROCESSING("preprocessing"), + PROCESSING("processing"), + POSTPROCESSING("postprocessing"), + SINK("sink"); + + private final String type; + public String getType() { + return type; + } + StageType(String type) {this.type = type;} + + public static StageType fromType(String type) { + for (StageType stage : values()) { + if (stage.type.equalsIgnoreCase(type)) { + return stage; + } + } + throw new IllegalArgumentException("Unknown type: " + type); + } + + @Override + public String toString() { + return type; + } + +} diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java index e0828a0..fe440f7 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java @@ -16,9 +16,9 @@ public abstract class AbstractExecutor<K, V> protected final Config operatorConfig; protected final Map<K,V> operatorMap; - protected AbstractExecutor(List<URL> jarPaths, Config operatorConfig) { + protected AbstractExecutor(Config operatorConfig) { this.operatorConfig = operatorConfig; - this.operatorMap = initialize(jarPaths, operatorConfig); + this.operatorMap = initialize(operatorConfig); } @Override @@ -27,7 +27,7 @@ public abstract class AbstractExecutor<K, V> } - protected abstract Map<K, V> initialize(List<URL> jarPaths, Config operatorConfig); + protected abstract Map<K, V> initialize(Config operatorConfig); protected static final BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER = (classLoader, url) -> { diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java index ec748cc..a45380e 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java @@ -19,26 +19,24 @@ import java.util.ServiceLoader; public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, ProcessorConfig> { - protected AbstractProcessorExecutor(List<URL> jarPaths, Config operatorConfig) { - super(jarPaths, operatorConfig); + protected AbstractProcessorExecutor(Config operatorConfig) { + super(operatorConfig); } @Override - public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { + public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException { - ProcessorConfig processorConfig = operatorMap.get(node.getName()); + ProcessorConfig processorConfig = operatorMap.get(operatorNode.getName()); boolean found = false; // 标志变量 ServiceLoader<Processor> processors = ServiceLoader.load(Processor.class); for (Processor processor : processors) { if(processor.type().equals(processorConfig.getType())){ found = true; - if (node.getParallelism() > 0) { - processorConfig.setParallelism(node.getParallelism()); + if (operatorNode.getParallelism() > 0) { + processorConfig.setParallelism(operatorNode.getParallelism()); } try { - - dataStream = processor.processorFunction( - dataStream, processorConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig()); + input = processor.process(jobRuntimeEnvironment.getStreamExecutionEnvironment(), input, processorConfig); } catch (Exception e) { throw new JobExecuteException("Create orderby pipeline instance failed!", e); } @@ -48,7 +46,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, if (!found) { throw new JobExecuteException("No matching processor found for type: " + processorConfig.getType()); } - return dataStream; + return input; } protected ProcessorConfig checkConfig(String key, Map<String, Object> value, Config processorsConfig) { diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java index d57d6bf..e43c949 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java @@ -4,7 +4,7 @@ import com.geedgenetworks.bootstrap.exception.JobExecuteException; public interface Executor<T, ENV extends RuntimeEnvironment> { - T execute(T dataStream, Node edge) throws JobExecuteException; + T execute(T dataStream, OperatorNode edge) throws JobExecuteException; void setRuntimeEnvironment(ENV runtimeEnvironment); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java index 1ea19f8..d70420e 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java @@ -1,6 +1,6 @@ package com.geedgenetworks.bootstrap.execution; -import com.geedgenetworks.bootstrap.enums.ProcessorType; +import com.geedgenetworks.bootstrap.enums.StageType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.config.Constants; import com.geedgenetworks.common.config.CheckConfigUtil; @@ -27,14 +27,14 @@ import java.util.ServiceLoader; */ @Slf4j public class FilterExecutor extends AbstractExecutor<String, FilterConfig> { - private static final String PROCESSOR_TYPE = ProcessorType.FILTER.getType(); + private static final String PROCESSOR_TYPE = StageType.FILTER.getType(); - public FilterExecutor(List<URL> jarPaths, Config config) { - super(jarPaths, config); + public FilterExecutor(Config config) { + super(config); } @Override - protected Map<String, FilterConfig> initialize(List<URL> jarPaths, Config operatorConfig) { + protected Map<String, FilterConfig> initialize(Config operatorConfig) { Map<String, FilterConfig> filterConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.FILTERS)) { Config filterConfig = operatorConfig.getConfig(Constants.FILTERS); @@ -54,20 +54,20 @@ public class FilterExecutor extends AbstractExecutor<String, FilterConfig> { } @Override - public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { - FilterConfig filterConfig = operatorMap.get(node.getName()); + public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException { + FilterConfig filterConfig = operatorMap.get(operatorNode.getName()); boolean found = false; // 标志变量 ServiceLoader<Filter> filters = ServiceLoader.load(Filter.class); for (Filter filter : filters) { if(filter.type().equals(filterConfig.getType())){ found = true; - if (node.getParallelism() > 0) { - filterConfig.setParallelism(node.getParallelism()); + if (operatorNode.getParallelism() > 0) { + filterConfig.setParallelism(operatorNode.getParallelism()); } try { - dataStream = + input = filter.filterFunction( - dataStream, filterConfig); + input, filterConfig); } catch (Exception e) { throw new JobExecuteException("Create filter instance failed!", e); } @@ -77,7 +77,7 @@ public class FilterExecutor extends AbstractExecutor<String, FilterConfig> { if (!found) { throw new JobExecuteException("No matching filter found for type: " + filterConfig.getType()); } - return dataStream; + return input; } protected FilterConfig checkConfig(String key, Map<String, Object> value, Config config) { diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java index 325f8a4..cd70f44 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java @@ -1,7 +1,7 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson2.JSONObject; -import com.geedgenetworks.bootstrap.enums.ProcessorType; +import com.geedgenetworks.bootstrap.enums.StageType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.main.GrootStreamRunner; import com.geedgenetworks.common.config.Constants; @@ -36,7 +36,7 @@ public class JobExecution { private final Executor<DataStream<Event>, JobRuntimeEnvironment> preprocessingExecutor; private final Executor<DataStream<Event>, JobRuntimeEnvironment> processingExecutor; private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor; - private final List<Node> nodes; + private final List<OperatorNode> operatorNodes; private final List<URL> jarPaths; private final Map<String,String> nodeNameWithSplitTags = new HashMap<>(); @@ -51,13 +51,13 @@ public class JobExecution { registerPlugin(jobConfig.getConfig(Constants.APPLICATION)); - this.sourceExecutor = new SourceExecutor(jarPaths, jobConfig); - this.sinkExecutor = new SinkExecutor(jarPaths, jobConfig); - this.filterExecutor = new FilterExecutor(jarPaths, jobConfig); - this.splitExecutor = new SplitExecutor(jarPaths, jobConfig); - this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, jobConfig); - this.processingExecutor = new ProcessingExecutor(jarPaths, jobConfig); - this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, jobConfig); + this.sourceExecutor = new SourceExecutor(jobConfig); + this.sinkExecutor = new SinkExecutor(jobConfig); + this.filterExecutor = new FilterExecutor(jobConfig); + this.splitExecutor = new SplitExecutor(jobConfig); + this.preprocessingExecutor = new PreprocessingExecutor(jobConfig); + this.processingExecutor = new ProcessingExecutor(jobConfig); + this.postprocessingExecutor = new PostprocessingExecutor(jobConfig); this.jobRuntimeEnvironment = JobRuntimeEnvironment.getInstance(this.registerPlugin(jobConfig, jarPaths), grootStreamConfig); @@ -68,7 +68,7 @@ public class JobExecution { this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.nodes = buildJobNode(jobConfig); + this.operatorNodes = buildJobNode(jobConfig); } @@ -153,7 +153,7 @@ public class JobExecution { return config; } - private List<Node> buildJobNode(Config config) { + private List<OperatorNode> buildJobNode(Config config) { Map<String, Object> sources = Maps.newHashMap(); Map<String, Object> sinks = Maps.newHashMap(); @@ -187,34 +187,34 @@ public class JobExecution { List<? extends Config> topology = config.getConfig(Constants.APPLICATION).getConfigList(Constants.APPLICATION_TOPOLOGY); - List<Node> nodes = Lists.newArrayList(); + List<OperatorNode> operatorNodes = Lists.newArrayList(); topology.forEach(item -> { - Node node = JSONObject.from(item.root().unwrapped()).toJavaObject(Node.class); - nodes.add(node); + OperatorNode operatorNode = JSONObject.from(item.root().unwrapped()).toJavaObject(OperatorNode.class); + operatorNodes.add(operatorNode); }); - for (Node node : nodes) { - if (sources.containsKey(node.getName())) { - node.setType(ProcessorType.SOURCE); - } else if (sinks.containsKey(node.getName())) { - node.setType(ProcessorType.SINK); - } else if (filters.containsKey(node.getName())) { - node.setType(ProcessorType.FILTER); - } else if (splits.containsKey(node.getName())) { - node.setType(ProcessorType.SPLIT); - } else if (preprocessingPipelines.containsKey(node.getName())) { - node.setType(ProcessorType.PREPROCESSING); - } else if (processingPipelines.containsKey(node.getName())) { - node.setType(ProcessorType.PROCESSING); - } else if (postprocessingPipelines.containsKey(node.getName())) { - node.setType(ProcessorType.POSTPROCESSING); + for (OperatorNode operatorNode : operatorNodes) { + if (sources.containsKey(operatorNode.getName())) { + operatorNode.setType(StageType.SOURCE); + } else if (sinks.containsKey(operatorNode.getName())) { + operatorNode.setType(StageType.SINK); + } else if (filters.containsKey(operatorNode.getName())) { + operatorNode.setType(StageType.FILTER); + } else if (splits.containsKey(operatorNode.getName())) { + operatorNode.setType(StageType.SPLIT); + } else if (preprocessingPipelines.containsKey(operatorNode.getName())) { + operatorNode.setType(StageType.PREPROCESSING); + } else if (processingPipelines.containsKey(operatorNode.getName())) { + operatorNode.setType(StageType.PROCESSING); + } else if (postprocessingPipelines.containsKey(operatorNode.getName())) { + operatorNode.setType(StageType.POSTPROCESSING); } else { - throw new JobExecuteException("unsupported process type " + node.getName()); + throw new JobExecuteException("unsupported process type " + operatorNode.getName()); } } - return nodes; + return operatorNodes; } @@ -223,14 +223,14 @@ public class JobExecution { if (!jobRuntimeEnvironment.isLocalMode() && !jobRuntimeEnvironment.isTestMode()) { jobRuntimeEnvironment.registerPlugin(jarPaths); } - List<Node> sourceNodes = nodes - .stream().filter(v -> v.getType().name().equals(ProcessorType.SOURCE.name())).collect(Collectors.toList()); + List<OperatorNode> sourceOperatorNodes = operatorNodes + .stream().filter(v -> v.getType().name().equals(StageType.SOURCE.name())).collect(Collectors.toList()); DataStream<Event> dataStream = null; - for (Node sourceNode : sourceNodes) { - dataStream = sourceExecutor.execute(dataStream, sourceNode); - for (String nodeName : sourceNode.getDownstream()) { + for (OperatorNode sourceOperatorNode : sourceOperatorNodes) { + dataStream = sourceExecutor.execute(dataStream, sourceOperatorNode); + for (String nodeName : sourceOperatorNode.getDownstream()) { buildJobGraph(dataStream, nodeName); } } @@ -251,68 +251,68 @@ public class JobExecution { } private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) { - Node node = getNode(downstreamNodeName).orElseGet(() -> { + OperatorNode operatorNode = getNode(downstreamNodeName).orElseGet(() -> { throw new JobExecuteException("Can't find downstream node " + downstreamNodeName); }); - if (node.getType().name().equals(ProcessorType.FILTER.name())) { - if (nodeNameWithSplitTags.containsKey(node.getName())) { - dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) { - }), node); + if (operatorNode.getType().name().equals(StageType.FILTER.name())) { + if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) { + dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream) + .getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())) {}), operatorNode); } else { - dataStream = filterExecutor.execute(dataStream, node); + dataStream = filterExecutor.execute(dataStream, operatorNode); } - } else if (node.getType().name().equals(ProcessorType.SPLIT.name())) { - if (node.getTags().size() == node.getDownstream().size()) { - for (int i = 0; i < node.getDownstream().size();i++) { - nodeNameWithSplitTags.put(node.getDownstream().get(i),node.getTags().get(i)); + } else if (operatorNode.getType().name().equals(StageType.SPLIT.name())) { + if (operatorNode.getTags().size() == operatorNode.getDownstream().size()) { + for (int i = 0; i < operatorNode.getDownstream().size(); i++) { + nodeNameWithSplitTags.put(operatorNode.getDownstream().get(i), operatorNode.getTags().get(i)); } } else { throw new JobExecuteException("split node downstream size not equal tags size"); } - dataStream = splitExecutor.execute(dataStream, node); - } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) { - if (nodeNameWithSplitTags.containsKey(node.getName())) { - dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())){ - }), node); + dataStream = splitExecutor.execute(dataStream, operatorNode); + } else if (operatorNode.getType().name().equals(StageType.PREPROCESSING.name())) { + if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) { + dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())){ + }), operatorNode); } else { - dataStream = preprocessingExecutor.execute(dataStream, node); + dataStream = preprocessingExecutor.execute(dataStream, operatorNode); } - } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) { - if (nodeNameWithSplitTags.containsKey(node.getName())) { - dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) { - }), node); + } else if (operatorNode.getType().name().equals(StageType.PROCESSING.name())) { + if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) { + dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())) { + }), operatorNode); } else { - dataStream = processingExecutor.execute(dataStream, node); + dataStream = processingExecutor.execute(dataStream, operatorNode); } - } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) { - if (nodeNameWithSplitTags.containsKey(node.getName())) { - dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) { - }), node); + } else if (operatorNode.getType().name().equals(StageType.POSTPROCESSING.name())) { + if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) { + dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())) { + }), operatorNode); } else { - dataStream = postprocessingExecutor.execute(dataStream, node); + dataStream = postprocessingExecutor.execute(dataStream, operatorNode); } - } else if (node.getType().name().equals(ProcessorType.SINK.name())) { - if (nodeNameWithSplitTags.containsKey(node.getName())) { - dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) { - }), node); + } else if (operatorNode.getType().name().equals(StageType.SINK.name())) { + if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) { + dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())) { + }), operatorNode); } else { - dataStream = sinkExecutor.execute(dataStream, node); + dataStream = sinkExecutor.execute(dataStream, operatorNode); } } else { - throw new JobExecuteException("unsupported process type " + node.getType().name()); + throw new JobExecuteException("unsupported process type " + operatorNode.getType().name()); } - for (String nodeName : node.getDownstream()) { + for (String nodeName : operatorNode.getDownstream()) { buildJobGraph(dataStream, nodeName); } } - private Optional<Node> getNode(String name) { - return nodes.stream().filter(v -> v.getName().equals(name)).findFirst(); + private Optional<OperatorNode> getNode(String name) { + return operatorNodes.stream().filter(v -> v.getName().equals(name)).findFirst(); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/OperatorNode.java index 66303c2..8c4b392 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/OperatorNode.java @@ -1,6 +1,6 @@ package com.geedgenetworks.bootstrap.execution; -import com.geedgenetworks.bootstrap.enums.ProcessorType; +import com.geedgenetworks.bootstrap.enums.StageType; import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; @@ -14,11 +14,11 @@ import java.util.List; @NoArgsConstructor @AllArgsConstructor @EqualsAndHashCode -public class Node implements Serializable { +public class OperatorNode implements Serializable { private String name; - private ProcessorType type; - private List<String> downstream = Collections.emptyList(); + private StageType type; private int parallelism; + private List<String> downstream = Collections.emptyList(); private List<String> tags = Collections.emptyList(); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java index e73b7dd..10d9188 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java @@ -1,6 +1,6 @@ package com.geedgenetworks.bootstrap.execution; -import com.geedgenetworks.bootstrap.enums.ProcessorType; +import com.geedgenetworks.bootstrap.enums.StageType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.config.Constants; import com.geedgenetworks.spi.processor.ProcessorConfig; @@ -17,14 +17,14 @@ import java.util.Map; * Initialize config and execute postprocessor */ public class PostprocessingExecutor extends AbstractProcessorExecutor { - private static final String PROCESSOR_TYPE = ProcessorType.POSTPROCESSING.getType(); + private static final String PROCESSOR_TYPE = StageType.POSTPROCESSING.getType(); - public PostprocessingExecutor(List<URL> jarPaths, Config config) { - super(jarPaths, config); + public PostprocessingExecutor(Config config) { + super(config); } @Override - protected Map<String, ProcessorConfig> initialize(List<URL> jarPaths, Config operatorConfig) { + protected Map<String, ProcessorConfig> initialize(Config operatorConfig) { Map<String, ProcessorConfig> postprocessingConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.POSTPROCESSING_PIPELINES)) { Config postprocessors = operatorConfig.getConfig(Constants.POSTPROCESSING_PIPELINES); @@ -37,7 +37,7 @@ public class PostprocessingExecutor extends AbstractProcessorExecutor { @Override - public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { - return super.execute(dataStream, node); + public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException { + return super.execute(input, operatorNode); } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java index 6179265..9acda99 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java @@ -1,6 +1,6 @@ package com.geedgenetworks.bootstrap.execution; -import com.geedgenetworks.bootstrap.enums.ProcessorType; +import com.geedgenetworks.bootstrap.enums.StageType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.config.Constants; import com.geedgenetworks.spi.processor.ProcessorConfig; @@ -19,14 +19,14 @@ import java.util.Map; */ @Slf4j public class PreprocessingExecutor extends AbstractProcessorExecutor { - private static final String PROCESSOR_TYPE = ProcessorType.PREPROCESSING.getType(); + private static final String PROCESSOR_TYPE = StageType.PREPROCESSING.getType(); - public PreprocessingExecutor(List<URL> jarPaths, Config config) { - super(jarPaths, config); + public PreprocessingExecutor(Config config) { + super(config); } @Override - protected Map<String, ProcessorConfig> initialize(List<URL> jarPaths, Config operatorConfig) { + protected Map<String, ProcessorConfig> initialize(Config operatorConfig) { Map<String, ProcessorConfig> preprocessingConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.PREPROCESSING_PIPELINES)) { Config preprocessors = operatorConfig.getConfig(Constants.PREPROCESSING_PIPELINES); @@ -38,9 +38,9 @@ public class PreprocessingExecutor extends AbstractProcessorExecutor { } @Override - public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { + public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException { - return super.execute(dataStream, node); + return super.execute(input, operatorNode); } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java index bc6a09e..c49df88 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java @@ -1,15 +1,18 @@ package com.geedgenetworks.bootstrap.execution; -import com.geedgenetworks.bootstrap.enums.ProcessorType; +import com.geedgenetworks.bootstrap.enums.StageType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.config.Constants; +import com.geedgenetworks.spi.processor.Processor; import com.geedgenetworks.spi.processor.ProcessorConfig; +import com.geedgenetworks.spi.processor.ProcessorProvider; import com.geedgenetworks.spi.table.event.Event; import com.google.common.collect.Maps; import com.typesafe.config.Config; import org.apache.flink.streaming.api.datastream.DataStream; import java.net.URL; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -17,27 +20,30 @@ import java.util.Map; * Initialize config and execute processor */ public class ProcessingExecutor extends AbstractProcessorExecutor { - private static final String PROCESSOR_TYPE = ProcessorType.PROCESSING.getType(); + private static final String PROCESSOR_TYPE = StageType.PROCESSING.getType(); + //private Map<String, Processor<?>> processors; - public ProcessingExecutor(List<URL> jarPaths, Config config) { - super(jarPaths, config); + public ProcessingExecutor(Config config) { + super(config); } @Override - protected Map<String, ProcessorConfig> initialize(List<URL> jarPaths, Config operatorConfig) { + protected Map<String, ProcessorConfig> initialize(Config operatorConfig) { Map<String, ProcessorConfig> processingConfigMap = Maps.newHashMap(); + //processors = new HashMap<>(); if (operatorConfig.hasPath(Constants.PROCESSING_PIPELINES)) { - Config processors = operatorConfig.getConfig(Constants.PROCESSING_PIPELINES); - processors.root().unwrapped().forEach((key, value) -> { - processingConfigMap.put(key, checkConfig(key, (Map<String, Object>) value, processors)); + Config processingConfig = operatorConfig.getConfig(Constants.PROCESSING_PIPELINES); + processingConfig.root().unwrapped().forEach((key, value) -> { + processingConfigMap.put(key, checkConfig(key, (Map<String, Object>) value, processingConfig)); + //processors.put(key, ProcessorProvider.load(((Map<?, ?>) value).get("type").toString())); + }); } return processingConfigMap; } @Override - public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { - - return super.execute(dataStream, node); + public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException { + return super.execute(input, operatorNode); } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java index b61b6f9..130705a 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java @@ -1,7 +1,7 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson.JSONObject; -import com.geedgenetworks.bootstrap.enums.ProcessorType; +import com.geedgenetworks.bootstrap.enums.StageType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.utils.SchemaConfigParse; import com.geedgenetworks.common.config.Constants; @@ -11,8 +11,8 @@ import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; import com.geedgenetworks.spi.sink.SinkConfig; import com.geedgenetworks.spi.sink.SinkConfigOptions; -import com.geedgenetworks.spi.table.connector.SinkProvider; -import com.geedgenetworks.spi.table.connector.SinkTableFactory; +import com.geedgenetworks.spi.sink.SinkProvider; +import com.geedgenetworks.spi.sink.SinkTableFactory; import com.geedgenetworks.spi.table.event.Event; import com.geedgenetworks.spi.table.factory.FactoryUtil; import com.geedgenetworks.spi.table.factory.TableFactory; @@ -24,8 +24,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; -import java.net.URL; -import java.util.List; import java.util.Map; /** @@ -33,13 +31,13 @@ import java.util.Map; */ @Slf4j public class SinkExecutor extends AbstractExecutor<String, SinkConfig> { - private static final String PROCESSOR_TYPE = ProcessorType.SINK.getType(); + private static final String PROCESSOR_TYPE = StageType.SINK.getType(); - public SinkExecutor(List<URL> jarPaths, Config config) { - super(jarPaths, config); + public SinkExecutor(Config config) { + super(config); } @Override - protected Map<String, SinkConfig> initialize(List<URL> jarPaths, Config operatorConfig) { + protected Map<String, SinkConfig> initialize(Config operatorConfig) { Map<String, SinkConfig> sinkConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.SINKS)) { @@ -64,8 +62,8 @@ public class SinkExecutor extends AbstractExecutor<String, SinkConfig> { } @Override - public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { - SinkConfig sinkConfig = operatorMap.get(node.getName()); + public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException { + SinkConfig sinkConfig = operatorMap.get(operatorNode.getName()); try { SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, sinkConfig.getType()); Map<String, String> options = sinkConfig.getProperties(); @@ -84,9 +82,9 @@ public class SinkExecutor extends AbstractExecutor<String, SinkConfig> { System.out.println(String.format("sink(%s) schema:\n%s", sinkConfig.getName(), schema.getDataType().treeString())); } - DataStreamSink<?> dataStreamSink = sinkProvider.consumeDataStream(dataStream); - if (node.getParallelism() > 0) { - dataStreamSink.setParallelism(node.getParallelism()); + DataStreamSink<?> dataStreamSink = sinkProvider.consumeDataStream(input); + if (operatorNode.getParallelism() > 0) { + dataStreamSink.setParallelism(operatorNode.getParallelism()); } dataStreamSink.name(sinkConfig.getName()); } catch (Exception e) { diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java index 3eeaad6..5109540 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java @@ -1,7 +1,7 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson2.JSONObject; -import com.geedgenetworks.bootstrap.enums.ProcessorType; +import com.geedgenetworks.bootstrap.enums.StageType; import com.geedgenetworks.bootstrap.exception.ConfigCheckException; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.utils.SchemaConfigParse; @@ -12,8 +12,8 @@ import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; import com.geedgenetworks.spi.configuration.SourceConfigOptions; import com.geedgenetworks.spi.source.SourceConfig; -import com.geedgenetworks.spi.table.connector.SourceProvider; -import com.geedgenetworks.spi.table.connector.SourceTableFactory; +import com.geedgenetworks.spi.source.SourceProvider; +import com.geedgenetworks.spi.source.SourceTableFactory; import com.geedgenetworks.spi.table.event.Event; import com.geedgenetworks.spi.table.factory.FactoryUtil; import com.geedgenetworks.spi.table.factory.TableFactory; @@ -28,9 +28,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import java.net.URL; import java.time.Duration; -import java.util.List; import java.util.Map; /** @@ -38,13 +36,13 @@ import java.util.Map; */ @Slf4j public class SourceExecutor extends AbstractExecutor<String, SourceConfig> { - private static final String PROCESSOR_TYPE = ProcessorType.SOURCE.getType(); + private static final String PROCESSOR_TYPE = StageType.SOURCE.getType(); - public SourceExecutor(List<URL> jarPaths, Config config) { - super(jarPaths, config); + public SourceExecutor(Config config) { + super(config); } @Override - protected Map<String, SourceConfig> initialize(List<URL> jarPaths, Config operatorConfig) { + protected Map<String, SourceConfig> initialize(Config operatorConfig) { Map<String, SourceConfig> sourceConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.SOURCES)) { Config sources = operatorConfig.getConfig(Constants.SOURCES); @@ -68,8 +66,8 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> { } @Override - public DataStream<Event> execute(DataStream<Event> outputStreamOperator, Node node) throws JobExecuteException { - SourceConfig sourceConfig = operatorMap.get(node.getName()); + public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException { + SourceConfig sourceConfig = operatorMap.get(operatorNode.getName()); SingleOutputStreamOperator sourceSingleOutputStreamOperator; try { SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, sourceConfig.getType()); @@ -90,17 +88,17 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> { } sourceSingleOutputStreamOperator = sourceProvider.produceDataStream(jobRuntimeEnvironment.getStreamExecutionEnvironment()).name(sourceConfig.getName()); - if (node.getParallelism() > 0) { - sourceSingleOutputStreamOperator.setParallelism(node.getParallelism()); + if (operatorNode.getParallelism() > 0) { + sourceSingleOutputStreamOperator.setParallelism(operatorNode.getParallelism()); } - sourceSingleOutputStreamOperator = setWatermarkIfNecessary(sourceSingleOutputStreamOperator, sourceConfig, node); + sourceSingleOutputStreamOperator = setWatermarkIfNecessary(sourceSingleOutputStreamOperator, sourceConfig, operatorNode); return sourceSingleOutputStreamOperator; } catch (Exception e) { throw new JobExecuteException("Create source instance failed!", e); } } - private SingleOutputStreamOperator<Event> setWatermarkIfNecessary(SingleOutputStreamOperator<Event> dataStream, SourceConfig sourceConfig, Node node){ + private SingleOutputStreamOperator<Event> setWatermarkIfNecessary(SingleOutputStreamOperator<Event> dataStream, SourceConfig sourceConfig, OperatorNode operatorNode){ final String watermarkTimestamp = sourceConfig.getWatermark_timestamp(); if(StringUtils.isNotBlank(watermarkTimestamp)){ String timestampUnit = sourceConfig.getWatermark_timestamp_unit(); @@ -139,8 +137,8 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> { WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofMillis(watermarkLag)) .withTimestampAssigner(timestampAssigner) ); - if (node.getParallelism() > 0) { - dataStream.setParallelism(node.getParallelism()); + if (operatorNode.getParallelism() > 0) { + dataStream.setParallelism(operatorNode.getParallelism()); } } return dataStream; diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java index 3d6f264..c142614 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java @@ -29,12 +29,12 @@ import java.util.ServiceLoader; public class SplitExecutor extends AbstractExecutor<String, SplitConfig> { - public SplitExecutor(List<URL> jarPaths, Config config) { - super(jarPaths, config); + public SplitExecutor(Config config) { + super(config); } @Override - protected Map<String, SplitConfig> initialize(List<URL> jarPaths, Config operatorConfig) { + protected Map<String, SplitConfig> initialize(Config operatorConfig) { Map<String, SplitConfig> splitConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.SPLITS)) { Config splitsConfig = operatorConfig.getConfig(Constants.SPLITS); @@ -56,20 +56,20 @@ public class SplitExecutor extends AbstractExecutor<String, SplitConfig> { } @Override - public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { - SplitConfig splitConfig = operatorMap.get(node.getName()); + public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException { + SplitConfig splitConfig = operatorMap.get(operatorNode.getName()); boolean found = false; // 标志变量 ServiceLoader<Split> splits = ServiceLoader.load(Split.class); for (Split split : splits) { found = true; // 标志变量 if(split.type().equals(splitConfig.getType())){ - if (node.getParallelism() > 0) { - splitConfig.setParallelism(node.getParallelism()); + if (operatorNode.getParallelism() > 0) { + splitConfig.setParallelism(operatorNode.getParallelism()); } try { - dataStream = + input = split.splitFunction( - dataStream, splitConfig); + input, splitConfig); } catch (Exception e) { throw new JobExecuteException("Create split instance failed!", e); } @@ -79,7 +79,7 @@ public class SplitExecutor extends AbstractExecutor<String, SplitConfig> { if (!found) { throw new JobExecuteException("No matching split found for type: " + splitConfig.getType()); } - return dataStream; + return input; } protected SplitConfig checkConfig(String key, Map<String, Object> value, Config config) { diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java index e52fd3b..130478e 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java @@ -1,7 +1,7 @@ package com.geedgenetworks.bootstrap.main.simple.collect; -import com.geedgenetworks.spi.table.connector.SinkProvider; -import com.geedgenetworks.spi.table.connector.SinkTableFactory; +import com.geedgenetworks.spi.sink.SinkProvider; +import com.geedgenetworks.spi.sink.SinkTableFactory; import com.geedgenetworks.spi.table.event.Event; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.streaming.api.datastream.DataStream; |
