summaryrefslogtreecommitdiff
path: root/groot-bootstrap
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-01-20 19:09:53 +0800
committerdoufenghu <[email protected]>2024-01-20 19:09:53 +0800
commitfed6e6a7a74800ae1d8c8c77d17a57ab13b26b35 (patch)
treef6149854f73901f5d1b9370cceba07b302bb50be /groot-bootstrap
parent2a8eaff6e057a3f8118f80adb217fcc900a714ae (diff)
[Improve]
[Bootstrap] - 按功能定义DataStream上的各个元素,包括Source/Sink connector,Filter Operator,Projection Processor - 优化任务配置校验,构建拓扑时对必需及至少任选一个参数进行校验 [Common] - 将Event,UDF,UDFContext 移至公共模块,便于引用继承 [Core] - ProjectionProcessFunction 不配置函数也可运行,用于定义下游事件支持部分字段筛选场景
Diffstat (limited to 'groot-bootstrap')
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/ConfigCheckException.java1
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java (renamed from groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecuteProcessor.java)8
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java (renamed from groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecuteProcessor.java)2
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java (renamed from groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecuteProcessor.java)37
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java52
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java (renamed from groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecuteProcessor.java)45
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java (renamed from groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecuteProcessor.java)49
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java (renamed from groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecuteProcessor.java)48
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java (renamed from groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecuteProcessor.java)37
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java (renamed from groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecuteProcessor.java)44
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java26
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java18
12 files changed, 214 insertions, 153 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/ConfigCheckException.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/ConfigCheckException.java
index 477e703..9d948a3 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/ConfigCheckException.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/ConfigCheckException.java
@@ -1,5 +1,6 @@
package com.geedgenetworks.bootstrap.exception;
+@Deprecated
public class ConfigCheckException extends RuntimeException {
public ConfigCheckException(String message) {
super(message);
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecuteProcessor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
index 3f425b1..077fee3 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecuteProcessor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
@@ -10,15 +10,15 @@ import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
-public abstract class AbstractExecuteProcessor<K, V>
- implements ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> {
+public abstract class AbstractExecutor<K, V>
+ implements Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> {
protected JobRuntimeEnvironment jobRuntimeEnvironment;
protected final Config operatorConfig;
protected final Map<K,V> operatorMap;
- protected AbstractExecuteProcessor(List<URL> jarPaths, Config operatorConfig) {
+ protected AbstractExecutor(List<URL> jarPaths, Config operatorConfig) {
this.operatorConfig = operatorConfig;
this.operatorMap = initialize(jarPaths, operatorConfig);
}
@@ -29,7 +29,7 @@ public abstract class AbstractExecuteProcessor<K, V>
}
- protected abstract Map<K, V> initialize(List<URL> jarPaths, Config operatorConfig);
+ protected abstract Map<K, V> initialize(List<URL> jarPaths, Config operatorConfig);
protected static final BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER =
(classLoader, url) -> {
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecuteProcessor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java
index ba374ae..d57d6bf 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecuteProcessor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java
@@ -2,7 +2,7 @@ package com.geedgenetworks.bootstrap.execution;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
-public interface ExecuteProcessor<T, ENV extends RuntimeEnvironment> {
+public interface Executor<T, ENV extends RuntimeEnvironment> {
T execute(T dataStream, Node edge) throws JobExecuteException;
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecuteProcessor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
index 785cb8a..0897186 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecuteProcessor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
@@ -2,12 +2,15 @@ package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson.JSONObject;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
-import com.geedgenetworks.bootstrap.exception.ConfigCheckException;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.config.CheckConfigUtil;
+import com.geedgenetworks.common.config.CheckResult;
+import com.geedgenetworks.common.config.FilterConfigOptions;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.ConfigValidationException;
import com.geedgenetworks.core.filter.AviatorFilter;
import com.geedgenetworks.core.pojo.FilterConfig;
-import com.geedgenetworks.utils.StringUtil;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
@@ -17,22 +20,29 @@ import java.net.URL;
import java.util.List;
import java.util.Map;
+/**
+ * Initialize config and execute filter operator
+ */
@Slf4j
-public class FilterExecuteProcessor extends AbstractExecuteProcessor<String, FilterConfig> {
-
+public class FilterExecutor extends AbstractExecutor<String, FilterConfig> {
private static final String PROCESSOR_TYPE = ProcessorType.FILTER.getType();
-
- public FilterExecuteProcessor(List<URL> jarPaths, Config config) {
+ public FilterExecutor(List<URL> jarPaths, Config config) {
super(jarPaths, config);
}
@Override
protected Map<String, FilterConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
Map<String, FilterConfig> filterConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.FILTERS)) {
- Map<String, Object> operatorConfigMap = operatorConfig.getConfig(Constants.FILTERS).root().unwrapped();
- operatorConfigMap.forEach((key,value) -> {
+ Config filters = operatorConfig.getConfig(Constants.FILTERS);
+ filters.root().unwrapped().forEach((key,value) -> {
+ CheckResult result = CheckConfigUtil.checkAllExists(filters.getConfig(key),
+ FilterConfigOptions.TYPE.key(), FilterConfigOptions.PROPERTIES.key());
+ if (!result.isSuccess()) {
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
+ "Filter: %s, Message: %s",
+ key, result.getMsg()));
+ }
FilterConfig filterConfig = new JSONObject((Map<String, Object>) value).toJavaObject(FilterConfig.class);
- checkFilterConfig(filterConfig);
filterConfig.setName(key);
filterConfigMap.put(key, filterConfig);
});
@@ -58,16 +68,9 @@ public class FilterExecuteProcessor extends AbstractExecuteProcessor<String, Fil
aviatorFilter.filterFunction(
dataStream, filterConfig);
} catch (Exception e) {
- throw new JobExecuteException("create filter operator error", e);
+ throw new JobExecuteException("Create filter instance failed!", e);
}
return dataStream;
}
-
- private void checkFilterConfig(FilterConfig filterConfig) {
-
- if (!StringUtil.isNotBlank(filterConfig.getType())) {
- throw new ConfigCheckException("校验未通过 filter " + filterConfig.getName() + " type 为空!");
- }
- }
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
index b90b9db..2eabefa 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
@@ -6,7 +6,7 @@ import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.main.GrootStreamRunner;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.config.GrootStreamConfig;
-import com.geedgenetworks.core.pojo.Event;
+import com.geedgenetworks.common.Event;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
@@ -27,12 +27,12 @@ import java.util.stream.Stream;
public class JobExecution {
private final JobRuntimeEnvironment jobRuntimeEnvironment;
- private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> sourceExecuteProcessor;
- private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> sinkExecuteProcessor;
- private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> filterExecuteProcessor;
- private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> preprocessingExecuteProcessor;
- private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> processingExecuteProcessor;
- private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> postprocessingExecuteProcessor;
+ private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sourceExecutor;
+ private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sinkExecutor;
+ private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> filterExecutor;
+ private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> preprocessingExecutor;
+ private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> processingExecutor;
+ private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> postprocessingExecutor;
private final List<Node> nodes;
private final List<URL> jarPaths;
@@ -47,21 +47,21 @@ public class JobExecution {
registerPlugin(config.getConfig(Constants.APPLICATION));
- this.sourceExecuteProcessor = new SourceExecuteProcessor(jarPaths, config);
- this.sinkExecuteProcessor = new SinkExecuteProcessor(jarPaths, config);
- this.filterExecuteProcessor = new FilterExecuteProcessor(jarPaths, config);
- this.preprocessingExecuteProcessor = new PreprocessingExecuteProcessor(jarPaths, config);
- this.processingExecuteProcessor = new ProcessingExecuteProcessor(jarPaths, config);
- this.postprocessingExecuteProcessor = new PostprocessingExecuteProcessor(jarPaths, config);
+ this.sourceExecutor = new SourceExecutor(jarPaths, config);
+ this.sinkExecutor = new SinkExecutor(jarPaths, config);
+ this.filterExecutor = new FilterExecutor(jarPaths, config);
+ this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, config);
+ this.processingExecutor = new ProcessingExecutor(jarPaths, config);
+ this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, config);
this.jobRuntimeEnvironment =
JobRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths), grootStreamConfig);
- this.sourceExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.sinkExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.filterExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.preprocessingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.processingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.postprocessingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.sourceExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.sinkExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.filterExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.nodes = buildJobNode(config);
}
@@ -86,7 +86,7 @@ public class JobExecution {
}
}).collect(Collectors.toList());
jarDependencies.forEach(url -> {
- AbstractExecuteProcessor.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url);
+ AbstractExecutor.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url);
});
jarPaths.addAll(jarDependencies);
@@ -217,7 +217,7 @@ public class JobExecution {
SingleOutputStreamOperator<Event> singleOutputStreamOperator = null;
for(Node sourceNode : sourceNodes) {
- singleOutputStreamOperator = sourceExecuteProcessor.execute(singleOutputStreamOperator, sourceNode);
+ singleOutputStreamOperator = sourceExecutor.execute(singleOutputStreamOperator, sourceNode);
for (String nodeName : sourceNode.getDownstream()) {
buildJobGraph(singleOutputStreamOperator, nodeName);
}
@@ -243,15 +243,15 @@ public class JobExecution {
throw new JobExecuteException("can't find downstream node " + downstreamNodeName);
});
if (node.getType().name().equals(ProcessorType.FILTER.name())) {
- singleOutputStreamOperator = filterExecuteProcessor.execute(singleOutputStreamOperator, node);
+ singleOutputStreamOperator = filterExecutor.execute(singleOutputStreamOperator, node);
} else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) {
- singleOutputStreamOperator = preprocessingExecuteProcessor.execute(singleOutputStreamOperator, node);
+ singleOutputStreamOperator = preprocessingExecutor.execute(singleOutputStreamOperator, node);
} else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) {
- singleOutputStreamOperator = processingExecuteProcessor.execute(singleOutputStreamOperator, node);
+ singleOutputStreamOperator = processingExecutor.execute(singleOutputStreamOperator, node);
} else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) {
- singleOutputStreamOperator = postprocessingExecuteProcessor.execute(singleOutputStreamOperator, node);
+ singleOutputStreamOperator = postprocessingExecutor.execute(singleOutputStreamOperator, node);
} else if (node.getType().name().equals(ProcessorType.SINK.name())) {
- singleOutputStreamOperator = sinkExecuteProcessor.execute(singleOutputStreamOperator, node);
+ singleOutputStreamOperator = sinkExecutor.execute(singleOutputStreamOperator, node);
} else {
throw new JobExecuteException("unsupported process type " + node.getType().name());
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecuteProcessor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
index 3aca69c..4e45b78 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecuteProcessor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
@@ -5,6 +5,11 @@ import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.ConfigCheckException;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.config.CheckConfigUtil;
+import com.geedgenetworks.common.config.CheckResult;
+import com.geedgenetworks.common.config.ProjectionConfigOptions;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.ConfigValidationException;
import com.geedgenetworks.core.pojo.ProjectionConfig;
import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
import com.geedgenetworks.utils.StringUtil;
@@ -16,23 +21,46 @@ import java.net.URL;
import java.util.List;
import java.util.Map;
-public class PostprocessingExecuteProcessor extends AbstractExecuteProcessor<String, ProjectionConfig> {
+/**
+ * Initialize config and execute postprocessor
+ */
+public class PostprocessingExecutor extends AbstractExecutor<String, ProjectionConfig> {
private static final String PROCESSOR_TYPE = ProcessorType.POSTPROCESSING.getType();
- public PostprocessingExecuteProcessor(List<URL> jarPaths, Config config) {
+ public PostprocessingExecutor(List<URL> jarPaths, Config config) {
super(jarPaths, config);
}
@Override
protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
Map<String, ProjectionConfig> postprocessingConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.POSTPROCESSING_PIPELINES)) {
- Map<String, Object> operatorConfigMap = operatorConfig.getConfig(Constants.POSTPROCESSING_PIPELINES).root().unwrapped();
- operatorConfigMap.forEach((key,value) -> {
+ Config postprocessors = operatorConfig.getConfig(Constants.POSTPROCESSING_PIPELINES);
+ postprocessors.root().unwrapped().forEach((key,value) -> {
+ CheckResult result = CheckConfigUtil.checkAllExists(postprocessors.getConfig(key),
+ ProjectionConfigOptions.TYPE.key());
+ if (!result.isSuccess()) {
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
+ "Postprocessor: %s, Message: %s",
+ key, result.getMsg()));
+ }
+ result = CheckConfigUtil.checkAtLeastOneExists(postprocessors.getConfig(key),
+ ProjectionConfigOptions.OUTPUT_FIELDS.key(),
+ ProjectionConfigOptions.REMOVE_FIELDS.key(),
+ ProjectionConfigOptions.FUNCTIONS.key());
+ if (!result.isSuccess()) {
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
+ "Postprocessor: %s, At least one of [%s] should be specified.",
+ key, String.join(",",
+ ProjectionConfigOptions.OUTPUT_FIELDS.key(),
+ ProjectionConfigOptions.REMOVE_FIELDS.key(),
+ ProjectionConfigOptions.FUNCTIONS.key())));
+ }
+
ProjectionConfig projectionConfig = new JSONObject((Map<String, Object>) value).toJavaObject(ProjectionConfig.class);
- checkProjectionConfig(projectionConfig);
projectionConfig.setName(key);
postprocessingConfigMap.put(key, projectionConfig);
});
+
}
return postprocessingConfigMap;
@@ -54,14 +82,9 @@ public class PostprocessingExecuteProcessor extends AbstractExecuteProcessor<Str
projectionProcessor.projectionProcessorFunction(
dataStream, projectionConfig);
} catch (Exception e) {
- throw new JobExecuteException("create postprocessing operator error", e);
+ throw new JobExecuteException("create postprocessing pipeline operator error", e);
}
return dataStream;
}
- private static void checkProjectionConfig(ProjectionConfig projectionConfig) {
- if (!StringUtil.isNotBlank(projectionConfig.getType())) {
- throw new ConfigCheckException("校验未通过 Processing Pipelines " + projectionConfig.getName() + " type 为空!");
- }
- }
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecuteProcessor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
index 637e33b..84e8718 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecuteProcessor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
@@ -2,12 +2,15 @@ package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson.JSONObject;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
-import com.geedgenetworks.bootstrap.exception.ConfigCheckException;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.config.CheckConfigUtil;
+import com.geedgenetworks.common.config.CheckResult;
+import com.geedgenetworks.common.config.ProjectionConfigOptions;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.ConfigValidationException;
import com.geedgenetworks.core.pojo.ProjectionConfig;
import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
-import com.geedgenetworks.utils.StringUtil;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
@@ -17,21 +20,43 @@ import java.net.URL;
import java.util.List;
import java.util.Map;
+/**
+ * Initialize config and execute preprocessor
+ */
@Slf4j
-public class PreprocessingExecuteProcessor extends AbstractExecuteProcessor<String, ProjectionConfig> {
+public class PreprocessingExecutor extends AbstractExecutor<String, ProjectionConfig> {
private static final String PROCESSOR_TYPE = ProcessorType.PREPROCESSING.getType();
-
- public PreprocessingExecuteProcessor(List<URL> jarPaths, Config config) {
+ public PreprocessingExecutor(List<URL> jarPaths, Config config) {
super(jarPaths, config);
}
@Override
protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
Map<String, ProjectionConfig> preprocessingConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.PREPROCESSING_PIPELINES)) {
- Map<String, Object> operatorConfigMap = operatorConfig.getConfig(Constants.PREPROCESSING_PIPELINES).root().unwrapped();
- operatorConfigMap.forEach((key,value) -> {
+ Config preprocessors = operatorConfig.getConfig(Constants.PREPROCESSING_PIPELINES);
+ preprocessors.root().unwrapped().forEach((key,value) -> {
+ CheckResult result = CheckConfigUtil.checkAllExists(preprocessors.getConfig(key),
+ ProjectionConfigOptions.TYPE.key());
+ if (!result.isSuccess()) {
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
+ "Preprocessor: %s, Message: %s",
+ key, result.getMsg()));
+ }
+
+ result = CheckConfigUtil.checkAtLeastOneExists(preprocessors.getConfig(key),
+ ProjectionConfigOptions.OUTPUT_FIELDS.key(),
+ ProjectionConfigOptions.REMOVE_FIELDS.key(),
+ ProjectionConfigOptions.FUNCTIONS.key());
+ if (!result.isSuccess()) {
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
+ "Preprocessor: %s, At least one of [%s] should be specified.",
+ key, String.join(",",
+ ProjectionConfigOptions.OUTPUT_FIELDS.key(),
+ ProjectionConfigOptions.REMOVE_FIELDS.key(),
+ ProjectionConfigOptions.FUNCTIONS.key())));
+ }
+
ProjectionConfig projectionConfig = new JSONObject((Map<String, Object>) value).toJavaObject(ProjectionConfig.class);
- checkProjectionConfig(projectionConfig);
projectionConfig.setName(key);
preprocessingConfigMap.put(key, projectionConfig);
});
@@ -56,14 +81,8 @@ public class PreprocessingExecuteProcessor extends AbstractExecuteProcessor<Stri
projectionProcessor.projectionProcessorFunction(
dataStream, projectionConfig);
} catch (Exception e) {
- throw new JobExecuteException("create preprocessing operator error", e);
+ throw new JobExecuteException("Create preprocessor pipeline instance failed!", e);
}
return dataStream;
}
- private static void checkProjectionConfig(ProjectionConfig projectionConfig) {
-
- if (!StringUtil.isNotBlank(projectionConfig.getType())) {
- throw new ConfigCheckException("校验未通过 projection " + projectionConfig.getName() + " type 为空!");
- }
- }
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecuteProcessor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
index e57448b..a8798e8 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecuteProcessor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
@@ -2,12 +2,15 @@ package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson.JSONObject;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
-import com.geedgenetworks.bootstrap.exception.ConfigCheckException;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.config.CheckConfigUtil;
+import com.geedgenetworks.common.config.CheckResult;
+import com.geedgenetworks.common.config.ProjectionConfigOptions;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.ConfigValidationException;
import com.geedgenetworks.core.pojo.ProjectionConfig;
import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
-import com.geedgenetworks.utils.StringUtil;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -15,24 +18,46 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
import java.util.List;
import java.util.Map;
-
-public class ProcessingExecuteProcessor extends AbstractExecuteProcessor<String, ProjectionConfig> {
+/**
+ * Initialize config and execute processor
+ */
+public class ProcessingExecutor extends AbstractExecutor<String, ProjectionConfig> {
private static final String PROCESSOR_TYPE = ProcessorType.PROCESSING.getType();
- public ProcessingExecuteProcessor(List<URL> jarPaths, Config config) {
+ public ProcessingExecutor(List<URL> jarPaths, Config config) {
super(jarPaths, config);
}
@Override
protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
Map<String, ProjectionConfig> processingConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.PROCESSING_PIPELINES)) {
- Map<String, Object> operatorConfigMap = operatorConfig.getConfig(Constants.PROCESSING_PIPELINES).root().unwrapped();
- operatorConfigMap.forEach((key,value) -> {
+ Config processors = operatorConfig.getConfig(Constants.PROCESSING_PIPELINES);
+ processors.root().unwrapped().forEach((key,value) -> {
+ CheckResult result = CheckConfigUtil.checkAllExists(processors.getConfig(key),
+ ProjectionConfigOptions.TYPE.key());
+ if (!result.isSuccess()) {
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
+ "Processor: %s, Message: %s",
+ key, result.getMsg()));
+ }
+ result = CheckConfigUtil.checkAtLeastOneExists(processors.getConfig(key),
+ ProjectionConfigOptions.OUTPUT_FIELDS.key(),
+ ProjectionConfigOptions.REMOVE_FIELDS.key(),
+ ProjectionConfigOptions.FUNCTIONS.key());
+ if (!result.isSuccess()) {
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
+ "Processor: %s, At least one of [%s] should be specified.",
+ key, String.join(",",
+ ProjectionConfigOptions.OUTPUT_FIELDS.key(),
+ ProjectionConfigOptions.REMOVE_FIELDS.key(),
+ ProjectionConfigOptions.FUNCTIONS.key())));
+ }
+
ProjectionConfig projectionConfig = new JSONObject((Map<String, Object>) value).toJavaObject(ProjectionConfig.class);
- checkProjectionConfig(projectionConfig);
projectionConfig.setName(key);
processingConfigMap.put(key, projectionConfig);
});
+
}
return processingConfigMap;
@@ -55,14 +80,9 @@ public class ProcessingExecuteProcessor extends AbstractExecuteProcessor<String,
projectionProcessor.projectionProcessorFunction(
dataStream, projectionConfig);
} catch (Exception e) {
- throw new JobExecuteException("create processing operator error", e);
+ throw new JobExecuteException("Create processing pipeline instance failed!", e);
}
return dataStream;
}
- private static void checkProjectionConfig(ProjectionConfig projectionConfig) {
- if (!StringUtil.isNotBlank(projectionConfig.getType())) {
- throw new ConfigCheckException("校验未通过 Processing Pipelines " + projectionConfig.getName() + " type 为空!");
- }
- }
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecuteProcessor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
index 800c59d..7aee40f 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecuteProcessor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
@@ -5,6 +5,12 @@ import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.ConfigCheckException;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.config.CheckConfigUtil;
+import com.geedgenetworks.common.config.CheckResult;
+import com.geedgenetworks.common.config.SinkConfigOptions;
+import com.geedgenetworks.common.config.SourceConfigOptions;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.ConfigValidationException;
import com.geedgenetworks.core.connector.sink.SinkProvider;
import com.geedgenetworks.core.factories.FactoryUtil;
import com.geedgenetworks.core.factories.SinkTableFactory;
@@ -22,24 +28,36 @@ import java.net.URL;
import java.util.List;
import java.util.Map;
+/**
+ * Initialize config and execute sink connector
+ */
@Slf4j
-public class SinkExecuteProcessor extends AbstractExecuteProcessor<String, SinkConfig>{
+public class SinkExecutor extends AbstractExecutor<String, SinkConfig> {
private static final String PROCESSOR_TYPE = ProcessorType.SINK.getType();
- public SinkExecuteProcessor(List<URL> jarPaths, Config config) {
+ public SinkExecutor(List<URL> jarPaths, Config config) {
super(jarPaths, config);
}
@Override
protected Map<String, SinkConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
Map<String, SinkConfig> sinkConfigMap = Maps.newHashMap();
+
if (operatorConfig.hasPath(Constants.SINKS)) {
- Map<String, Object> operatorConfigMap = operatorConfig.getConfig(Constants.SINKS).root().unwrapped();
- operatorConfigMap.forEach((key,value) -> {
+ Config sinks = operatorConfig.getConfig(Constants.SINKS);
+ sinks.root().unwrapped().forEach((key,value) -> {
+ CheckResult result = CheckConfigUtil.checkAllExists(sinks.getConfig(key),
+ SinkConfigOptions.TYPE.key(), SinkConfigOptions.PROPERTIES.key());
+ if (!result.isSuccess()) {
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
+ "Sink: %s, Message: %s",
+ key, result.getMsg()));
+ }
+
SinkConfig sinkConfig = new JSONObject((Map<String, Object>) value).toJavaObject(SinkConfig.class);
- checkSinkConfig(sinkConfig);
sinkConfig.setName(key);
sinkConfigMap.put(key, sinkConfig);
});
+
}
return sinkConfigMap;
@@ -60,17 +78,10 @@ public class SinkExecuteProcessor extends AbstractExecuteProcessor<String, SinkC
}
dataStreamSink.name(sinkConfig.getName());
} catch (Exception e) {
- throw new JobExecuteException("create sink operator error", e);
+ throw new JobExecuteException("Create sink instance failed!", e);
}
return null;
}
- private void checkSinkConfig(SinkConfig sinkConfig) {
-
- if (!StringUtil.isNotBlank(sinkConfig.getType())) {
- throw new ConfigCheckException("校验未通过 sink " + sinkConfig.getName() + " type 为空!");
- }
- }
-
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecuteProcessor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
index a4a596e..eee78a2 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecuteProcessor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
@@ -3,48 +3,55 @@ package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
-import com.geedgenetworks.bootstrap.exception.ConfigCheckException;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.config.CheckConfigUtil;
+import com.geedgenetworks.common.config.CheckResult;
+import com.geedgenetworks.common.config.SourceConfigOptions;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.ConfigValidationException;
import com.geedgenetworks.core.connector.source.SourceProvider;
import com.geedgenetworks.core.factories.FactoryUtil;
import com.geedgenetworks.core.factories.SourceTableFactory;
import com.geedgenetworks.core.factories.TableFactory;
-import com.geedgenetworks.core.pojo.Event;
import com.geedgenetworks.core.pojo.SourceConfig;
import com.geedgenetworks.core.types.StructType;
import com.geedgenetworks.core.types.Types;
-import com.geedgenetworks.utils.StringUtil;
import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
+import com.typesafe.config.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-
import java.net.URL;
-import java.time.Duration;
import java.util.List;
import java.util.Map;
+/**
+ * Initialize config and execute source connector
+ */
@Slf4j
-public class SourceExecuteProcessor extends AbstractExecuteProcessor<String, SourceConfig> {
+public class SourceExecutor extends AbstractExecutor<String, SourceConfig> {
private static final String PROCESSOR_TYPE = ProcessorType.SOURCE.getType();
- public SourceExecuteProcessor(List<URL> jarPaths, Config config) {
+ public SourceExecutor(List<URL> jarPaths, Config config) {
super(jarPaths, config);
}
@Override
protected Map<String, SourceConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
Map<String, SourceConfig> sourceConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.SOURCES)) {
- Map<String, Object> operatorConfigMap = operatorConfig.getConfig(Constants.SOURCES).root().unwrapped();
- operatorConfigMap.forEach((key,value) -> {
+ Config sources = operatorConfig.getConfig(Constants.SOURCES);
+ sources.root().unwrapped().forEach((key,value) -> {
+ CheckResult result = CheckConfigUtil.checkAllExists(sources.getConfig(key),
+ SourceConfigOptions.TYPE.key(), SourceConfigOptions.PROPERTIES.key());
+ if (!result.isSuccess()) {
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
+ "Source: %s, Message: %s",
+ key, result.getMsg()));
+ }
+
SourceConfig sourceConfig = new JSONObject((Map<String, Object>) value).toJavaObject(SourceConfig.class);
- checkSourceConfig(sourceConfig);
sourceConfig.setName(key);
if(CollectionUtils.isNotEmpty(sourceConfig.getFields())){
StructType schema = Types.parseSchemaFromJson(JSON.toJSONString(sourceConfig.getFields()));
@@ -52,6 +59,7 @@ public class SourceExecuteProcessor extends AbstractExecuteProcessor<String, Sou
}
sourceConfigMap.put(key, sourceConfig);
});
+
}
return sourceConfigMap;
@@ -79,13 +87,7 @@ public class SourceExecuteProcessor extends AbstractExecuteProcessor<String, Sou
}
return sourceSingleOutputStreamOperator.name(sourceConfig.getName());
} catch (Exception e) {
- throw new JobExecuteException("create source operator error", e);
- }
- }
-
- private void checkSourceConfig(SourceConfig sourceConfig) {
- if (!StringUtil.isNotBlank(sourceConfig.getType())) {
- throw new ConfigCheckException("校验未通过 source " + sourceConfig.getName() + " type 为空!");
+ throw new JobExecuteException("Create source instance failed!", e);
}
}
}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java
index db7cf43..8b299df 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java
@@ -8,7 +8,7 @@ import com.geedgenetworks.bootstrap.main.GrootStreamRunner;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.config.GrootStreamConfig;
import com.geedgenetworks.common.utils.ReflectionUtils;
-import com.geedgenetworks.core.pojo.Event;
+import com.geedgenetworks.common.Event;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
@@ -33,12 +33,12 @@ import java.util.stream.Stream;
public class JobExecutionTest {
protected final JobRuntimeEnvironment jobRuntimeEnvironment;
- private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> sourceExecuteProcessor;
- private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> filterExecuteProcessor;
- private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> preprocessingExecuteProcessor;
- private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> processingExecuteProcessor;
- private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> postprocessingExecuteProcessor;
- private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> sinkExecuteProcessor;
+ private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sourceExecuteProcessor;
+ private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> filterExecuteProcessor;
+ private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> preprocessingExecuteProcessor;
+ private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> processingExecuteProcessor;
+ private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> postprocessingExecuteProcessor;
+ private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sinkExecuteProcessor;
private final List<Node> nodes;
@@ -66,12 +66,12 @@ public class JobExecutionTest {
}
registerPlugin(config.getConfig(Constants.APPLICATION));
- this.sourceExecuteProcessor = new SourceExecuteProcessor(jarPaths, config);
- this.sinkExecuteProcessor = new SinkExecuteProcessor(jarPaths, config);
- this.filterExecuteProcessor = new FilterExecuteProcessor(jarPaths, config);
- this.preprocessingExecuteProcessor = new PreprocessingExecuteProcessor(jarPaths, config);
- this.processingExecuteProcessor = new ProcessingExecuteProcessor(jarPaths, config);
- this.postprocessingExecuteProcessor = new PostprocessingExecuteProcessor(jarPaths, config);
+ this.sourceExecuteProcessor = new SourceExecutor(jarPaths, config);
+ this.sinkExecuteProcessor = new SinkExecutor(jarPaths, config);
+ this.filterExecuteProcessor = new FilterExecutor(jarPaths, config);
+ this.preprocessingExecuteProcessor = new PreprocessingExecutor(jarPaths, config);
+ this.processingExecuteProcessor = new ProcessingExecutor(jarPaths, config);
+ this.postprocessingExecuteProcessor = new PostprocessingExecutor(jarPaths, config);
this.jobRuntimeEnvironment =
JobRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths), grootStreamConfig);
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
index 0d25ed6..46980de 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
@@ -1,47 +1,29 @@
package com.geedgenetworks.bootstrap.main.simple;
import cn.hutool.setting.yaml.YamlUtil;
-import com.geedgenetworks.bootstrap.command.Command;
-import com.geedgenetworks.bootstrap.command.CommandArgs;
import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs;
import com.geedgenetworks.bootstrap.enums.EngineType;
-import com.geedgenetworks.bootstrap.exception.ConfigCheckException;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName;
-import com.geedgenetworks.bootstrap.execution.JobExecution;
import com.geedgenetworks.bootstrap.utils.CommandLineUtils;
import com.geedgenetworks.bootstrap.utils.ConfigFileUtils;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.config.ConfigProvider;
import com.geedgenetworks.common.config.GrootStreamConfig;
-import com.geedgenetworks.common.utils.FileUtils;
import com.geedgenetworks.core.connector.collect.CollectSink;
-import com.geedgenetworks.core.pojo.Event;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigObject;
import com.typesafe.config.ConfigUtil;
import com.typesafe.config.ConfigValueFactory;
-import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
-import java.io.File;
-import java.net.URL;
import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
import java.util.Map;
-import static cn.hutool.core.util.ClassLoaderUtil.getClassLoader;
-import static com.geedgenetworks.bootstrap.utils.ConfigFileUtils.checkConfigExist;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;