diff options
| author | wangkuan <[email protected]> | 2024-07-12 11:09:17 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-07-12 11:09:17 +0800 |
| commit | 76b5f89c44bded7da01e27ee97065f9bd4c77848 (patch) | |
| tree | 153369594e57042125a0ca6187f7629430fa11e3 /groot-bootstrap | |
| parent | 22806af4ffbe6dc7ce644231c20fbf0765c337d5 (diff) | |
[improve][bootstrap][core] GAL-611 Groot Stream各个处理阶段Type命名与实现类解耦,实例通过spi方式注册,兼容旧版本feature/pipeline-type
Diffstat (limited to 'groot-bootstrap')
5 files changed, 100 insertions, 47 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java index 077fee3..04a6a94 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java @@ -1,26 +1,43 @@ package com.geedgenetworks.bootstrap.execution; import com.geedgenetworks.common.utils.ReflectionUtils; +import com.geedgenetworks.core.filter.Filter; +import com.geedgenetworks.core.processor.aggregate.AggregateProcessor; +import com.geedgenetworks.core.processor.projection.ProjectionProcessor; import com.typesafe.config.Config; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; - import java.net.URL; import java.net.URLClassLoader; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.ServiceLoader; import java.util.function.BiConsumer; public abstract class AbstractExecutor<K, V> implements Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> { protected JobRuntimeEnvironment jobRuntimeEnvironment; - protected final Config operatorConfig; - protected final Map<K,V> operatorMap; + protected final Map<String,Filter> filterMap = new HashMap<>(); + protected final Map<String,ProjectionProcessor> projectionProcessorMap = new HashMap<>(); + protected final Map<String, AggregateProcessor> aggregateProcessorMap = new HashMap<>(); protected AbstractExecutor(List<URL> jarPaths, Config operatorConfig) { this.operatorConfig = operatorConfig; this.operatorMap = initialize(jarPaths, operatorConfig); + ServiceLoader<Filter> filters = ServiceLoader.load(Filter.class); + for (Filter filter : filters) { + this.filterMap.put(filter.type(), filter); + } + ServiceLoader<ProjectionProcessor> projectionProcessors = ServiceLoader.load(ProjectionProcessor.class); + for (ProjectionProcessor projectionProcessor : projectionProcessors) { + this.projectionProcessorMap.put(projectionProcessor.type(), projectionProcessor); + } + ServiceLoader<AggregateProcessor> aggregateProcessors = ServiceLoader.load(AggregateProcessor.class); + for (AggregateProcessor aggregateProcessor : aggregateProcessors) { + this.aggregateProcessorMap.put(aggregateProcessor.type(), aggregateProcessor); + } } @Override diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java index 0897186..506aa11 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java @@ -9,7 +9,7 @@ import com.geedgenetworks.common.config.CheckResult; import com.geedgenetworks.common.config.FilterConfigOptions; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; -import com.geedgenetworks.core.filter.AviatorFilter; +import com.geedgenetworks.core.filter.Filter; import com.geedgenetworks.core.pojo.FilterConfig; import com.google.common.collect.Maps; import com.typesafe.config.Config; @@ -21,20 +21,22 @@ import java.util.List; import java.util.Map; /** - * Initialize config and execute filter operator + * Initialize config and execute filter operator */ @Slf4j public class FilterExecutor extends AbstractExecutor<String, FilterConfig> { private static final String PROCESSOR_TYPE = ProcessorType.FILTER.getType(); + public FilterExecutor(List<URL> jarPaths, Config config) { super(jarPaths, config); } + @Override protected Map<String, FilterConfig> initialize(List<URL> jarPaths, Config operatorConfig) { Map<String, FilterConfig> filterConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.FILTERS)) { Config filters = operatorConfig.getConfig(Constants.FILTERS); - filters.root().unwrapped().forEach((key,value) -> { + filters.root().unwrapped().forEach((key, value) -> { CheckResult result = CheckConfigUtil.checkAllExists(filters.getConfig(key), FilterConfigOptions.TYPE.key(), FilterConfigOptions.PROPERTIES.key()); if (!result.isSuccess()) { @@ -55,22 +57,29 @@ public class FilterExecutor extends AbstractExecutor<String, FilterConfig> { public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { FilterConfig filterConfig = operatorMap.get(node.getName()); String className = filterConfig.getType(); - Class cls = null; - AviatorFilter aviatorFilter = null; - try { - cls = Class.forName(className); - aviatorFilter = (AviatorFilter) cls.newInstance(); - if (node.getParallelism() > 0) { - filterConfig.setParallelism(node.getParallelism()); - } + Filter filter; + if (filterMap.containsKey(filterConfig.getType())) { + filter = filterMap.get(filterConfig.getType()); + } else { + Class cls; + try { + cls = Class.forName(className); + filter = (Filter) cls.newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { + throw new JobExecuteException("get filter instance failed!", e); + } + } + if (node.getParallelism() > 0) { + filterConfig.setParallelism(node.getParallelism()); + } + try { dataStream = - aviatorFilter.filterFunction( + filter.filterFunction( dataStream, filterConfig); } catch (Exception e) { throw new JobExecuteException("Create filter instance failed!", e); } - return dataStream; } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java index 4e45b78..fa5f572 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java @@ -2,7 +2,6 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson.JSONObject; import com.geedgenetworks.bootstrap.enums.ProcessorType; -import com.geedgenetworks.bootstrap.exception.ConfigCheckException; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.CheckConfigUtil; @@ -12,7 +11,6 @@ import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; import com.geedgenetworks.core.pojo.ProjectionConfig; import com.geedgenetworks.core.processor.projection.ProjectionProcessor; -import com.geedgenetworks.utils.StringUtil; import com.google.common.collect.Maps; import com.typesafe.config.Config; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -30,12 +28,13 @@ public class PostprocessingExecutor extends AbstractExecutor<String, ProjectionC public PostprocessingExecutor(List<URL> jarPaths, Config config) { super(jarPaths, config); } + @Override protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) { Map<String, ProjectionConfig> postprocessingConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.POSTPROCESSING_PIPELINES)) { Config postprocessors = operatorConfig.getConfig(Constants.POSTPROCESSING_PIPELINES); - postprocessors.root().unwrapped().forEach((key,value) -> { + postprocessors.root().unwrapped().forEach((key, value) -> { CheckResult result = CheckConfigUtil.checkAllExists(postprocessors.getConfig(key), ProjectionConfigOptions.TYPE.key()); if (!result.isSuccess()) { @@ -68,21 +67,29 @@ public class PostprocessingExecutor extends AbstractExecutor<String, ProjectionC @Override public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { - Class cls = null; - ProjectionProcessor projectionProcessor = null; ProjectionConfig projectionConfig = operatorMap.get(node.getName()); - try { - cls = Class.forName(projectionConfig.getType()); - projectionProcessor = (ProjectionProcessor) cls.newInstance(); - - if (node.getParallelism() > 0) { - projectionConfig.setParallelism(node.getParallelism()); + String className = projectionConfig.getType(); + ProjectionProcessor projectionProcessor; + if (projectionProcessorMap.containsKey(projectionConfig.getType())) { + projectionProcessor = projectionProcessorMap.get(projectionConfig.getType()); + } else { + Class cls; + try { + cls = Class.forName(className); + projectionProcessor = (ProjectionProcessor) cls.newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { + throw new JobExecuteException("get postprocessing pipeline instance failed!", e); } + } + if (node.getParallelism() > 0) { + projectionConfig.setParallelism(node.getParallelism()); + } + try { dataStream = projectionProcessor.projectionProcessorFunction( dataStream, projectionConfig); } catch (Exception e) { - throw new JobExecuteException("create postprocessing pipeline operator error", e); + throw new JobExecuteException("Create postprocessing pipeline instance failed!", e); } return dataStream; } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java index 84e8718..31fcce8 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java @@ -26,15 +26,17 @@ import java.util.Map; @Slf4j public class PreprocessingExecutor extends AbstractExecutor<String, ProjectionConfig> { private static final String PROCESSOR_TYPE = ProcessorType.PREPROCESSING.getType(); + public PreprocessingExecutor(List<URL> jarPaths, Config config) { super(jarPaths, config); } + @Override protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) { Map<String, ProjectionConfig> preprocessingConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.PREPROCESSING_PIPELINES)) { Config preprocessors = operatorConfig.getConfig(Constants.PREPROCESSING_PIPELINES); - preprocessors.root().unwrapped().forEach((key,value) -> { + preprocessors.root().unwrapped().forEach((key, value) -> { CheckResult result = CheckConfigUtil.checkAllExists(preprocessors.getConfig(key), ProjectionConfigOptions.TYPE.key()); if (!result.isSuccess()) { @@ -67,21 +69,29 @@ public class PreprocessingExecutor extends AbstractExecutor<String, ProjectionCo @Override public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { - Class cls = null; - ProjectionProcessor projectionProcessor = null; ProjectionConfig projectionConfig = operatorMap.get(node.getName()); - try { - cls = Class.forName(projectionConfig.getType()); - projectionProcessor = (ProjectionProcessor) cls.newInstance(); - - if (node.getParallelism() > 0) { - projectionConfig.setParallelism(node.getParallelism()); + String className = projectionConfig.getType(); + ProjectionProcessor projectionProcessor; + if (projectionProcessorMap.containsKey(projectionConfig.getType())) { + projectionProcessor = projectionProcessorMap.get(projectionConfig.getType()); + } else { + Class cls; + try { + cls = Class.forName(className); + projectionProcessor = (ProjectionProcessor) cls.newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { + throw new JobExecuteException("get preprocessing pipeline instance failed!", e); } + } + if (node.getParallelism() > 0) { + projectionConfig.setParallelism(node.getParallelism()); + } + try { dataStream = projectionProcessor.projectionProcessorFunction( dataStream, projectionConfig); } catch (Exception e) { - throw new JobExecuteException("Create preprocessor pipeline instance failed!", e); + throw new JobExecuteException("Create preprocessing pipeline instance failed!", e); } return dataStream; } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java index a8798e8..4a3b204 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java @@ -18,6 +18,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; import java.util.List; import java.util.Map; + /** * Initialize config and execute processor */ @@ -27,12 +28,13 @@ public class ProcessingExecutor extends AbstractExecutor<String, ProjectionConfi public ProcessingExecutor(List<URL> jarPaths, Config config) { super(jarPaths, config); } + @Override protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) { Map<String, ProjectionConfig> processingConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.PROCESSING_PIPELINES)) { Config processors = operatorConfig.getConfig(Constants.PROCESSING_PIPELINES); - processors.root().unwrapped().forEach((key,value) -> { + processors.root().unwrapped().forEach((key, value) -> { CheckResult result = CheckConfigUtil.checkAllExists(processors.getConfig(key), ProjectionConfigOptions.TYPE.key()); if (!result.isSuccess()) { @@ -66,16 +68,24 @@ public class ProcessingExecutor extends AbstractExecutor<String, ProjectionConfi @Override public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { - Class cls = null; - ProjectionProcessor projectionProcessor = null; ProjectionConfig projectionConfig = operatorMap.get(node.getName()); - try { - cls = Class.forName(projectionConfig.getType()); - projectionProcessor = (ProjectionProcessor) cls.newInstance(); - - if (node.getParallelism() > 0) { - projectionConfig.setParallelism(node.getParallelism()); + String className = projectionConfig.getType(); + ProjectionProcessor projectionProcessor; + if (projectionProcessorMap.containsKey(projectionConfig.getType())) { + projectionProcessor = projectionProcessorMap.get(projectionConfig.getType()); + } else { + Class cls; + try { + cls = Class.forName(className); + projectionProcessor = (ProjectionProcessor) cls.newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { + throw new JobExecuteException("get processing pipeline instance failed!", e); } + } + if (node.getParallelism() > 0) { + projectionConfig.setParallelism(node.getParallelism()); + } + try { dataStream = projectionProcessor.projectionProcessorFunction( dataStream, projectionConfig); |
