diff options
| author | 李奉超 <[email protected]> | 2024-08-26 03:03:40 +0000 |
|---|---|---|
| committer | 李奉超 <[email protected]> | 2024-08-26 03:03:40 +0000 |
| commit | 7e268f460a683987d940c78d70fcb6d633a576ba (patch) | |
| tree | 256a67b16618d94f5f6b622ea1f7fffd2b04690c | |
| parent | 56b21d494bfa07012b1cc4e43dcb4ccdb6257d12 (diff) | |
| parent | 9b0297020611fcf70445284637f370b5f8c4fddd (diff) | |
Merge branch 'feature/split' into 'develop'
Feature/split
See merge request galaxy/platform/groot-stream!95
35 files changed, 849 insertions, 128 deletions
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml index 4726af0..46d1123 100644 --- a/config/grootstream_job_example.yaml +++ b/config/grootstream_job_example.yaml @@ -11,7 +11,14 @@ filters: type: aviator properties: expression: event.server_ip != '12.12.12.12' - +splits: + decoded_as_split: + type: split + rules: + - name: projection_processor + expression: event.decoded_as == 'HTTP' + - name: aggregate_processor + expression: event.decoded_as == 'DNS' processing_pipelines: projection_processor: type: projection @@ -25,7 +32,7 @@ processing_pipelines: group_by_fields: [server_ip,server_port] window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time window_timestamp_field: recv_time - window_size: 60 + window_size: 6 window_slide: 10 #滑动窗口步长 functions: - function: NUMBER_SUM @@ -65,10 +72,12 @@ application: strategy: none topology: - name: inline_source - downstream: [filter_operator] - - name: filter_operator - downstream: [ projection_processor ] + downstream: [decoded_as_split] + - name: decoded_as_split + downstream: [ projection_processor ,aggregate_processor] - name: projection_processor downstream: [ print_sink ] + - name: aggregate_processor + downstream: [ print_sink ] - name: print_sink downstream: []
\ No newline at end of file diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java index d2b37f6..6f33cae 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java @@ -3,6 +3,7 @@ package com.geedgenetworks.bootstrap.enums; public enum ProcessorType { SOURCE("source"), FILTER("filter"), + SPLIT("split"), PREPROCESSING("preprocessing"), PROCESSING("processing"), POSTPROCESSING("postprocessing"), diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java index 64c66b6..7a55ffe 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java @@ -1,26 +1,26 @@ package com.geedgenetworks.bootstrap.execution; +import com.geedgenetworks.common.Event; import com.geedgenetworks.common.utils.ReflectionUtils; import com.geedgenetworks.core.filter.Filter; import com.geedgenetworks.core.processor.Processor; -import com.geedgenetworks.core.processor.aggregate.AggregateProcessor; -import com.geedgenetworks.core.processor.projection.ProjectionProcessor; +import com.geedgenetworks.core.split.Split; import com.typesafe.config.Config; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; + import java.net.URL; import java.net.URLClassLoader; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.ServiceLoader; +import java.util.*; import java.util.function.BiConsumer; public abstract class AbstractExecutor<K, V> - implements Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> { + implements Executor<DataStream<Event>, JobRuntimeEnvironment> { protected JobRuntimeEnvironment jobRuntimeEnvironment; protected final Config operatorConfig; protected final Map<K,V> operatorMap; protected final Map<String,Filter> filterMap = new HashMap<>(); + protected final Map<String, Split> splitMap = new HashMap<>(); protected final Map<String, Processor> processorMap = new HashMap<>(); protected AbstractExecutor(List<URL> jarPaths, Config operatorConfig) { @@ -30,6 +30,10 @@ public abstract class AbstractExecutor<K, V> for (Filter filter : filters) { this.filterMap.put(filter.type(), filter); } + ServiceLoader<Split> splits = ServiceLoader.load(Split.class); + for (Split split : splits) { + this.splitMap.put(split.type(), split); + } ServiceLoader<Processor> processors = ServiceLoader.load(Processor.class); for (Processor processor : processors) { this.processorMap.put(processor.type(), processor); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java index 66c0b0f..b0a04cd 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java @@ -2,6 +2,7 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson.JSONObject; import com.geedgenetworks.bootstrap.exception.JobExecuteException; +import com.geedgenetworks.common.Event; import com.geedgenetworks.common.config.*; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; @@ -13,6 +14,7 @@ import com.geedgenetworks.core.processor.table.TableProcessor; import com.geedgenetworks.core.processor.aggregate.AggregateProcessor; import com.geedgenetworks.core.processor.projection.ProjectionProcessor; import com.typesafe.config.Config; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; @@ -27,7 +29,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, } @Override - public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { + public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { ProcessorConfig processorConfig = operatorMap.get(node.getName()); switch (processorConfig.getType()) { @@ -45,7 +47,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, } return dataStream; } - protected SingleOutputStreamOperator executeTableProcessor(SingleOutputStreamOperator dataStream, Node node, TableConfig tableConfig) throws JobExecuteException { + protected DataStream<Event> executeTableProcessor(DataStream<Event> dataStream, Node node, TableConfig tableConfig) throws JobExecuteException { TableProcessor tableProcessor; if (processorMap.containsKey(tableConfig.getType())) { @@ -63,15 +65,15 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, tableConfig.setParallelism(node.getParallelism()); } try { - dataStream = - tableProcessor.processorFunction( - dataStream, tableConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig()); + + dataStream = tableProcessor.processorFunction( + dataStream, tableConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig()); } catch (Exception e) { throw new JobExecuteException("Create orderby pipeline instance failed!", e); } return dataStream; } - protected SingleOutputStreamOperator executeAggregateProcessor(SingleOutputStreamOperator dataStream, Node node, AggregateConfig aggregateConfig) throws JobExecuteException { + protected DataStream<Event> executeAggregateProcessor(DataStream<Event> dataStream, Node node, AggregateConfig aggregateConfig) throws JobExecuteException { AggregateProcessor aggregateProcessor; if (processorMap.containsKey(aggregateConfig.getType())) { @@ -98,7 +100,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, return dataStream; } - protected SingleOutputStreamOperator executeProjectionProcessor(SingleOutputStreamOperator dataStream, Node node, ProjectionConfig projectionConfig) throws JobExecuteException { + protected DataStream<Event> executeProjectionProcessor(DataStream<Event> dataStream, Node node, ProjectionConfig projectionConfig) throws JobExecuteException { ProjectionProcessor projectionProcessor; if (processorMap.containsKey(projectionConfig.getType())) { diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java index 506aa11..66f0585 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject; import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; import com.geedgenetworks.common.config.CheckConfigUtil; import com.geedgenetworks.common.config.CheckResult; import com.geedgenetworks.common.config.FilterConfigOptions; @@ -14,6 +15,7 @@ import com.geedgenetworks.core.pojo.FilterConfig; import com.google.common.collect.Maps; import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; @@ -54,7 +56,7 @@ public class FilterExecutor extends AbstractExecutor<String, FilterConfig> { } @Override - public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { + public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { FilterConfig filterConfig = operatorMap.get(node.getName()); String className = filterConfig.getType(); Filter filter; diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java index 2eabefa..22b23a7 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java @@ -5,15 +5,17 @@ import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.main.GrootStreamRunner; import com.geedgenetworks.common.Constants; -import com.geedgenetworks.common.config.GrootStreamConfig; import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.config.GrootStreamConfig; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.typesafe.config.Config; import com.typesafe.config.ConfigUtil; import com.typesafe.config.ConfigValueFactory; import lombok.extern.slf4j.Slf4j; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.util.OutputTag; import java.io.File; import java.net.MalformedURLException; @@ -27,17 +29,18 @@ import java.util.stream.Stream; public class JobExecution { private final JobRuntimeEnvironment jobRuntimeEnvironment; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sourceExecutor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sinkExecutor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> filterExecutor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> preprocessingExecutor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> processingExecutor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> postprocessingExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> sourceExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> sinkExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> filterExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> splitExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> preprocessingExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> processingExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor; private final List<Node> nodes; - private final List<URL> jarPaths; + public JobExecution(Config config, GrootStreamConfig grootStreamConfig) { - try { + try { jarPaths = new ArrayList<>(Collections.singletonList(new File(StartBuilder.appBootstrapDir() .resolve(GrootStreamRunner.APP_JAR_NAME).toString()) .toURI().toURL())); @@ -50,6 +53,7 @@ public class JobExecution { this.sourceExecutor = new SourceExecutor(jarPaths, config); this.sinkExecutor = new SinkExecutor(jarPaths, config); this.filterExecutor = new FilterExecutor(jarPaths, config); + this.splitExecutor = new SplitExecutor(jarPaths, config); this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, config); this.processingExecutor = new ProcessingExecutor(jarPaths, config); this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, config); @@ -59,6 +63,7 @@ public class JobExecution { this.sourceExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.sinkExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.filterExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.splitExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); @@ -69,7 +74,7 @@ public class JobExecution { private void registerPlugin(Config appConfig) { List<Path> thirdPartyJars = new ArrayList<>(); Config envConfig = appConfig.getConfig(Constants.APPLICATION_ENV); - if(envConfig.hasPath(ExecutionConfigKeyName.JARS)) { + if (envConfig.hasPath(ExecutionConfigKeyName.JARS)) { thirdPartyJars = new ArrayList<>(StartBuilder .getThirdPartyJars(envConfig.getString(ExecutionConfigKeyName.JARS))); } @@ -81,7 +86,7 @@ public class JobExecution { .map(uri -> { try { return uri.toURL(); - }catch (MalformedURLException e){ + } catch (MalformedURLException e) { throw new RuntimeException("the uri of jar illegal: " + uri, e); } }).collect(Collectors.toList()); @@ -93,10 +98,10 @@ public class JobExecution { } - private Config registerPlugin(Config config , List<URL> jars) { - config = this.injectJarsToConfig( - config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_JARS), jars); - return this.injectJarsToConfig( + private Config registerPlugin(Config config, List<URL> jars) { + config = this.injectJarsToConfig( + config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_JARS), jars); + return this.injectJarsToConfig( config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_CLASSPATHS), jars); } @@ -127,7 +132,7 @@ public class JobExecution { .collect(Collectors.toSet()); paths.addAll(validJars); - config = config.withValue( + config = config.withValue( path, ConfigValueFactory.fromAnyRef( paths.stream() @@ -150,8 +155,9 @@ public class JobExecution { private List<Node> buildJobNode(Config config) { Map<String, Object> sources = Maps.newHashMap(); - Map<String, Object> sinks =Maps.newHashMap(); + Map<String, Object> sinks = Maps.newHashMap(); Map<String, Object> filters = Maps.newHashMap(); + Map<String, Object> splits = Maps.newHashMap(); Map<String, Object> preprocessingPipelines = Maps.newHashMap(); Map<String, Object> processingPipelines = Maps.newHashMap(); Map<String, Object> postprocessingPipelines = Maps.newHashMap(); @@ -160,11 +166,14 @@ public class JobExecution { sources = config.getConfig(Constants.SOURCES).root().unwrapped(); } if (config.hasPath(Constants.SINKS)) { - sinks =config.getConfig(Constants.SINKS).root().unwrapped(); + sinks = config.getConfig(Constants.SINKS).root().unwrapped(); } if (config.hasPath(Constants.FILTERS)) { filters = config.getConfig(Constants.FILTERS).root().unwrapped(); } + if (config.hasPath(Constants.SPLITS)) { + splits = config.getConfig(Constants.SPLITS).root().unwrapped(); + } if (config.hasPath(Constants.PREPROCESSING_PIPELINES)) { preprocessingPipelines = config.getConfig(Constants.PREPROCESSING_PIPELINES).root().unwrapped(); } @@ -184,13 +193,15 @@ public class JobExecution { nodes.add(node); }); - for(Node node : nodes) { + for (Node node : nodes) { if (sources.containsKey(node.getName())) { node.setType(ProcessorType.SOURCE); } else if (sinks.containsKey(node.getName())) { node.setType(ProcessorType.SINK); } else if (filters.containsKey(node.getName())) { node.setType(ProcessorType.FILTER); + } else if (splits.containsKey(node.getName())) { + node.setType(ProcessorType.SPLIT); } else if (preprocessingPipelines.containsKey(node.getName())) { node.setType(ProcessorType.PREPROCESSING); } else if (processingPipelines.containsKey(node.getName())) { @@ -214,12 +225,12 @@ public class JobExecution { List<Node> sourceNodes = nodes .stream().filter(v -> v.getType().name().equals(ProcessorType.SOURCE.name())).collect(Collectors.toList()); - SingleOutputStreamOperator<Event> singleOutputStreamOperator = null; + DataStream<Event> dataStream = null; - for(Node sourceNode : sourceNodes) { - singleOutputStreamOperator = sourceExecutor.execute(singleOutputStreamOperator, sourceNode); + for (Node sourceNode : sourceNodes) { + dataStream = sourceExecutor.execute(dataStream, sourceNode); for (String nodeName : sourceNode.getDownstream()) { - buildJobGraph(singleOutputStreamOperator, nodeName); + buildJobGraph(dataStream, nodeName); } } @@ -230,7 +241,7 @@ public class JobExecution { log.info("Execution Job: {}", jobRuntimeEnvironment.getJobName()); jobRuntimeEnvironment.getStreamExecutionEnvironment().execute(jobRuntimeEnvironment.getJobName()); final long jobEndTime = System.currentTimeMillis(); - log.info("Job finished, execution duration: {} ms", jobEndTime-jobStartTime); + log.info("Job finished, execution duration: {} ms", jobEndTime - jobStartTime); } catch (Exception e) { throw new JobExecuteException("Execute job error", e); @@ -238,34 +249,62 @@ public class JobExecution { } - private void buildJobGraph(SingleOutputStreamOperator<Event> singleOutputStreamOperator, String downstreamNodeName) { + private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) { Node node = getNode(downstreamNodeName).orElseGet(() -> { throw new JobExecuteException("can't find downstream node " + downstreamNodeName); }); if (node.getType().name().equals(ProcessorType.FILTER.name())) { - singleOutputStreamOperator = filterExecutor.execute(singleOutputStreamOperator, node); + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = filterExecutor.execute(dataStream, node); + } + } else if (node.getType().name().equals(ProcessorType.SPLIT.name())) { + dataStream = splitExecutor.execute(dataStream, node); + } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) { - singleOutputStreamOperator = preprocessingExecutor.execute(singleOutputStreamOperator, node); + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = preprocessingExecutor.execute(dataStream, node); + } } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) { - singleOutputStreamOperator = processingExecutor.execute(singleOutputStreamOperator, node); + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = processingExecutor.execute(dataStream, node); + } } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) { - singleOutputStreamOperator = postprocessingExecutor.execute(singleOutputStreamOperator, node); + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = postprocessingExecutor.execute(dataStream, node); + } } else if (node.getType().name().equals(ProcessorType.SINK.name())) { - singleOutputStreamOperator = sinkExecutor.execute(singleOutputStreamOperator, node); + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = sinkExecutor.execute(dataStream, node); + } } else { throw new JobExecuteException("unsupported process type " + node.getType().name()); } for (String nodeName : node.getDownstream()) { - buildJobGraph(singleOutputStreamOperator, nodeName); + buildJobGraph(dataStream, nodeName); } } private Optional<Node> getNode(String name) { - return nodes.stream().filter(v-> v.getName().equals(name)).findFirst(); + return nodes.stream().filter(v -> v.getName().equals(name)).findFirst(); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java index 7141f5e..2e68098 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java @@ -25,9 +25,7 @@ import org.apache.flink.util.TernaryBoolean; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URL; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; +import java.util.*; import java.util.stream.Collectors; @Slf4j @@ -37,6 +35,8 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ private GrootStreamConfig grootStreamConfig; private StreamExecutionEnvironment environment; private String jobName = Constants.DEFAULT_JOB_NAME; + private Set<String> splitSet = new HashSet<>(); + private JobRuntimeEnvironment(Config config, GrootStreamConfig grootStreamConfig) { this.grootStreamConfig = grootStreamConfig; @@ -197,6 +197,14 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } } + public Set<String> getSplitSet() { + return splitSet; + } + + public void setSplitSet(Set<String> splitSet) { + this.splitSet = splitSet; + } + private void setCheckpoint() { long interval = 0; if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_INTERVAL)) { @@ -272,6 +280,7 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ + } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java index 36fad61..b9555b4 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java @@ -3,9 +3,11 @@ package com.geedgenetworks.bootstrap.execution; import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.ProcessorConfig; import com.google.common.collect.Maps; import com.typesafe.config.Config; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; @@ -36,7 +38,7 @@ public class PostprocessingExecutor extends AbstractProcessorExecutor { @Override - public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { + public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { return super.execute(dataStream, node); } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java index b1e53e4..a7b9e5e 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java @@ -3,10 +3,12 @@ package com.geedgenetworks.bootstrap.execution; import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.ProcessorConfig; import com.google.common.collect.Maps; import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; @@ -37,7 +39,8 @@ public class PreprocessingExecutor extends AbstractProcessorExecutor { } @Override - public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { + public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { + return super.execute(dataStream, node); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java index d69fe8c..f6788ed 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java @@ -3,9 +3,11 @@ package com.geedgenetworks.bootstrap.execution; import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.ProcessorConfig; import com.google.common.collect.Maps; import com.typesafe.config.Config; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; @@ -35,7 +37,7 @@ public class ProcessingExecutor extends AbstractProcessorExecutor { } @Override - public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { + public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { return super.execute(dataStream, node); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java index a0bedc9..70934b8 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java @@ -5,6 +5,7 @@ import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.utils.SchemaConfigParse; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; import com.geedgenetworks.common.config.CheckConfigUtil; import com.geedgenetworks.common.config.CheckResult; import com.geedgenetworks.common.config.SinkConfigOptions; @@ -20,6 +21,7 @@ import com.google.common.collect.Maps; import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -63,7 +65,7 @@ public class SinkExecutor extends AbstractExecutor<String, SinkConfig> { } @Override - public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { + public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { SinkConfig sinkConfig = operatorMap.get(node.getName()); try { SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, sinkConfig.getType()); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java index d4751af..9dff6b0 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java @@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; @@ -67,7 +68,7 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> { } @Override - public SingleOutputStreamOperator execute(SingleOutputStreamOperator outputStreamOperator, Node node) throws JobExecuteException { + public DataStream<Event> execute(DataStream<Event> outputStreamOperator, Node node) throws JobExecuteException { SourceConfig sourceConfig = operatorMap.get(node.getName()); SingleOutputStreamOperator sourceSingleOutputStreamOperator; try { @@ -152,4 +153,5 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> { return 0; } } + } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java new file mode 100644 index 0000000..3513a67 --- /dev/null +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java @@ -0,0 +1,91 @@ +package com.geedgenetworks.bootstrap.execution; + +import com.alibaba.fastjson.JSONObject; +import com.geedgenetworks.bootstrap.exception.JobExecuteException; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.config.CheckConfigUtil; +import com.geedgenetworks.common.config.CheckResult; +import com.geedgenetworks.common.config.SplitConfigOptions; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.ConfigValidationException; +import com.geedgenetworks.common.udf.RuleContext; +import com.geedgenetworks.core.pojo.SplitConfig; +import com.geedgenetworks.core.split.Split; +import com.google.common.collect.Maps; +import com.typesafe.config.Config; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.streaming.api.datastream.DataStream; + +import java.net.URL; +import java.util.List; +import java.util.Map; + + +/** + * Initialize config and execute filter operator + */ +@Slf4j +public class SplitExecutor extends AbstractExecutor<String, SplitConfig> { + + + public SplitExecutor(List<URL> jarPaths, Config config) { + super(jarPaths, config); + } + + @Override + protected Map<String, SplitConfig> initialize(List<URL> jarPaths, Config operatorConfig) { + Map<String, SplitConfig> splitConfigMap = Maps.newHashMap(); + if (operatorConfig.hasPath(Constants.SPLITS)) { + Config routes = operatorConfig.getConfig(Constants.SPLITS); + routes.root().unwrapped().forEach((key, value) -> { + CheckResult result = CheckConfigUtil.checkAllExists(routes.getConfig(key), + SplitConfigOptions.TYPE.key()); + if (!result.isSuccess()) { + throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( + "split: %s, Message: %s", + key, result.getMsg())); + } + SplitConfig splitConfig = new JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class); + splitConfig.setName(key); + splitConfigMap.put(key, splitConfig); + }); + } + + return splitConfigMap; + } + + @Override + public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { + SplitConfig splitConfig = operatorMap.get(node.getName()); + String className = splitConfig.getType(); + Split split; + if (splitMap.containsKey(splitConfig.getType())) { + + split = splitMap.get(splitConfig.getType()); + } else { + Class cls; + try { + cls = Class.forName(className); + split = (Split) cls.newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { + throw new JobExecuteException("get split instance failed!", e); + } + } + if (node.getParallelism() > 0) { + splitConfig.setParallelism(node.getParallelism()); + } + for(RuleContext ruleContext:splitConfig.getRules()) { + jobRuntimeEnvironment.getSplitSet().add(ruleContext.getName()); + } + try { + dataStream = + split.splitFunction( + dataStream, splitConfig); + } catch (Exception e) { + throw new JobExecuteException("Create split instance failed!", e); + } + return dataStream; + } + +} diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java index 8b299df..7e995cf 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java @@ -16,7 +16,9 @@ import com.typesafe.config.ConfigUtil; import com.typesafe.config.ConfigValueFactory; import lombok.Data; import lombok.extern.slf4j.Slf4j; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.util.OutputTag; import java.io.File; import java.net.MalformedURLException; @@ -33,12 +35,13 @@ import java.util.stream.Stream; public class JobExecutionTest { protected final JobRuntimeEnvironment jobRuntimeEnvironment; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sourceExecuteProcessor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> filterExecuteProcessor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> preprocessingExecuteProcessor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> processingExecuteProcessor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> postprocessingExecuteProcessor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sinkExecuteProcessor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> sourceExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> filterExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> preprocessingExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> splitExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> processingExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> sinkExecutor; private final List<Node> nodes; @@ -66,21 +69,22 @@ public class JobExecutionTest { } registerPlugin(config.getConfig(Constants.APPLICATION)); - this.sourceExecuteProcessor = new SourceExecutor(jarPaths, config); - this.sinkExecuteProcessor = new SinkExecutor(jarPaths, config); - this.filterExecuteProcessor = new FilterExecutor(jarPaths, config); - this.preprocessingExecuteProcessor = new PreprocessingExecutor(jarPaths, config); - this.processingExecuteProcessor = new ProcessingExecutor(jarPaths, config); - this.postprocessingExecuteProcessor = new PostprocessingExecutor(jarPaths, config); + this.sourceExecutor = new SourceExecutor(jarPaths, config); + this.sinkExecutor = new SinkExecutor(jarPaths, config); + this.splitExecutor = new SplitExecutor(jarPaths, config); + this.filterExecutor = new FilterExecutor(jarPaths, config); + this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, config); + this.processingExecutor = new ProcessingExecutor(jarPaths, config); + this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, config); this.jobRuntimeEnvironment = JobRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths), grootStreamConfig); - - this.sourceExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.sinkExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.filterExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.preprocessingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.processingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.postprocessingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.splitExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.sourceExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.sinkExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.filterExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.nodes = buildJobNode(config); } @@ -173,6 +177,7 @@ public class JobExecutionTest { Map<String, Object> sources = Maps.newHashMap(); Map<String, Object> sinks =Maps.newHashMap(); Map<String, Object> filters = Maps.newHashMap(); + Map<String, Object> splits = Maps.newHashMap(); Map<String, Object> preprocessingPipelines = Maps.newHashMap(); Map<String, Object> processingPipelines = Maps.newHashMap(); Map<String, Object> postprocessingPipelines = Maps.newHashMap(); @@ -186,6 +191,9 @@ public class JobExecutionTest { if (config.hasPath(Constants.FILTERS)) { filters = config.getConfig(Constants.FILTERS).root().unwrapped(); } + if (config.hasPath(Constants.SPLITS)) { + splits = config.getConfig(Constants.SPLITS).root().unwrapped(); + } if (config.hasPath(Constants.PREPROCESSING_PIPELINES)) { preprocessingPipelines = config.getConfig(Constants.PREPROCESSING_PIPELINES).root().unwrapped(); } @@ -210,6 +218,8 @@ public class JobExecutionTest { node.setType(ProcessorType.SOURCE); } else if (sinks.containsKey(node.getName())) { node.setType(ProcessorType.SINK); + } else if (splits.containsKey(node.getName())) { + node.setType(ProcessorType.SPLIT); } else if (filters.containsKey(node.getName())) { node.setType(ProcessorType.FILTER); } else if (preprocessingPipelines.containsKey(node.getName())) { @@ -228,15 +238,15 @@ public class JobExecutionTest { } - public SingleOutputStreamOperator<Event> getSingleOutputStreamOperator() throws JobExecuteException { + public DataStream<Event> getSingleOutputStreamOperator() throws JobExecuteException { List<Node> sourceNodes = nodes .stream().filter(v -> v.getType().name().equals(ProcessorType.SOURCE.name())).collect(Collectors.toList()); - SingleOutputStreamOperator<Event> singleOutputStreamOperator = null; + DataStream<Event> singleOutputStreamOperator = null; for(Node sourceNode : sourceNodes) { - singleOutputStreamOperator = sourceExecuteProcessor.execute(singleOutputStreamOperator, sourceNode); + singleOutputStreamOperator = sourceExecutor.execute(singleOutputStreamOperator, sourceNode); for (String nodeName : sourceNode.getDownstream()) { buildJobGraph(singleOutputStreamOperator, nodeName); } @@ -247,27 +257,55 @@ public class JobExecutionTest { } - private void buildJobGraph(SingleOutputStreamOperator<Event> singleOutputStreamOperator, String downstreamNodeName) { + private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) { Node node = getNode(downstreamNodeName).orElseGet(() -> { throw new JobExecuteException("can't find downstream node " + downstreamNodeName); }); if (node.getType().name().equals(ProcessorType.FILTER.name())) { - singleOutputStreamOperator = filterExecuteProcessor.execute(singleOutputStreamOperator, node); + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = filterExecutor.execute(dataStream, node); + } + } else if (node.getType().name().equals(ProcessorType.SPLIT.name())) { + dataStream = splitExecutor.execute(dataStream, node); + } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) { - singleOutputStreamOperator = preprocessingExecuteProcessor.execute(singleOutputStreamOperator, node); + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = preprocessingExecutor.execute(dataStream, node); + } } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) { - singleOutputStreamOperator = processingExecuteProcessor.execute(singleOutputStreamOperator, node); + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = processingExecutor.execute(dataStream, node); + } } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) { - singleOutputStreamOperator = postprocessingExecuteProcessor.execute(singleOutputStreamOperator, node); + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = postprocessingExecutor.execute(dataStream, node); + } } else if (node.getType().name().equals(ProcessorType.SINK.name())) { - singleOutputStreamOperator = sinkExecuteProcessor.execute(singleOutputStreamOperator, node); - } else { + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = sinkExecutor.execute(dataStream, node); + } + } else { throw new JobExecuteException("unsupported process type " + node.getType().name()); } for (String nodeName : node.getDownstream()) { - buildJobGraph(singleOutputStreamOperator, nodeName); + buildJobGraph(dataStream, nodeName); } diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java new file mode 100644 index 0000000..2f6984b --- /dev/null +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java @@ -0,0 +1,74 @@ +package com.geedgenetworks.bootstrap.main.simple; + +import cn.hutool.setting.yaml.YamlUtil; +import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs; +import com.geedgenetworks.bootstrap.enums.EngineType; +import com.geedgenetworks.bootstrap.exception.JobExecuteException; +import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName; +import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink; +import com.geedgenetworks.bootstrap.utils.CommandLineUtils; +import com.geedgenetworks.bootstrap.utils.ConfigFileUtils; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.config.ConfigProvider; +import com.geedgenetworks.common.config.GrootStreamConfig; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigObject; +import com.typesafe.config.ConfigUtil; +import com.typesafe.config.ConfigValueFactory; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; + +import java.nio.file.Path; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertTrue; + + +public class JobSplitTest { + + @ClassRule + public static MiniClusterWithClientResource flinkCluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberSlotsPerTaskManager(1) + .setNumberTaskManagers(1) + .build()); + + @Test + public void testSplit() { + + CollectSink.values.clear(); + String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_test.yaml"}; + ExecuteCommandArgs executeCommandArgs = CommandLineUtils + .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); + + executeCommandArgs.buildCommand(); + + + GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig(); + Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs); + // check config file exist + Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString()); + ConfigObject configObject = ConfigValueFactory.fromMap(configMap); + Config config = configObject.toConfig(); + + config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE), + ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); + + + JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig); + jobExecution.getSingleOutputStreamOperator(); + + try { + jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute(); + } catch (Exception e) { + throw new JobExecuteException("Job executed error", e); + } + Assert.assertEquals(7, CollectSink.values.size()); + } + +} diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java new file mode 100644 index 0000000..2edc5e7 --- /dev/null +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java @@ -0,0 +1,72 @@ +package com.geedgenetworks.bootstrap.main.simple; + +import cn.hutool.setting.yaml.YamlUtil; +import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs; +import com.geedgenetworks.bootstrap.enums.EngineType; +import com.geedgenetworks.bootstrap.exception.JobExecuteException; +import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName; +import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink; +import com.geedgenetworks.bootstrap.utils.CommandLineUtils; +import com.geedgenetworks.bootstrap.utils.ConfigFileUtils; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.config.ConfigProvider; +import com.geedgenetworks.common.config.GrootStreamConfig; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigObject; +import com.typesafe.config.ConfigUtil; +import com.typesafe.config.ConfigValueFactory; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; + +import java.nio.file.Path; +import java.util.Map; + + +public class JobSplitWithAggTest { + + @ClassRule + public static MiniClusterWithClientResource flinkCluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberSlotsPerTaskManager(1) + .setNumberTaskManagers(1) + .build()); + + @Test + public void testSplitForAgg() { + + CollectSink.values.clear(); + String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_agg_test.yaml"}; + ExecuteCommandArgs executeCommandArgs = CommandLineUtils + .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); + + executeCommandArgs.buildCommand(); + + + GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig(); + Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs); + // check config file exist + Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString()); + ConfigObject configObject = ConfigValueFactory.fromMap(configMap); + Config config = configObject.toConfig(); + + config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE), + ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); + + + JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig); + jobExecution.getSingleOutputStreamOperator(); + + try { + jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute(); + } catch (Exception e) { + throw new JobExecuteException("Job executed error", e); + } + Assert.assertEquals(1, CollectSink.values.size()); + Assert.assertEquals("2", CollectSink.values.get(0).getExtractedFields().get("sessions").toString()); + + } +} diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java index c4f54a3..556c8c4 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java @@ -75,16 +75,11 @@ public class SimpleJobTest { assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("recv_time").toString())); assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("processing_time").toString())); assertTrue(0 != Long.parseLong(CollectSink.values.get(0).getExtractedFields().get("log_id").toString())); - Assert.assertEquals("印第安纳州", CollectSink.values.get(0).getExtractedFields().get("server_super_admin_area").toString()); - Assert.assertEquals("6167", CollectSink.values.get(0).getExtractedFields().get("server_asn").toString()); - Assert.assertEquals("美国", CollectSink.values.get(0).getExtractedFields().get("server_country_region").toString()); - Assert.assertTrue(!CollectSink.values.get(0).getExtractedFields().containsKey("client_country_region")); Assert.assertEquals("http://192.168.44.12:9098/hos/traffic_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString()); Assert.assertEquals("[2600:1015:b002::,255.255.255.255]", CollectSink.values.get(0).getExtractedFields().get("ip_string").toString()); Assert.assertEquals("hello", CollectSink.values.get(0).getExtractedFields().get("mail_attachment_name").toString()); Assert.assertEquals("MULTIPATH_ETHERNET", CollectSink.values.get(0).getExtractedFields().get("tunnels_schema_type").toString()); List<String> asn_list = (List<String>) CollectSink.values.get(0).getExtractedFields().get("asn_list"); - Assert.assertEquals("6167", asn_list.get(0)); } @@ -121,4 +116,5 @@ public class SimpleJobTest { } Assert.assertEquals(4, CollectSink.values.size()); } + } diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java index dfcb459..c5806ed 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java @@ -3,9 +3,7 @@ package com.geedgenetworks.bootstrap.main.simple.collect; import com.geedgenetworks.common.Event; import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.util.*; public class CollectSink implements SinkFunction<Event> { diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml new file mode 100644 index 0000000..872800f --- /dev/null +++ b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml @@ -0,0 +1,92 @@ +sources: + inline_source: + type : inline + fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. + properties: + data: '[{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' + interval.per.row: 1s # 可选 + repeat.count: 1 # 可选 + format: json + json.ignore.parse.errors: false +sinks: + collect_sink: + type: collect + properties: + format: json +splits: + test_split: + type: split + rules: + - name: aggregate_processor + expression: event.decoded_as == 'DNS' + +postprocessing_pipelines: + pre_etl_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: [fields,tags] + output_fields: + functions: # [array of object] Function List + + - function: FLATTEN + lookup_fields: [ fields,tags ] + output_fields: [ ] + parameters: + #prefix: "" + depth: 3 + # delimiter: "." + + - function: RENAME + lookup_fields: [ '' ] + output_fields: [ '' ] + filter: + parameters: + # parent_fields: [tags] + # rename_fields: + # tags: tags + rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key; + + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ timestamp_ms ] + output_fields: [ recv_time ] + parameters: + precision: seconds + interval: 300 + # + + aggregate_processor: + type: aggregate + group_by_fields: [decoded_as] + window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time + window_size: 5 + window_timestamp_field: test_time + functions: + - function: NUMBER_SUM + lookup_fields: [ sessions ] + + table_processor: + type: table + functions: + - function: JSON_UNROLL + lookup_fields: [ encapsulation ] + output_fields: [ new_name ] + +application: # [object] Application Configuration + env: # [object] Environment Variables + name: groot-stream-job # [string] Job Name + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: # [array of object] Node List. It will be used build data flow for job dag graph. + - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. + parallelism: 1 # [number] Operator-Level Parallelism. + downstream: [test_split] + - name: test_split + parallelism: 1 + downstream: [ aggregate_processor ] + - name: aggregate_processor + parallelism: 1 + downstream: [ collect_sink ] + - name: collect_sink + parallelism: 1 + + diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml new file mode 100644 index 0000000..9bb2900 --- /dev/null +++ b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml @@ -0,0 +1,97 @@ +sources: + inline_source: + type : inline + fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. + properties: + data: '[{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' + interval.per.row: 1s # 可选 + repeat.count: 1 # 可选 + format: json + json.ignore.parse.errors: false +sinks: + collect_sink: + type: collect + properties: + format: json +splits: + test_split: + type: split + rules: + - name: table_processor + expression: event.decoded_as == 'HTTP' + - name: pre_etl_processor + expression: event.decoded_as == 'DNS' + +postprocessing_pipelines: + pre_etl_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: [fields,tags] + output_fields: + functions: # [array of object] Function List + + - function: FLATTEN + lookup_fields: [ fields,tags ] + output_fields: [ ] + parameters: + #prefix: "" + depth: 3 + # delimiter: "." + + - function: RENAME + lookup_fields: [ '' ] + output_fields: [ '' ] + filter: + parameters: + # parent_fields: [tags] + # rename_fields: + # tags: tags + rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key; + + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ timestamp_ms ] + output_fields: [ recv_time ] + parameters: + precision: seconds + interval: 300 + # + + aggregate_processor: + type: aggregate + group_by_fields: [decoded_as] + window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time + window_size: 5 + window_timestamp_field: test_time + functions: + - function: NUMBER_SUM + lookup_fields: [ sessions ] + + table_processor: + type: table + functions: + - function: JSON_UNROLL + lookup_fields: [ encapsulation ] + output_fields: [ new_name ] + +application: # [object] Application Configuration + env: # [object] Environment Variables + name: groot-stream-job # [string] Job Name + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: # [array of object] Node List. It will be used build data flow for job dag graph. + - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. + parallelism: 1 # [number] Operator-Level Parallelism. + downstream: [test_split,collect_sink] + - name: test_split + parallelism: 1 + downstream: [ table_processor,pre_etl_processor ] + - name: pre_etl_processor + parallelism: 1 + downstream: [ collect_sink ] + - name: table_processor + parallelism: 1 + downstream: [ collect_sink ] + - name: collect_sink + parallelism: 1 + + diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java index d13fc4b..e973c20 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java @@ -11,7 +11,7 @@ public final class Constants { public static final String SINKS = "sinks"; public static final int SYSTEM_EXIT_CODE = 2618; public static final String APPLICATION = "application"; - + public static final String SPLITS = "splits"; public static final String APPLICATION_ENV ="env"; public static final String APPLICATION_TOPOLOGY = "topology"; public static final String JOB_NAME = "name"; diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java new file mode 100644 index 0000000..a2acb71 --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java @@ -0,0 +1,18 @@ +package com.geedgenetworks.common.config; + +import com.alibaba.fastjson2.TypeReference; +import com.geedgenetworks.common.udf.RuleContext; +import java.util.List; + +public interface SplitConfigOptions { + Option<String> TYPE = Options.key("type") + .stringType() + .noDefaultValue() + .withDescription("The type of route ."); + + Option<List<RuleContext>> RULES = Options.key("rules") + .type(new TypeReference<List<RuleContext>>() {}) + .noDefaultValue() + .withDescription("The rules to be executed."); + +} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java new file mode 100644 index 0000000..ead0ecd --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java @@ -0,0 +1,19 @@ +package com.geedgenetworks.common.udf; + +import com.geedgenetworks.common.Event; +import com.googlecode.aviator.Expression; +import lombok.Data; +import org.apache.flink.util.OutputTag; + +import java.io.Serializable; + +@Data +public class RuleContext implements Serializable { + + private String name; + private String expression; + private Expression compiledExpression; + private OutputTag<Event> outputTag ; + + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java b/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java index 668ba6f..d8b8bc4 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java @@ -2,13 +2,14 @@ package com.geedgenetworks.core.filter; import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.FilterConfig; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; public class AviatorFilter implements Filter { @Override - public SingleOutputStreamOperator<Event> filterFunction( - SingleOutputStreamOperator<Event> singleOutputStreamOperator, FilterConfig FilterConfig) + public DataStream<Event> filterFunction( + DataStream<Event> singleOutputStreamOperator, FilterConfig FilterConfig) throws Exception { if (FilterConfig.getParallelism() != 0) { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java b/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java index a173438..f8b50eb 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java @@ -3,12 +3,13 @@ package com.geedgenetworks.core.filter; import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.FilterConfig; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; public interface Filter { - SingleOutputStreamOperator<Event> filterFunction( - SingleOutputStreamOperator<Event> singleOutputStreamOperator, FilterConfig FilterConfig) + DataStream<Event> filterFunction( + DataStream<Event> singleOutputStreamOperator, FilterConfig FilterConfig) throws Exception; String type(); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/SplitConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SplitConfig.java new file mode 100644 index 0000000..4381df5 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SplitConfig.java @@ -0,0 +1,17 @@ +package com.geedgenetworks.core.pojo; + +import com.geedgenetworks.common.udf.RuleContext; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +@Data +public class SplitConfig implements Serializable { + + private String type; + private Map<String, Object> properties; + private int parallelism; + private String name; + private List<RuleContext> rules; +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java index 172b368..3852414 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java @@ -2,12 +2,13 @@ package com.geedgenetworks.core.processor; import com.geedgenetworks.common.Event; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; public interface Processor<T> { - SingleOutputStreamOperator<Event> processorFunction( - SingleOutputStreamOperator<Event> singleOutputStreamOperator, + DataStream<Event> processorFunction( + DataStream<Event> singleOutputStreamOperator, T processorConfig, ExecutionConfig config) throws Exception; String type(); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java index 803fefc..4f9535d 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java @@ -31,8 +31,7 @@ import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect; public class AggregateProcessorFunction implements org.apache.flink.api.common.functions.AggregateFunction<Event, Accumulator, Accumulator> { private final List<UDFContext> udfContexts; private final List<String> udfClassNameLists; - private final List<String> groupByFields; - private LinkedList<UdfEntity> functions; + private final LinkedList<UdfEntity> functions; public AggregateProcessorFunction(AggregateConfig aggregateConfig, ExecutionConfig config) { udfClassNameLists = JSON.parseObject(config.getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class); @@ -40,7 +39,6 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f if (udfContexts == null || udfContexts.isEmpty()) { throw new RuntimeException(); } - groupByFields = aggregateConfig.getGroup_by_fields(); functions = Lists.newLinkedList(); Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists); try { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java index bc87c32..cf78310 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java @@ -5,45 +5,50 @@ import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.core.pojo.AggregateConfig; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.util.OutputTag; + import static com.geedgenetworks.common.Constants.*; public class AggregateProcessorImpl implements AggregateProcessor { @Override - public SingleOutputStreamOperator<Event> processorFunction(SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception { + public DataStream<Event> processorFunction(DataStream<Event> grootEventDataStream, AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception { + if (aggregateConfig.getParallelism() != 0) { switch (aggregateConfig.getWindow_type()) { case TUMBLING_PROCESSING_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); case TUMBLING_EVENT_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); case SLIDING_PROCESSING_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); case SLIDING_EVENT_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); default: throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type"); } }else { switch (aggregateConfig.getWindow_type()) { case TUMBLING_PROCESSING_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); case TUMBLING_EVENT_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); case SLIDING_PROCESSING_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); case SLIDING_EVENT_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); default: throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type"); } } + } @Override diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java index 6b46a7b..d87e7e2 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java @@ -4,25 +4,26 @@ import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.ProjectionConfig; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.util.OutputTag; public class ProjectionProcessorImpl implements ProjectionProcessor { @Override - public SingleOutputStreamOperator<Event> processorFunction(SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, ProjectionConfig projectionConfig, ExecutionConfig config) throws Exception { + public DataStream<Event> processorFunction(DataStream<Event> grootEventDataStream, ProjectionConfig projectionConfig, ExecutionConfig config) throws Exception { if (projectionConfig.getParallelism() != 0) { - return grootEventSingleOutputStreamOperator + return grootEventDataStream .process(new ProjectionProcessFunction(projectionConfig)) .setParallelism(projectionConfig.getParallelism()) .name(projectionConfig.getName()); } else { - return grootEventSingleOutputStreamOperator + return grootEventDataStream .process(new ProjectionProcessFunction(projectionConfig)) .name(projectionConfig.getName()); } } - @Override public String type() { return "projection"; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java index f36f8db..6ddc616 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java @@ -2,25 +2,25 @@ package com.geedgenetworks.core.processor.table; import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.TableConfig; -import com.geedgenetworks.core.processor.projection.ProjectionProcessFunction; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.util.OutputTag; + +import java.util.Map; -import java.time.Duration; public class TableProcessorImpl implements TableProcessor { @Override - public SingleOutputStreamOperator<Event> processorFunction(SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, TableConfig tableConfig, ExecutionConfig config) throws Exception { - + public DataStream<Event> processorFunction(DataStream<Event> grootEventDataStream, TableConfig tableConfig, ExecutionConfig config) throws Exception { if (tableConfig.getParallelism() != 0) { - return grootEventSingleOutputStreamOperator + return grootEventDataStream .flatMap(new TableProcessorFunction(tableConfig)) .setParallelism(tableConfig.getParallelism()) .name(tableConfig.getName()); } else { - return grootEventSingleOutputStreamOperator + return grootEventDataStream .flatMap(new TableProcessorFunction(tableConfig)) .name(tableConfig.getName()); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/Split.java b/groot-core/src/main/java/com/geedgenetworks/core/split/Split.java new file mode 100644 index 0000000..37e7b44 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/split/Split.java @@ -0,0 +1,16 @@ +package com.geedgenetworks.core.split; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.pojo.SplitConfig; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; + +import java.util.Set; + +public interface Split { + + DataStream<Event> splitFunction( + DataStream<Event> dataStream, SplitConfig splitConfig) + throws Exception; + String type(); +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java new file mode 100644 index 0000000..f07b568 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java @@ -0,0 +1,79 @@ +package com.geedgenetworks.core.split; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.RuleContext; +import com.geedgenetworks.core.metrics.InternalMetrics; +import com.geedgenetworks.core.pojo.SplitConfig; +import com.googlecode.aviator.AviatorEvaluator; +import com.googlecode.aviator.AviatorEvaluatorInstance; +import com.googlecode.aviator.Expression; +import com.googlecode.aviator.Options; +import com.googlecode.aviator.exception.ExpressionRuntimeException; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; + +import java.util.List; +import java.util.Map; + + +@Slf4j +public class SplitFunction extends ProcessFunction<Event, Event> { + private final SplitConfig splitConfig; + private List<RuleContext> rules; + private transient InternalMetrics internalMetrics; + + public SplitFunction(SplitConfig splitConfig) { + this.splitConfig = splitConfig; + + } + + @Override + public void open(Configuration parameters) throws Exception { + + this.internalMetrics = new InternalMetrics(getRuntimeContext()); + this.rules = splitConfig.getRules(); + for(RuleContext rule : rules){ + String expression = rule.getExpression(); + AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); + instance.setCachedExpressionByDefault(true); + instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL); + instance.setFunctionMissing(null); + Expression compiledExp = instance.compile(expression, true); + rule.setCompiledExpression(compiledExp); + OutputTag<Event> outputTag = new OutputTag<>(rule.getName()){}; + rule.setOutputTag(outputTag); + } + } + + + @Override + public void processElement(Event event, ProcessFunction<Event, Event>.Context ctx, Collector<Event> out) throws Exception { + try { + internalMetrics.incrementInEvents(); + for (RuleContext route : rules){ + boolean result = route.getExpression() != null ? (filterExecute(route.getCompiledExpression(), route.getCompiledExpression().newEnv("event", event.getExtractedFields()))) : true; + if (result) { + ctx.output(route.getOutputTag(), event); + } + } + }catch (Exception e) { + internalMetrics.incrementErrorEvents(); + log.error("error in split function", e); + } + } + + public static Boolean filterExecute(Expression expression, Map<String, Object> map) { + + boolean result; + Object object = expression.execute(map); + if (object != null) { + result = (Boolean) object; + } else { + throw new ExpressionRuntimeException(); + } + return result; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitOperator.java b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitOperator.java new file mode 100644 index 0000000..f6d2c8c --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitOperator.java @@ -0,0 +1,29 @@ +package com.geedgenetworks.core.split; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.pojo.SplitConfig; +import org.apache.flink.streaming.api.datastream.DataStream; + +public class SplitOperator implements Split { + + @Override + public DataStream<Event> splitFunction( + DataStream<Event> dataStream, SplitConfig splitConfig) + throws Exception { + if (splitConfig.getParallelism() != 0) { + return dataStream + .process(new SplitFunction(splitConfig)) + .setParallelism(splitConfig.getParallelism()) + .name(splitConfig.getName()); + } else { + return dataStream + .process(new SplitFunction(splitConfig)) + .name(splitConfig.getName()); + } + } + @Override + public String type() { + return "split"; + } + +} diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.split.Split b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.split.Split new file mode 100644 index 0000000..500c367 --- /dev/null +++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.split.Split @@ -0,0 +1 @@ +com.geedgenetworks.core.split.SplitOperator
\ No newline at end of file |
