summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author李奉超 <[email protected]>2024-08-26 03:03:40 +0000
committer李奉超 <[email protected]>2024-08-26 03:03:40 +0000
commit7e268f460a683987d940c78d70fcb6d633a576ba (patch)
tree256a67b16618d94f5f6b622ea1f7fffd2b04690c
parent56b21d494bfa07012b1cc4e43dcb4ccdb6257d12 (diff)
parent9b0297020611fcf70445284637f370b5f8c4fddd (diff)
Merge branch 'feature/split' into 'develop'
Feature/split See merge request galaxy/platform/groot-stream!95
-rw-r--r--config/grootstream_job_example.yaml19
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java1
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java18
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java16
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java4
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java103
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java15
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java4
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java5
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java4
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java4
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java4
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java91
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java98
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java74
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java72
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java6
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java4
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml92
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml97
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/Constants.java2
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java18
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java19
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/SplitConfig.java17
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java23
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java9
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java14
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/split/Split.java16
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java79
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/split/SplitOperator.java29
-rw-r--r--groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.split.Split1
35 files changed, 849 insertions, 128 deletions
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml
index 4726af0..46d1123 100644
--- a/config/grootstream_job_example.yaml
+++ b/config/grootstream_job_example.yaml
@@ -11,7 +11,14 @@ filters:
type: aviator
properties:
expression: event.server_ip != '12.12.12.12'
-
+splits:
+ decoded_as_split:
+ type: split
+ rules:
+ - name: projection_processor
+ expression: event.decoded_as == 'HTTP'
+ - name: aggregate_processor
+ expression: event.decoded_as == 'DNS'
processing_pipelines:
projection_processor:
type: projection
@@ -25,7 +32,7 @@ processing_pipelines:
group_by_fields: [server_ip,server_port]
window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time
window_timestamp_field: recv_time
- window_size: 60
+ window_size: 6
window_slide: 10 #滑动窗口步长
functions:
- function: NUMBER_SUM
@@ -65,10 +72,12 @@ application:
strategy: none
topology:
- name: inline_source
- downstream: [filter_operator]
- - name: filter_operator
- downstream: [ projection_processor ]
+ downstream: [decoded_as_split]
+ - name: decoded_as_split
+ downstream: [ projection_processor ,aggregate_processor]
- name: projection_processor
downstream: [ print_sink ]
+ - name: aggregate_processor
+ downstream: [ print_sink ]
- name: print_sink
downstream: [] \ No newline at end of file
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java
index d2b37f6..6f33cae 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java
@@ -3,6 +3,7 @@ package com.geedgenetworks.bootstrap.enums;
public enum ProcessorType {
SOURCE("source"),
FILTER("filter"),
+ SPLIT("split"),
PREPROCESSING("preprocessing"),
PROCESSING("processing"),
POSTPROCESSING("postprocessing"),
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
index 64c66b6..7a55ffe 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
@@ -1,26 +1,26 @@
package com.geedgenetworks.bootstrap.execution;
+import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.utils.ReflectionUtils;
import com.geedgenetworks.core.filter.Filter;
import com.geedgenetworks.core.processor.Processor;
-import com.geedgenetworks.core.processor.aggregate.AggregateProcessor;
-import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
+import com.geedgenetworks.core.split.Split;
import com.typesafe.config.Config;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+
import java.net.URL;
import java.net.URLClassLoader;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.ServiceLoader;
+import java.util.*;
import java.util.function.BiConsumer;
public abstract class AbstractExecutor<K, V>
- implements Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> {
+ implements Executor<DataStream<Event>, JobRuntimeEnvironment> {
protected JobRuntimeEnvironment jobRuntimeEnvironment;
protected final Config operatorConfig;
protected final Map<K,V> operatorMap;
protected final Map<String,Filter> filterMap = new HashMap<>();
+ protected final Map<String, Split> splitMap = new HashMap<>();
protected final Map<String, Processor> processorMap = new HashMap<>();
protected AbstractExecutor(List<URL> jarPaths, Config operatorConfig) {
@@ -30,6 +30,10 @@ public abstract class AbstractExecutor<K, V>
for (Filter filter : filters) {
this.filterMap.put(filter.type(), filter);
}
+ ServiceLoader<Split> splits = ServiceLoader.load(Split.class);
+ for (Split split : splits) {
+ this.splitMap.put(split.type(), split);
+ }
ServiceLoader<Processor> processors = ServiceLoader.load(Processor.class);
for (Processor processor : processors) {
this.processorMap.put(processor.type(), processor);
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
index 66c0b0f..b0a04cd 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
@@ -2,6 +2,7 @@ package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson.JSONObject;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
+import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.config.*;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
@@ -13,6 +14,7 @@ import com.geedgenetworks.core.processor.table.TableProcessor;
import com.geedgenetworks.core.processor.aggregate.AggregateProcessor;
import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
import com.typesafe.config.Config;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
@@ -27,7 +29,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
}
@Override
- public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
+ public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
ProcessorConfig processorConfig = operatorMap.get(node.getName());
switch (processorConfig.getType()) {
@@ -45,7 +47,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
}
return dataStream;
}
- protected SingleOutputStreamOperator executeTableProcessor(SingleOutputStreamOperator dataStream, Node node, TableConfig tableConfig) throws JobExecuteException {
+ protected DataStream<Event> executeTableProcessor(DataStream<Event> dataStream, Node node, TableConfig tableConfig) throws JobExecuteException {
TableProcessor tableProcessor;
if (processorMap.containsKey(tableConfig.getType())) {
@@ -63,15 +65,15 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
tableConfig.setParallelism(node.getParallelism());
}
try {
- dataStream =
- tableProcessor.processorFunction(
- dataStream, tableConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig());
+
+ dataStream = tableProcessor.processorFunction(
+ dataStream, tableConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig());
} catch (Exception e) {
throw new JobExecuteException("Create orderby pipeline instance failed!", e);
}
return dataStream;
}
- protected SingleOutputStreamOperator executeAggregateProcessor(SingleOutputStreamOperator dataStream, Node node, AggregateConfig aggregateConfig) throws JobExecuteException {
+ protected DataStream<Event> executeAggregateProcessor(DataStream<Event> dataStream, Node node, AggregateConfig aggregateConfig) throws JobExecuteException {
AggregateProcessor aggregateProcessor;
if (processorMap.containsKey(aggregateConfig.getType())) {
@@ -98,7 +100,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
return dataStream;
}
- protected SingleOutputStreamOperator executeProjectionProcessor(SingleOutputStreamOperator dataStream, Node node, ProjectionConfig projectionConfig) throws JobExecuteException {
+ protected DataStream<Event> executeProjectionProcessor(DataStream<Event> dataStream, Node node, ProjectionConfig projectionConfig) throws JobExecuteException {
ProjectionProcessor projectionProcessor;
if (processorMap.containsKey(projectionConfig.getType())) {
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
index 506aa11..66f0585 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.config.CheckConfigUtil;
import com.geedgenetworks.common.config.CheckResult;
import com.geedgenetworks.common.config.FilterConfigOptions;
@@ -14,6 +15,7 @@ import com.geedgenetworks.core.pojo.FilterConfig;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
@@ -54,7 +56,7 @@ public class FilterExecutor extends AbstractExecutor<String, FilterConfig> {
}
@Override
- public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
+ public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
FilterConfig filterConfig = operatorMap.get(node.getName());
String className = filterConfig.getType();
Filter filter;
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
index 2eabefa..22b23a7 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
@@ -5,15 +5,17 @@ import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.main.GrootStreamRunner;
import com.geedgenetworks.common.Constants;
-import com.geedgenetworks.common.config.GrootStreamConfig;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.config.GrootStreamConfig;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigUtil;
import com.typesafe.config.ConfigValueFactory;
import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.util.OutputTag;
import java.io.File;
import java.net.MalformedURLException;
@@ -27,17 +29,18 @@ import java.util.stream.Stream;
public class JobExecution {
private final JobRuntimeEnvironment jobRuntimeEnvironment;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sourceExecutor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sinkExecutor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> filterExecutor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> preprocessingExecutor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> processingExecutor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> postprocessingExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> sourceExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> sinkExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> filterExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> splitExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> preprocessingExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> processingExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor;
private final List<Node> nodes;
-
private final List<URL> jarPaths;
+
public JobExecution(Config config, GrootStreamConfig grootStreamConfig) {
- try {
+ try {
jarPaths = new ArrayList<>(Collections.singletonList(new File(StartBuilder.appBootstrapDir()
.resolve(GrootStreamRunner.APP_JAR_NAME).toString())
.toURI().toURL()));
@@ -50,6 +53,7 @@ public class JobExecution {
this.sourceExecutor = new SourceExecutor(jarPaths, config);
this.sinkExecutor = new SinkExecutor(jarPaths, config);
this.filterExecutor = new FilterExecutor(jarPaths, config);
+ this.splitExecutor = new SplitExecutor(jarPaths, config);
this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, config);
this.processingExecutor = new ProcessingExecutor(jarPaths, config);
this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, config);
@@ -59,6 +63,7 @@ public class JobExecution {
this.sourceExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.sinkExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.filterExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.splitExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
@@ -69,7 +74,7 @@ public class JobExecution {
private void registerPlugin(Config appConfig) {
List<Path> thirdPartyJars = new ArrayList<>();
Config envConfig = appConfig.getConfig(Constants.APPLICATION_ENV);
- if(envConfig.hasPath(ExecutionConfigKeyName.JARS)) {
+ if (envConfig.hasPath(ExecutionConfigKeyName.JARS)) {
thirdPartyJars = new ArrayList<>(StartBuilder
.getThirdPartyJars(envConfig.getString(ExecutionConfigKeyName.JARS)));
}
@@ -81,7 +86,7 @@ public class JobExecution {
.map(uri -> {
try {
return uri.toURL();
- }catch (MalformedURLException e){
+ } catch (MalformedURLException e) {
throw new RuntimeException("the uri of jar illegal: " + uri, e);
}
}).collect(Collectors.toList());
@@ -93,10 +98,10 @@ public class JobExecution {
}
- private Config registerPlugin(Config config , List<URL> jars) {
- config = this.injectJarsToConfig(
- config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_JARS), jars);
- return this.injectJarsToConfig(
+ private Config registerPlugin(Config config, List<URL> jars) {
+ config = this.injectJarsToConfig(
+ config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_JARS), jars);
+ return this.injectJarsToConfig(
config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_CLASSPATHS), jars);
}
@@ -127,7 +132,7 @@ public class JobExecution {
.collect(Collectors.toSet());
paths.addAll(validJars);
- config = config.withValue(
+ config = config.withValue(
path,
ConfigValueFactory.fromAnyRef(
paths.stream()
@@ -150,8 +155,9 @@ public class JobExecution {
private List<Node> buildJobNode(Config config) {
Map<String, Object> sources = Maps.newHashMap();
- Map<String, Object> sinks =Maps.newHashMap();
+ Map<String, Object> sinks = Maps.newHashMap();
Map<String, Object> filters = Maps.newHashMap();
+ Map<String, Object> splits = Maps.newHashMap();
Map<String, Object> preprocessingPipelines = Maps.newHashMap();
Map<String, Object> processingPipelines = Maps.newHashMap();
Map<String, Object> postprocessingPipelines = Maps.newHashMap();
@@ -160,11 +166,14 @@ public class JobExecution {
sources = config.getConfig(Constants.SOURCES).root().unwrapped();
}
if (config.hasPath(Constants.SINKS)) {
- sinks =config.getConfig(Constants.SINKS).root().unwrapped();
+ sinks = config.getConfig(Constants.SINKS).root().unwrapped();
}
if (config.hasPath(Constants.FILTERS)) {
filters = config.getConfig(Constants.FILTERS).root().unwrapped();
}
+ if (config.hasPath(Constants.SPLITS)) {
+ splits = config.getConfig(Constants.SPLITS).root().unwrapped();
+ }
if (config.hasPath(Constants.PREPROCESSING_PIPELINES)) {
preprocessingPipelines = config.getConfig(Constants.PREPROCESSING_PIPELINES).root().unwrapped();
}
@@ -184,13 +193,15 @@ public class JobExecution {
nodes.add(node);
});
- for(Node node : nodes) {
+ for (Node node : nodes) {
if (sources.containsKey(node.getName())) {
node.setType(ProcessorType.SOURCE);
} else if (sinks.containsKey(node.getName())) {
node.setType(ProcessorType.SINK);
} else if (filters.containsKey(node.getName())) {
node.setType(ProcessorType.FILTER);
+ } else if (splits.containsKey(node.getName())) {
+ node.setType(ProcessorType.SPLIT);
} else if (preprocessingPipelines.containsKey(node.getName())) {
node.setType(ProcessorType.PREPROCESSING);
} else if (processingPipelines.containsKey(node.getName())) {
@@ -214,12 +225,12 @@ public class JobExecution {
List<Node> sourceNodes = nodes
.stream().filter(v -> v.getType().name().equals(ProcessorType.SOURCE.name())).collect(Collectors.toList());
- SingleOutputStreamOperator<Event> singleOutputStreamOperator = null;
+ DataStream<Event> dataStream = null;
- for(Node sourceNode : sourceNodes) {
- singleOutputStreamOperator = sourceExecutor.execute(singleOutputStreamOperator, sourceNode);
+ for (Node sourceNode : sourceNodes) {
+ dataStream = sourceExecutor.execute(dataStream, sourceNode);
for (String nodeName : sourceNode.getDownstream()) {
- buildJobGraph(singleOutputStreamOperator, nodeName);
+ buildJobGraph(dataStream, nodeName);
}
}
@@ -230,7 +241,7 @@ public class JobExecution {
log.info("Execution Job: {}", jobRuntimeEnvironment.getJobName());
jobRuntimeEnvironment.getStreamExecutionEnvironment().execute(jobRuntimeEnvironment.getJobName());
final long jobEndTime = System.currentTimeMillis();
- log.info("Job finished, execution duration: {} ms", jobEndTime-jobStartTime);
+ log.info("Job finished, execution duration: {} ms", jobEndTime - jobStartTime);
} catch (Exception e) {
throw new JobExecuteException("Execute job error", e);
@@ -238,34 +249,62 @@ public class JobExecution {
}
- private void buildJobGraph(SingleOutputStreamOperator<Event> singleOutputStreamOperator, String downstreamNodeName) {
+ private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) {
Node node = getNode(downstreamNodeName).orElseGet(() -> {
throw new JobExecuteException("can't find downstream node " + downstreamNodeName);
});
if (node.getType().name().equals(ProcessorType.FILTER.name())) {
- singleOutputStreamOperator = filterExecutor.execute(singleOutputStreamOperator, node);
+ if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = filterExecutor.execute(dataStream, node);
+ }
+ } else if (node.getType().name().equals(ProcessorType.SPLIT.name())) {
+ dataStream = splitExecutor.execute(dataStream, node);
+
} else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) {
- singleOutputStreamOperator = preprocessingExecutor.execute(singleOutputStreamOperator, node);
+ if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = preprocessingExecutor.execute(dataStream, node);
+ }
} else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) {
- singleOutputStreamOperator = processingExecutor.execute(singleOutputStreamOperator, node);
+ if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = processingExecutor.execute(dataStream, node);
+ }
} else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) {
- singleOutputStreamOperator = postprocessingExecutor.execute(singleOutputStreamOperator, node);
+ if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = postprocessingExecutor.execute(dataStream, node);
+ }
} else if (node.getType().name().equals(ProcessorType.SINK.name())) {
- singleOutputStreamOperator = sinkExecutor.execute(singleOutputStreamOperator, node);
+ if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = sinkExecutor.execute(dataStream, node);
+ }
} else {
throw new JobExecuteException("unsupported process type " + node.getType().name());
}
for (String nodeName : node.getDownstream()) {
- buildJobGraph(singleOutputStreamOperator, nodeName);
+ buildJobGraph(dataStream, nodeName);
}
}
private Optional<Node> getNode(String name) {
- return nodes.stream().filter(v-> v.getName().equals(name)).findFirst();
+ return nodes.stream().filter(v -> v.getName().equals(name)).findFirst();
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
index 7141f5e..2e68098 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
@@ -25,9 +25,7 @@ import org.apache.flink.util.TernaryBoolean;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
+import java.util.*;
import java.util.stream.Collectors;
@Slf4j
@@ -37,6 +35,8 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
private GrootStreamConfig grootStreamConfig;
private StreamExecutionEnvironment environment;
private String jobName = Constants.DEFAULT_JOB_NAME;
+ private Set<String> splitSet = new HashSet<>();
+
private JobRuntimeEnvironment(Config config, GrootStreamConfig grootStreamConfig) {
this.grootStreamConfig = grootStreamConfig;
@@ -197,6 +197,14 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
}
}
+ public Set<String> getSplitSet() {
+ return splitSet;
+ }
+
+ public void setSplitSet(Set<String> splitSet) {
+ this.splitSet = splitSet;
+ }
+
private void setCheckpoint() {
long interval = 0;
if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_INTERVAL)) {
@@ -272,6 +280,7 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
+
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
index 36fad61..b9555b4 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
@@ -3,9 +3,11 @@ package com.geedgenetworks.bootstrap.execution;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.pojo.ProcessorConfig;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
@@ -36,7 +38,7 @@ public class PostprocessingExecutor extends AbstractProcessorExecutor {
@Override
- public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
+ public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
return super.execute(dataStream, node);
}
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
index b1e53e4..a7b9e5e 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
@@ -3,10 +3,12 @@ package com.geedgenetworks.bootstrap.execution;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.pojo.ProcessorConfig;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
@@ -37,7 +39,8 @@ public class PreprocessingExecutor extends AbstractProcessorExecutor {
}
@Override
- public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
+ public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
+
return super.execute(dataStream, node);
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
index d69fe8c..f6788ed 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
@@ -3,9 +3,11 @@ package com.geedgenetworks.bootstrap.execution;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.pojo.ProcessorConfig;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
@@ -35,7 +37,7 @@ public class ProcessingExecutor extends AbstractProcessorExecutor {
}
@Override
- public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
+ public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
return super.execute(dataStream, node);
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
index a0bedc9..70934b8 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
@@ -5,6 +5,7 @@ import com.geedgenetworks.bootstrap.enums.ProcessorType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.utils.SchemaConfigParse;
import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.config.CheckConfigUtil;
import com.geedgenetworks.common.config.CheckResult;
import com.geedgenetworks.common.config.SinkConfigOptions;
@@ -20,6 +21,7 @@ import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -63,7 +65,7 @@ public class SinkExecutor extends AbstractExecutor<String, SinkConfig> {
}
@Override
- public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
+ public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
SinkConfig sinkConfig = operatorMap.get(node.getName());
try {
SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, sinkConfig.getType());
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
index d4751af..9dff6b0 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
@@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
@@ -67,7 +68,7 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> {
}
@Override
- public SingleOutputStreamOperator execute(SingleOutputStreamOperator outputStreamOperator, Node node) throws JobExecuteException {
+ public DataStream<Event> execute(DataStream<Event> outputStreamOperator, Node node) throws JobExecuteException {
SourceConfig sourceConfig = operatorMap.get(node.getName());
SingleOutputStreamOperator sourceSingleOutputStreamOperator;
try {
@@ -152,4 +153,5 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> {
return 0;
}
}
+
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java
new file mode 100644
index 0000000..3513a67
--- /dev/null
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java
@@ -0,0 +1,91 @@
+package com.geedgenetworks.bootstrap.execution;
+
+import com.alibaba.fastjson.JSONObject;
+import com.geedgenetworks.bootstrap.exception.JobExecuteException;
+import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.config.CheckConfigUtil;
+import com.geedgenetworks.common.config.CheckResult;
+import com.geedgenetworks.common.config.SplitConfigOptions;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.ConfigValidationException;
+import com.geedgenetworks.common.udf.RuleContext;
+import com.geedgenetworks.core.pojo.SplitConfig;
+import com.geedgenetworks.core.split.Split;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Initialize config and execute filter operator
+ */
+@Slf4j
+public class SplitExecutor extends AbstractExecutor<String, SplitConfig> {
+
+
+ public SplitExecutor(List<URL> jarPaths, Config config) {
+ super(jarPaths, config);
+ }
+
+ @Override
+ protected Map<String, SplitConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
+ Map<String, SplitConfig> splitConfigMap = Maps.newHashMap();
+ if (operatorConfig.hasPath(Constants.SPLITS)) {
+ Config routes = operatorConfig.getConfig(Constants.SPLITS);
+ routes.root().unwrapped().forEach((key, value) -> {
+ CheckResult result = CheckConfigUtil.checkAllExists(routes.getConfig(key),
+ SplitConfigOptions.TYPE.key());
+ if (!result.isSuccess()) {
+ throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
+ "split: %s, Message: %s",
+ key, result.getMsg()));
+ }
+ SplitConfig splitConfig = new JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class);
+ splitConfig.setName(key);
+ splitConfigMap.put(key, splitConfig);
+ });
+ }
+
+ return splitConfigMap;
+ }
+
+ @Override
+ public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
+ SplitConfig splitConfig = operatorMap.get(node.getName());
+ String className = splitConfig.getType();
+ Split split;
+ if (splitMap.containsKey(splitConfig.getType())) {
+
+ split = splitMap.get(splitConfig.getType());
+ } else {
+ Class cls;
+ try {
+ cls = Class.forName(className);
+ split = (Split) cls.newInstance();
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
+ throw new JobExecuteException("get split instance failed!", e);
+ }
+ }
+ if (node.getParallelism() > 0) {
+ splitConfig.setParallelism(node.getParallelism());
+ }
+ for(RuleContext ruleContext:splitConfig.getRules()) {
+ jobRuntimeEnvironment.getSplitSet().add(ruleContext.getName());
+ }
+ try {
+ dataStream =
+ split.splitFunction(
+ dataStream, splitConfig);
+ } catch (Exception e) {
+ throw new JobExecuteException("Create split instance failed!", e);
+ }
+ return dataStream;
+ }
+
+}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java
index 8b299df..7e995cf 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java
@@ -16,7 +16,9 @@ import com.typesafe.config.ConfigUtil;
import com.typesafe.config.ConfigValueFactory;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.util.OutputTag;
import java.io.File;
import java.net.MalformedURLException;
@@ -33,12 +35,13 @@ import java.util.stream.Stream;
public class JobExecutionTest {
protected final JobRuntimeEnvironment jobRuntimeEnvironment;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sourceExecuteProcessor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> filterExecuteProcessor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> preprocessingExecuteProcessor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> processingExecuteProcessor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> postprocessingExecuteProcessor;
- private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sinkExecuteProcessor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> sourceExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> filterExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> preprocessingExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> splitExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> processingExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor;
+ private final Executor<DataStream<Event>, JobRuntimeEnvironment> sinkExecutor;
private final List<Node> nodes;
@@ -66,21 +69,22 @@ public class JobExecutionTest {
}
registerPlugin(config.getConfig(Constants.APPLICATION));
- this.sourceExecuteProcessor = new SourceExecutor(jarPaths, config);
- this.sinkExecuteProcessor = new SinkExecutor(jarPaths, config);
- this.filterExecuteProcessor = new FilterExecutor(jarPaths, config);
- this.preprocessingExecuteProcessor = new PreprocessingExecutor(jarPaths, config);
- this.processingExecuteProcessor = new ProcessingExecutor(jarPaths, config);
- this.postprocessingExecuteProcessor = new PostprocessingExecutor(jarPaths, config);
+ this.sourceExecutor = new SourceExecutor(jarPaths, config);
+ this.sinkExecutor = new SinkExecutor(jarPaths, config);
+ this.splitExecutor = new SplitExecutor(jarPaths, config);
+ this.filterExecutor = new FilterExecutor(jarPaths, config);
+ this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, config);
+ this.processingExecutor = new ProcessingExecutor(jarPaths, config);
+ this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, config);
this.jobRuntimeEnvironment =
JobRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths), grootStreamConfig);
-
- this.sourceExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.sinkExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.filterExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.preprocessingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.processingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.postprocessingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.splitExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.sourceExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.sinkExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.filterExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.nodes = buildJobNode(config);
}
@@ -173,6 +177,7 @@ public class JobExecutionTest {
Map<String, Object> sources = Maps.newHashMap();
Map<String, Object> sinks =Maps.newHashMap();
Map<String, Object> filters = Maps.newHashMap();
+ Map<String, Object> splits = Maps.newHashMap();
Map<String, Object> preprocessingPipelines = Maps.newHashMap();
Map<String, Object> processingPipelines = Maps.newHashMap();
Map<String, Object> postprocessingPipelines = Maps.newHashMap();
@@ -186,6 +191,9 @@ public class JobExecutionTest {
if (config.hasPath(Constants.FILTERS)) {
filters = config.getConfig(Constants.FILTERS).root().unwrapped();
}
+ if (config.hasPath(Constants.SPLITS)) {
+ splits = config.getConfig(Constants.SPLITS).root().unwrapped();
+ }
if (config.hasPath(Constants.PREPROCESSING_PIPELINES)) {
preprocessingPipelines = config.getConfig(Constants.PREPROCESSING_PIPELINES).root().unwrapped();
}
@@ -210,6 +218,8 @@ public class JobExecutionTest {
node.setType(ProcessorType.SOURCE);
} else if (sinks.containsKey(node.getName())) {
node.setType(ProcessorType.SINK);
+ } else if (splits.containsKey(node.getName())) {
+ node.setType(ProcessorType.SPLIT);
} else if (filters.containsKey(node.getName())) {
node.setType(ProcessorType.FILTER);
} else if (preprocessingPipelines.containsKey(node.getName())) {
@@ -228,15 +238,15 @@ public class JobExecutionTest {
}
- public SingleOutputStreamOperator<Event> getSingleOutputStreamOperator() throws JobExecuteException {
+ public DataStream<Event> getSingleOutputStreamOperator() throws JobExecuteException {
List<Node> sourceNodes = nodes
.stream().filter(v -> v.getType().name().equals(ProcessorType.SOURCE.name())).collect(Collectors.toList());
- SingleOutputStreamOperator<Event> singleOutputStreamOperator = null;
+ DataStream<Event> singleOutputStreamOperator = null;
for(Node sourceNode : sourceNodes) {
- singleOutputStreamOperator = sourceExecuteProcessor.execute(singleOutputStreamOperator, sourceNode);
+ singleOutputStreamOperator = sourceExecutor.execute(singleOutputStreamOperator, sourceNode);
for (String nodeName : sourceNode.getDownstream()) {
buildJobGraph(singleOutputStreamOperator, nodeName);
}
@@ -247,27 +257,55 @@ public class JobExecutionTest {
}
- private void buildJobGraph(SingleOutputStreamOperator<Event> singleOutputStreamOperator, String downstreamNodeName) {
+ private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) {
Node node = getNode(downstreamNodeName).orElseGet(() -> {
throw new JobExecuteException("can't find downstream node " + downstreamNodeName);
});
if (node.getType().name().equals(ProcessorType.FILTER.name())) {
- singleOutputStreamOperator = filterExecuteProcessor.execute(singleOutputStreamOperator, node);
+ if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = filterExecutor.execute(dataStream, node);
+ }
+ } else if (node.getType().name().equals(ProcessorType.SPLIT.name())) {
+ dataStream = splitExecutor.execute(dataStream, node);
+
} else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) {
- singleOutputStreamOperator = preprocessingExecuteProcessor.execute(singleOutputStreamOperator, node);
+ if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = preprocessingExecutor.execute(dataStream, node);
+ }
} else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) {
- singleOutputStreamOperator = processingExecuteProcessor.execute(singleOutputStreamOperator, node);
+ if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = processingExecutor.execute(dataStream, node);
+ }
} else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) {
- singleOutputStreamOperator = postprocessingExecuteProcessor.execute(singleOutputStreamOperator, node);
+ if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = postprocessingExecutor.execute(dataStream, node);
+ }
} else if (node.getType().name().equals(ProcessorType.SINK.name())) {
- singleOutputStreamOperator = sinkExecuteProcessor.execute(singleOutputStreamOperator, node);
- } else {
+ if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) {
+ dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ }), node);
+ } else {
+ dataStream = sinkExecutor.execute(dataStream, node);
+ }
+ } else {
throw new JobExecuteException("unsupported process type " + node.getType().name());
}
for (String nodeName : node.getDownstream()) {
- buildJobGraph(singleOutputStreamOperator, nodeName);
+ buildJobGraph(dataStream, nodeName);
}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java
new file mode 100644
index 0000000..2f6984b
--- /dev/null
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java
@@ -0,0 +1,74 @@
+package com.geedgenetworks.bootstrap.main.simple;
+
+import cn.hutool.setting.yaml.YamlUtil;
+import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs;
+import com.geedgenetworks.bootstrap.enums.EngineType;
+import com.geedgenetworks.bootstrap.exception.JobExecuteException;
+import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName;
+import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink;
+import com.geedgenetworks.bootstrap.utils.CommandLineUtils;
+import com.geedgenetworks.bootstrap.utils.ConfigFileUtils;
+import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.config.ConfigProvider;
+import com.geedgenetworks.common.config.GrootStreamConfig;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigObject;
+import com.typesafe.config.ConfigUtil;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class JobSplitTest {
+
+ @ClassRule
+ public static MiniClusterWithClientResource flinkCluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(1)
+ .setNumberTaskManagers(1)
+ .build());
+
+ @Test
+ public void testSplit() {
+
+ CollectSink.values.clear();
+ String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_test.yaml"};
+ ExecuteCommandArgs executeCommandArgs = CommandLineUtils
+ .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
+
+ executeCommandArgs.buildCommand();
+
+
+ GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig();
+ Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs);
+ // check config file exist
+ Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString());
+ ConfigObject configObject = ConfigValueFactory.fromMap(configMap);
+ Config config = configObject.toConfig();
+
+ config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE),
+ ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget()));
+
+
+ JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig);
+ jobExecution.getSingleOutputStreamOperator();
+
+ try {
+ jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute();
+ } catch (Exception e) {
+ throw new JobExecuteException("Job executed error", e);
+ }
+ Assert.assertEquals(7, CollectSink.values.size());
+ }
+
+}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java
new file mode 100644
index 0000000..2edc5e7
--- /dev/null
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java
@@ -0,0 +1,72 @@
+package com.geedgenetworks.bootstrap.main.simple;
+
+import cn.hutool.setting.yaml.YamlUtil;
+import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs;
+import com.geedgenetworks.bootstrap.enums.EngineType;
+import com.geedgenetworks.bootstrap.exception.JobExecuteException;
+import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName;
+import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink;
+import com.geedgenetworks.bootstrap.utils.CommandLineUtils;
+import com.geedgenetworks.bootstrap.utils.ConfigFileUtils;
+import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.config.ConfigProvider;
+import com.geedgenetworks.common.config.GrootStreamConfig;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigObject;
+import com.typesafe.config.ConfigUtil;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.nio.file.Path;
+import java.util.Map;
+
+
+public class JobSplitWithAggTest {
+
+ @ClassRule
+ public static MiniClusterWithClientResource flinkCluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(1)
+ .setNumberTaskManagers(1)
+ .build());
+
+ @Test
+ public void testSplitForAgg() {
+
+ CollectSink.values.clear();
+ String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_agg_test.yaml"};
+ ExecuteCommandArgs executeCommandArgs = CommandLineUtils
+ .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
+
+ executeCommandArgs.buildCommand();
+
+
+ GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig();
+ Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs);
+ // check config file exist
+ Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString());
+ ConfigObject configObject = ConfigValueFactory.fromMap(configMap);
+ Config config = configObject.toConfig();
+
+ config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE),
+ ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget()));
+
+
+ JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig);
+ jobExecution.getSingleOutputStreamOperator();
+
+ try {
+ jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute();
+ } catch (Exception e) {
+ throw new JobExecuteException("Job executed error", e);
+ }
+ Assert.assertEquals(1, CollectSink.values.size());
+ Assert.assertEquals("2", CollectSink.values.get(0).getExtractedFields().get("sessions").toString());
+
+ }
+}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
index c4f54a3..556c8c4 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
@@ -75,16 +75,11 @@ public class SimpleJobTest {
assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("recv_time").toString()));
assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("processing_time").toString()));
assertTrue(0 != Long.parseLong(CollectSink.values.get(0).getExtractedFields().get("log_id").toString()));
- Assert.assertEquals("印第安纳州", CollectSink.values.get(0).getExtractedFields().get("server_super_admin_area").toString());
- Assert.assertEquals("6167", CollectSink.values.get(0).getExtractedFields().get("server_asn").toString());
- Assert.assertEquals("美国", CollectSink.values.get(0).getExtractedFields().get("server_country_region").toString());
- Assert.assertTrue(!CollectSink.values.get(0).getExtractedFields().containsKey("client_country_region"));
Assert.assertEquals("http://192.168.44.12:9098/hos/traffic_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString());
Assert.assertEquals("[2600:1015:b002::,255.255.255.255]", CollectSink.values.get(0).getExtractedFields().get("ip_string").toString());
Assert.assertEquals("hello", CollectSink.values.get(0).getExtractedFields().get("mail_attachment_name").toString());
Assert.assertEquals("MULTIPATH_ETHERNET", CollectSink.values.get(0).getExtractedFields().get("tunnels_schema_type").toString());
List<String> asn_list = (List<String>) CollectSink.values.get(0).getExtractedFields().get("asn_list");
- Assert.assertEquals("6167", asn_list.get(0));
}
@@ -121,4 +116,5 @@ public class SimpleJobTest {
}
Assert.assertEquals(4, CollectSink.values.size());
}
+
}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java
index dfcb459..c5806ed 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java
@@ -3,9 +3,7 @@ package com.geedgenetworks.bootstrap.main.simple.collect;
import com.geedgenetworks.common.Event;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import java.util.*;
public class CollectSink implements SinkFunction<Event> {
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml
new file mode 100644
index 0000000..872800f
--- /dev/null
+++ b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml
@@ -0,0 +1,92 @@
+sources:
+ inline_source:
+ type : inline
+ fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
+ properties:
+ data: '[{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]'
+ interval.per.row: 1s # 可选
+ repeat.count: 1 # 可选
+ format: json
+ json.ignore.parse.errors: false
+sinks:
+ collect_sink:
+ type: collect
+ properties:
+ format: json
+splits:
+ test_split:
+ type: split
+ rules:
+ - name: aggregate_processor
+ expression: event.decoded_as == 'DNS'
+
+postprocessing_pipelines:
+ pre_etl_processor: # [object] Processing Pipeline
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ remove_fields: [fields,tags]
+ output_fields:
+ functions: # [array of object] Function List
+
+ - function: FLATTEN
+ lookup_fields: [ fields,tags ]
+ output_fields: [ ]
+ parameters:
+ #prefix: ""
+ depth: 3
+ # delimiter: "."
+
+ - function: RENAME
+ lookup_fields: [ '' ]
+ output_fields: [ '' ]
+ filter:
+ parameters:
+ # parent_fields: [tags]
+ # rename_fields:
+ # tags: tags
+ rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key;
+
+
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [ timestamp_ms ]
+ output_fields: [ recv_time ]
+ parameters:
+ precision: seconds
+ interval: 300
+ #
+
+ aggregate_processor:
+ type: aggregate
+ group_by_fields: [decoded_as]
+ window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time
+ window_size: 5
+ window_timestamp_field: test_time
+ functions:
+ - function: NUMBER_SUM
+ lookup_fields: [ sessions ]
+
+ table_processor:
+ type: table
+ functions:
+ - function: JSON_UNROLL
+ lookup_fields: [ encapsulation ]
+ output_fields: [ new_name ]
+
+application: # [object] Application Configuration
+ env: # [object] Environment Variables
+ name: groot-stream-job # [string] Job Name
+ pipeline:
+ object-reuse: true # [boolean] Object Reuse, default is false
+ topology: # [array of object] Node List. It will be used build data flow for job dag graph.
+ - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE.
+ parallelism: 1 # [number] Operator-Level Parallelism.
+ downstream: [test_split]
+ - name: test_split
+ parallelism: 1
+ downstream: [ aggregate_processor ]
+ - name: aggregate_processor
+ parallelism: 1
+ downstream: [ collect_sink ]
+ - name: collect_sink
+ parallelism: 1
+
+
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml
new file mode 100644
index 0000000..9bb2900
--- /dev/null
+++ b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml
@@ -0,0 +1,97 @@
+sources:
+ inline_source:
+ type : inline
+ fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
+ properties:
+ data: '[{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]'
+ interval.per.row: 1s # 可选
+ repeat.count: 1 # 可选
+ format: json
+ json.ignore.parse.errors: false
+sinks:
+ collect_sink:
+ type: collect
+ properties:
+ format: json
+splits:
+ test_split:
+ type: split
+ rules:
+ - name: table_processor
+ expression: event.decoded_as == 'HTTP'
+ - name: pre_etl_processor
+ expression: event.decoded_as == 'DNS'
+
+postprocessing_pipelines:
+ pre_etl_processor: # [object] Processing Pipeline
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ remove_fields: [fields,tags]
+ output_fields:
+ functions: # [array of object] Function List
+
+ - function: FLATTEN
+ lookup_fields: [ fields,tags ]
+ output_fields: [ ]
+ parameters:
+ #prefix: ""
+ depth: 3
+ # delimiter: "."
+
+ - function: RENAME
+ lookup_fields: [ '' ]
+ output_fields: [ '' ]
+ filter:
+ parameters:
+ # parent_fields: [tags]
+ # rename_fields:
+ # tags: tags
+ rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key;
+
+
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [ timestamp_ms ]
+ output_fields: [ recv_time ]
+ parameters:
+ precision: seconds
+ interval: 300
+ #
+
+ aggregate_processor:
+ type: aggregate
+ group_by_fields: [decoded_as]
+ window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time
+ window_size: 5
+ window_timestamp_field: test_time
+ functions:
+ - function: NUMBER_SUM
+ lookup_fields: [ sessions ]
+
+ table_processor:
+ type: table
+ functions:
+ - function: JSON_UNROLL
+ lookup_fields: [ encapsulation ]
+ output_fields: [ new_name ]
+
+application: # [object] Application Configuration
+ env: # [object] Environment Variables
+ name: groot-stream-job # [string] Job Name
+ pipeline:
+ object-reuse: true # [boolean] Object Reuse, default is false
+ topology: # [array of object] Node List. It will be used build data flow for job dag graph.
+ - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE.
+ parallelism: 1 # [number] Operator-Level Parallelism.
+ downstream: [test_split,collect_sink]
+ - name: test_split
+ parallelism: 1
+ downstream: [ table_processor,pre_etl_processor ]
+ - name: pre_etl_processor
+ parallelism: 1
+ downstream: [ collect_sink ]
+ - name: table_processor
+ parallelism: 1
+ downstream: [ collect_sink ]
+ - name: collect_sink
+ parallelism: 1
+
+
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
index d13fc4b..e973c20 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
@@ -11,7 +11,7 @@ public final class Constants {
public static final String SINKS = "sinks";
public static final int SYSTEM_EXIT_CODE = 2618;
public static final String APPLICATION = "application";
-
+ public static final String SPLITS = "splits";
public static final String APPLICATION_ENV ="env";
public static final String APPLICATION_TOPOLOGY = "topology";
public static final String JOB_NAME = "name";
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java
new file mode 100644
index 0000000..a2acb71
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java
@@ -0,0 +1,18 @@
+package com.geedgenetworks.common.config;
+
+import com.alibaba.fastjson2.TypeReference;
+import com.geedgenetworks.common.udf.RuleContext;
+import java.util.List;
+
+public interface SplitConfigOptions {
+ Option<String> TYPE = Options.key("type")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The type of route .");
+
+ Option<List<RuleContext>> RULES = Options.key("rules")
+ .type(new TypeReference<List<RuleContext>>() {})
+ .noDefaultValue()
+ .withDescription("The rules to be executed.");
+
+}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java
new file mode 100644
index 0000000..ead0ecd
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java
@@ -0,0 +1,19 @@
+package com.geedgenetworks.common.udf;
+
+import com.geedgenetworks.common.Event;
+import com.googlecode.aviator.Expression;
+import lombok.Data;
+import org.apache.flink.util.OutputTag;
+
+import java.io.Serializable;
+
+@Data
+public class RuleContext implements Serializable {
+
+ private String name;
+ private String expression;
+ private Expression compiledExpression;
+ private OutputTag<Event> outputTag ;
+
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java b/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java
index 668ba6f..d8b8bc4 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java
@@ -2,13 +2,14 @@ package com.geedgenetworks.core.filter;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.pojo.FilterConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
public class AviatorFilter implements Filter {
@Override
- public SingleOutputStreamOperator<Event> filterFunction(
- SingleOutputStreamOperator<Event> singleOutputStreamOperator, FilterConfig FilterConfig)
+ public DataStream<Event> filterFunction(
+ DataStream<Event> singleOutputStreamOperator, FilterConfig FilterConfig)
throws Exception {
if (FilterConfig.getParallelism() != 0) {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java b/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java
index a173438..f8b50eb 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java
@@ -3,12 +3,13 @@ package com.geedgenetworks.core.filter;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.pojo.FilterConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
public interface Filter {
- SingleOutputStreamOperator<Event> filterFunction(
- SingleOutputStreamOperator<Event> singleOutputStreamOperator, FilterConfig FilterConfig)
+ DataStream<Event> filterFunction(
+ DataStream<Event> singleOutputStreamOperator, FilterConfig FilterConfig)
throws Exception;
String type();
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/SplitConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SplitConfig.java
new file mode 100644
index 0000000..4381df5
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SplitConfig.java
@@ -0,0 +1,17 @@
+package com.geedgenetworks.core.pojo;
+
+import com.geedgenetworks.common.udf.RuleContext;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+@Data
+public class SplitConfig implements Serializable {
+
+ private String type;
+ private Map<String, Object> properties;
+ private int parallelism;
+ private String name;
+ private List<RuleContext> rules;
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java
index 172b368..3852414 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java
@@ -2,12 +2,13 @@ package com.geedgenetworks.core.processor;
import com.geedgenetworks.common.Event;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
public interface Processor<T> {
- SingleOutputStreamOperator<Event> processorFunction(
- SingleOutputStreamOperator<Event> singleOutputStreamOperator,
+ DataStream<Event> processorFunction(
+ DataStream<Event> singleOutputStreamOperator,
T processorConfig, ExecutionConfig config)
throws Exception;
String type();
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
index 803fefc..4f9535d 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
@@ -31,8 +31,7 @@ import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect;
public class AggregateProcessorFunction implements org.apache.flink.api.common.functions.AggregateFunction<Event, Accumulator, Accumulator> {
private final List<UDFContext> udfContexts;
private final List<String> udfClassNameLists;
- private final List<String> groupByFields;
- private LinkedList<UdfEntity> functions;
+ private final LinkedList<UdfEntity> functions;
public AggregateProcessorFunction(AggregateConfig aggregateConfig, ExecutionConfig config) {
udfClassNameLists = JSON.parseObject(config.getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class);
@@ -40,7 +39,6 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f
if (udfContexts == null || udfContexts.isEmpty()) {
throw new RuntimeException();
}
- groupByFields = aggregateConfig.getGroup_by_fields();
functions = Lists.newLinkedList();
Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists);
try {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
index bc87c32..cf78310 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
@@ -5,45 +5,50 @@ import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.core.pojo.AggregateConfig;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.OutputTag;
+
import static com.geedgenetworks.common.Constants.*;
public class AggregateProcessorImpl implements AggregateProcessor {
@Override
- public SingleOutputStreamOperator<Event> processorFunction(SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception {
+ public DataStream<Event> processorFunction(DataStream<Event> grootEventDataStream, AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception {
+
if (aggregateConfig.getParallelism() != 0) {
switch (aggregateConfig.getWindow_type()) {
case TUMBLING_PROCESSING_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
case TUMBLING_EVENT_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
case SLIDING_PROCESSING_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
case SLIDING_EVENT_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
default:
throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type");
}
}else {
switch (aggregateConfig.getWindow_type()) {
case TUMBLING_PROCESSING_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
+ return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
case TUMBLING_EVENT_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
+ return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
case SLIDING_PROCESSING_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
+ return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
case SLIDING_EVENT_TIME:
- return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
+ return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
default:
throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type");
}
}
+
}
@Override
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java
index 6b46a7b..d87e7e2 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java
@@ -4,25 +4,26 @@ import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.pojo.ProjectionConfig;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.util.OutputTag;
public class ProjectionProcessorImpl implements ProjectionProcessor {
@Override
- public SingleOutputStreamOperator<Event> processorFunction(SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, ProjectionConfig projectionConfig, ExecutionConfig config) throws Exception {
+ public DataStream<Event> processorFunction(DataStream<Event> grootEventDataStream, ProjectionConfig projectionConfig, ExecutionConfig config) throws Exception {
if (projectionConfig.getParallelism() != 0) {
- return grootEventSingleOutputStreamOperator
+ return grootEventDataStream
.process(new ProjectionProcessFunction(projectionConfig))
.setParallelism(projectionConfig.getParallelism())
.name(projectionConfig.getName());
} else {
- return grootEventSingleOutputStreamOperator
+ return grootEventDataStream
.process(new ProjectionProcessFunction(projectionConfig))
.name(projectionConfig.getName());
}
}
-
@Override
public String type() {
return "projection";
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java
index f36f8db..6ddc616 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java
@@ -2,25 +2,25 @@ package com.geedgenetworks.core.processor.table;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.pojo.TableConfig;
-import com.geedgenetworks.core.processor.projection.ProjectionProcessFunction;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.util.OutputTag;
+
+import java.util.Map;
-import java.time.Duration;
public class TableProcessorImpl implements TableProcessor {
@Override
- public SingleOutputStreamOperator<Event> processorFunction(SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, TableConfig tableConfig, ExecutionConfig config) throws Exception {
-
+ public DataStream<Event> processorFunction(DataStream<Event> grootEventDataStream, TableConfig tableConfig, ExecutionConfig config) throws Exception {
if (tableConfig.getParallelism() != 0) {
- return grootEventSingleOutputStreamOperator
+ return grootEventDataStream
.flatMap(new TableProcessorFunction(tableConfig))
.setParallelism(tableConfig.getParallelism())
.name(tableConfig.getName());
} else {
- return grootEventSingleOutputStreamOperator
+ return grootEventDataStream
.flatMap(new TableProcessorFunction(tableConfig))
.name(tableConfig.getName());
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/Split.java b/groot-core/src/main/java/com/geedgenetworks/core/split/Split.java
new file mode 100644
index 0000000..37e7b44
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/split/Split.java
@@ -0,0 +1,16 @@
+package com.geedgenetworks.core.split;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.pojo.SplitConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+
+import java.util.Set;
+
+public interface Split {
+
+ DataStream<Event> splitFunction(
+ DataStream<Event> dataStream, SplitConfig splitConfig)
+ throws Exception;
+ String type();
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java
new file mode 100644
index 0000000..f07b568
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java
@@ -0,0 +1,79 @@
+package com.geedgenetworks.core.split;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.RuleContext;
+import com.geedgenetworks.core.metrics.InternalMetrics;
+import com.geedgenetworks.core.pojo.SplitConfig;
+import com.googlecode.aviator.AviatorEvaluator;
+import com.googlecode.aviator.AviatorEvaluatorInstance;
+import com.googlecode.aviator.Expression;
+import com.googlecode.aviator.Options;
+import com.googlecode.aviator.exception.ExpressionRuntimeException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import java.util.List;
+import java.util.Map;
+
+
+@Slf4j
+public class SplitFunction extends ProcessFunction<Event, Event> {
+ private final SplitConfig splitConfig;
+ private List<RuleContext> rules;
+ private transient InternalMetrics internalMetrics;
+
+ public SplitFunction(SplitConfig splitConfig) {
+ this.splitConfig = splitConfig;
+
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+
+ this.internalMetrics = new InternalMetrics(getRuntimeContext());
+ this.rules = splitConfig.getRules();
+ for(RuleContext rule : rules){
+ String expression = rule.getExpression();
+ AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance();
+ instance.setCachedExpressionByDefault(true);
+ instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL);
+ instance.setFunctionMissing(null);
+ Expression compiledExp = instance.compile(expression, true);
+ rule.setCompiledExpression(compiledExp);
+ OutputTag<Event> outputTag = new OutputTag<>(rule.getName()){};
+ rule.setOutputTag(outputTag);
+ }
+ }
+
+
+ @Override
+ public void processElement(Event event, ProcessFunction<Event, Event>.Context ctx, Collector<Event> out) throws Exception {
+ try {
+ internalMetrics.incrementInEvents();
+ for (RuleContext route : rules){
+ boolean result = route.getExpression() != null ? (filterExecute(route.getCompiledExpression(), route.getCompiledExpression().newEnv("event", event.getExtractedFields()))) : true;
+ if (result) {
+ ctx.output(route.getOutputTag(), event);
+ }
+ }
+ }catch (Exception e) {
+ internalMetrics.incrementErrorEvents();
+ log.error("error in split function", e);
+ }
+ }
+
+ public static Boolean filterExecute(Expression expression, Map<String, Object> map) {
+
+ boolean result;
+ Object object = expression.execute(map);
+ if (object != null) {
+ result = (Boolean) object;
+ } else {
+ throw new ExpressionRuntimeException();
+ }
+ return result;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitOperator.java b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitOperator.java
new file mode 100644
index 0000000..f6d2c8c
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitOperator.java
@@ -0,0 +1,29 @@
+package com.geedgenetworks.core.split;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.pojo.SplitConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+public class SplitOperator implements Split {
+
+ @Override
+ public DataStream<Event> splitFunction(
+ DataStream<Event> dataStream, SplitConfig splitConfig)
+ throws Exception {
+ if (splitConfig.getParallelism() != 0) {
+ return dataStream
+ .process(new SplitFunction(splitConfig))
+ .setParallelism(splitConfig.getParallelism())
+ .name(splitConfig.getName());
+ } else {
+ return dataStream
+ .process(new SplitFunction(splitConfig))
+ .name(splitConfig.getName());
+ }
+ }
+ @Override
+ public String type() {
+ return "split";
+ }
+
+}
diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.split.Split b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.split.Split
new file mode 100644
index 0000000..500c367
--- /dev/null
+++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.split.Split
@@ -0,0 +1 @@
+com.geedgenetworks.core.split.SplitOperator \ No newline at end of file