diff options
| author | wangkuan <[email protected]> | 2024-07-12 11:09:17 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-07-12 11:09:17 +0800 |
| commit | 76b5f89c44bded7da01e27ee97065f9bd4c77848 (patch) | |
| tree | 153369594e57042125a0ca6187f7629430fa11e3 | |
| parent | 22806af4ffbe6dc7ce644231c20fbf0765c337d5 (diff) | |
[improve][bootstrap][core] GAL-611 Groot Stream各个处理阶段Type命名与实现类解耦,实例通过spi方式注册,兼容旧版本feature/pipeline-type
15 files changed, 125 insertions, 50 deletions
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml index 52b5001..11ba617 100644 --- a/config/grootstream_job_example.yaml +++ b/config/grootstream_job_example.yaml @@ -8,13 +8,13 @@ sources: filters: filter_operator: - type: com.geedgenetworks.core.filter.AviatorFilter + type: aviator properties: expression: event.server_ip != '12.12.12.12' processing_pipelines: projection_processor: - type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + type: projection remove_fields: [http_request_line, http_response_line, http_response_content_type] functions: - function: DROP diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java index 077fee3..04a6a94 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java @@ -1,26 +1,43 @@ package com.geedgenetworks.bootstrap.execution; import com.geedgenetworks.common.utils.ReflectionUtils; +import com.geedgenetworks.core.filter.Filter; +import com.geedgenetworks.core.processor.aggregate.AggregateProcessor; +import com.geedgenetworks.core.processor.projection.ProjectionProcessor; import com.typesafe.config.Config; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; - import java.net.URL; import java.net.URLClassLoader; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.ServiceLoader; import java.util.function.BiConsumer; public abstract class AbstractExecutor<K, V> implements Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> { protected JobRuntimeEnvironment jobRuntimeEnvironment; - protected final Config operatorConfig; - protected final Map<K,V> operatorMap; + protected final Map<String,Filter> filterMap = new HashMap<>(); + protected final Map<String,ProjectionProcessor> projectionProcessorMap = new HashMap<>(); + protected final Map<String, AggregateProcessor> aggregateProcessorMap = new HashMap<>(); protected AbstractExecutor(List<URL> jarPaths, Config operatorConfig) { this.operatorConfig = operatorConfig; this.operatorMap = initialize(jarPaths, operatorConfig); + ServiceLoader<Filter> filters = ServiceLoader.load(Filter.class); + for (Filter filter : filters) { + this.filterMap.put(filter.type(), filter); + } + ServiceLoader<ProjectionProcessor> projectionProcessors = ServiceLoader.load(ProjectionProcessor.class); + for (ProjectionProcessor projectionProcessor : projectionProcessors) { + this.projectionProcessorMap.put(projectionProcessor.type(), projectionProcessor); + } + ServiceLoader<AggregateProcessor> aggregateProcessors = ServiceLoader.load(AggregateProcessor.class); + for (AggregateProcessor aggregateProcessor : aggregateProcessors) { + this.aggregateProcessorMap.put(aggregateProcessor.type(), aggregateProcessor); + } } @Override diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java index 0897186..506aa11 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java @@ -9,7 +9,7 @@ import com.geedgenetworks.common.config.CheckResult; import com.geedgenetworks.common.config.FilterConfigOptions; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; -import com.geedgenetworks.core.filter.AviatorFilter; +import com.geedgenetworks.core.filter.Filter; import com.geedgenetworks.core.pojo.FilterConfig; import com.google.common.collect.Maps; import com.typesafe.config.Config; @@ -21,20 +21,22 @@ import java.util.List; import java.util.Map; /** - * Initialize config and execute filter operator + * Initialize config and execute filter operator */ @Slf4j public class FilterExecutor extends AbstractExecutor<String, FilterConfig> { private static final String PROCESSOR_TYPE = ProcessorType.FILTER.getType(); + public FilterExecutor(List<URL> jarPaths, Config config) { super(jarPaths, config); } + @Override protected Map<String, FilterConfig> initialize(List<URL> jarPaths, Config operatorConfig) { Map<String, FilterConfig> filterConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.FILTERS)) { Config filters = operatorConfig.getConfig(Constants.FILTERS); - filters.root().unwrapped().forEach((key,value) -> { + filters.root().unwrapped().forEach((key, value) -> { CheckResult result = CheckConfigUtil.checkAllExists(filters.getConfig(key), FilterConfigOptions.TYPE.key(), FilterConfigOptions.PROPERTIES.key()); if (!result.isSuccess()) { @@ -55,22 +57,29 @@ public class FilterExecutor extends AbstractExecutor<String, FilterConfig> { public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { FilterConfig filterConfig = operatorMap.get(node.getName()); String className = filterConfig.getType(); - Class cls = null; - AviatorFilter aviatorFilter = null; - try { - cls = Class.forName(className); - aviatorFilter = (AviatorFilter) cls.newInstance(); - if (node.getParallelism() > 0) { - filterConfig.setParallelism(node.getParallelism()); - } + Filter filter; + if (filterMap.containsKey(filterConfig.getType())) { + filter = filterMap.get(filterConfig.getType()); + } else { + Class cls; + try { + cls = Class.forName(className); + filter = (Filter) cls.newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { + throw new JobExecuteException("get filter instance failed!", e); + } + } + if (node.getParallelism() > 0) { + filterConfig.setParallelism(node.getParallelism()); + } + try { dataStream = - aviatorFilter.filterFunction( + filter.filterFunction( dataStream, filterConfig); } catch (Exception e) { throw new JobExecuteException("Create filter instance failed!", e); } - return dataStream; } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java index 4e45b78..fa5f572 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java @@ -2,7 +2,6 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson.JSONObject; import com.geedgenetworks.bootstrap.enums.ProcessorType; -import com.geedgenetworks.bootstrap.exception.ConfigCheckException; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.CheckConfigUtil; @@ -12,7 +11,6 @@ import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; import com.geedgenetworks.core.pojo.ProjectionConfig; import com.geedgenetworks.core.processor.projection.ProjectionProcessor; -import com.geedgenetworks.utils.StringUtil; import com.google.common.collect.Maps; import com.typesafe.config.Config; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -30,12 +28,13 @@ public class PostprocessingExecutor extends AbstractExecutor<String, ProjectionC public PostprocessingExecutor(List<URL> jarPaths, Config config) { super(jarPaths, config); } + @Override protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) { Map<String, ProjectionConfig> postprocessingConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.POSTPROCESSING_PIPELINES)) { Config postprocessors = operatorConfig.getConfig(Constants.POSTPROCESSING_PIPELINES); - postprocessors.root().unwrapped().forEach((key,value) -> { + postprocessors.root().unwrapped().forEach((key, value) -> { CheckResult result = CheckConfigUtil.checkAllExists(postprocessors.getConfig(key), ProjectionConfigOptions.TYPE.key()); if (!result.isSuccess()) { @@ -68,21 +67,29 @@ public class PostprocessingExecutor extends AbstractExecutor<String, ProjectionC @Override public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { - Class cls = null; - ProjectionProcessor projectionProcessor = null; ProjectionConfig projectionConfig = operatorMap.get(node.getName()); - try { - cls = Class.forName(projectionConfig.getType()); - projectionProcessor = (ProjectionProcessor) cls.newInstance(); - - if (node.getParallelism() > 0) { - projectionConfig.setParallelism(node.getParallelism()); + String className = projectionConfig.getType(); + ProjectionProcessor projectionProcessor; + if (projectionProcessorMap.containsKey(projectionConfig.getType())) { + projectionProcessor = projectionProcessorMap.get(projectionConfig.getType()); + } else { + Class cls; + try { + cls = Class.forName(className); + projectionProcessor = (ProjectionProcessor) cls.newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { + throw new JobExecuteException("get postprocessing pipeline instance failed!", e); } + } + if (node.getParallelism() > 0) { + projectionConfig.setParallelism(node.getParallelism()); + } + try { dataStream = projectionProcessor.projectionProcessorFunction( dataStream, projectionConfig); } catch (Exception e) { - throw new JobExecuteException("create postprocessing pipeline operator error", e); + throw new JobExecuteException("Create postprocessing pipeline instance failed!", e); } return dataStream; } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java index 84e8718..31fcce8 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java @@ -26,15 +26,17 @@ import java.util.Map; @Slf4j public class PreprocessingExecutor extends AbstractExecutor<String, ProjectionConfig> { private static final String PROCESSOR_TYPE = ProcessorType.PREPROCESSING.getType(); + public PreprocessingExecutor(List<URL> jarPaths, Config config) { super(jarPaths, config); } + @Override protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) { Map<String, ProjectionConfig> preprocessingConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.PREPROCESSING_PIPELINES)) { Config preprocessors = operatorConfig.getConfig(Constants.PREPROCESSING_PIPELINES); - preprocessors.root().unwrapped().forEach((key,value) -> { + preprocessors.root().unwrapped().forEach((key, value) -> { CheckResult result = CheckConfigUtil.checkAllExists(preprocessors.getConfig(key), ProjectionConfigOptions.TYPE.key()); if (!result.isSuccess()) { @@ -67,21 +69,29 @@ public class PreprocessingExecutor extends AbstractExecutor<String, ProjectionCo @Override public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { - Class cls = null; - ProjectionProcessor projectionProcessor = null; ProjectionConfig projectionConfig = operatorMap.get(node.getName()); - try { - cls = Class.forName(projectionConfig.getType()); - projectionProcessor = (ProjectionProcessor) cls.newInstance(); - - if (node.getParallelism() > 0) { - projectionConfig.setParallelism(node.getParallelism()); + String className = projectionConfig.getType(); + ProjectionProcessor projectionProcessor; + if (projectionProcessorMap.containsKey(projectionConfig.getType())) { + projectionProcessor = projectionProcessorMap.get(projectionConfig.getType()); + } else { + Class cls; + try { + cls = Class.forName(className); + projectionProcessor = (ProjectionProcessor) cls.newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { + throw new JobExecuteException("get preprocessing pipeline instance failed!", e); } + } + if (node.getParallelism() > 0) { + projectionConfig.setParallelism(node.getParallelism()); + } + try { dataStream = projectionProcessor.projectionProcessorFunction( dataStream, projectionConfig); } catch (Exception e) { - throw new JobExecuteException("Create preprocessor pipeline instance failed!", e); + throw new JobExecuteException("Create preprocessing pipeline instance failed!", e); } return dataStream; } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java index a8798e8..4a3b204 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java @@ -18,6 +18,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; import java.util.List; import java.util.Map; + /** * Initialize config and execute processor */ @@ -27,12 +28,13 @@ public class ProcessingExecutor extends AbstractExecutor<String, ProjectionConfi public ProcessingExecutor(List<URL> jarPaths, Config config) { super(jarPaths, config); } + @Override protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) { Map<String, ProjectionConfig> processingConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.PROCESSING_PIPELINES)) { Config processors = operatorConfig.getConfig(Constants.PROCESSING_PIPELINES); - processors.root().unwrapped().forEach((key,value) -> { + processors.root().unwrapped().forEach((key, value) -> { CheckResult result = CheckConfigUtil.checkAllExists(processors.getConfig(key), ProjectionConfigOptions.TYPE.key()); if (!result.isSuccess()) { @@ -66,16 +68,24 @@ public class ProcessingExecutor extends AbstractExecutor<String, ProjectionConfi @Override public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { - Class cls = null; - ProjectionProcessor projectionProcessor = null; ProjectionConfig projectionConfig = operatorMap.get(node.getName()); - try { - cls = Class.forName(projectionConfig.getType()); - projectionProcessor = (ProjectionProcessor) cls.newInstance(); - - if (node.getParallelism() > 0) { - projectionConfig.setParallelism(node.getParallelism()); + String className = projectionConfig.getType(); + ProjectionProcessor projectionProcessor; + if (projectionProcessorMap.containsKey(projectionConfig.getType())) { + projectionProcessor = projectionProcessorMap.get(projectionConfig.getType()); + } else { + Class cls; + try { + cls = Class.forName(className); + projectionProcessor = (ProjectionProcessor) cls.newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { + throw new JobExecuteException("get processing pipeline instance failed!", e); } + } + if (node.getParallelism() > 0) { + projectionConfig.setParallelism(node.getParallelism()); + } + try { dataStream = projectionProcessor.projectionProcessorFunction( dataStream, projectionConfig); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java b/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java index dae6f94..668ba6f 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java @@ -22,4 +22,10 @@ public class AviatorFilter implements Filter { .name(FilterConfig.getName()); } } + + @Override + public String type() { + return "aviator"; + } + } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java b/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java index b2483e8..a173438 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java @@ -10,4 +10,5 @@ public interface Filter { SingleOutputStreamOperator<Event> filterFunction( SingleOutputStreamOperator<Event> singleOutputStreamOperator, FilterConfig FilterConfig) throws Exception; + String type(); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java index 70783bd..bbf7d8d 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java @@ -11,4 +11,5 @@ public interface AggregateProcessor { SingleOutputStreamOperator<Event> singleOutputStreamOperator, AggregateConfig aggregateConfig) throws Exception; + String type(); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java index f7e6dab..f6fa9d5 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java @@ -28,4 +28,9 @@ public class AggregateProcessorImpl implements AggregateProcessor { }*/ return singleOutputStreamOperator; } + + @Override + public String type() { + return "aggregate"; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java index e97ab98..862ba5e 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java @@ -11,4 +11,5 @@ public interface ProjectionProcessor { SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, ProjectionConfig projectionConfig) throws Exception; + String type(); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java index 5f6d0fe..79b0e0d 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java @@ -11,7 +11,7 @@ public class ProjectionProcessorImpl implements ProjectionProcessor { public SingleOutputStreamOperator<Event> projectionProcessorFunction( SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, ProjectionConfig projectionConfig) - throws Exception { + throws Exception{ if (projectionConfig.getParallelism() != 0) { return grootEventSingleOutputStreamOperator @@ -24,4 +24,9 @@ public class ProjectionProcessorImpl implements ProjectionProcessor { .name(projectionConfig.getName()); } } + + @Override + public String type() { + return "projection"; + } } diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.filter.Filter b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.filter.Filter new file mode 100644 index 0000000..2268533 --- /dev/null +++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.filter.Filter @@ -0,0 +1 @@ +com.geedgenetworks.core.filter.AviatorFilter
\ No newline at end of file diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.aggregate.AggregateProcessor b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.aggregate.AggregateProcessor new file mode 100644 index 0000000..426a1a9 --- /dev/null +++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.aggregate.AggregateProcessor @@ -0,0 +1 @@ +com.geedgenetworks.core.processor.aggregate.AggregateProcessorImpl
\ No newline at end of file diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.projection.ProjectionProcessor b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.projection.ProjectionProcessor new file mode 100644 index 0000000..ede2c8c --- /dev/null +++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.projection.ProjectionProcessor @@ -0,0 +1 @@ +com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
\ No newline at end of file |
