diff options
| author | wangkuan <[email protected]> | 2024-07-23 15:39:48 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-07-23 15:39:48 +0800 |
| commit | 6e558c28ce9f07f58e9adcbbd802b586c6a179da (patch) | |
| tree | cde93730383c2a7e1d944f50bd2683af7fc9fa8b /groot-bootstrap | |
| parent | 76b5f89c44bded7da01e27ee97065f9bd4c77848 (diff) | |
[feature][bootstrap][core][common]支持自定义聚合函数,udf接口重命名为scalarFunction,时间戳转换函数支持设置interval
Diffstat (limited to 'groot-bootstrap')
4 files changed, 188 insertions, 185 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java new file mode 100644 index 0000000..42a4828 --- /dev/null +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java @@ -0,0 +1,168 @@ +package com.geedgenetworks.bootstrap.execution; + +import com.alibaba.fastjson.JSONObject; +import com.geedgenetworks.bootstrap.exception.JobExecuteException; +import com.geedgenetworks.common.config.AggregateConfigOptions; +import com.geedgenetworks.common.config.CheckConfigUtil; +import com.geedgenetworks.common.config.CheckResult; +import com.geedgenetworks.common.config.ProjectionConfigOptions; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.ConfigValidationException; +import com.geedgenetworks.core.pojo.AggregateConfig; +import com.geedgenetworks.core.pojo.ProcessorConfig; +import com.geedgenetworks.core.pojo.ProjectionConfig; +import com.geedgenetworks.core.processor.aggregate.AggregateProcessor; +import com.geedgenetworks.core.processor.projection.ProjectionProcessor; +import com.typesafe.config.Config; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; + +import java.net.URL; +import java.util.List; +import java.util.Map; + +public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, ProcessorConfig> { + + + protected AbstractProcessorExecutor(List<URL> jarPaths, Config operatorConfig) { + super(jarPaths, operatorConfig); + } + + @Override + public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { + + ProcessorConfig processorConfig = operatorMap.get(node.getName()); + switch (processorConfig.getType()) { + case "aggregate": + dataStream = executeAggregateProcessor(dataStream, node, (AggregateConfig) processorConfig); + break; + case "projection": + dataStream = executeProjectionProcessor(dataStream, node, (ProjectionConfig) processorConfig); + break; + default:// 兼容历史版本 + dataStream = executeProjectionProcessor(dataStream, node, (ProjectionConfig) processorConfig); + } + return dataStream; + } + + protected SingleOutputStreamOperator executeAggregateProcessor(SingleOutputStreamOperator dataStream, Node node, AggregateConfig aggregateConfig) throws JobExecuteException { + + AggregateProcessor aggregateProcessor; + if (aggregateProcessorMap.containsKey(aggregateConfig.getType())) { + aggregateProcessor = aggregateProcessorMap.get(aggregateConfig.getType()); + } else { + Class cls; + try { + cls = Class.forName(aggregateConfig.getType()); + aggregateProcessor = (AggregateProcessor) cls.newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { + throw new JobExecuteException("get processing pipeline instance failed!", e); + } + } + if (node.getParallelism() > 0) { + aggregateConfig.setParallelism(node.getParallelism()); + } + try { + dataStream = + aggregateProcessor.aggregateProcessorFunction( + dataStream, aggregateConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig()); + } catch (Exception e) { + throw new JobExecuteException("Create aggregate pipeline instance failed!", e); + } + return dataStream; + } + + protected SingleOutputStreamOperator executeProjectionProcessor(SingleOutputStreamOperator dataStream, Node node, ProjectionConfig projectionConfig) throws JobExecuteException { + + ProjectionProcessor projectionProcessor; + if (projectionProcessorMap.containsKey(projectionConfig.getType())) { + projectionProcessor = projectionProcessorMap.get(projectionConfig.getType()); + } else { + Class cls; + try { + cls = Class.forName(projectionConfig.getType()); + projectionProcessor = (ProjectionProcessor) cls.newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { + throw new JobExecuteException("get processing pipeline instance failed!", e); + } + } + if (node.getParallelism() > 0) { + projectionConfig.setParallelism(node.getParallelism()); + } + try { + dataStream = + projectionProcessor.projectionProcessorFunction( + dataStream, projectionConfig); + } catch (Exception e) { + throw new JobExecuteException("Create processing pipeline instance failed!", e); + } + return dataStream; + } + + protected ProcessorConfig checkProcessorConfig(String key, Map<String, Object> value, Config processorsConfig) { + ProcessorConfig projectionConfig; + CheckResult result = CheckConfigUtil.checkAllExists(processorsConfig.getConfig(key), + ProjectionConfigOptions.TYPE.key()); + if (!result.isSuccess()) { + throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( + "Postprocessor: %s, Message: %s", + key, result.getMsg())); + } + switch ((String) value.getOrDefault("type", "")) { + case "projection": + projectionConfig = checkProjectionProcessorConfig(key, value, processorsConfig); + break; + case "aggregate": + projectionConfig = checkAggregateProcessorConfig(key, value, processorsConfig); + break; + default://兼容历史版本 + projectionConfig = checkProjectionProcessorConfig(key, value, processorsConfig); + } + return projectionConfig; + } + + protected ProcessorConfig checkProjectionProcessorConfig(String key, Map<String, Object> value, Config projectionProcessors) { + + CheckResult result = CheckConfigUtil.checkAtLeastOneExists(projectionProcessors.getConfig(key), + ProjectionConfigOptions.OUTPUT_FIELDS.key(), + ProjectionConfigOptions.REMOVE_FIELDS.key(), + ProjectionConfigOptions.FUNCTIONS.key()); + if (!result.isSuccess()) { + throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( + "Processor: %s, At least one of [%s] should be specified.", + key, String.join(",", + ProjectionConfigOptions.OUTPUT_FIELDS.key(), + ProjectionConfigOptions.REMOVE_FIELDS.key(), + ProjectionConfigOptions.FUNCTIONS.key()))); + } + + ProjectionConfig projectionConfig = new JSONObject(value).toJavaObject(ProjectionConfig.class); + projectionConfig.setName(key); + + return projectionConfig; + } + + + protected AggregateConfig checkAggregateProcessorConfig(String key, Map<String, Object> value, Config aggregateProcessorsConfig) { + + + CheckResult result = CheckConfigUtil.checkAllExists(aggregateProcessorsConfig.getConfig(key), + AggregateConfigOptions.GROUP_BY_FIELDS.key(), + AggregateConfigOptions.WINDOW_TYPE.key(), + AggregateConfigOptions.FUNCTIONS.key(), + AggregateConfigOptions.WINDOW_SIZE.key()); + if (!result.isSuccess()) { + throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( + "Aggregate processor: %s, At least one of [%s] should be specified.", + key, String.join(",", + AggregateConfigOptions.OUTPUT_FIELDS.key(), + AggregateConfigOptions.REMOVE_FIELDS.key(), + AggregateConfigOptions.FUNCTIONS.key()))); + } + + AggregateConfig aggregateConfig = new JSONObject(value).toJavaObject(AggregateConfig.class); + aggregateConfig.setName(key); + return aggregateConfig; + } + + +} diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java index fa5f572..36fad61 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java @@ -1,16 +1,9 @@ package com.geedgenetworks.bootstrap.execution; -import com.alibaba.fastjson.JSONObject; import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; -import com.geedgenetworks.common.config.CheckConfigUtil; -import com.geedgenetworks.common.config.CheckResult; -import com.geedgenetworks.common.config.ProjectionConfigOptions; -import com.geedgenetworks.common.exception.CommonErrorCode; -import com.geedgenetworks.common.exception.ConfigValidationException; -import com.geedgenetworks.core.pojo.ProjectionConfig; -import com.geedgenetworks.core.processor.projection.ProjectionProcessor; +import com.geedgenetworks.core.pojo.ProcessorConfig; import com.google.common.collect.Maps; import com.typesafe.config.Config; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -22,7 +15,7 @@ import java.util.Map; /** * Initialize config and execute postprocessor */ -public class PostprocessingExecutor extends AbstractExecutor<String, ProjectionConfig> { +public class PostprocessingExecutor extends AbstractProcessorExecutor { private static final String PROCESSOR_TYPE = ProcessorType.POSTPROCESSING.getType(); public PostprocessingExecutor(List<URL> jarPaths, Config config) { @@ -30,68 +23,20 @@ public class PostprocessingExecutor extends AbstractExecutor<String, ProjectionC } @Override - protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) { - Map<String, ProjectionConfig> postprocessingConfigMap = Maps.newHashMap(); + protected Map<String, ProcessorConfig> initialize(List<URL> jarPaths, Config operatorConfig) { + Map<String, ProcessorConfig> postprocessingConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.POSTPROCESSING_PIPELINES)) { Config postprocessors = operatorConfig.getConfig(Constants.POSTPROCESSING_PIPELINES); postprocessors.root().unwrapped().forEach((key, value) -> { - CheckResult result = CheckConfigUtil.checkAllExists(postprocessors.getConfig(key), - ProjectionConfigOptions.TYPE.key()); - if (!result.isSuccess()) { - throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( - "Postprocessor: %s, Message: %s", - key, result.getMsg())); - } - result = CheckConfigUtil.checkAtLeastOneExists(postprocessors.getConfig(key), - ProjectionConfigOptions.OUTPUT_FIELDS.key(), - ProjectionConfigOptions.REMOVE_FIELDS.key(), - ProjectionConfigOptions.FUNCTIONS.key()); - if (!result.isSuccess()) { - throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( - "Postprocessor: %s, At least one of [%s] should be specified.", - key, String.join(",", - ProjectionConfigOptions.OUTPUT_FIELDS.key(), - ProjectionConfigOptions.REMOVE_FIELDS.key(), - ProjectionConfigOptions.FUNCTIONS.key()))); - } - - ProjectionConfig projectionConfig = new JSONObject((Map<String, Object>) value).toJavaObject(ProjectionConfig.class); - projectionConfig.setName(key); - postprocessingConfigMap.put(key, projectionConfig); + postprocessingConfigMap.put(key, checkProcessorConfig(key, (Map<String, Object>) value, postprocessors)); }); - } - return postprocessingConfigMap; } + @Override public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { - ProjectionConfig projectionConfig = operatorMap.get(node.getName()); - String className = projectionConfig.getType(); - ProjectionProcessor projectionProcessor; - if (projectionProcessorMap.containsKey(projectionConfig.getType())) { - projectionProcessor = projectionProcessorMap.get(projectionConfig.getType()); - } else { - Class cls; - try { - cls = Class.forName(className); - projectionProcessor = (ProjectionProcessor) cls.newInstance(); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { - throw new JobExecuteException("get postprocessing pipeline instance failed!", e); - } - } - if (node.getParallelism() > 0) { - projectionConfig.setParallelism(node.getParallelism()); - } - try { - dataStream = - projectionProcessor.projectionProcessorFunction( - dataStream, projectionConfig); - } catch (Exception e) { - throw new JobExecuteException("Create postprocessing pipeline instance failed!", e); - } - return dataStream; + return super.execute(dataStream, node); } - } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java index 31fcce8..b1e53e4 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java @@ -1,16 +1,9 @@ package com.geedgenetworks.bootstrap.execution; -import com.alibaba.fastjson.JSONObject; import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; -import com.geedgenetworks.common.config.CheckConfigUtil; -import com.geedgenetworks.common.config.CheckResult; -import com.geedgenetworks.common.config.ProjectionConfigOptions; -import com.geedgenetworks.common.exception.CommonErrorCode; -import com.geedgenetworks.common.exception.ConfigValidationException; -import com.geedgenetworks.core.pojo.ProjectionConfig; -import com.geedgenetworks.core.processor.projection.ProjectionProcessor; +import com.geedgenetworks.core.pojo.ProcessorConfig; import com.google.common.collect.Maps; import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; @@ -24,7 +17,7 @@ import java.util.Map; * Initialize config and execute preprocessor */ @Slf4j -public class PreprocessingExecutor extends AbstractExecutor<String, ProjectionConfig> { +public class PreprocessingExecutor extends AbstractProcessorExecutor { private static final String PROCESSOR_TYPE = ProcessorType.PREPROCESSING.getType(); public PreprocessingExecutor(List<URL> jarPaths, Config config) { @@ -32,67 +25,20 @@ public class PreprocessingExecutor extends AbstractExecutor<String, ProjectionCo } @Override - protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) { - Map<String, ProjectionConfig> preprocessingConfigMap = Maps.newHashMap(); + protected Map<String, ProcessorConfig> initialize(List<URL> jarPaths, Config operatorConfig) { + Map<String, ProcessorConfig> preprocessingConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.PREPROCESSING_PIPELINES)) { Config preprocessors = operatorConfig.getConfig(Constants.PREPROCESSING_PIPELINES); preprocessors.root().unwrapped().forEach((key, value) -> { - CheckResult result = CheckConfigUtil.checkAllExists(preprocessors.getConfig(key), - ProjectionConfigOptions.TYPE.key()); - if (!result.isSuccess()) { - throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( - "Preprocessor: %s, Message: %s", - key, result.getMsg())); - } - - result = CheckConfigUtil.checkAtLeastOneExists(preprocessors.getConfig(key), - ProjectionConfigOptions.OUTPUT_FIELDS.key(), - ProjectionConfigOptions.REMOVE_FIELDS.key(), - ProjectionConfigOptions.FUNCTIONS.key()); - if (!result.isSuccess()) { - throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( - "Preprocessor: %s, At least one of [%s] should be specified.", - key, String.join(",", - ProjectionConfigOptions.OUTPUT_FIELDS.key(), - ProjectionConfigOptions.REMOVE_FIELDS.key(), - ProjectionConfigOptions.FUNCTIONS.key()))); - } - - ProjectionConfig projectionConfig = new JSONObject((Map<String, Object>) value).toJavaObject(ProjectionConfig.class); - projectionConfig.setName(key); - preprocessingConfigMap.put(key, projectionConfig); + preprocessingConfigMap.put(key, checkProcessorConfig(key, (Map<String, Object>) value, preprocessors)); }); } - return preprocessingConfigMap; } @Override public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { - ProjectionConfig projectionConfig = operatorMap.get(node.getName()); - String className = projectionConfig.getType(); - ProjectionProcessor projectionProcessor; - if (projectionProcessorMap.containsKey(projectionConfig.getType())) { - projectionProcessor = projectionProcessorMap.get(projectionConfig.getType()); - } else { - Class cls; - try { - cls = Class.forName(className); - projectionProcessor = (ProjectionProcessor) cls.newInstance(); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { - throw new JobExecuteException("get preprocessing pipeline instance failed!", e); - } - } - if (node.getParallelism() > 0) { - projectionConfig.setParallelism(node.getParallelism()); - } - try { - dataStream = - projectionProcessor.projectionProcessorFunction( - dataStream, projectionConfig); - } catch (Exception e) { - throw new JobExecuteException("Create preprocessing pipeline instance failed!", e); - } - return dataStream; + return super.execute(dataStream, node); + } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java index 4a3b204..d69fe8c 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java @@ -1,16 +1,9 @@ package com.geedgenetworks.bootstrap.execution; -import com.alibaba.fastjson.JSONObject; import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; -import com.geedgenetworks.common.config.CheckConfigUtil; -import com.geedgenetworks.common.config.CheckResult; -import com.geedgenetworks.common.config.ProjectionConfigOptions; -import com.geedgenetworks.common.exception.CommonErrorCode; -import com.geedgenetworks.common.exception.ConfigValidationException; -import com.geedgenetworks.core.pojo.ProjectionConfig; -import com.geedgenetworks.core.processor.projection.ProjectionProcessor; +import com.geedgenetworks.core.pojo.ProcessorConfig; import com.google.common.collect.Maps; import com.typesafe.config.Config; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -22,7 +15,7 @@ import java.util.Map; /** * Initialize config and execute processor */ -public class ProcessingExecutor extends AbstractExecutor<String, ProjectionConfig> { +public class ProcessingExecutor extends AbstractProcessorExecutor { private static final String PROCESSOR_TYPE = ProcessorType.PROCESSING.getType(); public ProcessingExecutor(List<URL> jarPaths, Config config) { @@ -30,69 +23,20 @@ public class ProcessingExecutor extends AbstractExecutor<String, ProjectionConfi } @Override - protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) { - Map<String, ProjectionConfig> processingConfigMap = Maps.newHashMap(); + protected Map<String, ProcessorConfig> initialize(List<URL> jarPaths, Config operatorConfig) { + Map<String, ProcessorConfig> processingConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.PROCESSING_PIPELINES)) { Config processors = operatorConfig.getConfig(Constants.PROCESSING_PIPELINES); processors.root().unwrapped().forEach((key, value) -> { - CheckResult result = CheckConfigUtil.checkAllExists(processors.getConfig(key), - ProjectionConfigOptions.TYPE.key()); - if (!result.isSuccess()) { - throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( - "Processor: %s, Message: %s", - key, result.getMsg())); - } - result = CheckConfigUtil.checkAtLeastOneExists(processors.getConfig(key), - ProjectionConfigOptions.OUTPUT_FIELDS.key(), - ProjectionConfigOptions.REMOVE_FIELDS.key(), - ProjectionConfigOptions.FUNCTIONS.key()); - if (!result.isSuccess()) { - throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( - "Processor: %s, At least one of [%s] should be specified.", - key, String.join(",", - ProjectionConfigOptions.OUTPUT_FIELDS.key(), - ProjectionConfigOptions.REMOVE_FIELDS.key(), - ProjectionConfigOptions.FUNCTIONS.key()))); - } - - ProjectionConfig projectionConfig = new JSONObject((Map<String, Object>) value).toJavaObject(ProjectionConfig.class); - projectionConfig.setName(key); - processingConfigMap.put(key, projectionConfig); + processingConfigMap.put(key, checkProcessorConfig(key, (Map<String, Object>) value, processors)); }); - } - return processingConfigMap; } @Override public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { - ProjectionConfig projectionConfig = operatorMap.get(node.getName()); - String className = projectionConfig.getType(); - ProjectionProcessor projectionProcessor; - if (projectionProcessorMap.containsKey(projectionConfig.getType())) { - projectionProcessor = projectionProcessorMap.get(projectionConfig.getType()); - } else { - Class cls; - try { - cls = Class.forName(className); - projectionProcessor = (ProjectionProcessor) cls.newInstance(); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { - throw new JobExecuteException("get processing pipeline instance failed!", e); - } - } - if (node.getParallelism() > 0) { - projectionConfig.setParallelism(node.getParallelism()); - } - try { - dataStream = - projectionProcessor.projectionProcessorFunction( - dataStream, projectionConfig); - } catch (Exception e) { - throw new JobExecuteException("Create processing pipeline instance failed!", e); - } - return dataStream; + return super.execute(dataStream, node); } - } |
