diff options
| author | wangkuan <[email protected]> | 2024-08-23 17:58:22 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-08-23 17:58:22 +0800 |
| commit | 215dd9aa1e4ec6a509d64c78ec414a8196dace3c (patch) | |
| tree | 5c038000b9de72684a1f8b8f1832394aed30cf13 /groot-bootstrap | |
| parent | 07332297c1306aa0dac649c7d15bf131e8edbc7e (diff) | |
[feature][core][common]GAL-646 Groot Stream支持Split Operator实现动态分流
Diffstat (limited to 'groot-bootstrap')
18 files changed, 584 insertions, 93 deletions
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java index d2b37f6..6f33cae 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java @@ -3,6 +3,7 @@ package com.geedgenetworks.bootstrap.enums; public enum ProcessorType { SOURCE("source"), FILTER("filter"), + SPLIT("split"), PREPROCESSING("preprocessing"), PROCESSING("processing"), POSTPROCESSING("postprocessing"), diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java index 64c66b6..7a55ffe 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java @@ -1,26 +1,26 @@ package com.geedgenetworks.bootstrap.execution; +import com.geedgenetworks.common.Event; import com.geedgenetworks.common.utils.ReflectionUtils; import com.geedgenetworks.core.filter.Filter; import com.geedgenetworks.core.processor.Processor; -import com.geedgenetworks.core.processor.aggregate.AggregateProcessor; -import com.geedgenetworks.core.processor.projection.ProjectionProcessor; +import com.geedgenetworks.core.split.Split; import com.typesafe.config.Config; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; + import java.net.URL; import java.net.URLClassLoader; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.ServiceLoader; +import java.util.*; import java.util.function.BiConsumer; public abstract class AbstractExecutor<K, V> - implements Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> { + implements Executor<DataStream<Event>, JobRuntimeEnvironment> { protected JobRuntimeEnvironment jobRuntimeEnvironment; protected final Config operatorConfig; protected final Map<K,V> operatorMap; protected final Map<String,Filter> filterMap = new HashMap<>(); + protected final Map<String, Split> splitMap = new HashMap<>(); protected final Map<String, Processor> processorMap = new HashMap<>(); protected AbstractExecutor(List<URL> jarPaths, Config operatorConfig) { @@ -30,6 +30,10 @@ public abstract class AbstractExecutor<K, V> for (Filter filter : filters) { this.filterMap.put(filter.type(), filter); } + ServiceLoader<Split> splits = ServiceLoader.load(Split.class); + for (Split split : splits) { + this.splitMap.put(split.type(), split); + } ServiceLoader<Processor> processors = ServiceLoader.load(Processor.class); for (Processor processor : processors) { this.processorMap.put(processor.type(), processor); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java index 66c0b0f..b0a04cd 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java @@ -2,6 +2,7 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson.JSONObject; import com.geedgenetworks.bootstrap.exception.JobExecuteException; +import com.geedgenetworks.common.Event; import com.geedgenetworks.common.config.*; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; @@ -13,6 +14,7 @@ import com.geedgenetworks.core.processor.table.TableProcessor; import com.geedgenetworks.core.processor.aggregate.AggregateProcessor; import com.geedgenetworks.core.processor.projection.ProjectionProcessor; import com.typesafe.config.Config; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; @@ -27,7 +29,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, } @Override - public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { + public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { ProcessorConfig processorConfig = operatorMap.get(node.getName()); switch (processorConfig.getType()) { @@ -45,7 +47,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, } return dataStream; } - protected SingleOutputStreamOperator executeTableProcessor(SingleOutputStreamOperator dataStream, Node node, TableConfig tableConfig) throws JobExecuteException { + protected DataStream<Event> executeTableProcessor(DataStream<Event> dataStream, Node node, TableConfig tableConfig) throws JobExecuteException { TableProcessor tableProcessor; if (processorMap.containsKey(tableConfig.getType())) { @@ -63,15 +65,15 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, tableConfig.setParallelism(node.getParallelism()); } try { - dataStream = - tableProcessor.processorFunction( - dataStream, tableConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig()); + + dataStream = tableProcessor.processorFunction( + dataStream, tableConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig()); } catch (Exception e) { throw new JobExecuteException("Create orderby pipeline instance failed!", e); } return dataStream; } - protected SingleOutputStreamOperator executeAggregateProcessor(SingleOutputStreamOperator dataStream, Node node, AggregateConfig aggregateConfig) throws JobExecuteException { + protected DataStream<Event> executeAggregateProcessor(DataStream<Event> dataStream, Node node, AggregateConfig aggregateConfig) throws JobExecuteException { AggregateProcessor aggregateProcessor; if (processorMap.containsKey(aggregateConfig.getType())) { @@ -98,7 +100,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, return dataStream; } - protected SingleOutputStreamOperator executeProjectionProcessor(SingleOutputStreamOperator dataStream, Node node, ProjectionConfig projectionConfig) throws JobExecuteException { + protected DataStream<Event> executeProjectionProcessor(DataStream<Event> dataStream, Node node, ProjectionConfig projectionConfig) throws JobExecuteException { ProjectionProcessor projectionProcessor; if (processorMap.containsKey(projectionConfig.getType())) { diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java index 506aa11..66f0585 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject; import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; import com.geedgenetworks.common.config.CheckConfigUtil; import com.geedgenetworks.common.config.CheckResult; import com.geedgenetworks.common.config.FilterConfigOptions; @@ -14,6 +15,7 @@ import com.geedgenetworks.core.pojo.FilterConfig; import com.google.common.collect.Maps; import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; @@ -54,7 +56,7 @@ public class FilterExecutor extends AbstractExecutor<String, FilterConfig> { } @Override - public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { + public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { FilterConfig filterConfig = operatorMap.get(node.getName()); String className = filterConfig.getType(); Filter filter; diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java index 2eabefa..22b23a7 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java @@ -5,15 +5,17 @@ import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.main.GrootStreamRunner; import com.geedgenetworks.common.Constants; -import com.geedgenetworks.common.config.GrootStreamConfig; import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.config.GrootStreamConfig; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.typesafe.config.Config; import com.typesafe.config.ConfigUtil; import com.typesafe.config.ConfigValueFactory; import lombok.extern.slf4j.Slf4j; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.util.OutputTag; import java.io.File; import java.net.MalformedURLException; @@ -27,17 +29,18 @@ import java.util.stream.Stream; public class JobExecution { private final JobRuntimeEnvironment jobRuntimeEnvironment; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sourceExecutor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sinkExecutor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> filterExecutor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> preprocessingExecutor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> processingExecutor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> postprocessingExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> sourceExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> sinkExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> filterExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> splitExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> preprocessingExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> processingExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor; private final List<Node> nodes; - private final List<URL> jarPaths; + public JobExecution(Config config, GrootStreamConfig grootStreamConfig) { - try { + try { jarPaths = new ArrayList<>(Collections.singletonList(new File(StartBuilder.appBootstrapDir() .resolve(GrootStreamRunner.APP_JAR_NAME).toString()) .toURI().toURL())); @@ -50,6 +53,7 @@ public class JobExecution { this.sourceExecutor = new SourceExecutor(jarPaths, config); this.sinkExecutor = new SinkExecutor(jarPaths, config); this.filterExecutor = new FilterExecutor(jarPaths, config); + this.splitExecutor = new SplitExecutor(jarPaths, config); this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, config); this.processingExecutor = new ProcessingExecutor(jarPaths, config); this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, config); @@ -59,6 +63,7 @@ public class JobExecution { this.sourceExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.sinkExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.filterExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.splitExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); @@ -69,7 +74,7 @@ public class JobExecution { private void registerPlugin(Config appConfig) { List<Path> thirdPartyJars = new ArrayList<>(); Config envConfig = appConfig.getConfig(Constants.APPLICATION_ENV); - if(envConfig.hasPath(ExecutionConfigKeyName.JARS)) { + if (envConfig.hasPath(ExecutionConfigKeyName.JARS)) { thirdPartyJars = new ArrayList<>(StartBuilder .getThirdPartyJars(envConfig.getString(ExecutionConfigKeyName.JARS))); } @@ -81,7 +86,7 @@ public class JobExecution { .map(uri -> { try { return uri.toURL(); - }catch (MalformedURLException e){ + } catch (MalformedURLException e) { throw new RuntimeException("the uri of jar illegal: " + uri, e); } }).collect(Collectors.toList()); @@ -93,10 +98,10 @@ public class JobExecution { } - private Config registerPlugin(Config config , List<URL> jars) { - config = this.injectJarsToConfig( - config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_JARS), jars); - return this.injectJarsToConfig( + private Config registerPlugin(Config config, List<URL> jars) { + config = this.injectJarsToConfig( + config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_JARS), jars); + return this.injectJarsToConfig( config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_CLASSPATHS), jars); } @@ -127,7 +132,7 @@ public class JobExecution { .collect(Collectors.toSet()); paths.addAll(validJars); - config = config.withValue( + config = config.withValue( path, ConfigValueFactory.fromAnyRef( paths.stream() @@ -150,8 +155,9 @@ public class JobExecution { private List<Node> buildJobNode(Config config) { Map<String, Object> sources = Maps.newHashMap(); - Map<String, Object> sinks =Maps.newHashMap(); + Map<String, Object> sinks = Maps.newHashMap(); Map<String, Object> filters = Maps.newHashMap(); + Map<String, Object> splits = Maps.newHashMap(); Map<String, Object> preprocessingPipelines = Maps.newHashMap(); Map<String, Object> processingPipelines = Maps.newHashMap(); Map<String, Object> postprocessingPipelines = Maps.newHashMap(); @@ -160,11 +166,14 @@ public class JobExecution { sources = config.getConfig(Constants.SOURCES).root().unwrapped(); } if (config.hasPath(Constants.SINKS)) { - sinks =config.getConfig(Constants.SINKS).root().unwrapped(); + sinks = config.getConfig(Constants.SINKS).root().unwrapped(); } if (config.hasPath(Constants.FILTERS)) { filters = config.getConfig(Constants.FILTERS).root().unwrapped(); } + if (config.hasPath(Constants.SPLITS)) { + splits = config.getConfig(Constants.SPLITS).root().unwrapped(); + } if (config.hasPath(Constants.PREPROCESSING_PIPELINES)) { preprocessingPipelines = config.getConfig(Constants.PREPROCESSING_PIPELINES).root().unwrapped(); } @@ -184,13 +193,15 @@ public class JobExecution { nodes.add(node); }); - for(Node node : nodes) { + for (Node node : nodes) { if (sources.containsKey(node.getName())) { node.setType(ProcessorType.SOURCE); } else if (sinks.containsKey(node.getName())) { node.setType(ProcessorType.SINK); } else if (filters.containsKey(node.getName())) { node.setType(ProcessorType.FILTER); + } else if (splits.containsKey(node.getName())) { + node.setType(ProcessorType.SPLIT); } else if (preprocessingPipelines.containsKey(node.getName())) { node.setType(ProcessorType.PREPROCESSING); } else if (processingPipelines.containsKey(node.getName())) { @@ -214,12 +225,12 @@ public class JobExecution { List<Node> sourceNodes = nodes .stream().filter(v -> v.getType().name().equals(ProcessorType.SOURCE.name())).collect(Collectors.toList()); - SingleOutputStreamOperator<Event> singleOutputStreamOperator = null; + DataStream<Event> dataStream = null; - for(Node sourceNode : sourceNodes) { - singleOutputStreamOperator = sourceExecutor.execute(singleOutputStreamOperator, sourceNode); + for (Node sourceNode : sourceNodes) { + dataStream = sourceExecutor.execute(dataStream, sourceNode); for (String nodeName : sourceNode.getDownstream()) { - buildJobGraph(singleOutputStreamOperator, nodeName); + buildJobGraph(dataStream, nodeName); } } @@ -230,7 +241,7 @@ public class JobExecution { log.info("Execution Job: {}", jobRuntimeEnvironment.getJobName()); jobRuntimeEnvironment.getStreamExecutionEnvironment().execute(jobRuntimeEnvironment.getJobName()); final long jobEndTime = System.currentTimeMillis(); - log.info("Job finished, execution duration: {} ms", jobEndTime-jobStartTime); + log.info("Job finished, execution duration: {} ms", jobEndTime - jobStartTime); } catch (Exception e) { throw new JobExecuteException("Execute job error", e); @@ -238,34 +249,62 @@ public class JobExecution { } - private void buildJobGraph(SingleOutputStreamOperator<Event> singleOutputStreamOperator, String downstreamNodeName) { + private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) { Node node = getNode(downstreamNodeName).orElseGet(() -> { throw new JobExecuteException("can't find downstream node " + downstreamNodeName); }); if (node.getType().name().equals(ProcessorType.FILTER.name())) { - singleOutputStreamOperator = filterExecutor.execute(singleOutputStreamOperator, node); + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = filterExecutor.execute(dataStream, node); + } + } else if (node.getType().name().equals(ProcessorType.SPLIT.name())) { + dataStream = splitExecutor.execute(dataStream, node); + } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) { - singleOutputStreamOperator = preprocessingExecutor.execute(singleOutputStreamOperator, node); + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = preprocessingExecutor.execute(dataStream, node); + } } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) { - singleOutputStreamOperator = processingExecutor.execute(singleOutputStreamOperator, node); + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = processingExecutor.execute(dataStream, node); + } } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) { - singleOutputStreamOperator = postprocessingExecutor.execute(singleOutputStreamOperator, node); + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = postprocessingExecutor.execute(dataStream, node); + } } else if (node.getType().name().equals(ProcessorType.SINK.name())) { - singleOutputStreamOperator = sinkExecutor.execute(singleOutputStreamOperator, node); + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = sinkExecutor.execute(dataStream, node); + } } else { throw new JobExecuteException("unsupported process type " + node.getType().name()); } for (String nodeName : node.getDownstream()) { - buildJobGraph(singleOutputStreamOperator, nodeName); + buildJobGraph(dataStream, nodeName); } } private Optional<Node> getNode(String name) { - return nodes.stream().filter(v-> v.getName().equals(name)).findFirst(); + return nodes.stream().filter(v -> v.getName().equals(name)).findFirst(); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java index 7141f5e..2e68098 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java @@ -25,9 +25,7 @@ import org.apache.flink.util.TernaryBoolean; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URL; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; +import java.util.*; import java.util.stream.Collectors; @Slf4j @@ -37,6 +35,8 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ private GrootStreamConfig grootStreamConfig; private StreamExecutionEnvironment environment; private String jobName = Constants.DEFAULT_JOB_NAME; + private Set<String> splitSet = new HashSet<>(); + private JobRuntimeEnvironment(Config config, GrootStreamConfig grootStreamConfig) { this.grootStreamConfig = grootStreamConfig; @@ -197,6 +197,14 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } } + public Set<String> getSplitSet() { + return splitSet; + } + + public void setSplitSet(Set<String> splitSet) { + this.splitSet = splitSet; + } + private void setCheckpoint() { long interval = 0; if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_INTERVAL)) { @@ -272,6 +280,7 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ + } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java index 36fad61..b9555b4 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java @@ -3,9 +3,11 @@ package com.geedgenetworks.bootstrap.execution; import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.ProcessorConfig; import com.google.common.collect.Maps; import com.typesafe.config.Config; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; @@ -36,7 +38,7 @@ public class PostprocessingExecutor extends AbstractProcessorExecutor { @Override - public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { + public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { return super.execute(dataStream, node); } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java index b1e53e4..a7b9e5e 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java @@ -3,10 +3,12 @@ package com.geedgenetworks.bootstrap.execution; import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.ProcessorConfig; import com.google.common.collect.Maps; import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; @@ -37,7 +39,8 @@ public class PreprocessingExecutor extends AbstractProcessorExecutor { } @Override - public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { + public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { + return super.execute(dataStream, node); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java index d69fe8c..f6788ed 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java @@ -3,9 +3,11 @@ package com.geedgenetworks.bootstrap.execution; import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.ProcessorConfig; import com.google.common.collect.Maps; import com.typesafe.config.Config; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; @@ -35,7 +37,7 @@ public class ProcessingExecutor extends AbstractProcessorExecutor { } @Override - public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { + public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { return super.execute(dataStream, node); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java index a0bedc9..70934b8 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java @@ -5,6 +5,7 @@ import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.utils.SchemaConfigParse; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; import com.geedgenetworks.common.config.CheckConfigUtil; import com.geedgenetworks.common.config.CheckResult; import com.geedgenetworks.common.config.SinkConfigOptions; @@ -20,6 +21,7 @@ import com.google.common.collect.Maps; import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -63,7 +65,7 @@ public class SinkExecutor extends AbstractExecutor<String, SinkConfig> { } @Override - public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { + public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { SinkConfig sinkConfig = operatorMap.get(node.getName()); try { SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, sinkConfig.getType()); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java index d4751af..9dff6b0 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java @@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; @@ -67,7 +68,7 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> { } @Override - public SingleOutputStreamOperator execute(SingleOutputStreamOperator outputStreamOperator, Node node) throws JobExecuteException { + public DataStream<Event> execute(DataStream<Event> outputStreamOperator, Node node) throws JobExecuteException { SourceConfig sourceConfig = operatorMap.get(node.getName()); SingleOutputStreamOperator sourceSingleOutputStreamOperator; try { @@ -152,4 +153,5 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> { return 0; } } + } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java new file mode 100644 index 0000000..c0ac3a5 --- /dev/null +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java @@ -0,0 +1,95 @@ +package com.geedgenetworks.bootstrap.execution; + +import com.alibaba.fastjson.JSONObject; +import com.geedgenetworks.bootstrap.exception.JobExecuteException; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.config.CheckConfigUtil; +import com.geedgenetworks.common.config.CheckResult; +import com.geedgenetworks.common.config.RouteConfigOptions; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.ConfigValidationException; +import com.geedgenetworks.common.udf.RuleContext; +import com.geedgenetworks.core.pojo.SplitConfig; +import com.geedgenetworks.core.split.Split; +import com.google.common.collect.Maps; +import com.typesafe.config.Config; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; + +import java.net.URL; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + + +/** + * Initialize config and execute filter operator + */ +@Slf4j +public class SplitExecutor extends AbstractExecutor<String, SplitConfig> { + + + public SplitExecutor(List<URL> jarPaths, Config config) { + super(jarPaths, config); + } + + @Override + protected Map<String, SplitConfig> initialize(List<URL> jarPaths, Config operatorConfig) { + Map<String, SplitConfig> routeConfigMap = Maps.newHashMap(); + if (operatorConfig.hasPath(Constants.SPLITS)) { + Config routes = operatorConfig.getConfig(Constants.SPLITS); + routes.root().unwrapped().forEach((key, value) -> { + CheckResult result = CheckConfigUtil.checkAllExists(routes.getConfig(key), + RouteConfigOptions.TYPE.key()); + if (!result.isSuccess()) { + throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( + "split: %s, Message: %s", + key, result.getMsg())); + } + SplitConfig splitConfig = new JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class); + splitConfig.setName(key); + routeConfigMap.put(key, splitConfig); + }); + } + + return routeConfigMap; + } + + @Override + public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { + SplitConfig splitConfig = operatorMap.get(node.getName()); + String className = splitConfig.getType(); + Split split; + if (splitMap.containsKey(splitConfig.getType())) { + + split = splitMap.get(splitConfig.getType()); + } else { + Class cls; + try { + cls = Class.forName(className); + split = (Split) cls.newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { + throw new JobExecuteException("get split instance failed!", e); + } + } + if (node.getParallelism() > 0) { + splitConfig.setParallelism(node.getParallelism()); + } + for(RuleContext ruleContext:splitConfig.getRules()) { + jobRuntimeEnvironment.getSplitSet().add(ruleContext.getName()); + } + try { + dataStream = + split.splitFunction( + dataStream, splitConfig); + } catch (Exception e) { + throw new JobExecuteException("Create split instance failed!", e); + } + return dataStream; + } + +} diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java index 8b299df..7e995cf 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java @@ -16,7 +16,9 @@ import com.typesafe.config.ConfigUtil; import com.typesafe.config.ConfigValueFactory; import lombok.Data; import lombok.extern.slf4j.Slf4j; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.util.OutputTag; import java.io.File; import java.net.MalformedURLException; @@ -33,12 +35,13 @@ import java.util.stream.Stream; public class JobExecutionTest { protected final JobRuntimeEnvironment jobRuntimeEnvironment; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sourceExecuteProcessor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> filterExecuteProcessor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> preprocessingExecuteProcessor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> processingExecuteProcessor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> postprocessingExecuteProcessor; - private final Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> sinkExecuteProcessor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> sourceExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> filterExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> preprocessingExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> splitExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> processingExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor; + private final Executor<DataStream<Event>, JobRuntimeEnvironment> sinkExecutor; private final List<Node> nodes; @@ -66,21 +69,22 @@ public class JobExecutionTest { } registerPlugin(config.getConfig(Constants.APPLICATION)); - this.sourceExecuteProcessor = new SourceExecutor(jarPaths, config); - this.sinkExecuteProcessor = new SinkExecutor(jarPaths, config); - this.filterExecuteProcessor = new FilterExecutor(jarPaths, config); - this.preprocessingExecuteProcessor = new PreprocessingExecutor(jarPaths, config); - this.processingExecuteProcessor = new ProcessingExecutor(jarPaths, config); - this.postprocessingExecuteProcessor = new PostprocessingExecutor(jarPaths, config); + this.sourceExecutor = new SourceExecutor(jarPaths, config); + this.sinkExecutor = new SinkExecutor(jarPaths, config); + this.splitExecutor = new SplitExecutor(jarPaths, config); + this.filterExecutor = new FilterExecutor(jarPaths, config); + this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, config); + this.processingExecutor = new ProcessingExecutor(jarPaths, config); + this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, config); this.jobRuntimeEnvironment = JobRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths), grootStreamConfig); - - this.sourceExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.sinkExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.filterExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.preprocessingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.processingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.postprocessingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.splitExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.sourceExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.sinkExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.filterExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.nodes = buildJobNode(config); } @@ -173,6 +177,7 @@ public class JobExecutionTest { Map<String, Object> sources = Maps.newHashMap(); Map<String, Object> sinks =Maps.newHashMap(); Map<String, Object> filters = Maps.newHashMap(); + Map<String, Object> splits = Maps.newHashMap(); Map<String, Object> preprocessingPipelines = Maps.newHashMap(); Map<String, Object> processingPipelines = Maps.newHashMap(); Map<String, Object> postprocessingPipelines = Maps.newHashMap(); @@ -186,6 +191,9 @@ public class JobExecutionTest { if (config.hasPath(Constants.FILTERS)) { filters = config.getConfig(Constants.FILTERS).root().unwrapped(); } + if (config.hasPath(Constants.SPLITS)) { + splits = config.getConfig(Constants.SPLITS).root().unwrapped(); + } if (config.hasPath(Constants.PREPROCESSING_PIPELINES)) { preprocessingPipelines = config.getConfig(Constants.PREPROCESSING_PIPELINES).root().unwrapped(); } @@ -210,6 +218,8 @@ public class JobExecutionTest { node.setType(ProcessorType.SOURCE); } else if (sinks.containsKey(node.getName())) { node.setType(ProcessorType.SINK); + } else if (splits.containsKey(node.getName())) { + node.setType(ProcessorType.SPLIT); } else if (filters.containsKey(node.getName())) { node.setType(ProcessorType.FILTER); } else if (preprocessingPipelines.containsKey(node.getName())) { @@ -228,15 +238,15 @@ public class JobExecutionTest { } - public SingleOutputStreamOperator<Event> getSingleOutputStreamOperator() throws JobExecuteException { + public DataStream<Event> getSingleOutputStreamOperator() throws JobExecuteException { List<Node> sourceNodes = nodes .stream().filter(v -> v.getType().name().equals(ProcessorType.SOURCE.name())).collect(Collectors.toList()); - SingleOutputStreamOperator<Event> singleOutputStreamOperator = null; + DataStream<Event> singleOutputStreamOperator = null; for(Node sourceNode : sourceNodes) { - singleOutputStreamOperator = sourceExecuteProcessor.execute(singleOutputStreamOperator, sourceNode); + singleOutputStreamOperator = sourceExecutor.execute(singleOutputStreamOperator, sourceNode); for (String nodeName : sourceNode.getDownstream()) { buildJobGraph(singleOutputStreamOperator, nodeName); } @@ -247,27 +257,55 @@ public class JobExecutionTest { } - private void buildJobGraph(SingleOutputStreamOperator<Event> singleOutputStreamOperator, String downstreamNodeName) { + private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) { Node node = getNode(downstreamNodeName).orElseGet(() -> { throw new JobExecuteException("can't find downstream node " + downstreamNodeName); }); if (node.getType().name().equals(ProcessorType.FILTER.name())) { - singleOutputStreamOperator = filterExecuteProcessor.execute(singleOutputStreamOperator, node); + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = filterExecutor.execute(dataStream, node); + } + } else if (node.getType().name().equals(ProcessorType.SPLIT.name())) { + dataStream = splitExecutor.execute(dataStream, node); + } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) { - singleOutputStreamOperator = preprocessingExecuteProcessor.execute(singleOutputStreamOperator, node); + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = preprocessingExecutor.execute(dataStream, node); + } } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) { - singleOutputStreamOperator = processingExecuteProcessor.execute(singleOutputStreamOperator, node); + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = processingExecutor.execute(dataStream, node); + } } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) { - singleOutputStreamOperator = postprocessingExecuteProcessor.execute(singleOutputStreamOperator, node); + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = postprocessingExecutor.execute(dataStream, node); + } } else if (node.getType().name().equals(ProcessorType.SINK.name())) { - singleOutputStreamOperator = sinkExecuteProcessor.execute(singleOutputStreamOperator, node); - } else { + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + }), node); + } else { + dataStream = sinkExecutor.execute(dataStream, node); + } + } else { throw new JobExecuteException("unsupported process type " + node.getType().name()); } for (String nodeName : node.getDownstream()) { - buildJobGraph(singleOutputStreamOperator, nodeName); + buildJobGraph(dataStream, nodeName); } diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java new file mode 100644 index 0000000..dfe0600 --- /dev/null +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java @@ -0,0 +1,107 @@ +package com.geedgenetworks.bootstrap.main.simple; + +import cn.hutool.setting.yaml.YamlUtil; +import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs; +import com.geedgenetworks.bootstrap.enums.EngineType; +import com.geedgenetworks.bootstrap.exception.JobExecuteException; +import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName; +import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink; +import com.geedgenetworks.bootstrap.utils.CommandLineUtils; +import com.geedgenetworks.bootstrap.utils.ConfigFileUtils; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.config.ConfigProvider; +import com.geedgenetworks.common.config.GrootStreamConfig; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigObject; +import com.typesafe.config.ConfigUtil; +import com.typesafe.config.ConfigValueFactory; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; + +import java.nio.file.Path; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertTrue; + + +public class JobSplitTest { + + @ClassRule + public static MiniClusterWithClientResource flinkCluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberSlotsPerTaskManager(1) + .setNumberTaskManagers(1) + .build()); + + @Test + public void testSplit() { + + CollectSink.values.clear(); + String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_test.yaml"}; + ExecuteCommandArgs executeCommandArgs = CommandLineUtils + .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); + + executeCommandArgs.buildCommand(); + + + GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig(); + Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs); + // check config file exist + Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString()); + ConfigObject configObject = ConfigValueFactory.fromMap(configMap); + Config config = configObject.toConfig(); + + config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE), + ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); + + + JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig); + jobExecution.getSingleOutputStreamOperator(); + + try { + jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute(); + } catch (Exception e) { + throw new JobExecuteException("Job executed error", e); + } + Assert.assertEquals(7, CollectSink.values.size()); + } + @Test + public void testSplitForAgg() { + + CollectSink.values.clear(); + String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_agg_test.yaml"}; + ExecuteCommandArgs executeCommandArgs = CommandLineUtils + .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); + + executeCommandArgs.buildCommand(); + + + GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig(); + Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs); + // check config file exist + Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString()); + ConfigObject configObject = ConfigValueFactory.fromMap(configMap); + Config config = configObject.toConfig(); + + config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE), + ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); + + + JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig); + jobExecution.getSingleOutputStreamOperator(); + + try { + jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute(); + } catch (Exception e) { + throw new JobExecuteException("Job executed error", e); + } + Assert.assertEquals(1, CollectSink.values.size()); + Assert.assertEquals("2", CollectSink.values.get(0).getExtractedFields().get("sessions").toString()); + + } +} diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java index c4f54a3..556c8c4 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java @@ -75,16 +75,11 @@ public class SimpleJobTest { assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("recv_time").toString())); assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("processing_time").toString())); assertTrue(0 != Long.parseLong(CollectSink.values.get(0).getExtractedFields().get("log_id").toString())); - Assert.assertEquals("印第安纳州", CollectSink.values.get(0).getExtractedFields().get("server_super_admin_area").toString()); - Assert.assertEquals("6167", CollectSink.values.get(0).getExtractedFields().get("server_asn").toString()); - Assert.assertEquals("美国", CollectSink.values.get(0).getExtractedFields().get("server_country_region").toString()); - Assert.assertTrue(!CollectSink.values.get(0).getExtractedFields().containsKey("client_country_region")); Assert.assertEquals("http://192.168.44.12:9098/hos/traffic_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString()); Assert.assertEquals("[2600:1015:b002::,255.255.255.255]", CollectSink.values.get(0).getExtractedFields().get("ip_string").toString()); Assert.assertEquals("hello", CollectSink.values.get(0).getExtractedFields().get("mail_attachment_name").toString()); Assert.assertEquals("MULTIPATH_ETHERNET", CollectSink.values.get(0).getExtractedFields().get("tunnels_schema_type").toString()); List<String> asn_list = (List<String>) CollectSink.values.get(0).getExtractedFields().get("asn_list"); - Assert.assertEquals("6167", asn_list.get(0)); } @@ -121,4 +116,5 @@ public class SimpleJobTest { } Assert.assertEquals(4, CollectSink.values.size()); } + } diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java index dfcb459..c5806ed 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java @@ -3,9 +3,7 @@ package com.geedgenetworks.bootstrap.main.simple.collect; import com.geedgenetworks.common.Event; import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.util.*; public class CollectSink implements SinkFunction<Event> { diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml new file mode 100644 index 0000000..732d0f6 --- /dev/null +++ b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml @@ -0,0 +1,92 @@ +sources: + inline_source: + type : inline + fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. + properties: + data: '[{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' + interval.per.row: 1s # 可选 + repeat.count: 1 # 可选 + format: json + json.ignore.parse.errors: false +sinks: + collect_sink: + type: collect + properties: + format: json +splits: + test_split: + type: split + rules: + - name: aggregate_processor + expression: event.decoded_as == 'DNS' + +postprocessing_pipelines: + pre_etl_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: [fields,tags] + output_fields: + functions: # [array of object] Function List + + - function: FLATTEN + lookup_fields: [ fields,tags ] + output_fields: [ ] + parameters: + #prefix: "" + depth: 3 + # delimiter: "." + + - function: RENAME + lookup_fields: [ '' ] + output_fields: [ '' ] + filter: + parameters: + # parent_fields: [tags] + # rename_fields: + # tags: tags + rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key; + + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ timestamp_ms ] + output_fields: [ recv_time ] + parameters: + precision: seconds + interval: 300 + # + + aggregate_processor: + type: aggregate + group_by_fields: [decoded_as] + window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time + window_size: 3 + window_timestamp_field: test_time + functions: + - function: NUMBER_SUM + lookup_fields: [ sessions ] + + table_processor: + type: table + functions: + - function: JSON_UNROLL + lookup_fields: [ encapsulation ] + output_fields: [ new_name ] + +application: # [object] Application Configuration + env: # [object] Environment Variables + name: groot-stream-job # [string] Job Name + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: # [array of object] Node List. It will be used build data flow for job dag graph. + - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. + parallelism: 1 # [number] Operator-Level Parallelism. + downstream: [test_split] + - name: test_split + parallelism: 1 + downstream: [ aggregate_processor ] + - name: aggregate_processor + parallelism: 1 + downstream: [ collect_sink ] + - name: collect_sink + parallelism: 1 + + diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml new file mode 100644 index 0000000..f13d69e --- /dev/null +++ b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml @@ -0,0 +1,97 @@ +sources: + inline_source: + type : inline + fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. + properties: + data: '[{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' + interval.per.row: 1s # 可选 + repeat.count: 1 # 可选 + format: json + json.ignore.parse.errors: false +sinks: + collect_sink: + type: collect + properties: + format: json +splits: + test_split: + type: split + rules: + - name: table_processor + expression: event.decoded_as == 'HTTP' + - name: pre_etl_processor + expression: event.decoded_as == 'DNS' + +postprocessing_pipelines: + pre_etl_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: [fields,tags] + output_fields: + functions: # [array of object] Function List + + - function: FLATTEN + lookup_fields: [ fields,tags ] + output_fields: [ ] + parameters: + #prefix: "" + depth: 3 + # delimiter: "." + + - function: RENAME + lookup_fields: [ '' ] + output_fields: [ '' ] + filter: + parameters: + # parent_fields: [tags] + # rename_fields: + # tags: tags + rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key; + + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ timestamp_ms ] + output_fields: [ recv_time ] + parameters: + precision: seconds + interval: 300 + # + + aggregate_processor: + type: aggregate + group_by_fields: [decoded_as] + window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time + window_size: 3 + window_timestamp_field: test_time + functions: + - function: NUMBER_SUM + lookup_fields: [ sessions ] + + table_processor: + type: table + functions: + - function: JSON_UNROLL + lookup_fields: [ encapsulation ] + output_fields: [ new_name ] + +application: # [object] Application Configuration + env: # [object] Environment Variables + name: groot-stream-job # [string] Job Name + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: # [array of object] Node List. It will be used build data flow for job dag graph. + - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. + parallelism: 1 # [number] Operator-Level Parallelism. + downstream: [test_split,collect_sink] + - name: test_split + parallelism: 1 + downstream: [ table_processor,pre_etl_processor ] + - name: pre_etl_processor + parallelism: 1 + downstream: [ collect_sink ] + - name: table_processor + parallelism: 1 + downstream: [ collect_sink ] + - name: collect_sink + parallelism: 1 + + |
