summaryrefslogtreecommitdiff
path: root/groot-bootstrap
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-08-23 17:58:22 +0800
committerwangkuan <[email protected]>2024-08-23 17:58:22 +0800
commit215dd9aa1e4ec6a509d64c78ec414a8196dace3c (patch)
tree5c038000b9de72684a1f8b8f1832394aed30cf13 /groot-bootstrap
parent07332297c1306aa0dac649c7d15bf131e8edbc7e (diff)
[feature][core][common]GAL-646 Groot Stream支持Split Operator实现动态分流
Diffstat (limited to 'groot-bootstrap')
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java1
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java18
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java16
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java4
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java103
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java15
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java4
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java5
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java4
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java4
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java4
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java95
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java98
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java107
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java6
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java4
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml92
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml97
18 files changed, 584 insertions, 93 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java
index d2b37f6..6f33cae 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java
@@ -3,6 +3,7 @@ package com.geedgenetworks.bootstrap.enums;
public enum ProcessorType {
SOURCE("source"),
FILTER("filter"),
+ SPLIT("split"),
PREPROCESSING("preprocessing"),
PROCESSING("processing"),
POSTPROCESSING("postprocessing"),
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
index 64c66b6..7a55ffe 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
@@ -1,26 +1,26 @@
package com.geedgenetworks.bootstrap.execution;
+import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.utils.ReflectionUtils;
import com.geedgenetworks.core.filter.Filter;
import com.geedgenetworks.core.processor.Processor;
-import com.geedgenetworks.core.processor.aggregate.AggregateProcessor;
-import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
+import com.geedgenetworks.core.split.Split;
import com.typesafe.config.Config;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+
import java.net.URL;
import java.net.URLClassLoader;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.ServiceLoader;
+import java.util.*;
import java.util.function.BiConsumer;
public abstract class AbstractExecutor<K, V>
- implements Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> {
+ implements Executor<DataStream<Event>, JobRuntimeEnvironment> {
protected JobRuntimeEnvironment jobRuntimeEnvironment;
protected final Config operatorConfig;
protected final Map<K,V> operatorMap;
protected final Map<String,Filter> filterMap = new HashMap<>();
+ protected final Map<String, Split> splitMap = new HashMap<>();
protected final Map<String, Processor> processorMap = new HashMap<>();
protected AbstractExecutor(List<URL> jarPaths, Config operatorConfig) {
@@ -30,6 +30,10 @@ public abstract class AbstractExecutor<K, V>
for (Filter filter : filters) {
this.filterMap.put(filter.type(), filter);
}
+ ServiceLoader<Split> splits = ServiceLoader.load(Split.class);
+ for (Split split : splits) {
+ this.splitMap.put(split.type(), split);
+ }
ServiceLoader<Processor> processors = ServiceLoader.load(Processor.class);
for (Processor processor : processors) {
this.processorMap.put(processor.type(), processor);
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
index 66c0b0f..b0a04cd 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
@@ -2,6 +2,7 @@ package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson.JSONObject;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
+import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.config.*;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
@@ -13,6 +14,7 @@ import com.geedgenetworks.core.processor.table.TableProcessor;
import com.geedgenetworks.core.processor.aggregate.AggregateProcessor;
import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
import com.typesafe.config.Config;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
@@ -27,7 +29,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
}
@Override
- public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
+ public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
ProcessorConfig processorConfig = operatorMap.get(node.getName());
switch (processorConfig.getType()) {
@@ -45,7 +47,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
}
return dataStream;
}
- protected SingleOutputStreamOperator executeTableProcessor(SingleOutputStreamOperator dataStream, Node node, TableConfig tableConfig) throws JobExecuteException {
+ protected DataStream<Event> executeTableProcessor(DataStream<Event> dataStream, Node node, TableConfig tableConfig) throws JobExecuteException {
TableProcessor tableProcessor;
if (processorMap.containsKey(tableConfig.getType())) {
@@ -63,15 +65,15 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
tableConfig.setParallelism(node.getParallelism());
}
try {
- dataStream =
- tableProcessor.processorFunction(
- dataStream, tableConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig());
+
+ dataStream = tableProcessor.processorFunction(
+ dataStream, tableConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig());
} catch (Exception e) {
throw new JobExecuteException("Create orderby pipeline instance failed!", e);
}
return dataStream;
}
- protected SingleOutputStreamOperator executeAggregateProcessor(SingleOutputStreamOperator dataStream, Node node, AggregateConfig aggregateConfig) throws JobExecuteException {
+ protected DataStream<Event> executeAggregateProcessor(DataStream<Event> dataStream, Node node, AggregateConfig aggregateConfig) throws JobExecuteException {
AggregateProcessor aggregateProcessor;
if (processorMap.containsKey(aggregateConfig.getType())) {
@@ -98,7 +100,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
return dataStream;
}
- protected SingleOutputStreamOperator executeProjectionProcessor(SingleOutputStreamOperator dataStream, Node node, ProjectionConfig projectionConfig) throws JobExecuteException {
+ protected DataStream<Event> executeProjectionProcessor(DataStream<Event> dataStream, Node node, ProjectionConfig projectionConfig) throws JobExecuteException {
ProjectionProcessor projectionProcessor;
if (processorMap.containsKey(projectionConfig.getType())) {
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
index 506aa11..66f0585 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.config.CheckConfigUtil;
import com.geedgenetworks.common.config.CheckResult;
import com.geedgenetworks.common.config.FilterConfigOptions;
@@ -14,6 +15,7 @@ import com.geedgenetworks.core.pojo.FilterConfig;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
@@ -54,7 +56,7 @@ public class FilterExecutor extends AbstractExecutor<String, FilterConfig> {
}
@Override
- public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
+ public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
FilterConfig filterConfig = operatorMap.get(node.getName());
String className = filterConfig.getType();
Filter filter;
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
index 2eabefa..22b23a7 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
@@ -5,15 +5,17 @@ import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.main.GrootStreamRunner;
import com.geedgenetworks.common.Constants;
-import com.geedgenetworks.common.config.GrootStreamConfig;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.config.GrootStreamConfig;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigUtil;
import com.typesafe.config.ConfigValueFactory;
import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.util.OutputTag;
import java.io.File;
import java.net.MalformedURLException;
@@ -27,17 +29,18 @@ import java.util.stream.Stream;
public class JobExecution {
private final JobRuntimeEnvironment jobRuntimeEnvironment;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sourceExecutor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sinkExecutor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> filterExecutor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> preprocessingExecutor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> processingExecutor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> postprocessingExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> sourceExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> sinkExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> filterExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> splitExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> preprocessingExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> processingExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor;
private final List<Node> nodes;
-
private final List<URL> jarPaths;
+
public JobExecution(Config config, GrootStreamConfig grootStreamConfig) {
- try {
+ try {
jarPaths = new ArrayList<>(Collections.singletonList(new File(StartBuilder.appBootstrapDir()
.resolve(GrootStreamRunner.APP_JAR_NAME).toString())
.toURI().toURL()));
@@ -50,6 +53,7 @@ public class JobExecution {
this.sourceExecutor = new SourceExecutor(jarPaths, config);
this.sinkExecutor = new SinkExecutor(jarPaths, config);
this.filterExecutor = new FilterExecutor(jarPaths, config);
+ this.splitExecutor = new SplitExecutor(jarPaths, config);
this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, config);
this.processingExecutor = new ProcessingExecutor(jarPaths, config);
this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, config);
@@ -59,6 +63,7 @@ public class JobExecution {
this.sourceExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.sinkExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.filterExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.splitExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
@@ -69,7 +74,7 @@ public class JobExecution {
private void registerPlugin(Config appConfig) {
List<Path> thirdPartyJars = new ArrayList<>();
Config envConfig = appConfig.getConfig(Constants.APPLICATION_ENV);
- if(envConfig.hasPath(ExecutionConfigKeyName.JARS)) {
+ if (envConfig.hasPath(ExecutionConfigKeyName.JARS)) {
thirdPartyJars = new ArrayList<>(StartBuilder
.getThirdPartyJars(envConfig.getString(ExecutionConfigKeyName.JARS)));
}
@@ -81,7 +86,7 @@ public class JobExecution {
.map(uri -> {
try {
return uri.toURL();
- }catch (MalformedURLException e){
+ } catch (MalformedURLException e) {
throw new RuntimeException("the uri of jar illegal: " + uri, e);
}
}).collect(Collectors.toList());
@@ -93,10 +98,10 @@ public class JobExecution {
}
- private Config registerPlugin(Config config , List<URL> jars) {
- config = this.injectJarsToConfig(
- config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_JARS), jars);
- return this.injectJarsToConfig(
+ private Config registerPlugin(Config config, List<URL> jars) {
+ config = this.injectJarsToConfig(
+ config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_JARS), jars);
+ return this.injectJarsToConfig(
config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_CLASSPATHS), jars);
}
@@ -127,7 +132,7 @@ public class JobExecution {
.collect(Collectors.toSet());
paths.addAll(validJars);
- config = config.withValue(
+ config = config.withValue(
path,
ConfigValueFactory.fromAnyRef(
paths.stream()
@@ -150,8 +155,9 @@ public class JobExecution {
private List<Node> buildJobNode(Config config) {
Map<String, Object> sources = Maps.newHashMap();
- Map<String, Object> sinks =Maps.newHashMap();
+ Map<String, Object> sinks = Maps.newHashMap();
Map<String, Object> filters = Maps.newHashMap();
+ Map<String, Object> splits = Maps.newHashMap();
Map<String, Object> preprocessingPipelines = Maps.newHashMap();
Map<String, Object> processingPipelines = Maps.newHashMap();
Map<String, Object> postprocessingPipelines = Maps.newHashMap();
@@ -160,11 +166,14 @@ public class JobExecution {
sources = config.getConfig(Constants.SOURCES).root().unwrapped();
}
if (config.hasPath(Constants.SINKS)) {
- sinks =config.getConfig(Constants.SINKS).root().unwrapped();
+ sinks = config.getConfig(Constants.SINKS).root().unwrapped();
}
if (config.hasPath(Constants.FILTERS)) {
filters = config.getConfig(Constants.FILTERS).root().unwrapped();
}
+ if (config.hasPath(Constants.SPLITS)) {
+ splits = config.getConfig(Constants.SPLITS).root().unwrapped();
+ }
if (config.hasPath(Constants.PREPROCESSING_PIPELINES)) {
preprocessingPipelines = config.getConfig(Constants.PREPROCESSING_PIPELINES).root().unwrapped();
}
@@ -184,13 +193,15 @@ public class JobExecution {
nodes.add(node);
});
- for(Node node : nodes) {
+ for (Node node : nodes) {
if (sources.containsKey(node.getName())) {
node.setType(ProcessorType.SOURCE);
} else if (sinks.containsKey(node.getName())) {
node.setType(ProcessorType.SINK);
} else if (filters.containsKey(node.getName())) {
node.setType(ProcessorType.FILTER);
+ } else if (splits.containsKey(node.getName())) {
+ node.setType(ProcessorType.SPLIT);
} else if (preprocessingPipelines.containsKey(node.getName())) {
node.setType(ProcessorType.PREPROCESSING);
} else if (processingPipelines.containsKey(node.getName())) {
@@ -214,12 +225,12 @@ public class JobExecution {
List<Node> sourceNodes = nodes
.stream().filter(v -> v.getType().name().equals(ProcessorType.SOURCE.name())).collect(Collectors.toList());
- SingleOutputStreamOperator<Event> singleOutputStreamOperator = null;
+ DataStream<Event> dataStream = null;
- for(Node sourceNode : sourceNodes) {
- singleOutputStreamOperator = sourceExecutor.execute(singleOutputStreamOperator, sourceNode);
+ for (Node sourceNode : sourceNodes) {
+ dataStream = sourceExecutor.execute(dataStream, sourceNode);
for (String nodeName : sourceNode.getDownstream()) {
- buildJobGraph(singleOutputStreamOperator, nodeName);
+ buildJobGraph(dataStream, nodeName);
}
}
@@ -230,7 +241,7 @@ public class JobExecution {
log.info("Execution Job: {}", jobRuntimeEnvironment.getJobName());
jobRuntimeEnvironment.getStreamExecutionEnvironment().execute(jobRuntimeEnvironment.getJobName());
final long jobEndTime = System.currentTimeMillis();
- log.info("Job finished, execution duration: {} ms", jobEndTime-jobStartTime);
+ log.info("Job finished, execution duration: {} ms", jobEndTime - jobStartTime);
} catch (Exception e) {
throw new JobExecuteException("Execute job error", e);
@@ -238,34 +249,62 @@ public class JobExecution {
}
- private void buildJobGraph(SingleOutputStreamOperator<Event> singleOutputStreamOperator, String downstreamNodeName) {
+ private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) {
Node node = getNode(downstreamNodeName).orElseGet(() -> {
throw new JobExecuteException("can't find downstream node " + downstreamNodeName);
});
if (node.getType().name().equals(ProcessorType.FILTER.name())) {
- singleOutputStreamOperator = filterExecutor.execute(singleOutputStreamOperator, node);
+ if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = filterExecutor.execute(dataStream, node);
+ }
+ } else if (node.getType().name().equals(ProcessorType.SPLIT.name())) {
+ dataStream = splitExecutor.execute(dataStream, node);
+
} else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) {
- singleOutputStreamOperator = preprocessingExecutor.execute(singleOutputStreamOperator, node);
+ if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = preprocessingExecutor.execute(dataStream, node);
+ }
} else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) {
- singleOutputStreamOperator = processingExecutor.execute(singleOutputStreamOperator, node);
+ if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = processingExecutor.execute(dataStream, node);
+ }
} else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) {
- singleOutputStreamOperator = postprocessingExecutor.execute(singleOutputStreamOperator, node);
+ if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = postprocessingExecutor.execute(dataStream, node);
+ }
} else if (node.getType().name().equals(ProcessorType.SINK.name())) {
- singleOutputStreamOperator = sinkExecutor.execute(singleOutputStreamOperator, node);
+ if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = sinkExecutor.execute(dataStream, node);
+ }
} else {
throw new JobExecuteException("unsupported process type " + node.getType().name());
}
for (String nodeName : node.getDownstream()) {
- buildJobGraph(singleOutputStreamOperator, nodeName);
+ buildJobGraph(dataStream, nodeName);
}
}
private Optional<Node> getNode(String name) {
- return nodes.stream().filter(v-> v.getName().equals(name)).findFirst();
+ return nodes.stream().filter(v -> v.getName().equals(name)).findFirst();
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
index 7141f5e..2e68098 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
@@ -25,9 +25,7 @@ import org.apache.flink.util.TernaryBoolean;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
+import java.util.*;
import java.util.stream.Collectors;
@Slf4j
@@ -37,6 +35,8 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
private GrootStreamConfig grootStreamConfig;
private StreamExecutionEnvironment environment;
private String jobName = Constants.DEFAULT_JOB_NAME;
+ private Set<String> splitSet = new HashSet<>();
+
private JobRuntimeEnvironment(Config config, GrootStreamConfig grootStreamConfig) {
this.grootStreamConfig = grootStreamConfig;
@@ -197,6 +197,14 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
}
}
+ public Set<String> getSplitSet() {
+ return splitSet;
+ }
+
+ public void setSplitSet(Set<String> splitSet) {
+ this.splitSet = splitSet;
+ }
+
private void setCheckpoint() {
long interval = 0;
if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_INTERVAL)) {
@@ -272,6 +280,7 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
+
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
index 36fad61..b9555b4 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
@@ -3,9 +3,11 @@ package com.geedgenetworks.bootstrap.execution;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.pojo.ProcessorConfig;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
@@ -36,7 +38,7 @@ public class PostprocessingExecutor extends AbstractProcessorExecutor {
@Override
- public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
+ public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
return super.execute(dataStream, node);
}
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
index b1e53e4..a7b9e5e 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
@@ -3,10 +3,12 @@ package com.geedgenetworks.bootstrap.execution;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.pojo.ProcessorConfig;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
@@ -37,7 +39,8 @@ public class PreprocessingExecutor extends AbstractProcessorExecutor {
}
@Override
- public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
+ public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
+
return super.execute(dataStream, node);
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
index d69fe8c..f6788ed 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
@@ -3,9 +3,11 @@ package com.geedgenetworks.bootstrap.execution;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.pojo.ProcessorConfig;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
@@ -35,7 +37,7 @@ public class ProcessingExecutor extends AbstractProcessorExecutor {
}
@Override
- public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
+ public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
return super.execute(dataStream, node);
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
index a0bedc9..70934b8 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
@@ -5,6 +5,7 @@ import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.utils.SchemaConfigParse;
import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.config.CheckConfigUtil;
import com.geedgenetworks.common.config.CheckResult;
import com.geedgenetworks.common.config.SinkConfigOptions;
@@ -20,6 +21,7 @@ import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -63,7 +65,7 @@ public class SinkExecutor extends AbstractExecutor<String, SinkConfig> {
}
@Override
- public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
+ public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
SinkConfig sinkConfig = operatorMap.get(node.getName());
try {
SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, sinkConfig.getType());
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
index d4751af..9dff6b0 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
@@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
@@ -67,7 +68,7 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> {
}
@Override
- public SingleOutputStreamOperator execute(SingleOutputStreamOperator outputStreamOperator, Node node) throws JobExecuteException {
+ public DataStream<Event> execute(DataStream<Event> outputStreamOperator, Node node) throws JobExecuteException {
SourceConfig sourceConfig = operatorMap.get(node.getName());
SingleOutputStreamOperator sourceSingleOutputStreamOperator;
try {
@@ -152,4 +153,5 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> {
return 0;
}
}
+
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java
new file mode 100644
index 0000000..c0ac3a5
--- /dev/null
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java
@@ -0,0 +1,95 @@
+package com.geedgenetworks.bootstrap.execution;
+
+import com.alibaba.fastjson.JSONObject;
+import com.geedgenetworks.bootstrap.exception.JobExecuteException;
+import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.config.CheckConfigUtil;
+import com.geedgenetworks.common.config.CheckResult;
+import com.geedgenetworks.common.config.RouteConfigOptions;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.ConfigValidationException;
+import com.geedgenetworks.common.udf.RuleContext;
+import com.geedgenetworks.core.pojo.SplitConfig;
+import com.geedgenetworks.core.split.Split;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+
+import java.net.URL;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+
+/**
+ * Initialize config and execute filter operator
+ */
+@Slf4j
+public class SplitExecutor extends AbstractExecutor<String, SplitConfig> {
+
+
+ public SplitExecutor(List<URL> jarPaths, Config config) {
+ super(jarPaths, config);
+ }
+
+ @Override
+ protected Map<String, SplitConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
+ Map<String, SplitConfig> routeConfigMap = Maps.newHashMap();
+ if (operatorConfig.hasPath(Constants.SPLITS)) {
+ Config routes = operatorConfig.getConfig(Constants.SPLITS);
+ routes.root().unwrapped().forEach((key, value) -> {
+ CheckResult result = CheckConfigUtil.checkAllExists(routes.getConfig(key),
+ RouteConfigOptions.TYPE.key());
+ if (!result.isSuccess()) {
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
+ "split: %s, Message: %s",
+ key, result.getMsg()));
+ }
+ SplitConfig splitConfig = new JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class);
+ splitConfig.setName(key);
+ routeConfigMap.put(key, splitConfig);
+ });
+ }
+
+ return routeConfigMap;
+ }
+
+ @Override
+ public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
+ SplitConfig splitConfig = operatorMap.get(node.getName());
+ String className = splitConfig.getType();
+ Split split;
+ if (splitMap.containsKey(splitConfig.getType())) {
+
+ split = splitMap.get(splitConfig.getType());
+ } else {
+ Class cls;
+ try {
+ cls = Class.forName(className);
+ split = (Split) cls.newInstance();
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
+ throw new JobExecuteException("get split instance failed!", e);
+ }
+ }
+ if (node.getParallelism() > 0) {
+ splitConfig.setParallelism(node.getParallelism());
+ }
+ for(RuleContext ruleContext:splitConfig.getRules()) {
+ jobRuntimeEnvironment.getSplitSet().add(ruleContext.getName());
+ }
+ try {
+ dataStream =
+ split.splitFunction(
+ dataStream, splitConfig);
+ } catch (Exception e) {
+ throw new JobExecuteException("Create split instance failed!", e);
+ }
+ return dataStream;
+ }
+
+}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java
index 8b299df..7e995cf 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java
@@ -16,7 +16,9 @@ import com.typesafe.config.ConfigUtil;
import com.typesafe.config.ConfigValueFactory;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.util.OutputTag;
import java.io.File;
import java.net.MalformedURLException;
@@ -33,12 +35,13 @@ import java.util.stream.Stream;
public class JobExecutionTest {
protected final JobRuntimeEnvironment jobRuntimeEnvironment;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sourceExecuteProcessor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> filterExecuteProcessor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> preprocessingExecuteProcessor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> processingExecuteProcessor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> postprocessingExecuteProcessor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sinkExecuteProcessor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> sourceExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> filterExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> preprocessingExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> splitExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> processingExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> sinkExecutor;
private final List<Node> nodes;
@@ -66,21 +69,22 @@ public class JobExecutionTest {
}
registerPlugin(config.getConfig(Constants.APPLICATION));
- this.sourceExecuteProcessor = new SourceExecutor(jarPaths, config);
- this.sinkExecuteProcessor = new SinkExecutor(jarPaths, config);
- this.filterExecuteProcessor = new FilterExecutor(jarPaths, config);
- this.preprocessingExecuteProcessor = new PreprocessingExecutor(jarPaths, config);
- this.processingExecuteProcessor = new ProcessingExecutor(jarPaths, config);
- this.postprocessingExecuteProcessor = new PostprocessingExecutor(jarPaths, config);
+ this.sourceExecutor = new SourceExecutor(jarPaths, config);
+ this.sinkExecutor = new SinkExecutor(jarPaths, config);
+ this.splitExecutor = new SplitExecutor(jarPaths, config);
+ this.filterExecutor = new FilterExecutor(jarPaths, config);
+ this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, config);
+ this.processingExecutor = new ProcessingExecutor(jarPaths, config);
+ this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, config);
this.jobRuntimeEnvironment =
JobRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths), grootStreamConfig);
-
- this.sourceExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.sinkExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.filterExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.preprocessingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.processingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.postprocessingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.splitExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.sourceExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.sinkExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.filterExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.nodes = buildJobNode(config);
}
@@ -173,6 +177,7 @@ public class JobExecutionTest {
Map<String, Object> sources = Maps.newHashMap();
Map<String, Object> sinks =Maps.newHashMap();
Map<String, Object> filters = Maps.newHashMap();
+ Map<String, Object> splits = Maps.newHashMap();
Map<String, Object> preprocessingPipelines = Maps.newHashMap();
Map<String, Object> processingPipelines = Maps.newHashMap();
Map<String, Object> postprocessingPipelines = Maps.newHashMap();
@@ -186,6 +191,9 @@ public class JobExecutionTest {
if (config.hasPath(Constants.FILTERS)) {
filters = config.getConfig(Constants.FILTERS).root().unwrapped();
}
+ if (config.hasPath(Constants.SPLITS)) {
+ splits = config.getConfig(Constants.SPLITS).root().unwrapped();
+ }
if (config.hasPath(Constants.PREPROCESSING_PIPELINES)) {
preprocessingPipelines = config.getConfig(Constants.PREPROCESSING_PIPELINES).root().unwrapped();
}
@@ -210,6 +218,8 @@ public class JobExecutionTest {
node.setType(ProcessorType.SOURCE);
} else if (sinks.containsKey(node.getName())) {
node.setType(ProcessorType.SINK);
+ } else if (splits.containsKey(node.getName())) {
+ node.setType(ProcessorType.SPLIT);
} else if (filters.containsKey(node.getName())) {
node.setType(ProcessorType.FILTER);
} else if (preprocessingPipelines.containsKey(node.getName())) {
@@ -228,15 +238,15 @@ public class JobExecutionTest {
}
- public SingleOutputStreamOperator<Event> getSingleOutputStreamOperator() throws JobExecuteException {
+ public DataStream<Event> getSingleOutputStreamOperator() throws JobExecuteException {
List<Node> sourceNodes = nodes
.stream().filter(v -> v.getType().name().equals(ProcessorType.SOURCE.name())).collect(Collectors.toList());
- SingleOutputStreamOperator<Event> singleOutputStreamOperator = null;
+ DataStream<Event> singleOutputStreamOperator = null;
for(Node sourceNode : sourceNodes) {
- singleOutputStreamOperator = sourceExecuteProcessor.execute(singleOutputStreamOperator, sourceNode);
+ singleOutputStreamOperator = sourceExecutor.execute(singleOutputStreamOperator, sourceNode);
for (String nodeName : sourceNode.getDownstream()) {
buildJobGraph(singleOutputStreamOperator, nodeName);
}
@@ -247,27 +257,55 @@ public class JobExecutionTest {
}
- private void buildJobGraph(SingleOutputStreamOperator<Event> singleOutputStreamOperator, String downstreamNodeName) {
+ private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) {
Node node = getNode(downstreamNodeName).orElseGet(() -> {
throw new JobExecuteException("can't find downstream node " + downstreamNodeName);
});
if (node.getType().name().equals(ProcessorType.FILTER.name())) {
- singleOutputStreamOperator = filterExecuteProcessor.execute(singleOutputStreamOperator, node);
+ if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = filterExecutor.execute(dataStream, node);
+ }
+ } else if (node.getType().name().equals(ProcessorType.SPLIT.name())) {
+ dataStream = splitExecutor.execute(dataStream, node);
+
} else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) {
- singleOutputStreamOperator = preprocessingExecuteProcessor.execute(singleOutputStreamOperator, node);
+ if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = preprocessingExecutor.execute(dataStream, node);
+ }
} else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) {
- singleOutputStreamOperator = processingExecuteProcessor.execute(singleOutputStreamOperator, node);
+ if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = processingExecutor.execute(dataStream, node);
+ }
} else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) {
- singleOutputStreamOperator = postprocessingExecuteProcessor.execute(singleOutputStreamOperator, node);
+ if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = postprocessingExecutor.execute(dataStream, node);
+ }
} else if (node.getType().name().equals(ProcessorType.SINK.name())) {
- singleOutputStreamOperator = sinkExecuteProcessor.execute(singleOutputStreamOperator, node);
- } else {
+ if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = sinkExecutor.execute(dataStream, node);
+ }
+ } else {
throw new JobExecuteException("unsupported process type " + node.getType().name());
}
for (String nodeName : node.getDownstream()) {
- buildJobGraph(singleOutputStreamOperator, nodeName);
+ buildJobGraph(dataStream, nodeName);
}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java
new file mode 100644
index 0000000..dfe0600
--- /dev/null
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java
@@ -0,0 +1,107 @@
+package com.geedgenetworks.bootstrap.main.simple;
+
+import cn.hutool.setting.yaml.YamlUtil;
+import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs;
+import com.geedgenetworks.bootstrap.enums.EngineType;
+import com.geedgenetworks.bootstrap.exception.JobExecuteException;
+import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName;
+import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink;
+import com.geedgenetworks.bootstrap.utils.CommandLineUtils;
+import com.geedgenetworks.bootstrap.utils.ConfigFileUtils;
+import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.config.ConfigProvider;
+import com.geedgenetworks.common.config.GrootStreamConfig;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigObject;
+import com.typesafe.config.ConfigUtil;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class JobSplitTest {
+
+ @ClassRule
+ public static MiniClusterWithClientResource flinkCluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(1)
+ .setNumberTaskManagers(1)
+ .build());
+
+ @Test
+ public void testSplit() {
+
+ CollectSink.values.clear();
+ String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_test.yaml"};
+ ExecuteCommandArgs executeCommandArgs = CommandLineUtils
+ .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
+
+ executeCommandArgs.buildCommand();
+
+
+ GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig();
+ Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs);
+ // check config file exist
+ Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString());
+ ConfigObject configObject = ConfigValueFactory.fromMap(configMap);
+ Config config = configObject.toConfig();
+
+ config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE),
+ ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget()));
+
+
+ JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig);
+ jobExecution.getSingleOutputStreamOperator();
+
+ try {
+ jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute();
+ } catch (Exception e) {
+ throw new JobExecuteException("Job executed error", e);
+ }
+ Assert.assertEquals(7, CollectSink.values.size());
+ }
+ @Test
+ public void testSplitForAgg() {
+
+ CollectSink.values.clear();
+ String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_agg_test.yaml"};
+ ExecuteCommandArgs executeCommandArgs = CommandLineUtils
+ .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
+
+ executeCommandArgs.buildCommand();
+
+
+ GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig();
+ Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs);
+ // check config file exist
+ Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString());
+ ConfigObject configObject = ConfigValueFactory.fromMap(configMap);
+ Config config = configObject.toConfig();
+
+ config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE),
+ ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget()));
+
+
+ JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig);
+ jobExecution.getSingleOutputStreamOperator();
+
+ try {
+ jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute();
+ } catch (Exception e) {
+ throw new JobExecuteException("Job executed error", e);
+ }
+ Assert.assertEquals(1, CollectSink.values.size());
+ Assert.assertEquals("2", CollectSink.values.get(0).getExtractedFields().get("sessions").toString());
+
+ }
+}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
index c4f54a3..556c8c4 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
@@ -75,16 +75,11 @@ public class SimpleJobTest {
assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("recv_time").toString()));
assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("processing_time").toString()));
assertTrue(0 != Long.parseLong(CollectSink.values.get(0).getExtractedFields().get("log_id").toString()));
- Assert.assertEquals("印第安纳州", CollectSink.values.get(0).getExtractedFields().get("server_super_admin_area").toString());
- Assert.assertEquals("6167", CollectSink.values.get(0).getExtractedFields().get("server_asn").toString());
- Assert.assertEquals("美国", CollectSink.values.get(0).getExtractedFields().get("server_country_region").toString());
- Assert.assertTrue(!CollectSink.values.get(0).getExtractedFields().containsKey("client_country_region"));
Assert.assertEquals("http://192.168.44.12:9098/hos/traffic_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString());
Assert.assertEquals("[2600:1015:b002::,255.255.255.255]", CollectSink.values.get(0).getExtractedFields().get("ip_string").toString());
Assert.assertEquals("hello", CollectSink.values.get(0).getExtractedFields().get("mail_attachment_name").toString());
Assert.assertEquals("MULTIPATH_ETHERNET", CollectSink.values.get(0).getExtractedFields().get("tunnels_schema_type").toString());
List<String> asn_list = (List<String>) CollectSink.values.get(0).getExtractedFields().get("asn_list");
- Assert.assertEquals("6167", asn_list.get(0));
}
@@ -121,4 +116,5 @@ public class SimpleJobTest {
}
Assert.assertEquals(4, CollectSink.values.size());
}
+
}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java
index dfcb459..c5806ed 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java
@@ -3,9 +3,7 @@ package com.geedgenetworks.bootstrap.main.simple.collect;
import com.geedgenetworks.common.Event;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import java.util.*;
public class CollectSink implements SinkFunction<Event> {
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml
new file mode 100644
index 0000000..732d0f6
--- /dev/null
+++ b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml
@@ -0,0 +1,92 @@
+sources:
+ inline_source:
+ type : inline
+ fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
+ properties:
+ data: '[{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]'
+ interval.per.row: 1s # 可选
+ repeat.count: 1 # 可选
+ format: json
+ json.ignore.parse.errors: false
+sinks:
+ collect_sink:
+ type: collect
+ properties:
+ format: json
+splits:
+ test_split:
+ type: split
+ rules:
+ - name: aggregate_processor
+ expression: event.decoded_as == 'DNS'
+
+postprocessing_pipelines:
+ pre_etl_processor: # [object] Processing Pipeline
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ remove_fields: [fields,tags]
+ output_fields:
+ functions: # [array of object] Function List
+
+ - function: FLATTEN
+ lookup_fields: [ fields,tags ]
+ output_fields: [ ]
+ parameters:
+ #prefix: ""
+ depth: 3
+ # delimiter: "."
+
+ - function: RENAME
+ lookup_fields: [ '' ]
+ output_fields: [ '' ]
+ filter:
+ parameters:
+ # parent_fields: [tags]
+ # rename_fields:
+ # tags: tags
+ rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key;
+
+
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [ timestamp_ms ]
+ output_fields: [ recv_time ]
+ parameters:
+ precision: seconds
+ interval: 300
+ #
+
+ aggregate_processor:
+ type: aggregate
+ group_by_fields: [decoded_as]
+ window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time
+ window_size: 3
+ window_timestamp_field: test_time
+ functions:
+ - function: NUMBER_SUM
+ lookup_fields: [ sessions ]
+
+ table_processor:
+ type: table
+ functions:
+ - function: JSON_UNROLL
+ lookup_fields: [ encapsulation ]
+ output_fields: [ new_name ]
+
+application: # [object] Application Configuration
+ env: # [object] Environment Variables
+ name: groot-stream-job # [string] Job Name
+ pipeline:
+ object-reuse: true # [boolean] Object Reuse, default is false
+ topology: # [array of object] Node List. It will be used build data flow for job dag graph.
+ - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE.
+ parallelism: 1 # [number] Operator-Level Parallelism.
+ downstream: [test_split]
+ - name: test_split
+ parallelism: 1
+ downstream: [ aggregate_processor ]
+ - name: aggregate_processor
+ parallelism: 1
+ downstream: [ collect_sink ]
+ - name: collect_sink
+ parallelism: 1
+
+
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml
new file mode 100644
index 0000000..f13d69e
--- /dev/null
+++ b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml
@@ -0,0 +1,97 @@
+sources:
+ inline_source:
+ type : inline
+ fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
+ properties:
+ data: '[{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]'
+ interval.per.row: 1s # 可选
+ repeat.count: 1 # 可选
+ format: json
+ json.ignore.parse.errors: false
+sinks:
+ collect_sink:
+ type: collect
+ properties:
+ format: json
+splits:
+ test_split:
+ type: split
+ rules:
+ - name: table_processor
+ expression: event.decoded_as == 'HTTP'
+ - name: pre_etl_processor
+ expression: event.decoded_as == 'DNS'
+
+postprocessing_pipelines:
+ pre_etl_processor: # [object] Processing Pipeline
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ remove_fields: [fields,tags]
+ output_fields:
+ functions: # [array of object] Function List
+
+ - function: FLATTEN
+ lookup_fields: [ fields,tags ]
+ output_fields: [ ]
+ parameters:
+ #prefix: ""
+ depth: 3
+ # delimiter: "."
+
+ - function: RENAME
+ lookup_fields: [ '' ]
+ output_fields: [ '' ]
+ filter:
+ parameters:
+ # parent_fields: [tags]
+ # rename_fields:
+ # tags: tags
+ rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key;
+
+
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [ timestamp_ms ]
+ output_fields: [ recv_time ]
+ parameters:
+ precision: seconds
+ interval: 300
+ #
+
+ aggregate_processor:
+ type: aggregate
+ group_by_fields: [decoded_as]
+ window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time
+ window_size: 3
+ window_timestamp_field: test_time
+ functions:
+ - function: NUMBER_SUM
+ lookup_fields: [ sessions ]
+
+ table_processor:
+ type: table
+ functions:
+ - function: JSON_UNROLL
+ lookup_fields: [ encapsulation ]
+ output_fields: [ new_name ]
+
+application: # [object] Application Configuration
+ env: # [object] Environment Variables
+ name: groot-stream-job # [string] Job Name
+ pipeline:
+ object-reuse: true # [boolean] Object Reuse, default is false
+ topology: # [array of object] Node List. It will be used build data flow for job dag graph.
+ - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE.
+ parallelism: 1 # [number] Operator-Level Parallelism.
+ downstream: [test_split,collect_sink]
+ - name: test_split
+ parallelism: 1
+ downstream: [ table_processor,pre_etl_processor ]
+ - name: pre_etl_processor
+ parallelism: 1
+ downstream: [ collect_sink ]
+ - name: table_processor
+ parallelism: 1
+ downstream: [ collect_sink ]
+ - name: collect_sink
+ parallelism: 1
+
+