summaryrefslogtreecommitdiff
path: root/groot-bootstrap
diff options
context:
space:
mode:
author王宽 <[email protected]>2024-08-13 07:30:52 +0000
committer王宽 <[email protected]>2024-08-13 07:30:52 +0000
commit647296e18fc36fab3b1e01bf276115fb1200d0eb (patch)
tree0a86cc562474a3b574bce2638ff3b5ddfbc03d39 /groot-bootstrap
parentab28f6690c1f7f1df7eb28e197f309fedcf37470 (diff)
parentb6d37a57e16fee20b2725333dd9740fbe03dc142 (diff)
Merge branch 'feature/table' into 'develop'
[feature][core][common][bootstrap]支持tableProcessor,增加UnRoll函数 See merge request galaxy/platform/groot-stream!90
Diffstat (limited to 'groot-bootstrap')
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java57
1 files changed, 53 insertions, 4 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
index bd8b75c..66c0b0f 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
@@ -2,15 +2,14 @@ package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson.JSONObject;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
-import com.geedgenetworks.common.config.AggregateConfigOptions;
-import com.geedgenetworks.common.config.CheckConfigUtil;
-import com.geedgenetworks.common.config.CheckResult;
-import com.geedgenetworks.common.config.ProjectionConfigOptions;
+import com.geedgenetworks.common.config.*;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
import com.geedgenetworks.core.pojo.AggregateConfig;
import com.geedgenetworks.core.pojo.ProcessorConfig;
import com.geedgenetworks.core.pojo.ProjectionConfig;
+import com.geedgenetworks.core.pojo.TableConfig;
+import com.geedgenetworks.core.processor.table.TableProcessor;
import com.geedgenetworks.core.processor.aggregate.AggregateProcessor;
import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
import com.typesafe.config.Config;
@@ -35,6 +34,9 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
case "aggregate":
dataStream = executeAggregateProcessor(dataStream, node, (AggregateConfig) processorConfig);
break;
+ case "table":
+ dataStream = executeTableProcessor(dataStream, node, (TableConfig) processorConfig);
+ break;
case "projection":
dataStream = executeProjectionProcessor(dataStream, node, (ProjectionConfig) processorConfig);
break;
@@ -43,7 +45,32 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
}
return dataStream;
}
+ protected SingleOutputStreamOperator executeTableProcessor(SingleOutputStreamOperator dataStream, Node node, TableConfig tableConfig) throws JobExecuteException {
+ TableProcessor tableProcessor;
+ if (processorMap.containsKey(tableConfig.getType())) {
+ tableProcessor = (TableProcessor) processorMap.get(tableConfig.getType());
+ } else {
+ Class cls;
+ try {
+ cls = Class.forName(tableConfig.getType());
+ tableProcessor = (TableProcessor) cls.newInstance();
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
+ throw new JobExecuteException("get processing pipeline instance failed!", e);
+ }
+ }
+ if (node.getParallelism() > 0) {
+ tableConfig.setParallelism(node.getParallelism());
+ }
+ try {
+ dataStream =
+ tableProcessor.processorFunction(
+ dataStream, tableConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig());
+ } catch (Exception e) {
+ throw new JobExecuteException("Create orderby pipeline instance failed!", e);
+ }
+ return dataStream;
+ }
protected SingleOutputStreamOperator executeAggregateProcessor(SingleOutputStreamOperator dataStream, Node node, AggregateConfig aggregateConfig) throws JobExecuteException {
AggregateProcessor aggregateProcessor;
@@ -114,6 +141,9 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
case "aggregate":
projectionConfig = checkAggregateProcessorConfig(key, value, processorsConfig);
break;
+ case "table":
+ projectionConfig = checkTableProcessorConfig(key, value, processorsConfig);
+ break;
default://兼容历史版本
projectionConfig = checkProjectionProcessorConfig(key, value, processorsConfig);
}
@@ -164,5 +194,24 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
return aggregateConfig;
}
+ protected TableConfig checkTableProcessorConfig(String key, Map<String, Object> value, Config config) {
+
+ CheckResult result = CheckConfigUtil.checkAtLeastOneExists(config.getConfig(key),
+ TableConfigOptions.OUTPUT_FIELDS.key(),
+ TableConfigOptions.REMOVE_FIELDS.key(),
+ TableConfigOptions.FUNCTIONS.key());
+ if (!result.isSuccess()) {
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
+ "Table processor: %s, At least one of [%s] should be specified.",
+ key, String.join(",",
+ TableConfigOptions.OUTPUT_FIELDS.key(),
+ TableConfigOptions.REMOVE_FIELDS.key(),
+ TableConfigOptions.FUNCTIONS.key())));
+ }
+
+ TableConfig tableConfig = new JSONObject(value).toJavaObject(TableConfig.class);
+ tableConfig.setName(key);
+ return tableConfig;
+ }
}