summaryrefslogtreecommitdiff
path: root/groot-bootstrap
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-07-12 11:09:17 +0800
committerwangkuan <[email protected]>2024-07-12 11:09:17 +0800
commit76b5f89c44bded7da01e27ee97065f9bd4c77848 (patch)
tree153369594e57042125a0ca6187f7629430fa11e3 /groot-bootstrap
parent22806af4ffbe6dc7ce644231c20fbf0765c337d5 (diff)
[improve][bootstrap][core] GAL-611 Groot Stream各个处理阶段Type命名与实现类解耦,实例通过spi方式注册,兼容旧版本feature/pipeline-type
Diffstat (limited to 'groot-bootstrap')
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java23
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java35
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java31
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java30
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java28
5 files changed, 100 insertions, 47 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
index 077fee3..04a6a94 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
@@ -1,26 +1,43 @@
package com.geedgenetworks.bootstrap.execution;
import com.geedgenetworks.common.utils.ReflectionUtils;
+import com.geedgenetworks.core.filter.Filter;
+import com.geedgenetworks.core.processor.aggregate.AggregateProcessor;
+import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
import com.typesafe.config.Config;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-
import java.net.URL;
import java.net.URLClassLoader;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.ServiceLoader;
import java.util.function.BiConsumer;
public abstract class AbstractExecutor<K, V>
implements Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> {
protected JobRuntimeEnvironment jobRuntimeEnvironment;
-
protected final Config operatorConfig;
-
protected final Map<K,V> operatorMap;
+ protected final Map<String,Filter> filterMap = new HashMap<>();
+ protected final Map<String,ProjectionProcessor> projectionProcessorMap = new HashMap<>();
+ protected final Map<String, AggregateProcessor> aggregateProcessorMap = new HashMap<>();
protected AbstractExecutor(List<URL> jarPaths, Config operatorConfig) {
this.operatorConfig = operatorConfig;
this.operatorMap = initialize(jarPaths, operatorConfig);
+ ServiceLoader<Filter> filters = ServiceLoader.load(Filter.class);
+ for (Filter filter : filters) {
+ this.filterMap.put(filter.type(), filter);
+ }
+ ServiceLoader<ProjectionProcessor> projectionProcessors = ServiceLoader.load(ProjectionProcessor.class);
+ for (ProjectionProcessor projectionProcessor : projectionProcessors) {
+ this.projectionProcessorMap.put(projectionProcessor.type(), projectionProcessor);
+ }
+ ServiceLoader<AggregateProcessor> aggregateProcessors = ServiceLoader.load(AggregateProcessor.class);
+ for (AggregateProcessor aggregateProcessor : aggregateProcessors) {
+ this.aggregateProcessorMap.put(aggregateProcessor.type(), aggregateProcessor);
+ }
}
@Override
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
index 0897186..506aa11 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
@@ -9,7 +9,7 @@ import com.geedgenetworks.common.config.CheckResult;
import com.geedgenetworks.common.config.FilterConfigOptions;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
-import com.geedgenetworks.core.filter.AviatorFilter;
+import com.geedgenetworks.core.filter.Filter;
import com.geedgenetworks.core.pojo.FilterConfig;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
@@ -21,20 +21,22 @@ import java.util.List;
import java.util.Map;
/**
- * Initialize config and execute filter operator
+ * Initialize config and execute filter operator
*/
@Slf4j
public class FilterExecutor extends AbstractExecutor<String, FilterConfig> {
private static final String PROCESSOR_TYPE = ProcessorType.FILTER.getType();
+
public FilterExecutor(List<URL> jarPaths, Config config) {
super(jarPaths, config);
}
+
@Override
protected Map<String, FilterConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
Map<String, FilterConfig> filterConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.FILTERS)) {
Config filters = operatorConfig.getConfig(Constants.FILTERS);
- filters.root().unwrapped().forEach((key,value) -> {
+ filters.root().unwrapped().forEach((key, value) -> {
CheckResult result = CheckConfigUtil.checkAllExists(filters.getConfig(key),
FilterConfigOptions.TYPE.key(), FilterConfigOptions.PROPERTIES.key());
if (!result.isSuccess()) {
@@ -55,22 +57,29 @@ public class FilterExecutor extends AbstractExecutor<String, FilterConfig> {
public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
FilterConfig filterConfig = operatorMap.get(node.getName());
String className = filterConfig.getType();
- Class cls = null;
- AviatorFilter aviatorFilter = null;
- try {
- cls = Class.forName(className);
- aviatorFilter = (AviatorFilter) cls.newInstance();
- if (node.getParallelism() > 0) {
- filterConfig.setParallelism(node.getParallelism());
- }
+ Filter filter;
+ if (filterMap.containsKey(filterConfig.getType())) {
+ filter = filterMap.get(filterConfig.getType());
+ } else {
+ Class cls;
+ try {
+ cls = Class.forName(className);
+ filter = (Filter) cls.newInstance();
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
+ throw new JobExecuteException("get filter instance failed!", e);
+ }
+ }
+ if (node.getParallelism() > 0) {
+ filterConfig.setParallelism(node.getParallelism());
+ }
+ try {
dataStream =
- aviatorFilter.filterFunction(
+ filter.filterFunction(
dataStream, filterConfig);
} catch (Exception e) {
throw new JobExecuteException("Create filter instance failed!", e);
}
-
return dataStream;
}
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
index 4e45b78..fa5f572 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
@@ -2,7 +2,6 @@ package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson.JSONObject;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
-import com.geedgenetworks.bootstrap.exception.ConfigCheckException;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.config.CheckConfigUtil;
@@ -12,7 +11,6 @@ import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
import com.geedgenetworks.core.pojo.ProjectionConfig;
import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
-import com.geedgenetworks.utils.StringUtil;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -30,12 +28,13 @@ public class PostprocessingExecutor extends AbstractExecutor<String, ProjectionC
public PostprocessingExecutor(List<URL> jarPaths, Config config) {
super(jarPaths, config);
}
+
@Override
protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
Map<String, ProjectionConfig> postprocessingConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.POSTPROCESSING_PIPELINES)) {
Config postprocessors = operatorConfig.getConfig(Constants.POSTPROCESSING_PIPELINES);
- postprocessors.root().unwrapped().forEach((key,value) -> {
+ postprocessors.root().unwrapped().forEach((key, value) -> {
CheckResult result = CheckConfigUtil.checkAllExists(postprocessors.getConfig(key),
ProjectionConfigOptions.TYPE.key());
if (!result.isSuccess()) {
@@ -68,21 +67,29 @@ public class PostprocessingExecutor extends AbstractExecutor<String, ProjectionC
@Override
public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
- Class cls = null;
- ProjectionProcessor projectionProcessor = null;
ProjectionConfig projectionConfig = operatorMap.get(node.getName());
- try {
- cls = Class.forName(projectionConfig.getType());
- projectionProcessor = (ProjectionProcessor) cls.newInstance();
-
- if (node.getParallelism() > 0) {
- projectionConfig.setParallelism(node.getParallelism());
+ String className = projectionConfig.getType();
+ ProjectionProcessor projectionProcessor;
+ if (projectionProcessorMap.containsKey(projectionConfig.getType())) {
+ projectionProcessor = projectionProcessorMap.get(projectionConfig.getType());
+ } else {
+ Class cls;
+ try {
+ cls = Class.forName(className);
+ projectionProcessor = (ProjectionProcessor) cls.newInstance();
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
+ throw new JobExecuteException("get postprocessing pipeline instance failed!", e);
}
+ }
+ if (node.getParallelism() > 0) {
+ projectionConfig.setParallelism(node.getParallelism());
+ }
+ try {
dataStream =
projectionProcessor.projectionProcessorFunction(
dataStream, projectionConfig);
} catch (Exception e) {
- throw new JobExecuteException("create postprocessing pipeline operator error", e);
+ throw new JobExecuteException("Create postprocessing pipeline instance failed!", e);
}
return dataStream;
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
index 84e8718..31fcce8 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
@@ -26,15 +26,17 @@ import java.util.Map;
@Slf4j
public class PreprocessingExecutor extends AbstractExecutor<String, ProjectionConfig> {
private static final String PROCESSOR_TYPE = ProcessorType.PREPROCESSING.getType();
+
public PreprocessingExecutor(List<URL> jarPaths, Config config) {
super(jarPaths, config);
}
+
@Override
protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
Map<String, ProjectionConfig> preprocessingConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.PREPROCESSING_PIPELINES)) {
Config preprocessors = operatorConfig.getConfig(Constants.PREPROCESSING_PIPELINES);
- preprocessors.root().unwrapped().forEach((key,value) -> {
+ preprocessors.root().unwrapped().forEach((key, value) -> {
CheckResult result = CheckConfigUtil.checkAllExists(preprocessors.getConfig(key),
ProjectionConfigOptions.TYPE.key());
if (!result.isSuccess()) {
@@ -67,21 +69,29 @@ public class PreprocessingExecutor extends AbstractExecutor<String, ProjectionCo
@Override
public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
- Class cls = null;
- ProjectionProcessor projectionProcessor = null;
ProjectionConfig projectionConfig = operatorMap.get(node.getName());
- try {
- cls = Class.forName(projectionConfig.getType());
- projectionProcessor = (ProjectionProcessor) cls.newInstance();
-
- if (node.getParallelism() > 0) {
- projectionConfig.setParallelism(node.getParallelism());
+ String className = projectionConfig.getType();
+ ProjectionProcessor projectionProcessor;
+ if (projectionProcessorMap.containsKey(projectionConfig.getType())) {
+ projectionProcessor = projectionProcessorMap.get(projectionConfig.getType());
+ } else {
+ Class cls;
+ try {
+ cls = Class.forName(className);
+ projectionProcessor = (ProjectionProcessor) cls.newInstance();
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
+ throw new JobExecuteException("get preprocessing pipeline instance failed!", e);
}
+ }
+ if (node.getParallelism() > 0) {
+ projectionConfig.setParallelism(node.getParallelism());
+ }
+ try {
dataStream =
projectionProcessor.projectionProcessorFunction(
dataStream, projectionConfig);
} catch (Exception e) {
- throw new JobExecuteException("Create preprocessor pipeline instance failed!", e);
+ throw new JobExecuteException("Create preprocessing pipeline instance failed!", e);
}
return dataStream;
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
index a8798e8..4a3b204 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
@@ -18,6 +18,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
import java.util.List;
import java.util.Map;
+
/**
* Initialize config and execute processor
*/
@@ -27,12 +28,13 @@ public class ProcessingExecutor extends AbstractExecutor<String, ProjectionConfi
public ProcessingExecutor(List<URL> jarPaths, Config config) {
super(jarPaths, config);
}
+
@Override
protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
Map<String, ProjectionConfig> processingConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.PROCESSING_PIPELINES)) {
Config processors = operatorConfig.getConfig(Constants.PROCESSING_PIPELINES);
- processors.root().unwrapped().forEach((key,value) -> {
+ processors.root().unwrapped().forEach((key, value) -> {
CheckResult result = CheckConfigUtil.checkAllExists(processors.getConfig(key),
ProjectionConfigOptions.TYPE.key());
if (!result.isSuccess()) {
@@ -66,16 +68,24 @@ public class ProcessingExecutor extends AbstractExecutor<String, ProjectionConfi
@Override
public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
- Class cls = null;
- ProjectionProcessor projectionProcessor = null;
ProjectionConfig projectionConfig = operatorMap.get(node.getName());
- try {
- cls = Class.forName(projectionConfig.getType());
- projectionProcessor = (ProjectionProcessor) cls.newInstance();
-
- if (node.getParallelism() > 0) {
- projectionConfig.setParallelism(node.getParallelism());
+ String className = projectionConfig.getType();
+ ProjectionProcessor projectionProcessor;
+ if (projectionProcessorMap.containsKey(projectionConfig.getType())) {
+ projectionProcessor = projectionProcessorMap.get(projectionConfig.getType());
+ } else {
+ Class cls;
+ try {
+ cls = Class.forName(className);
+ projectionProcessor = (ProjectionProcessor) cls.newInstance();
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
+ throw new JobExecuteException("get processing pipeline instance failed!", e);
}
+ }
+ if (node.getParallelism() > 0) {
+ projectionConfig.setParallelism(node.getParallelism());
+ }
+ try {
dataStream =
projectionProcessor.projectionProcessorFunction(
dataStream, projectionConfig);