diff options
| author | doufenghu <[email protected]> | 2024-01-20 19:09:53 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-01-20 19:09:53 +0800 |
| commit | fed6e6a7a74800ae1d8c8c77d17a57ab13b26b35 (patch) | |
| tree | f6149854f73901f5d1b9370cceba07b302bb50be /groot-bootstrap | |
| parent | 2a8eaff6e057a3f8118f80adb217fcc900a714ae (diff) | |
[Improve]
[Bootstrap]
- 按功能定义DataStream上的各个元素,包括Source/Sink connector,Filter Operator,Projection Processor
- 优化任务配置校验,构建拓扑时对必需及至少任选一个参数进行校验
[Common]
- 将Event,UDF,UDFContext 移至公共模块,便于引用继承
[Core]
- ProjectionProcessFunction 不配置函数也可运行,用于定义下游事件支持部分字段筛选场景
Diffstat (limited to 'groot-bootstrap')
12 files changed, 214 insertions, 153 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/ConfigCheckException.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/ConfigCheckException.java index 477e703..9d948a3 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/ConfigCheckException.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/ConfigCheckException.java @@ -1,5 +1,6 @@ package com.geedgenetworks.bootstrap.exception; +@Deprecated public class ConfigCheckException extends RuntimeException { public ConfigCheckException(String message) { super(message); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecuteProcessor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java index 3f425b1..077fee3 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecuteProcessor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java @@ -10,15 +10,15 @@ import java.util.List; import java.util.Map; import java.util.function.BiConsumer; -public abstract class AbstractExecuteProcessor<K, V> - implements ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> { +public abstract class AbstractExecutor<K, V> + implements Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> { protected JobRuntimeEnvironment jobRuntimeEnvironment; protected final Config operatorConfig; protected final Map<K,V> operatorMap; - protected AbstractExecuteProcessor(List<URL> jarPaths, Config operatorConfig) { + protected AbstractExecutor(List<URL> jarPaths, Config operatorConfig) { this.operatorConfig = operatorConfig; this.operatorMap = initialize(jarPaths, operatorConfig); } @@ -29,7 +29,7 @@ public abstract class AbstractExecuteProcessor<K, V> } - protected abstract Map<K, V> initialize(List<URL> jarPaths, Config operatorConfig); + protected abstract Map<K, V> initialize(List<URL> jarPaths, Config operatorConfig); protected static final BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER = (classLoader, url) -> { diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecuteProcessor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java index ba374ae..d57d6bf 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecuteProcessor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java @@ -2,7 +2,7 @@ package com.geedgenetworks.bootstrap.execution; import com.geedgenetworks.bootstrap.exception.JobExecuteException; -public interface ExecuteProcessor<T, ENV extends RuntimeEnvironment> { +public interface Executor<T, ENV extends RuntimeEnvironment> { T execute(T dataStream, Node edge) throws JobExecuteException; diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecuteProcessor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java index 785cb8a..0897186 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecuteProcessor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java @@ -2,12 +2,15 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson.JSONObject; import com.geedgenetworks.bootstrap.enums.ProcessorType; -import com.geedgenetworks.bootstrap.exception.ConfigCheckException; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.config.CheckConfigUtil; +import com.geedgenetworks.common.config.CheckResult; +import com.geedgenetworks.common.config.FilterConfigOptions; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.ConfigValidationException; import com.geedgenetworks.core.filter.AviatorFilter; import com.geedgenetworks.core.pojo.FilterConfig; -import com.geedgenetworks.utils.StringUtil; import com.google.common.collect.Maps; import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; @@ -17,22 +20,29 @@ import java.net.URL; import java.util.List; import java.util.Map; +/** + * Initialize config and execute filter operator + */ @Slf4j -public class FilterExecuteProcessor extends AbstractExecuteProcessor<String, FilterConfig> { - +public class FilterExecutor extends AbstractExecutor<String, FilterConfig> { private static final String PROCESSOR_TYPE = ProcessorType.FILTER.getType(); - - public FilterExecuteProcessor(List<URL> jarPaths, Config config) { + public FilterExecutor(List<URL> jarPaths, Config config) { super(jarPaths, config); } @Override protected Map<String, FilterConfig> initialize(List<URL> jarPaths, Config operatorConfig) { Map<String, FilterConfig> filterConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.FILTERS)) { - Map<String, Object> operatorConfigMap = operatorConfig.getConfig(Constants.FILTERS).root().unwrapped(); - operatorConfigMap.forEach((key,value) -> { + Config filters = operatorConfig.getConfig(Constants.FILTERS); + filters.root().unwrapped().forEach((key,value) -> { + CheckResult result = CheckConfigUtil.checkAllExists(filters.getConfig(key), + FilterConfigOptions.TYPE.key(), FilterConfigOptions.PROPERTIES.key()); + if (!result.isSuccess()) { + throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( + "Filter: %s, Message: %s", + key, result.getMsg())); + } FilterConfig filterConfig = new JSONObject((Map<String, Object>) value).toJavaObject(FilterConfig.class); - checkFilterConfig(filterConfig); filterConfig.setName(key); filterConfigMap.put(key, filterConfig); }); @@ -58,16 +68,9 @@ public class FilterExecuteProcessor extends AbstractExecuteProcessor<String, Fil aviatorFilter.filterFunction( dataStream, filterConfig); } catch (Exception e) { - throw new JobExecuteException("create filter operator error", e); + throw new JobExecuteException("Create filter instance failed!", e); } return dataStream; } - - private void checkFilterConfig(FilterConfig filterConfig) { - - if (!StringUtil.isNotBlank(filterConfig.getType())) { - throw new ConfigCheckException("校验未通过 filter " + filterConfig.getName() + " type 为空!"); - } - } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java index b90b9db..2eabefa 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java @@ -6,7 +6,7 @@ import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.main.GrootStreamRunner; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.GrootStreamConfig; -import com.geedgenetworks.core.pojo.Event; +import com.geedgenetworks.common.Event; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.typesafe.config.Config; @@ -27,12 +27,12 @@ import java.util.stream.Stream; public class JobExecution { private final JobRuntimeEnvironment jobRuntimeEnvironment; - private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> sourceExecuteProcessor; - private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> sinkExecuteProcessor; - private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> filterExecuteProcessor; - private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> preprocessingExecuteProcessor; - private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> processingExecuteProcessor; - private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> postprocessingExecuteProcessor; + private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sourceExecutor; + private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sinkExecutor; + private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> filterExecutor; + private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> preprocessingExecutor; + private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> processingExecutor; + private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> postprocessingExecutor; private final List<Node> nodes; private final List<URL> jarPaths; @@ -47,21 +47,21 @@ public class JobExecution { registerPlugin(config.getConfig(Constants.APPLICATION)); - this.sourceExecuteProcessor = new SourceExecuteProcessor(jarPaths, config); - this.sinkExecuteProcessor = new SinkExecuteProcessor(jarPaths, config); - this.filterExecuteProcessor = new FilterExecuteProcessor(jarPaths, config); - this.preprocessingExecuteProcessor = new PreprocessingExecuteProcessor(jarPaths, config); - this.processingExecuteProcessor = new ProcessingExecuteProcessor(jarPaths, config); - this.postprocessingExecuteProcessor = new PostprocessingExecuteProcessor(jarPaths, config); + this.sourceExecutor = new SourceExecutor(jarPaths, config); + this.sinkExecutor = new SinkExecutor(jarPaths, config); + this.filterExecutor = new FilterExecutor(jarPaths, config); + this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, config); + this.processingExecutor = new ProcessingExecutor(jarPaths, config); + this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, config); this.jobRuntimeEnvironment = JobRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths), grootStreamConfig); - this.sourceExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.sinkExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.filterExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.preprocessingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.processingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.postprocessingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.sourceExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.sinkExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.filterExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.nodes = buildJobNode(config); } @@ -86,7 +86,7 @@ public class JobExecution { } }).collect(Collectors.toList()); jarDependencies.forEach(url -> { - AbstractExecuteProcessor.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url); + AbstractExecutor.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url); }); jarPaths.addAll(jarDependencies); @@ -217,7 +217,7 @@ public class JobExecution { SingleOutputStreamOperator<Event> singleOutputStreamOperator = null; for(Node sourceNode : sourceNodes) { - singleOutputStreamOperator = sourceExecuteProcessor.execute(singleOutputStreamOperator, sourceNode); + singleOutputStreamOperator = sourceExecutor.execute(singleOutputStreamOperator, sourceNode); for (String nodeName : sourceNode.getDownstream()) { buildJobGraph(singleOutputStreamOperator, nodeName); } @@ -243,15 +243,15 @@ public class JobExecution { throw new JobExecuteException("can't find downstream node " + downstreamNodeName); }); if (node.getType().name().equals(ProcessorType.FILTER.name())) { - singleOutputStreamOperator = filterExecuteProcessor.execute(singleOutputStreamOperator, node); + singleOutputStreamOperator = filterExecutor.execute(singleOutputStreamOperator, node); } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) { - singleOutputStreamOperator = preprocessingExecuteProcessor.execute(singleOutputStreamOperator, node); + singleOutputStreamOperator = preprocessingExecutor.execute(singleOutputStreamOperator, node); } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) { - singleOutputStreamOperator = processingExecuteProcessor.execute(singleOutputStreamOperator, node); + singleOutputStreamOperator = processingExecutor.execute(singleOutputStreamOperator, node); } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) { - singleOutputStreamOperator = postprocessingExecuteProcessor.execute(singleOutputStreamOperator, node); + singleOutputStreamOperator = postprocessingExecutor.execute(singleOutputStreamOperator, node); } else if (node.getType().name().equals(ProcessorType.SINK.name())) { - singleOutputStreamOperator = sinkExecuteProcessor.execute(singleOutputStreamOperator, node); + singleOutputStreamOperator = sinkExecutor.execute(singleOutputStreamOperator, node); } else { throw new JobExecuteException("unsupported process type " + node.getType().name()); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecuteProcessor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java index 3aca69c..4e45b78 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecuteProcessor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java @@ -5,6 +5,11 @@ import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.ConfigCheckException; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.config.CheckConfigUtil; +import com.geedgenetworks.common.config.CheckResult; +import com.geedgenetworks.common.config.ProjectionConfigOptions; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.ConfigValidationException; import com.geedgenetworks.core.pojo.ProjectionConfig; import com.geedgenetworks.core.processor.projection.ProjectionProcessor; import com.geedgenetworks.utils.StringUtil; @@ -16,23 +21,46 @@ import java.net.URL; import java.util.List; import java.util.Map; -public class PostprocessingExecuteProcessor extends AbstractExecuteProcessor<String, ProjectionConfig> { +/** + * Initialize config and execute postprocessor + */ +public class PostprocessingExecutor extends AbstractExecutor<String, ProjectionConfig> { private static final String PROCESSOR_TYPE = ProcessorType.POSTPROCESSING.getType(); - public PostprocessingExecuteProcessor(List<URL> jarPaths, Config config) { + public PostprocessingExecutor(List<URL> jarPaths, Config config) { super(jarPaths, config); } @Override protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) { Map<String, ProjectionConfig> postprocessingConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.POSTPROCESSING_PIPELINES)) { - Map<String, Object> operatorConfigMap = operatorConfig.getConfig(Constants.POSTPROCESSING_PIPELINES).root().unwrapped(); - operatorConfigMap.forEach((key,value) -> { + Config postprocessors = operatorConfig.getConfig(Constants.POSTPROCESSING_PIPELINES); + postprocessors.root().unwrapped().forEach((key,value) -> { + CheckResult result = CheckConfigUtil.checkAllExists(postprocessors.getConfig(key), + ProjectionConfigOptions.TYPE.key()); + if (!result.isSuccess()) { + throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( + "Postprocessor: %s, Message: %s", + key, result.getMsg())); + } + result = CheckConfigUtil.checkAtLeastOneExists(postprocessors.getConfig(key), + ProjectionConfigOptions.OUTPUT_FIELDS.key(), + ProjectionConfigOptions.REMOVE_FIELDS.key(), + ProjectionConfigOptions.FUNCTIONS.key()); + if (!result.isSuccess()) { + throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( + "Postprocessor: %s, At least one of [%s] should be specified.", + key, String.join(",", + ProjectionConfigOptions.OUTPUT_FIELDS.key(), + ProjectionConfigOptions.REMOVE_FIELDS.key(), + ProjectionConfigOptions.FUNCTIONS.key()))); + } + ProjectionConfig projectionConfig = new JSONObject((Map<String, Object>) value).toJavaObject(ProjectionConfig.class); - checkProjectionConfig(projectionConfig); projectionConfig.setName(key); postprocessingConfigMap.put(key, projectionConfig); }); + } return postprocessingConfigMap; @@ -54,14 +82,9 @@ public class PostprocessingExecuteProcessor extends AbstractExecuteProcessor<Str projectionProcessor.projectionProcessorFunction( dataStream, projectionConfig); } catch (Exception e) { - throw new JobExecuteException("create postprocessing operator error", e); + throw new JobExecuteException("create postprocessing pipeline operator error", e); } return dataStream; } - private static void checkProjectionConfig(ProjectionConfig projectionConfig) { - if (!StringUtil.isNotBlank(projectionConfig.getType())) { - throw new ConfigCheckException("校验未通过 Processing Pipelines " + projectionConfig.getName() + " type 为空!"); - } - } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecuteProcessor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java index 637e33b..84e8718 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecuteProcessor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java @@ -2,12 +2,15 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson.JSONObject; import com.geedgenetworks.bootstrap.enums.ProcessorType; -import com.geedgenetworks.bootstrap.exception.ConfigCheckException; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.config.CheckConfigUtil; +import com.geedgenetworks.common.config.CheckResult; +import com.geedgenetworks.common.config.ProjectionConfigOptions; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.ConfigValidationException; import com.geedgenetworks.core.pojo.ProjectionConfig; import com.geedgenetworks.core.processor.projection.ProjectionProcessor; -import com.geedgenetworks.utils.StringUtil; import com.google.common.collect.Maps; import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; @@ -17,21 +20,43 @@ import java.net.URL; import java.util.List; import java.util.Map; +/** + * Initialize config and execute preprocessor + */ @Slf4j -public class PreprocessingExecuteProcessor extends AbstractExecuteProcessor<String, ProjectionConfig> { +public class PreprocessingExecutor extends AbstractExecutor<String, ProjectionConfig> { private static final String PROCESSOR_TYPE = ProcessorType.PREPROCESSING.getType(); - - public PreprocessingExecuteProcessor(List<URL> jarPaths, Config config) { + public PreprocessingExecutor(List<URL> jarPaths, Config config) { super(jarPaths, config); } @Override protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) { Map<String, ProjectionConfig> preprocessingConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.PREPROCESSING_PIPELINES)) { - Map<String, Object> operatorConfigMap = operatorConfig.getConfig(Constants.PREPROCESSING_PIPELINES).root().unwrapped(); - operatorConfigMap.forEach((key,value) -> { + Config preprocessors = operatorConfig.getConfig(Constants.PREPROCESSING_PIPELINES); + preprocessors.root().unwrapped().forEach((key,value) -> { + CheckResult result = CheckConfigUtil.checkAllExists(preprocessors.getConfig(key), + ProjectionConfigOptions.TYPE.key()); + if (!result.isSuccess()) { + throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( + "Preprocessor: %s, Message: %s", + key, result.getMsg())); + } + + result = CheckConfigUtil.checkAtLeastOneExists(preprocessors.getConfig(key), + ProjectionConfigOptions.OUTPUT_FIELDS.key(), + ProjectionConfigOptions.REMOVE_FIELDS.key(), + ProjectionConfigOptions.FUNCTIONS.key()); + if (!result.isSuccess()) { + throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( + "Preprocessor: %s, At least one of [%s] should be specified.", + key, String.join(",", + ProjectionConfigOptions.OUTPUT_FIELDS.key(), + ProjectionConfigOptions.REMOVE_FIELDS.key(), + ProjectionConfigOptions.FUNCTIONS.key()))); + } + ProjectionConfig projectionConfig = new JSONObject((Map<String, Object>) value).toJavaObject(ProjectionConfig.class); - checkProjectionConfig(projectionConfig); projectionConfig.setName(key); preprocessingConfigMap.put(key, projectionConfig); }); @@ -56,14 +81,8 @@ public class PreprocessingExecuteProcessor extends AbstractExecuteProcessor<Stri projectionProcessor.projectionProcessorFunction( dataStream, projectionConfig); } catch (Exception e) { - throw new JobExecuteException("create preprocessing operator error", e); + throw new JobExecuteException("Create preprocessor pipeline instance failed!", e); } return dataStream; } - private static void checkProjectionConfig(ProjectionConfig projectionConfig) { - - if (!StringUtil.isNotBlank(projectionConfig.getType())) { - throw new ConfigCheckException("校验未通过 projection " + projectionConfig.getName() + " type 为空!"); - } - } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecuteProcessor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java index e57448b..a8798e8 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecuteProcessor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java @@ -2,12 +2,15 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson.JSONObject; import com.geedgenetworks.bootstrap.enums.ProcessorType; -import com.geedgenetworks.bootstrap.exception.ConfigCheckException; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.config.CheckConfigUtil; +import com.geedgenetworks.common.config.CheckResult; +import com.geedgenetworks.common.config.ProjectionConfigOptions; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.ConfigValidationException; import com.geedgenetworks.core.pojo.ProjectionConfig; import com.geedgenetworks.core.processor.projection.ProjectionProcessor; -import com.geedgenetworks.utils.StringUtil; import com.google.common.collect.Maps; import com.typesafe.config.Config; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -15,24 +18,46 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; import java.util.List; import java.util.Map; - -public class ProcessingExecuteProcessor extends AbstractExecuteProcessor<String, ProjectionConfig> { +/** + * Initialize config and execute processor + */ +public class ProcessingExecutor extends AbstractExecutor<String, ProjectionConfig> { private static final String PROCESSOR_TYPE = ProcessorType.PROCESSING.getType(); - public ProcessingExecuteProcessor(List<URL> jarPaths, Config config) { + public ProcessingExecutor(List<URL> jarPaths, Config config) { super(jarPaths, config); } @Override protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) { Map<String, ProjectionConfig> processingConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.PROCESSING_PIPELINES)) { - Map<String, Object> operatorConfigMap = operatorConfig.getConfig(Constants.PROCESSING_PIPELINES).root().unwrapped(); - operatorConfigMap.forEach((key,value) -> { + Config processors = operatorConfig.getConfig(Constants.PROCESSING_PIPELINES); + processors.root().unwrapped().forEach((key,value) -> { + CheckResult result = CheckConfigUtil.checkAllExists(processors.getConfig(key), + ProjectionConfigOptions.TYPE.key()); + if (!result.isSuccess()) { + throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( + "Processor: %s, Message: %s", + key, result.getMsg())); + } + result = CheckConfigUtil.checkAtLeastOneExists(processors.getConfig(key), + ProjectionConfigOptions.OUTPUT_FIELDS.key(), + ProjectionConfigOptions.REMOVE_FIELDS.key(), + ProjectionConfigOptions.FUNCTIONS.key()); + if (!result.isSuccess()) { + throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( + "Processor: %s, At least one of [%s] should be specified.", + key, String.join(",", + ProjectionConfigOptions.OUTPUT_FIELDS.key(), + ProjectionConfigOptions.REMOVE_FIELDS.key(), + ProjectionConfigOptions.FUNCTIONS.key()))); + } + ProjectionConfig projectionConfig = new JSONObject((Map<String, Object>) value).toJavaObject(ProjectionConfig.class); - checkProjectionConfig(projectionConfig); projectionConfig.setName(key); processingConfigMap.put(key, projectionConfig); }); + } return processingConfigMap; @@ -55,14 +80,9 @@ public class ProcessingExecuteProcessor extends AbstractExecuteProcessor<String, projectionProcessor.projectionProcessorFunction( dataStream, projectionConfig); } catch (Exception e) { - throw new JobExecuteException("create processing operator error", e); + throw new JobExecuteException("Create processing pipeline instance failed!", e); } return dataStream; } - private static void checkProjectionConfig(ProjectionConfig projectionConfig) { - if (!StringUtil.isNotBlank(projectionConfig.getType())) { - throw new ConfigCheckException("校验未通过 Processing Pipelines " + projectionConfig.getName() + " type 为空!"); - } - } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecuteProcessor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java index 800c59d..7aee40f 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecuteProcessor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java @@ -5,6 +5,12 @@ import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.ConfigCheckException; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.config.CheckConfigUtil; +import com.geedgenetworks.common.config.CheckResult; +import com.geedgenetworks.common.config.SinkConfigOptions; +import com.geedgenetworks.common.config.SourceConfigOptions; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.ConfigValidationException; import com.geedgenetworks.core.connector.sink.SinkProvider; import com.geedgenetworks.core.factories.FactoryUtil; import com.geedgenetworks.core.factories.SinkTableFactory; @@ -22,24 +28,36 @@ import java.net.URL; import java.util.List; import java.util.Map; +/** + * Initialize config and execute sink connector + */ @Slf4j -public class SinkExecuteProcessor extends AbstractExecuteProcessor<String, SinkConfig>{ +public class SinkExecutor extends AbstractExecutor<String, SinkConfig> { private static final String PROCESSOR_TYPE = ProcessorType.SINK.getType(); - public SinkExecuteProcessor(List<URL> jarPaths, Config config) { + public SinkExecutor(List<URL> jarPaths, Config config) { super(jarPaths, config); } @Override protected Map<String, SinkConfig> initialize(List<URL> jarPaths, Config operatorConfig) { Map<String, SinkConfig> sinkConfigMap = Maps.newHashMap(); + if (operatorConfig.hasPath(Constants.SINKS)) { - Map<String, Object> operatorConfigMap = operatorConfig.getConfig(Constants.SINKS).root().unwrapped(); - operatorConfigMap.forEach((key,value) -> { + Config sinks = operatorConfig.getConfig(Constants.SINKS); + sinks.root().unwrapped().forEach((key,value) -> { + CheckResult result = CheckConfigUtil.checkAllExists(sinks.getConfig(key), + SinkConfigOptions.TYPE.key(), SinkConfigOptions.PROPERTIES.key()); + if (!result.isSuccess()) { + throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( + "Sink: %s, Message: %s", + key, result.getMsg())); + } + SinkConfig sinkConfig = new JSONObject((Map<String, Object>) value).toJavaObject(SinkConfig.class); - checkSinkConfig(sinkConfig); sinkConfig.setName(key); sinkConfigMap.put(key, sinkConfig); }); + } return sinkConfigMap; @@ -60,17 +78,10 @@ public class SinkExecuteProcessor extends AbstractExecuteProcessor<String, SinkC } dataStreamSink.name(sinkConfig.getName()); } catch (Exception e) { - throw new JobExecuteException("create sink operator error", e); + throw new JobExecuteException("Create sink instance failed!", e); } return null; } - private void checkSinkConfig(SinkConfig sinkConfig) { - - if (!StringUtil.isNotBlank(sinkConfig.getType())) { - throw new ConfigCheckException("校验未通过 sink " + sinkConfig.getName() + " type 为空!"); - } - } - } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecuteProcessor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java index a4a596e..eee78a2 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecuteProcessor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java @@ -3,48 +3,55 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.geedgenetworks.bootstrap.enums.ProcessorType; -import com.geedgenetworks.bootstrap.exception.ConfigCheckException; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.config.CheckConfigUtil; +import com.geedgenetworks.common.config.CheckResult; +import com.geedgenetworks.common.config.SourceConfigOptions; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.ConfigValidationException; import com.geedgenetworks.core.connector.source.SourceProvider; import com.geedgenetworks.core.factories.FactoryUtil; import com.geedgenetworks.core.factories.SourceTableFactory; import com.geedgenetworks.core.factories.TableFactory; -import com.geedgenetworks.core.pojo.Event; import com.geedgenetworks.core.pojo.SourceConfig; import com.geedgenetworks.core.types.StructType; import com.geedgenetworks.core.types.Types; -import com.geedgenetworks.utils.StringUtil; import com.google.common.collect.Maps; -import com.typesafe.config.Config; +import com.typesafe.config.*; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; - import java.net.URL; -import java.time.Duration; import java.util.List; import java.util.Map; +/** + * Initialize config and execute source connector + */ @Slf4j -public class SourceExecuteProcessor extends AbstractExecuteProcessor<String, SourceConfig> { +public class SourceExecutor extends AbstractExecutor<String, SourceConfig> { private static final String PROCESSOR_TYPE = ProcessorType.SOURCE.getType(); - public SourceExecuteProcessor(List<URL> jarPaths, Config config) { + public SourceExecutor(List<URL> jarPaths, Config config) { super(jarPaths, config); } @Override protected Map<String, SourceConfig> initialize(List<URL> jarPaths, Config operatorConfig) { Map<String, SourceConfig> sourceConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.SOURCES)) { - Map<String, Object> operatorConfigMap = operatorConfig.getConfig(Constants.SOURCES).root().unwrapped(); - operatorConfigMap.forEach((key,value) -> { + Config sources = operatorConfig.getConfig(Constants.SOURCES); + sources.root().unwrapped().forEach((key,value) -> { + CheckResult result = CheckConfigUtil.checkAllExists(sources.getConfig(key), + SourceConfigOptions.TYPE.key(), SourceConfigOptions.PROPERTIES.key()); + if (!result.isSuccess()) { + throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( + "Source: %s, Message: %s", + key, result.getMsg())); + } + SourceConfig sourceConfig = new JSONObject((Map<String, Object>) value).toJavaObject(SourceConfig.class); - checkSourceConfig(sourceConfig); sourceConfig.setName(key); if(CollectionUtils.isNotEmpty(sourceConfig.getFields())){ StructType schema = Types.parseSchemaFromJson(JSON.toJSONString(sourceConfig.getFields())); @@ -52,6 +59,7 @@ public class SourceExecuteProcessor extends AbstractExecuteProcessor<String, Sou } sourceConfigMap.put(key, sourceConfig); }); + } return sourceConfigMap; @@ -79,13 +87,7 @@ public class SourceExecuteProcessor extends AbstractExecuteProcessor<String, Sou } return sourceSingleOutputStreamOperator.name(sourceConfig.getName()); } catch (Exception e) { - throw new JobExecuteException("create source operator error", e); - } - } - - private void checkSourceConfig(SourceConfig sourceConfig) { - if (!StringUtil.isNotBlank(sourceConfig.getType())) { - throw new ConfigCheckException("校验未通过 source " + sourceConfig.getName() + " type 为空!"); + throw new JobExecuteException("Create source instance failed!", e); } } } diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java index db7cf43..8b299df 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java @@ -8,7 +8,7 @@ import com.geedgenetworks.bootstrap.main.GrootStreamRunner; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.GrootStreamConfig; import com.geedgenetworks.common.utils.ReflectionUtils; -import com.geedgenetworks.core.pojo.Event; +import com.geedgenetworks.common.Event; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.typesafe.config.Config; @@ -33,12 +33,12 @@ import java.util.stream.Stream; public class JobExecutionTest { protected final JobRuntimeEnvironment jobRuntimeEnvironment; - private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> sourceExecuteProcessor; - private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> filterExecuteProcessor; - private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> preprocessingExecuteProcessor; - private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> processingExecuteProcessor; - private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> postprocessingExecuteProcessor; - private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> sinkExecuteProcessor; + private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sourceExecuteProcessor; + private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> filterExecuteProcessor; + private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> preprocessingExecuteProcessor; + private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> processingExecuteProcessor; + private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> postprocessingExecuteProcessor; + private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sinkExecuteProcessor; private final List<Node> nodes; @@ -66,12 +66,12 @@ public class JobExecutionTest { } registerPlugin(config.getConfig(Constants.APPLICATION)); - this.sourceExecuteProcessor = new SourceExecuteProcessor(jarPaths, config); - this.sinkExecuteProcessor = new SinkExecuteProcessor(jarPaths, config); - this.filterExecuteProcessor = new FilterExecuteProcessor(jarPaths, config); - this.preprocessingExecuteProcessor = new PreprocessingExecuteProcessor(jarPaths, config); - this.processingExecuteProcessor = new ProcessingExecuteProcessor(jarPaths, config); - this.postprocessingExecuteProcessor = new PostprocessingExecuteProcessor(jarPaths, config); + this.sourceExecuteProcessor = new SourceExecutor(jarPaths, config); + this.sinkExecuteProcessor = new SinkExecutor(jarPaths, config); + this.filterExecuteProcessor = new FilterExecutor(jarPaths, config); + this.preprocessingExecuteProcessor = new PreprocessingExecutor(jarPaths, config); + this.processingExecuteProcessor = new ProcessingExecutor(jarPaths, config); + this.postprocessingExecuteProcessor = new PostprocessingExecutor(jarPaths, config); this.jobRuntimeEnvironment = JobRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths), grootStreamConfig); diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java index 0d25ed6..46980de 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java @@ -1,47 +1,29 @@ package com.geedgenetworks.bootstrap.main.simple; import cn.hutool.setting.yaml.YamlUtil; -import com.geedgenetworks.bootstrap.command.Command; -import com.geedgenetworks.bootstrap.command.CommandArgs; import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs; import com.geedgenetworks.bootstrap.enums.EngineType; -import com.geedgenetworks.bootstrap.exception.ConfigCheckException; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName; -import com.geedgenetworks.bootstrap.execution.JobExecution; import com.geedgenetworks.bootstrap.utils.CommandLineUtils; import com.geedgenetworks.bootstrap.utils.ConfigFileUtils; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.ConfigProvider; import com.geedgenetworks.common.config.GrootStreamConfig; -import com.geedgenetworks.common.utils.FileUtils; import com.geedgenetworks.core.connector.collect.CollectSink; -import com.geedgenetworks.core.pojo.Event; import com.typesafe.config.Config; import com.typesafe.config.ConfigObject; import com.typesafe.config.ConfigUtil; import com.typesafe.config.ConfigValueFactory; -import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; -import java.io.File; -import java.net.URL; import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.Map; -import static cn.hutool.core.util.ClassLoaderUtil.getClassLoader; -import static com.geedgenetworks.bootstrap.utils.ConfigFileUtils.checkConfigExist; import static org.junit.Assert.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; |
