diff options
| author | wangkuan <[email protected]> | 2024-08-13 11:10:40 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-08-13 11:10:40 +0800 |
| commit | b6d37a57e16fee20b2725333dd9740fbe03dc142 (patch) | |
| tree | 15c927ccaa218940e082b49c50c3cc70f35b3b01 /groot-bootstrap | |
| parent | 8ff8fb17f7cf59c163452a62d9d3ecb66672c1b8 (diff) | |
[feature][core][common][bootstrap]支持tableProcessor,增加UnRoll函数
Diffstat (limited to 'groot-bootstrap')
| -rw-r--r-- | groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java | 57 |
1 files changed, 53 insertions, 4 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java index bd8b75c..66c0b0f 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java @@ -2,15 +2,14 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson.JSONObject; import com.geedgenetworks.bootstrap.exception.JobExecuteException; -import com.geedgenetworks.common.config.AggregateConfigOptions; -import com.geedgenetworks.common.config.CheckConfigUtil; -import com.geedgenetworks.common.config.CheckResult; -import com.geedgenetworks.common.config.ProjectionConfigOptions; +import com.geedgenetworks.common.config.*; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; import com.geedgenetworks.core.pojo.AggregateConfig; import com.geedgenetworks.core.pojo.ProcessorConfig; import com.geedgenetworks.core.pojo.ProjectionConfig; +import com.geedgenetworks.core.pojo.TableConfig; +import com.geedgenetworks.core.processor.table.TableProcessor; import com.geedgenetworks.core.processor.aggregate.AggregateProcessor; import com.geedgenetworks.core.processor.projection.ProjectionProcessor; import com.typesafe.config.Config; @@ -35,6 +34,9 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, case "aggregate": dataStream = executeAggregateProcessor(dataStream, node, (AggregateConfig) processorConfig); break; + case "table": + dataStream = executeTableProcessor(dataStream, node, (TableConfig) processorConfig); + break; case "projection": dataStream = executeProjectionProcessor(dataStream, node, (ProjectionConfig) processorConfig); break; @@ -43,7 +45,32 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, } return dataStream; } + protected SingleOutputStreamOperator executeTableProcessor(SingleOutputStreamOperator dataStream, Node node, TableConfig tableConfig) throws JobExecuteException { + TableProcessor tableProcessor; + if (processorMap.containsKey(tableConfig.getType())) { + tableProcessor = (TableProcessor) processorMap.get(tableConfig.getType()); + } else { + Class cls; + try { + cls = Class.forName(tableConfig.getType()); + tableProcessor = (TableProcessor) cls.newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { + throw new JobExecuteException("get processing pipeline instance failed!", e); + } + } + if (node.getParallelism() > 0) { + tableConfig.setParallelism(node.getParallelism()); + } + try { + dataStream = + tableProcessor.processorFunction( + dataStream, tableConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig()); + } catch (Exception e) { + throw new JobExecuteException("Create orderby pipeline instance failed!", e); + } + return dataStream; + } protected SingleOutputStreamOperator executeAggregateProcessor(SingleOutputStreamOperator dataStream, Node node, AggregateConfig aggregateConfig) throws JobExecuteException { AggregateProcessor aggregateProcessor; @@ -114,6 +141,9 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, case "aggregate": projectionConfig = checkAggregateProcessorConfig(key, value, processorsConfig); break; + case "table": + projectionConfig = checkTableProcessorConfig(key, value, processorsConfig); + break; default://兼容历史版本 projectionConfig = checkProjectionProcessorConfig(key, value, processorsConfig); } @@ -164,5 +194,24 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, return aggregateConfig; } + protected TableConfig checkTableProcessorConfig(String key, Map<String, Object> value, Config config) { + + CheckResult result = CheckConfigUtil.checkAtLeastOneExists(config.getConfig(key), + TableConfigOptions.OUTPUT_FIELDS.key(), + TableConfigOptions.REMOVE_FIELDS.key(), + TableConfigOptions.FUNCTIONS.key()); + if (!result.isSuccess()) { + throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( + "Table processor: %s, At least one of [%s] should be specified.", + key, String.join(",", + TableConfigOptions.OUTPUT_FIELDS.key(), + TableConfigOptions.REMOVE_FIELDS.key(), + TableConfigOptions.FUNCTIONS.key()))); + } + + TableConfig tableConfig = new JSONObject(value).toJavaObject(TableConfig.class); + tableConfig.setName(key); + return tableConfig; + } } |
