summaryrefslogtreecommitdiff
path: root/groot-bootstrap
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-07-23 15:39:48 +0800
committerwangkuan <[email protected]>2024-07-23 15:39:48 +0800
commit6e558c28ce9f07f58e9adcbbd802b586c6a179da (patch)
treecde93730383c2a7e1d944f50bd2683af7fc9fa8b /groot-bootstrap
parent76b5f89c44bded7da01e27ee97065f9bd4c77848 (diff)
[feature][bootstrap][core][common]支持自定义聚合函数,udf接口重命名为scalarFunction,时间戳转换函数支持设置interval
Diffstat (limited to 'groot-bootstrap')
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java168
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java69
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java68
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java68
4 files changed, 188 insertions, 185 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
new file mode 100644
index 0000000..42a4828
--- /dev/null
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
@@ -0,0 +1,168 @@
+package com.geedgenetworks.bootstrap.execution;
+
+import com.alibaba.fastjson.JSONObject;
+import com.geedgenetworks.bootstrap.exception.JobExecuteException;
+import com.geedgenetworks.common.config.AggregateConfigOptions;
+import com.geedgenetworks.common.config.CheckConfigUtil;
+import com.geedgenetworks.common.config.CheckResult;
+import com.geedgenetworks.common.config.ProjectionConfigOptions;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.ConfigValidationException;
+import com.geedgenetworks.core.pojo.AggregateConfig;
+import com.geedgenetworks.core.pojo.ProcessorConfig;
+import com.geedgenetworks.core.pojo.ProjectionConfig;
+import com.geedgenetworks.core.processor.aggregate.AggregateProcessor;
+import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
+import com.typesafe.config.Config;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, ProcessorConfig> {
+
+
+ protected AbstractProcessorExecutor(List<URL> jarPaths, Config operatorConfig) {
+ super(jarPaths, operatorConfig);
+ }
+
+ @Override
+ public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
+
+ ProcessorConfig processorConfig = operatorMap.get(node.getName());
+ switch (processorConfig.getType()) {
+ case "aggregate":
+ dataStream = executeAggregateProcessor(dataStream, node, (AggregateConfig) processorConfig);
+ break;
+ case "projection":
+ dataStream = executeProjectionProcessor(dataStream, node, (ProjectionConfig) processorConfig);
+ break;
+ default:// 兼容历史版本
+ dataStream = executeProjectionProcessor(dataStream, node, (ProjectionConfig) processorConfig);
+ }
+ return dataStream;
+ }
+
+ protected SingleOutputStreamOperator executeAggregateProcessor(SingleOutputStreamOperator dataStream, Node node, AggregateConfig aggregateConfig) throws JobExecuteException {
+
+ AggregateProcessor aggregateProcessor;
+ if (aggregateProcessorMap.containsKey(aggregateConfig.getType())) {
+ aggregateProcessor = aggregateProcessorMap.get(aggregateConfig.getType());
+ } else {
+ Class cls;
+ try {
+ cls = Class.forName(aggregateConfig.getType());
+ aggregateProcessor = (AggregateProcessor) cls.newInstance();
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
+ throw new JobExecuteException("get processing pipeline instance failed!", e);
+ }
+ }
+ if (node.getParallelism() > 0) {
+ aggregateConfig.setParallelism(node.getParallelism());
+ }
+ try {
+ dataStream =
+ aggregateProcessor.aggregateProcessorFunction(
+ dataStream, aggregateConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig());
+ } catch (Exception e) {
+ throw new JobExecuteException("Create aggregate pipeline instance failed!", e);
+ }
+ return dataStream;
+ }
+
+ protected SingleOutputStreamOperator executeProjectionProcessor(SingleOutputStreamOperator dataStream, Node node, ProjectionConfig projectionConfig) throws JobExecuteException {
+
+ ProjectionProcessor projectionProcessor;
+ if (projectionProcessorMap.containsKey(projectionConfig.getType())) {
+ projectionProcessor = projectionProcessorMap.get(projectionConfig.getType());
+ } else {
+ Class cls;
+ try {
+ cls = Class.forName(projectionConfig.getType());
+ projectionProcessor = (ProjectionProcessor) cls.newInstance();
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
+ throw new JobExecuteException("get processing pipeline instance failed!", e);
+ }
+ }
+ if (node.getParallelism() > 0) {
+ projectionConfig.setParallelism(node.getParallelism());
+ }
+ try {
+ dataStream =
+ projectionProcessor.projectionProcessorFunction(
+ dataStream, projectionConfig);
+ } catch (Exception e) {
+ throw new JobExecuteException("Create processing pipeline instance failed!", e);
+ }
+ return dataStream;
+ }
+
+ protected ProcessorConfig checkProcessorConfig(String key, Map<String, Object> value, Config processorsConfig) {
+ ProcessorConfig projectionConfig;
+ CheckResult result = CheckConfigUtil.checkAllExists(processorsConfig.getConfig(key),
+ ProjectionConfigOptions.TYPE.key());
+ if (!result.isSuccess()) {
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
+ "Postprocessor: %s, Message: %s",
+ key, result.getMsg()));
+ }
+ switch ((String) value.getOrDefault("type", "")) {
+ case "projection":
+ projectionConfig = checkProjectionProcessorConfig(key, value, processorsConfig);
+ break;
+ case "aggregate":
+ projectionConfig = checkAggregateProcessorConfig(key, value, processorsConfig);
+ break;
+ default://兼容历史版本
+ projectionConfig = checkProjectionProcessorConfig(key, value, processorsConfig);
+ }
+ return projectionConfig;
+ }
+
+ protected ProcessorConfig checkProjectionProcessorConfig(String key, Map<String, Object> value, Config projectionProcessors) {
+
+ CheckResult result = CheckConfigUtil.checkAtLeastOneExists(projectionProcessors.getConfig(key),
+ ProjectionConfigOptions.OUTPUT_FIELDS.key(),
+ ProjectionConfigOptions.REMOVE_FIELDS.key(),
+ ProjectionConfigOptions.FUNCTIONS.key());
+ if (!result.isSuccess()) {
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
+ "Processor: %s, At least one of [%s] should be specified.",
+ key, String.join(",",
+ ProjectionConfigOptions.OUTPUT_FIELDS.key(),
+ ProjectionConfigOptions.REMOVE_FIELDS.key(),
+ ProjectionConfigOptions.FUNCTIONS.key())));
+ }
+
+ ProjectionConfig projectionConfig = new JSONObject(value).toJavaObject(ProjectionConfig.class);
+ projectionConfig.setName(key);
+
+ return projectionConfig;
+ }
+
+
+ protected AggregateConfig checkAggregateProcessorConfig(String key, Map<String, Object> value, Config aggregateProcessorsConfig) {
+
+
+ CheckResult result = CheckConfigUtil.checkAllExists(aggregateProcessorsConfig.getConfig(key),
+ AggregateConfigOptions.GROUP_BY_FIELDS.key(),
+ AggregateConfigOptions.WINDOW_TYPE.key(),
+ AggregateConfigOptions.FUNCTIONS.key(),
+ AggregateConfigOptions.WINDOW_SIZE.key());
+ if (!result.isSuccess()) {
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
+ "Aggregate processor: %s, At least one of [%s] should be specified.",
+ key, String.join(",",
+ AggregateConfigOptions.OUTPUT_FIELDS.key(),
+ AggregateConfigOptions.REMOVE_FIELDS.key(),
+ AggregateConfigOptions.FUNCTIONS.key())));
+ }
+
+ AggregateConfig aggregateConfig = new JSONObject(value).toJavaObject(AggregateConfig.class);
+ aggregateConfig.setName(key);
+ return aggregateConfig;
+ }
+
+
+}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
index fa5f572..36fad61 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
@@ -1,16 +1,9 @@
package com.geedgenetworks.bootstrap.execution;
-import com.alibaba.fastjson.JSONObject;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Constants;
-import com.geedgenetworks.common.config.CheckConfigUtil;
-import com.geedgenetworks.common.config.CheckResult;
-import com.geedgenetworks.common.config.ProjectionConfigOptions;
-import com.geedgenetworks.common.exception.CommonErrorCode;
-import com.geedgenetworks.common.exception.ConfigValidationException;
-import com.geedgenetworks.core.pojo.ProjectionConfig;
-import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
+import com.geedgenetworks.core.pojo.ProcessorConfig;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -22,7 +15,7 @@ import java.util.Map;
/**
* Initialize config and execute postprocessor
*/
-public class PostprocessingExecutor extends AbstractExecutor<String, ProjectionConfig> {
+public class PostprocessingExecutor extends AbstractProcessorExecutor {
private static final String PROCESSOR_TYPE = ProcessorType.POSTPROCESSING.getType();
public PostprocessingExecutor(List<URL> jarPaths, Config config) {
@@ -30,68 +23,20 @@ public class PostprocessingExecutor extends AbstractExecutor<String, ProjectionC
}
@Override
- protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
- Map<String, ProjectionConfig> postprocessingConfigMap = Maps.newHashMap();
+ protected Map<String, ProcessorConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
+ Map<String, ProcessorConfig> postprocessingConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.POSTPROCESSING_PIPELINES)) {
Config postprocessors = operatorConfig.getConfig(Constants.POSTPROCESSING_PIPELINES);
postprocessors.root().unwrapped().forEach((key, value) -> {
- CheckResult result = CheckConfigUtil.checkAllExists(postprocessors.getConfig(key),
- ProjectionConfigOptions.TYPE.key());
- if (!result.isSuccess()) {
- throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
- "Postprocessor: %s, Message: %s",
- key, result.getMsg()));
- }
- result = CheckConfigUtil.checkAtLeastOneExists(postprocessors.getConfig(key),
- ProjectionConfigOptions.OUTPUT_FIELDS.key(),
- ProjectionConfigOptions.REMOVE_FIELDS.key(),
- ProjectionConfigOptions.FUNCTIONS.key());
- if (!result.isSuccess()) {
- throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
- "Postprocessor: %s, At least one of [%s] should be specified.",
- key, String.join(",",
- ProjectionConfigOptions.OUTPUT_FIELDS.key(),
- ProjectionConfigOptions.REMOVE_FIELDS.key(),
- ProjectionConfigOptions.FUNCTIONS.key())));
- }
-
- ProjectionConfig projectionConfig = new JSONObject((Map<String, Object>) value).toJavaObject(ProjectionConfig.class);
- projectionConfig.setName(key);
- postprocessingConfigMap.put(key, projectionConfig);
+ postprocessingConfigMap.put(key, checkProcessorConfig(key, (Map<String, Object>) value, postprocessors));
});
-
}
-
return postprocessingConfigMap;
}
+
@Override
public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
- ProjectionConfig projectionConfig = operatorMap.get(node.getName());
- String className = projectionConfig.getType();
- ProjectionProcessor projectionProcessor;
- if (projectionProcessorMap.containsKey(projectionConfig.getType())) {
- projectionProcessor = projectionProcessorMap.get(projectionConfig.getType());
- } else {
- Class cls;
- try {
- cls = Class.forName(className);
- projectionProcessor = (ProjectionProcessor) cls.newInstance();
- } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
- throw new JobExecuteException("get postprocessing pipeline instance failed!", e);
- }
- }
- if (node.getParallelism() > 0) {
- projectionConfig.setParallelism(node.getParallelism());
- }
- try {
- dataStream =
- projectionProcessor.projectionProcessorFunction(
- dataStream, projectionConfig);
- } catch (Exception e) {
- throw new JobExecuteException("Create postprocessing pipeline instance failed!", e);
- }
- return dataStream;
+ return super.execute(dataStream, node);
}
-
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
index 31fcce8..b1e53e4 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
@@ -1,16 +1,9 @@
package com.geedgenetworks.bootstrap.execution;
-import com.alibaba.fastjson.JSONObject;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Constants;
-import com.geedgenetworks.common.config.CheckConfigUtil;
-import com.geedgenetworks.common.config.CheckResult;
-import com.geedgenetworks.common.config.ProjectionConfigOptions;
-import com.geedgenetworks.common.exception.CommonErrorCode;
-import com.geedgenetworks.common.exception.ConfigValidationException;
-import com.geedgenetworks.core.pojo.ProjectionConfig;
-import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
+import com.geedgenetworks.core.pojo.ProcessorConfig;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
@@ -24,7 +17,7 @@ import java.util.Map;
* Initialize config and execute preprocessor
*/
@Slf4j
-public class PreprocessingExecutor extends AbstractExecutor<String, ProjectionConfig> {
+public class PreprocessingExecutor extends AbstractProcessorExecutor {
private static final String PROCESSOR_TYPE = ProcessorType.PREPROCESSING.getType();
public PreprocessingExecutor(List<URL> jarPaths, Config config) {
@@ -32,67 +25,20 @@ public class PreprocessingExecutor extends AbstractExecutor<String, ProjectionCo
}
@Override
- protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
- Map<String, ProjectionConfig> preprocessingConfigMap = Maps.newHashMap();
+ protected Map<String, ProcessorConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
+ Map<String, ProcessorConfig> preprocessingConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.PREPROCESSING_PIPELINES)) {
Config preprocessors = operatorConfig.getConfig(Constants.PREPROCESSING_PIPELINES);
preprocessors.root().unwrapped().forEach((key, value) -> {
- CheckResult result = CheckConfigUtil.checkAllExists(preprocessors.getConfig(key),
- ProjectionConfigOptions.TYPE.key());
- if (!result.isSuccess()) {
- throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
- "Preprocessor: %s, Message: %s",
- key, result.getMsg()));
- }
-
- result = CheckConfigUtil.checkAtLeastOneExists(preprocessors.getConfig(key),
- ProjectionConfigOptions.OUTPUT_FIELDS.key(),
- ProjectionConfigOptions.REMOVE_FIELDS.key(),
- ProjectionConfigOptions.FUNCTIONS.key());
- if (!result.isSuccess()) {
- throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
- "Preprocessor: %s, At least one of [%s] should be specified.",
- key, String.join(",",
- ProjectionConfigOptions.OUTPUT_FIELDS.key(),
- ProjectionConfigOptions.REMOVE_FIELDS.key(),
- ProjectionConfigOptions.FUNCTIONS.key())));
- }
-
- ProjectionConfig projectionConfig = new JSONObject((Map<String, Object>) value).toJavaObject(ProjectionConfig.class);
- projectionConfig.setName(key);
- preprocessingConfigMap.put(key, projectionConfig);
+ preprocessingConfigMap.put(key, checkProcessorConfig(key, (Map<String, Object>) value, preprocessors));
});
}
-
return preprocessingConfigMap;
}
@Override
public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
- ProjectionConfig projectionConfig = operatorMap.get(node.getName());
- String className = projectionConfig.getType();
- ProjectionProcessor projectionProcessor;
- if (projectionProcessorMap.containsKey(projectionConfig.getType())) {
- projectionProcessor = projectionProcessorMap.get(projectionConfig.getType());
- } else {
- Class cls;
- try {
- cls = Class.forName(className);
- projectionProcessor = (ProjectionProcessor) cls.newInstance();
- } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
- throw new JobExecuteException("get preprocessing pipeline instance failed!", e);
- }
- }
- if (node.getParallelism() > 0) {
- projectionConfig.setParallelism(node.getParallelism());
- }
- try {
- dataStream =
- projectionProcessor.projectionProcessorFunction(
- dataStream, projectionConfig);
- } catch (Exception e) {
- throw new JobExecuteException("Create preprocessing pipeline instance failed!", e);
- }
- return dataStream;
+ return super.execute(dataStream, node);
+
}
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
index 4a3b204..d69fe8c 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
@@ -1,16 +1,9 @@
package com.geedgenetworks.bootstrap.execution;
-import com.alibaba.fastjson.JSONObject;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Constants;
-import com.geedgenetworks.common.config.CheckConfigUtil;
-import com.geedgenetworks.common.config.CheckResult;
-import com.geedgenetworks.common.config.ProjectionConfigOptions;
-import com.geedgenetworks.common.exception.CommonErrorCode;
-import com.geedgenetworks.common.exception.ConfigValidationException;
-import com.geedgenetworks.core.pojo.ProjectionConfig;
-import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
+import com.geedgenetworks.core.pojo.ProcessorConfig;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -22,7 +15,7 @@ import java.util.Map;
/**
* Initialize config and execute processor
*/
-public class ProcessingExecutor extends AbstractExecutor<String, ProjectionConfig> {
+public class ProcessingExecutor extends AbstractProcessorExecutor {
private static final String PROCESSOR_TYPE = ProcessorType.PROCESSING.getType();
public ProcessingExecutor(List<URL> jarPaths, Config config) {
@@ -30,69 +23,20 @@ public class ProcessingExecutor extends AbstractExecutor<String, ProjectionConfi
}
@Override
- protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
- Map<String, ProjectionConfig> processingConfigMap = Maps.newHashMap();
+ protected Map<String, ProcessorConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
+ Map<String, ProcessorConfig> processingConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.PROCESSING_PIPELINES)) {
Config processors = operatorConfig.getConfig(Constants.PROCESSING_PIPELINES);
processors.root().unwrapped().forEach((key, value) -> {
- CheckResult result = CheckConfigUtil.checkAllExists(processors.getConfig(key),
- ProjectionConfigOptions.TYPE.key());
- if (!result.isSuccess()) {
- throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
- "Processor: %s, Message: %s",
- key, result.getMsg()));
- }
- result = CheckConfigUtil.checkAtLeastOneExists(processors.getConfig(key),
- ProjectionConfigOptions.OUTPUT_FIELDS.key(),
- ProjectionConfigOptions.REMOVE_FIELDS.key(),
- ProjectionConfigOptions.FUNCTIONS.key());
- if (!result.isSuccess()) {
- throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
- "Processor: %s, At least one of [%s] should be specified.",
- key, String.join(",",
- ProjectionConfigOptions.OUTPUT_FIELDS.key(),
- ProjectionConfigOptions.REMOVE_FIELDS.key(),
- ProjectionConfigOptions.FUNCTIONS.key())));
- }
-
- ProjectionConfig projectionConfig = new JSONObject((Map<String, Object>) value).toJavaObject(ProjectionConfig.class);
- projectionConfig.setName(key);
- processingConfigMap.put(key, projectionConfig);
+ processingConfigMap.put(key, checkProcessorConfig(key, (Map<String, Object>) value, processors));
});
-
}
-
return processingConfigMap;
}
@Override
public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
- ProjectionConfig projectionConfig = operatorMap.get(node.getName());
- String className = projectionConfig.getType();
- ProjectionProcessor projectionProcessor;
- if (projectionProcessorMap.containsKey(projectionConfig.getType())) {
- projectionProcessor = projectionProcessorMap.get(projectionConfig.getType());
- } else {
- Class cls;
- try {
- cls = Class.forName(className);
- projectionProcessor = (ProjectionProcessor) cls.newInstance();
- } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
- throw new JobExecuteException("get processing pipeline instance failed!", e);
- }
- }
- if (node.getParallelism() > 0) {
- projectionConfig.setParallelism(node.getParallelism());
- }
- try {
- dataStream =
- projectionProcessor.projectionProcessorFunction(
- dataStream, projectionConfig);
- } catch (Exception e) {
- throw new JobExecuteException("Create processing pipeline instance failed!", e);
- }
- return dataStream;
+ return super.execute(dataStream, node);
}
-
}