From 215dd9aa1e4ec6a509d64c78ec414a8196dace3c Mon Sep 17 00:00:00 2001 From: wangkuan Date: Fri, 23 Aug 2024 17:58:22 +0800 Subject: [feature][core][common]GAL-646 Groot Stream支持Split Operator实现动态分流 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/grootstream_job_example.yaml | 19 +++- .../bootstrap/enums/ProcessorType.java | 1 + .../bootstrap/execution/AbstractExecutor.java | 18 ++-- .../execution/AbstractProcessorExecutor.java | 16 +-- .../bootstrap/execution/FilterExecutor.java | 4 +- .../bootstrap/execution/JobExecution.java | 103 ++++++++++++++------ .../bootstrap/execution/JobRuntimeEnvironment.java | 15 ++- .../execution/PostprocessingExecutor.java | 4 +- .../bootstrap/execution/PreprocessingExecutor.java | 5 +- .../bootstrap/execution/ProcessingExecutor.java | 4 +- .../bootstrap/execution/SinkExecutor.java | 4 +- .../bootstrap/execution/SourceExecutor.java | 4 +- .../bootstrap/execution/SplitExecutor.java | 95 ++++++++++++++++++ .../bootstrap/main/simple/JobExecutionTest.java | 98 +++++++++++++------ .../bootstrap/main/simple/JobSplitTest.java | 107 +++++++++++++++++++++ .../bootstrap/main/simple/SimpleJobTest.java | 6 +- .../bootstrap/main/simple/collect/CollectSink.java | 4 +- .../resources/grootstream_job_split_agg_test.yaml | 92 ++++++++++++++++++ .../test/resources/grootstream_job_split_test.yaml | 97 +++++++++++++++++++ .../java/com/geedgenetworks/common/Constants.java | 2 +- .../common/config/RouteConfigOptions.java | 18 ++++ .../com/geedgenetworks/common/udf/RuleContext.java | 19 ++++ .../geedgenetworks/core/filter/AviatorFilter.java | 5 +- .../com/geedgenetworks/core/filter/Filter.java | 5 +- .../com/geedgenetworks/core/pojo/SplitConfig.java | 17 ++++ .../geedgenetworks/core/processor/Processor.java | 5 +- .../aggregate/AggregateProcessorImpl.java | 23 +++-- .../projection/ProjectionProcessorImpl.java | 9 +- .../core/processor/table/TableProcessorImpl.java | 14 +-- .../java/com/geedgenetworks/core/split/Split.java | 16 +++ .../geedgenetworks/core/split/SplitFunction.java | 79 +++++++++++++++ .../geedgenetworks/core/split/SplitOperator.java | 29 ++++++ .../services/com.geedgenetworks.core.split.Split | 1 + 33 files changed, 813 insertions(+), 125 deletions(-) create mode 100644 groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java create mode 100644 groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java create mode 100644 groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml create mode 100644 groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml create mode 100644 groot-common/src/main/java/com/geedgenetworks/common/config/RouteConfigOptions.java create mode 100644 groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/pojo/SplitConfig.java create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/split/Split.java create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/split/SplitOperator.java create mode 100644 groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.split.Split diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml index 4726af0..46d1123 100644 --- a/config/grootstream_job_example.yaml +++ b/config/grootstream_job_example.yaml @@ -11,7 +11,14 @@ filters: type: aviator properties: expression: event.server_ip != '12.12.12.12' - +splits: + decoded_as_split: + type: split + rules: + - name: projection_processor + expression: event.decoded_as == 'HTTP' + - name: aggregate_processor + expression: event.decoded_as == 'DNS' processing_pipelines: projection_processor: type: projection @@ -25,7 +32,7 @@ processing_pipelines: group_by_fields: [server_ip,server_port] window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time window_timestamp_field: recv_time - window_size: 60 + window_size: 6 window_slide: 10 #滑动窗口步长 functions: - function: NUMBER_SUM @@ -65,10 +72,12 @@ application: strategy: none topology: - name: inline_source - downstream: [filter_operator] - - name: filter_operator - downstream: [ projection_processor ] + downstream: [decoded_as_split] + - name: decoded_as_split + downstream: [ projection_processor ,aggregate_processor] - name: projection_processor downstream: [ print_sink ] + - name: aggregate_processor + downstream: [ print_sink ] - name: print_sink downstream: [] \ No newline at end of file diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java index d2b37f6..6f33cae 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java @@ -3,6 +3,7 @@ package com.geedgenetworks.bootstrap.enums; public enum ProcessorType { SOURCE("source"), FILTER("filter"), + SPLIT("split"), PREPROCESSING("preprocessing"), PROCESSING("processing"), POSTPROCESSING("postprocessing"), diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java index 64c66b6..7a55ffe 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java @@ -1,26 +1,26 @@ package com.geedgenetworks.bootstrap.execution; +import com.geedgenetworks.common.Event; import com.geedgenetworks.common.utils.ReflectionUtils; import com.geedgenetworks.core.filter.Filter; import com.geedgenetworks.core.processor.Processor; -import com.geedgenetworks.core.processor.aggregate.AggregateProcessor; -import com.geedgenetworks.core.processor.projection.ProjectionProcessor; +import com.geedgenetworks.core.split.Split; import com.typesafe.config.Config; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; + import java.net.URL; import java.net.URLClassLoader; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.ServiceLoader; +import java.util.*; import java.util.function.BiConsumer; public abstract class AbstractExecutor - implements Executor { + implements Executor, JobRuntimeEnvironment> { protected JobRuntimeEnvironment jobRuntimeEnvironment; protected final Config operatorConfig; protected final Map operatorMap; protected final Map filterMap = new HashMap<>(); + protected final Map splitMap = new HashMap<>(); protected final Map processorMap = new HashMap<>(); protected AbstractExecutor(List jarPaths, Config operatorConfig) { @@ -30,6 +30,10 @@ public abstract class AbstractExecutor for (Filter filter : filters) { this.filterMap.put(filter.type(), filter); } + ServiceLoader splits = ServiceLoader.load(Split.class); + for (Split split : splits) { + this.splitMap.put(split.type(), split); + } ServiceLoader processors = ServiceLoader.load(Processor.class); for (Processor processor : processors) { this.processorMap.put(processor.type(), processor); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java index 66c0b0f..b0a04cd 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java @@ -2,6 +2,7 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson.JSONObject; import com.geedgenetworks.bootstrap.exception.JobExecuteException; +import com.geedgenetworks.common.Event; import com.geedgenetworks.common.config.*; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; @@ -13,6 +14,7 @@ import com.geedgenetworks.core.processor.table.TableProcessor; import com.geedgenetworks.core.processor.aggregate.AggregateProcessor; import com.geedgenetworks.core.processor.projection.ProjectionProcessor; import com.typesafe.config.Config; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; @@ -27,7 +29,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor execute(DataStream dataStream, Node node) throws JobExecuteException { ProcessorConfig processorConfig = operatorMap.get(node.getName()); switch (processorConfig.getType()) { @@ -45,7 +47,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor executeTableProcessor(DataStream dataStream, Node node, TableConfig tableConfig) throws JobExecuteException { TableProcessor tableProcessor; if (processorMap.containsKey(tableConfig.getType())) { @@ -63,15 +65,15 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor executeAggregateProcessor(DataStream dataStream, Node node, AggregateConfig aggregateConfig) throws JobExecuteException { AggregateProcessor aggregateProcessor; if (processorMap.containsKey(aggregateConfig.getType())) { @@ -98,7 +100,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor executeProjectionProcessor(DataStream dataStream, Node node, ProjectionConfig projectionConfig) throws JobExecuteException { ProjectionProcessor projectionProcessor; if (processorMap.containsKey(projectionConfig.getType())) { diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java index 506aa11..66f0585 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject; import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; import com.geedgenetworks.common.config.CheckConfigUtil; import com.geedgenetworks.common.config.CheckResult; import com.geedgenetworks.common.config.FilterConfigOptions; @@ -14,6 +15,7 @@ import com.geedgenetworks.core.pojo.FilterConfig; import com.google.common.collect.Maps; import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; @@ -54,7 +56,7 @@ public class FilterExecutor extends AbstractExecutor { } @Override - public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { + public DataStream execute(DataStream dataStream, Node node) throws JobExecuteException { FilterConfig filterConfig = operatorMap.get(node.getName()); String className = filterConfig.getType(); Filter filter; diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java index 2eabefa..22b23a7 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java @@ -5,15 +5,17 @@ import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.main.GrootStreamRunner; import com.geedgenetworks.common.Constants; -import com.geedgenetworks.common.config.GrootStreamConfig; import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.config.GrootStreamConfig; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.typesafe.config.Config; import com.typesafe.config.ConfigUtil; import com.typesafe.config.ConfigValueFactory; import lombok.extern.slf4j.Slf4j; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.util.OutputTag; import java.io.File; import java.net.MalformedURLException; @@ -27,17 +29,18 @@ import java.util.stream.Stream; public class JobExecution { private final JobRuntimeEnvironment jobRuntimeEnvironment; - private final Executor sourceExecutor; - private final Executor sinkExecutor; - private final Executor filterExecutor; - private final Executor preprocessingExecutor; - private final Executor processingExecutor; - private final Executor postprocessingExecutor; + private final Executor, JobRuntimeEnvironment> sourceExecutor; + private final Executor, JobRuntimeEnvironment> sinkExecutor; + private final Executor, JobRuntimeEnvironment> filterExecutor; + private final Executor, JobRuntimeEnvironment> splitExecutor; + private final Executor, JobRuntimeEnvironment> preprocessingExecutor; + private final Executor, JobRuntimeEnvironment> processingExecutor; + private final Executor, JobRuntimeEnvironment> postprocessingExecutor; private final List nodes; - private final List jarPaths; + public JobExecution(Config config, GrootStreamConfig grootStreamConfig) { - try { + try { jarPaths = new ArrayList<>(Collections.singletonList(new File(StartBuilder.appBootstrapDir() .resolve(GrootStreamRunner.APP_JAR_NAME).toString()) .toURI().toURL())); @@ -50,6 +53,7 @@ public class JobExecution { this.sourceExecutor = new SourceExecutor(jarPaths, config); this.sinkExecutor = new SinkExecutor(jarPaths, config); this.filterExecutor = new FilterExecutor(jarPaths, config); + this.splitExecutor = new SplitExecutor(jarPaths, config); this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, config); this.processingExecutor = new ProcessingExecutor(jarPaths, config); this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, config); @@ -59,6 +63,7 @@ public class JobExecution { this.sourceExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.sinkExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.filterExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.splitExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); @@ -69,7 +74,7 @@ public class JobExecution { private void registerPlugin(Config appConfig) { List thirdPartyJars = new ArrayList<>(); Config envConfig = appConfig.getConfig(Constants.APPLICATION_ENV); - if(envConfig.hasPath(ExecutionConfigKeyName.JARS)) { + if (envConfig.hasPath(ExecutionConfigKeyName.JARS)) { thirdPartyJars = new ArrayList<>(StartBuilder .getThirdPartyJars(envConfig.getString(ExecutionConfigKeyName.JARS))); } @@ -81,7 +86,7 @@ public class JobExecution { .map(uri -> { try { return uri.toURL(); - }catch (MalformedURLException e){ + } catch (MalformedURLException e) { throw new RuntimeException("the uri of jar illegal: " + uri, e); } }).collect(Collectors.toList()); @@ -93,10 +98,10 @@ public class JobExecution { } - private Config registerPlugin(Config config , List jars) { - config = this.injectJarsToConfig( - config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_JARS), jars); - return this.injectJarsToConfig( + private Config registerPlugin(Config config, List jars) { + config = this.injectJarsToConfig( + config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_JARS), jars); + return this.injectJarsToConfig( config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_CLASSPATHS), jars); } @@ -127,7 +132,7 @@ public class JobExecution { .collect(Collectors.toSet()); paths.addAll(validJars); - config = config.withValue( + config = config.withValue( path, ConfigValueFactory.fromAnyRef( paths.stream() @@ -150,8 +155,9 @@ public class JobExecution { private List buildJobNode(Config config) { Map sources = Maps.newHashMap(); - Map sinks =Maps.newHashMap(); + Map sinks = Maps.newHashMap(); Map filters = Maps.newHashMap(); + Map splits = Maps.newHashMap(); Map preprocessingPipelines = Maps.newHashMap(); Map processingPipelines = Maps.newHashMap(); Map postprocessingPipelines = Maps.newHashMap(); @@ -160,11 +166,14 @@ public class JobExecution { sources = config.getConfig(Constants.SOURCES).root().unwrapped(); } if (config.hasPath(Constants.SINKS)) { - sinks =config.getConfig(Constants.SINKS).root().unwrapped(); + sinks = config.getConfig(Constants.SINKS).root().unwrapped(); } if (config.hasPath(Constants.FILTERS)) { filters = config.getConfig(Constants.FILTERS).root().unwrapped(); } + if (config.hasPath(Constants.SPLITS)) { + splits = config.getConfig(Constants.SPLITS).root().unwrapped(); + } if (config.hasPath(Constants.PREPROCESSING_PIPELINES)) { preprocessingPipelines = config.getConfig(Constants.PREPROCESSING_PIPELINES).root().unwrapped(); } @@ -184,13 +193,15 @@ public class JobExecution { nodes.add(node); }); - for(Node node : nodes) { + for (Node node : nodes) { if (sources.containsKey(node.getName())) { node.setType(ProcessorType.SOURCE); } else if (sinks.containsKey(node.getName())) { node.setType(ProcessorType.SINK); } else if (filters.containsKey(node.getName())) { node.setType(ProcessorType.FILTER); + } else if (splits.containsKey(node.getName())) { + node.setType(ProcessorType.SPLIT); } else if (preprocessingPipelines.containsKey(node.getName())) { node.setType(ProcessorType.PREPROCESSING); } else if (processingPipelines.containsKey(node.getName())) { @@ -214,12 +225,12 @@ public class JobExecution { List sourceNodes = nodes .stream().filter(v -> v.getType().name().equals(ProcessorType.SOURCE.name())).collect(Collectors.toList()); - SingleOutputStreamOperator singleOutputStreamOperator = null; + DataStream dataStream = null; - for(Node sourceNode : sourceNodes) { - singleOutputStreamOperator = sourceExecutor.execute(singleOutputStreamOperator, sourceNode); + for (Node sourceNode : sourceNodes) { + dataStream = sourceExecutor.execute(dataStream, sourceNode); for (String nodeName : sourceNode.getDownstream()) { - buildJobGraph(singleOutputStreamOperator, nodeName); + buildJobGraph(dataStream, nodeName); } } @@ -230,7 +241,7 @@ public class JobExecution { log.info("Execution Job: {}", jobRuntimeEnvironment.getJobName()); jobRuntimeEnvironment.getStreamExecutionEnvironment().execute(jobRuntimeEnvironment.getJobName()); final long jobEndTime = System.currentTimeMillis(); - log.info("Job finished, execution duration: {} ms", jobEndTime-jobStartTime); + log.info("Job finished, execution duration: {} ms", jobEndTime - jobStartTime); } catch (Exception e) { throw new JobExecuteException("Execute job error", e); @@ -238,34 +249,62 @@ public class JobExecution { } - private void buildJobGraph(SingleOutputStreamOperator singleOutputStreamOperator, String downstreamNodeName) { + private void buildJobGraph(DataStream dataStream, String downstreamNodeName) { Node node = getNode(downstreamNodeName).orElseGet(() -> { throw new JobExecuteException("can't find downstream node " + downstreamNodeName); }); if (node.getType().name().equals(ProcessorType.FILTER.name())) { - singleOutputStreamOperator = filterExecutor.execute(singleOutputStreamOperator, node); + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = filterExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { + }), node); + } else { + dataStream = filterExecutor.execute(dataStream, node); + } + } else if (node.getType().name().equals(ProcessorType.SPLIT.name())) { + dataStream = splitExecutor.execute(dataStream, node); + } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) { - singleOutputStreamOperator = preprocessingExecutor.execute(singleOutputStreamOperator, node); + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { + }), node); + } else { + dataStream = preprocessingExecutor.execute(dataStream, node); + } } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) { - singleOutputStreamOperator = processingExecutor.execute(singleOutputStreamOperator, node); + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = processingExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { + }), node); + } else { + dataStream = processingExecutor.execute(dataStream, node); + } } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) { - singleOutputStreamOperator = postprocessingExecutor.execute(singleOutputStreamOperator, node); + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { + }), node); + } else { + dataStream = postprocessingExecutor.execute(dataStream, node); + } } else if (node.getType().name().equals(ProcessorType.SINK.name())) { - singleOutputStreamOperator = sinkExecutor.execute(singleOutputStreamOperator, node); + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = sinkExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { + }), node); + } else { + dataStream = sinkExecutor.execute(dataStream, node); + } } else { throw new JobExecuteException("unsupported process type " + node.getType().name()); } for (String nodeName : node.getDownstream()) { - buildJobGraph(singleOutputStreamOperator, nodeName); + buildJobGraph(dataStream, nodeName); } } private Optional getNode(String name) { - return nodes.stream().filter(v-> v.getName().equals(name)).findFirst(); + return nodes.stream().filter(v -> v.getName().equals(name)).findFirst(); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java index 7141f5e..2e68098 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java @@ -25,9 +25,7 @@ import org.apache.flink.util.TernaryBoolean; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URL; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; +import java.util.*; import java.util.stream.Collectors; @Slf4j @@ -37,6 +35,8 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ private GrootStreamConfig grootStreamConfig; private StreamExecutionEnvironment environment; private String jobName = Constants.DEFAULT_JOB_NAME; + private Set splitSet = new HashSet<>(); + private JobRuntimeEnvironment(Config config, GrootStreamConfig grootStreamConfig) { this.grootStreamConfig = grootStreamConfig; @@ -197,6 +197,14 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } } + public Set getSplitSet() { + return splitSet; + } + + public void setSplitSet(Set splitSet) { + this.splitSet = splitSet; + } + private void setCheckpoint() { long interval = 0; if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_INTERVAL)) { @@ -272,6 +280,7 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ + } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java index 36fad61..b9555b4 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java @@ -3,9 +3,11 @@ package com.geedgenetworks.bootstrap.execution; import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.ProcessorConfig; import com.google.common.collect.Maps; import com.typesafe.config.Config; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; @@ -36,7 +38,7 @@ public class PostprocessingExecutor extends AbstractProcessorExecutor { @Override - public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { + public DataStream execute(DataStream dataStream, Node node) throws JobExecuteException { return super.execute(dataStream, node); } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java index b1e53e4..a7b9e5e 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java @@ -3,10 +3,12 @@ package com.geedgenetworks.bootstrap.execution; import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.ProcessorConfig; import com.google.common.collect.Maps; import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; @@ -37,7 +39,8 @@ public class PreprocessingExecutor extends AbstractProcessorExecutor { } @Override - public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { + public DataStream execute(DataStream dataStream, Node node) throws JobExecuteException { + return super.execute(dataStream, node); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java index d69fe8c..f6788ed 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java @@ -3,9 +3,11 @@ package com.geedgenetworks.bootstrap.execution; import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.ProcessorConfig; import com.google.common.collect.Maps; import com.typesafe.config.Config; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; @@ -35,7 +37,7 @@ public class ProcessingExecutor extends AbstractProcessorExecutor { } @Override - public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { + public DataStream execute(DataStream dataStream, Node node) throws JobExecuteException { return super.execute(dataStream, node); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java index a0bedc9..70934b8 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java @@ -5,6 +5,7 @@ import com.geedgenetworks.bootstrap.enums.ProcessorType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.utils.SchemaConfigParse; import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; import com.geedgenetworks.common.config.CheckConfigUtil; import com.geedgenetworks.common.config.CheckResult; import com.geedgenetworks.common.config.SinkConfigOptions; @@ -20,6 +21,7 @@ import com.google.common.collect.Maps; import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -63,7 +65,7 @@ public class SinkExecutor extends AbstractExecutor { } @Override - public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { + public DataStream execute(DataStream dataStream, Node node) throws JobExecuteException { SinkConfig sinkConfig = operatorMap.get(node.getName()); try { SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, sinkConfig.getType()); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java index d4751af..9dff6b0 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java @@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; @@ -67,7 +68,7 @@ public class SourceExecutor extends AbstractExecutor { } @Override - public SingleOutputStreamOperator execute(SingleOutputStreamOperator outputStreamOperator, Node node) throws JobExecuteException { + public DataStream execute(DataStream outputStreamOperator, Node node) throws JobExecuteException { SourceConfig sourceConfig = operatorMap.get(node.getName()); SingleOutputStreamOperator sourceSingleOutputStreamOperator; try { @@ -152,4 +153,5 @@ public class SourceExecutor extends AbstractExecutor { return 0; } } + } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java new file mode 100644 index 0000000..c0ac3a5 --- /dev/null +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java @@ -0,0 +1,95 @@ +package com.geedgenetworks.bootstrap.execution; + +import com.alibaba.fastjson.JSONObject; +import com.geedgenetworks.bootstrap.exception.JobExecuteException; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.config.CheckConfigUtil; +import com.geedgenetworks.common.config.CheckResult; +import com.geedgenetworks.common.config.RouteConfigOptions; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.ConfigValidationException; +import com.geedgenetworks.common.udf.RuleContext; +import com.geedgenetworks.core.pojo.SplitConfig; +import com.geedgenetworks.core.split.Split; +import com.google.common.collect.Maps; +import com.typesafe.config.Config; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; + +import java.net.URL; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + + +/** + * Initialize config and execute filter operator + */ +@Slf4j +public class SplitExecutor extends AbstractExecutor { + + + public SplitExecutor(List jarPaths, Config config) { + super(jarPaths, config); + } + + @Override + protected Map initialize(List jarPaths, Config operatorConfig) { + Map routeConfigMap = Maps.newHashMap(); + if (operatorConfig.hasPath(Constants.SPLITS)) { + Config routes = operatorConfig.getConfig(Constants.SPLITS); + routes.root().unwrapped().forEach((key, value) -> { + CheckResult result = CheckConfigUtil.checkAllExists(routes.getConfig(key), + RouteConfigOptions.TYPE.key()); + if (!result.isSuccess()) { + throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( + "split: %s, Message: %s", + key, result.getMsg())); + } + SplitConfig splitConfig = new JSONObject((Map) value).toJavaObject(SplitConfig.class); + splitConfig.setName(key); + routeConfigMap.put(key, splitConfig); + }); + } + + return routeConfigMap; + } + + @Override + public DataStream execute(DataStream dataStream, Node node) throws JobExecuteException { + SplitConfig splitConfig = operatorMap.get(node.getName()); + String className = splitConfig.getType(); + Split split; + if (splitMap.containsKey(splitConfig.getType())) { + + split = splitMap.get(splitConfig.getType()); + } else { + Class cls; + try { + cls = Class.forName(className); + split = (Split) cls.newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { + throw new JobExecuteException("get split instance failed!", e); + } + } + if (node.getParallelism() > 0) { + splitConfig.setParallelism(node.getParallelism()); + } + for(RuleContext ruleContext:splitConfig.getRules()) { + jobRuntimeEnvironment.getSplitSet().add(ruleContext.getName()); + } + try { + dataStream = + split.splitFunction( + dataStream, splitConfig); + } catch (Exception e) { + throw new JobExecuteException("Create split instance failed!", e); + } + return dataStream; + } + +} diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java index 8b299df..7e995cf 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java @@ -16,7 +16,9 @@ import com.typesafe.config.ConfigUtil; import com.typesafe.config.ConfigValueFactory; import lombok.Data; import lombok.extern.slf4j.Slf4j; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.util.OutputTag; import java.io.File; import java.net.MalformedURLException; @@ -33,12 +35,13 @@ import java.util.stream.Stream; public class JobExecutionTest { protected final JobRuntimeEnvironment jobRuntimeEnvironment; - private final Executor sourceExecuteProcessor; - private final Executor filterExecuteProcessor; - private final Executor preprocessingExecuteProcessor; - private final Executor processingExecuteProcessor; - private final Executor postprocessingExecuteProcessor; - private final Executor sinkExecuteProcessor; + private final Executor, JobRuntimeEnvironment> sourceExecutor; + private final Executor, JobRuntimeEnvironment> filterExecutor; + private final Executor, JobRuntimeEnvironment> preprocessingExecutor; + private final Executor, JobRuntimeEnvironment> splitExecutor; + private final Executor, JobRuntimeEnvironment> processingExecutor; + private final Executor, JobRuntimeEnvironment> postprocessingExecutor; + private final Executor, JobRuntimeEnvironment> sinkExecutor; private final List nodes; @@ -66,21 +69,22 @@ public class JobExecutionTest { } registerPlugin(config.getConfig(Constants.APPLICATION)); - this.sourceExecuteProcessor = new SourceExecutor(jarPaths, config); - this.sinkExecuteProcessor = new SinkExecutor(jarPaths, config); - this.filterExecuteProcessor = new FilterExecutor(jarPaths, config); - this.preprocessingExecuteProcessor = new PreprocessingExecutor(jarPaths, config); - this.processingExecuteProcessor = new ProcessingExecutor(jarPaths, config); - this.postprocessingExecuteProcessor = new PostprocessingExecutor(jarPaths, config); + this.sourceExecutor = new SourceExecutor(jarPaths, config); + this.sinkExecutor = new SinkExecutor(jarPaths, config); + this.splitExecutor = new SplitExecutor(jarPaths, config); + this.filterExecutor = new FilterExecutor(jarPaths, config); + this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, config); + this.processingExecutor = new ProcessingExecutor(jarPaths, config); + this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, config); this.jobRuntimeEnvironment = JobRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths), grootStreamConfig); - - this.sourceExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.sinkExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.filterExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.preprocessingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.processingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.postprocessingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.splitExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.sourceExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.sinkExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.filterExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.nodes = buildJobNode(config); } @@ -173,6 +177,7 @@ public class JobExecutionTest { Map sources = Maps.newHashMap(); Map sinks =Maps.newHashMap(); Map filters = Maps.newHashMap(); + Map splits = Maps.newHashMap(); Map preprocessingPipelines = Maps.newHashMap(); Map processingPipelines = Maps.newHashMap(); Map postprocessingPipelines = Maps.newHashMap(); @@ -186,6 +191,9 @@ public class JobExecutionTest { if (config.hasPath(Constants.FILTERS)) { filters = config.getConfig(Constants.FILTERS).root().unwrapped(); } + if (config.hasPath(Constants.SPLITS)) { + splits = config.getConfig(Constants.SPLITS).root().unwrapped(); + } if (config.hasPath(Constants.PREPROCESSING_PIPELINES)) { preprocessingPipelines = config.getConfig(Constants.PREPROCESSING_PIPELINES).root().unwrapped(); } @@ -210,6 +218,8 @@ public class JobExecutionTest { node.setType(ProcessorType.SOURCE); } else if (sinks.containsKey(node.getName())) { node.setType(ProcessorType.SINK); + } else if (splits.containsKey(node.getName())) { + node.setType(ProcessorType.SPLIT); } else if (filters.containsKey(node.getName())) { node.setType(ProcessorType.FILTER); } else if (preprocessingPipelines.containsKey(node.getName())) { @@ -228,15 +238,15 @@ public class JobExecutionTest { } - public SingleOutputStreamOperator getSingleOutputStreamOperator() throws JobExecuteException { + public DataStream getSingleOutputStreamOperator() throws JobExecuteException { List sourceNodes = nodes .stream().filter(v -> v.getType().name().equals(ProcessorType.SOURCE.name())).collect(Collectors.toList()); - SingleOutputStreamOperator singleOutputStreamOperator = null; + DataStream singleOutputStreamOperator = null; for(Node sourceNode : sourceNodes) { - singleOutputStreamOperator = sourceExecuteProcessor.execute(singleOutputStreamOperator, sourceNode); + singleOutputStreamOperator = sourceExecutor.execute(singleOutputStreamOperator, sourceNode); for (String nodeName : sourceNode.getDownstream()) { buildJobGraph(singleOutputStreamOperator, nodeName); } @@ -247,27 +257,55 @@ public class JobExecutionTest { } - private void buildJobGraph(SingleOutputStreamOperator singleOutputStreamOperator, String downstreamNodeName) { + private void buildJobGraph(DataStream dataStream, String downstreamNodeName) { Node node = getNode(downstreamNodeName).orElseGet(() -> { throw new JobExecuteException("can't find downstream node " + downstreamNodeName); }); if (node.getType().name().equals(ProcessorType.FILTER.name())) { - singleOutputStreamOperator = filterExecuteProcessor.execute(singleOutputStreamOperator, node); + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = filterExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { + }), node); + } else { + dataStream = filterExecutor.execute(dataStream, node); + } + } else if (node.getType().name().equals(ProcessorType.SPLIT.name())) { + dataStream = splitExecutor.execute(dataStream, node); + } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) { - singleOutputStreamOperator = preprocessingExecuteProcessor.execute(singleOutputStreamOperator, node); + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { + }), node); + } else { + dataStream = preprocessingExecutor.execute(dataStream, node); + } } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) { - singleOutputStreamOperator = processingExecuteProcessor.execute(singleOutputStreamOperator, node); + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = processingExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { + }), node); + } else { + dataStream = processingExecutor.execute(dataStream, node); + } } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) { - singleOutputStreamOperator = postprocessingExecuteProcessor.execute(singleOutputStreamOperator, node); + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { + }), node); + } else { + dataStream = postprocessingExecutor.execute(dataStream, node); + } } else if (node.getType().name().equals(ProcessorType.SINK.name())) { - singleOutputStreamOperator = sinkExecuteProcessor.execute(singleOutputStreamOperator, node); - } else { + if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + dataStream = sinkExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { + }), node); + } else { + dataStream = sinkExecutor.execute(dataStream, node); + } + } else { throw new JobExecuteException("unsupported process type " + node.getType().name()); } for (String nodeName : node.getDownstream()) { - buildJobGraph(singleOutputStreamOperator, nodeName); + buildJobGraph(dataStream, nodeName); } diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java new file mode 100644 index 0000000..dfe0600 --- /dev/null +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java @@ -0,0 +1,107 @@ +package com.geedgenetworks.bootstrap.main.simple; + +import cn.hutool.setting.yaml.YamlUtil; +import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs; +import com.geedgenetworks.bootstrap.enums.EngineType; +import com.geedgenetworks.bootstrap.exception.JobExecuteException; +import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName; +import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink; +import com.geedgenetworks.bootstrap.utils.CommandLineUtils; +import com.geedgenetworks.bootstrap.utils.ConfigFileUtils; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.config.ConfigProvider; +import com.geedgenetworks.common.config.GrootStreamConfig; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigObject; +import com.typesafe.config.ConfigUtil; +import com.typesafe.config.ConfigValueFactory; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; + +import java.nio.file.Path; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertTrue; + + +public class JobSplitTest { + + @ClassRule + public static MiniClusterWithClientResource flinkCluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberSlotsPerTaskManager(1) + .setNumberTaskManagers(1) + .build()); + + @Test + public void testSplit() { + + CollectSink.values.clear(); + String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_test.yaml"}; + ExecuteCommandArgs executeCommandArgs = CommandLineUtils + .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); + + executeCommandArgs.buildCommand(); + + + GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig(); + Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs); + // check config file exist + Map configMap = YamlUtil.loadByPath(configFile.toString()); + ConfigObject configObject = ConfigValueFactory.fromMap(configMap); + Config config = configObject.toConfig(); + + config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE), + ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); + + + JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig); + jobExecution.getSingleOutputStreamOperator(); + + try { + jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute(); + } catch (Exception e) { + throw new JobExecuteException("Job executed error", e); + } + Assert.assertEquals(7, CollectSink.values.size()); + } + @Test + public void testSplitForAgg() { + + CollectSink.values.clear(); + String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_agg_test.yaml"}; + ExecuteCommandArgs executeCommandArgs = CommandLineUtils + .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); + + executeCommandArgs.buildCommand(); + + + GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig(); + Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs); + // check config file exist + Map configMap = YamlUtil.loadByPath(configFile.toString()); + ConfigObject configObject = ConfigValueFactory.fromMap(configMap); + Config config = configObject.toConfig(); + + config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE), + ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); + + + JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig); + jobExecution.getSingleOutputStreamOperator(); + + try { + jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute(); + } catch (Exception e) { + throw new JobExecuteException("Job executed error", e); + } + Assert.assertEquals(1, CollectSink.values.size()); + Assert.assertEquals("2", CollectSink.values.get(0).getExtractedFields().get("sessions").toString()); + + } +} diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java index c4f54a3..556c8c4 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java @@ -75,16 +75,11 @@ public class SimpleJobTest { assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("recv_time").toString())); assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("processing_time").toString())); assertTrue(0 != Long.parseLong(CollectSink.values.get(0).getExtractedFields().get("log_id").toString())); - Assert.assertEquals("印第安纳州", CollectSink.values.get(0).getExtractedFields().get("server_super_admin_area").toString()); - Assert.assertEquals("6167", CollectSink.values.get(0).getExtractedFields().get("server_asn").toString()); - Assert.assertEquals("美国", CollectSink.values.get(0).getExtractedFields().get("server_country_region").toString()); - Assert.assertTrue(!CollectSink.values.get(0).getExtractedFields().containsKey("client_country_region")); Assert.assertEquals("http://192.168.44.12:9098/hos/traffic_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString()); Assert.assertEquals("[2600:1015:b002::,255.255.255.255]", CollectSink.values.get(0).getExtractedFields().get("ip_string").toString()); Assert.assertEquals("hello", CollectSink.values.get(0).getExtractedFields().get("mail_attachment_name").toString()); Assert.assertEquals("MULTIPATH_ETHERNET", CollectSink.values.get(0).getExtractedFields().get("tunnels_schema_type").toString()); List asn_list = (List) CollectSink.values.get(0).getExtractedFields().get("asn_list"); - Assert.assertEquals("6167", asn_list.get(0)); } @@ -121,4 +116,5 @@ public class SimpleJobTest { } Assert.assertEquals(4, CollectSink.values.size()); } + } diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java index dfcb459..c5806ed 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectSink.java @@ -3,9 +3,7 @@ package com.geedgenetworks.bootstrap.main.simple.collect; import com.geedgenetworks.common.Event; import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.util.*; public class CollectSink implements SinkFunction { diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml new file mode 100644 index 0000000..732d0f6 --- /dev/null +++ b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml @@ -0,0 +1,92 @@ +sources: + inline_source: + type : inline + fields: # [array of object] Field List, if not set, all fields(Map) will be output. + properties: + data: '[{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' + interval.per.row: 1s # 可选 + repeat.count: 1 # 可选 + format: json + json.ignore.parse.errors: false +sinks: + collect_sink: + type: collect + properties: + format: json +splits: + test_split: + type: split + rules: + - name: aggregate_processor + expression: event.decoded_as == 'DNS' + +postprocessing_pipelines: + pre_etl_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: [fields,tags] + output_fields: + functions: # [array of object] Function List + + - function: FLATTEN + lookup_fields: [ fields,tags ] + output_fields: [ ] + parameters: + #prefix: "" + depth: 3 + # delimiter: "." + + - function: RENAME + lookup_fields: [ '' ] + output_fields: [ '' ] + filter: + parameters: + # parent_fields: [tags] + # rename_fields: + # tags: tags + rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key; + + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ timestamp_ms ] + output_fields: [ recv_time ] + parameters: + precision: seconds + interval: 300 + # + + aggregate_processor: + type: aggregate + group_by_fields: [decoded_as] + window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time + window_size: 3 + window_timestamp_field: test_time + functions: + - function: NUMBER_SUM + lookup_fields: [ sessions ] + + table_processor: + type: table + functions: + - function: JSON_UNROLL + lookup_fields: [ encapsulation ] + output_fields: [ new_name ] + +application: # [object] Application Configuration + env: # [object] Environment Variables + name: groot-stream-job # [string] Job Name + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: # [array of object] Node List. It will be used build data flow for job dag graph. + - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. + parallelism: 1 # [number] Operator-Level Parallelism. + downstream: [test_split] + - name: test_split + parallelism: 1 + downstream: [ aggregate_processor ] + - name: aggregate_processor + parallelism: 1 + downstream: [ collect_sink ] + - name: collect_sink + parallelism: 1 + + diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml new file mode 100644 index 0000000..f13d69e --- /dev/null +++ b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml @@ -0,0 +1,97 @@ +sources: + inline_source: + type : inline + fields: # [array of object] Field List, if not set, all fields(Map) will be output. + properties: + data: '[{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' + interval.per.row: 1s # 可选 + repeat.count: 1 # 可选 + format: json + json.ignore.parse.errors: false +sinks: + collect_sink: + type: collect + properties: + format: json +splits: + test_split: + type: split + rules: + - name: table_processor + expression: event.decoded_as == 'HTTP' + - name: pre_etl_processor + expression: event.decoded_as == 'DNS' + +postprocessing_pipelines: + pre_etl_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: [fields,tags] + output_fields: + functions: # [array of object] Function List + + - function: FLATTEN + lookup_fields: [ fields,tags ] + output_fields: [ ] + parameters: + #prefix: "" + depth: 3 + # delimiter: "." + + - function: RENAME + lookup_fields: [ '' ] + output_fields: [ '' ] + filter: + parameters: + # parent_fields: [tags] + # rename_fields: + # tags: tags + rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key; + + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ timestamp_ms ] + output_fields: [ recv_time ] + parameters: + precision: seconds + interval: 300 + # + + aggregate_processor: + type: aggregate + group_by_fields: [decoded_as] + window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time + window_size: 3 + window_timestamp_field: test_time + functions: + - function: NUMBER_SUM + lookup_fields: [ sessions ] + + table_processor: + type: table + functions: + - function: JSON_UNROLL + lookup_fields: [ encapsulation ] + output_fields: [ new_name ] + +application: # [object] Application Configuration + env: # [object] Environment Variables + name: groot-stream-job # [string] Job Name + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: # [array of object] Node List. It will be used build data flow for job dag graph. + - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. + parallelism: 1 # [number] Operator-Level Parallelism. + downstream: [test_split,collect_sink] + - name: test_split + parallelism: 1 + downstream: [ table_processor,pre_etl_processor ] + - name: pre_etl_processor + parallelism: 1 + downstream: [ collect_sink ] + - name: table_processor + parallelism: 1 + downstream: [ collect_sink ] + - name: collect_sink + parallelism: 1 + + diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java index d13fc4b..e973c20 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java @@ -11,7 +11,7 @@ public final class Constants { public static final String SINKS = "sinks"; public static final int SYSTEM_EXIT_CODE = 2618; public static final String APPLICATION = "application"; - + public static final String SPLITS = "splits"; public static final String APPLICATION_ENV ="env"; public static final String APPLICATION_TOPOLOGY = "topology"; public static final String JOB_NAME = "name"; diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/RouteConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/RouteConfigOptions.java new file mode 100644 index 0000000..4d4ef12 --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/RouteConfigOptions.java @@ -0,0 +1,18 @@ +package com.geedgenetworks.common.config; + +import com.alibaba.fastjson2.TypeReference; +import com.geedgenetworks.common.udf.RuleContext; +import java.util.List; + +public interface RouteConfigOptions { + Option TYPE = Options.key("type") + .stringType() + .noDefaultValue() + .withDescription("The type of route ."); + + Option> RULES = Options.key("rules") + .type(new TypeReference>() {}) + .noDefaultValue() + .withDescription("The rules to be executed."); + +} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java new file mode 100644 index 0000000..ead0ecd --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java @@ -0,0 +1,19 @@ +package com.geedgenetworks.common.udf; + +import com.geedgenetworks.common.Event; +import com.googlecode.aviator.Expression; +import lombok.Data; +import org.apache.flink.util.OutputTag; + +import java.io.Serializable; + +@Data +public class RuleContext implements Serializable { + + private String name; + private String expression; + private Expression compiledExpression; + private OutputTag outputTag ; + + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java b/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java index 668ba6f..d8b8bc4 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java @@ -2,13 +2,14 @@ package com.geedgenetworks.core.filter; import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.FilterConfig; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; public class AviatorFilter implements Filter { @Override - public SingleOutputStreamOperator filterFunction( - SingleOutputStreamOperator singleOutputStreamOperator, FilterConfig FilterConfig) + public DataStream filterFunction( + DataStream singleOutputStreamOperator, FilterConfig FilterConfig) throws Exception { if (FilterConfig.getParallelism() != 0) { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java b/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java index a173438..f8b50eb 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java @@ -3,12 +3,13 @@ package com.geedgenetworks.core.filter; import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.FilterConfig; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; public interface Filter { - SingleOutputStreamOperator filterFunction( - SingleOutputStreamOperator singleOutputStreamOperator, FilterConfig FilterConfig) + DataStream filterFunction( + DataStream singleOutputStreamOperator, FilterConfig FilterConfig) throws Exception; String type(); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/SplitConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SplitConfig.java new file mode 100644 index 0000000..4381df5 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/SplitConfig.java @@ -0,0 +1,17 @@ +package com.geedgenetworks.core.pojo; + +import com.geedgenetworks.common.udf.RuleContext; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +@Data +public class SplitConfig implements Serializable { + + private String type; + private Map properties; + private int parallelism; + private String name; + private List rules; +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java index 172b368..3852414 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/Processor.java @@ -2,12 +2,13 @@ package com.geedgenetworks.core.processor; import com.geedgenetworks.common.Event; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; public interface Processor { - SingleOutputStreamOperator processorFunction( - SingleOutputStreamOperator singleOutputStreamOperator, + DataStream processorFunction( + DataStream singleOutputStreamOperator, T processorConfig, ExecutionConfig config) throws Exception; String type(); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java index bc87c32..cf78310 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java @@ -5,45 +5,50 @@ import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.core.pojo.AggregateConfig; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.util.OutputTag; + import static com.geedgenetworks.common.Constants.*; public class AggregateProcessorImpl implements AggregateProcessor { @Override - public SingleOutputStreamOperator processorFunction(SingleOutputStreamOperator grootEventSingleOutputStreamOperator, AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception { + public DataStream processorFunction(DataStream grootEventDataStream, AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception { + if (aggregateConfig.getParallelism() != 0) { switch (aggregateConfig.getWindow_type()) { case TUMBLING_PROCESSING_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); case TUMBLING_EVENT_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); case SLIDING_PROCESSING_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); case SLIDING_EVENT_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); default: throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type"); } }else { switch (aggregateConfig.getWindow_type()) { case TUMBLING_PROCESSING_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); case TUMBLING_EVENT_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); case SLIDING_PROCESSING_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); case SLIDING_EVENT_TIME: - return grootEventSingleOutputStreamOperator.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); default: throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type"); } } + } @Override diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java index 6b46a7b..d87e7e2 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java @@ -4,25 +4,26 @@ import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.ProjectionConfig; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.util.OutputTag; public class ProjectionProcessorImpl implements ProjectionProcessor { @Override - public SingleOutputStreamOperator processorFunction(SingleOutputStreamOperator grootEventSingleOutputStreamOperator, ProjectionConfig projectionConfig, ExecutionConfig config) throws Exception { + public DataStream processorFunction(DataStream grootEventDataStream, ProjectionConfig projectionConfig, ExecutionConfig config) throws Exception { if (projectionConfig.getParallelism() != 0) { - return grootEventSingleOutputStreamOperator + return grootEventDataStream .process(new ProjectionProcessFunction(projectionConfig)) .setParallelism(projectionConfig.getParallelism()) .name(projectionConfig.getName()); } else { - return grootEventSingleOutputStreamOperator + return grootEventDataStream .process(new ProjectionProcessFunction(projectionConfig)) .name(projectionConfig.getName()); } } - @Override public String type() { return "projection"; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java index f36f8db..6ddc616 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java @@ -2,25 +2,25 @@ package com.geedgenetworks.core.processor.table; import com.geedgenetworks.common.Event; import com.geedgenetworks.core.pojo.TableConfig; -import com.geedgenetworks.core.processor.projection.ProjectionProcessFunction; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.util.OutputTag; + +import java.util.Map; -import java.time.Duration; public class TableProcessorImpl implements TableProcessor { @Override - public SingleOutputStreamOperator processorFunction(SingleOutputStreamOperator grootEventSingleOutputStreamOperator, TableConfig tableConfig, ExecutionConfig config) throws Exception { - + public DataStream processorFunction(DataStream grootEventDataStream, TableConfig tableConfig, ExecutionConfig config) throws Exception { if (tableConfig.getParallelism() != 0) { - return grootEventSingleOutputStreamOperator + return grootEventDataStream .flatMap(new TableProcessorFunction(tableConfig)) .setParallelism(tableConfig.getParallelism()) .name(tableConfig.getName()); } else { - return grootEventSingleOutputStreamOperator + return grootEventDataStream .flatMap(new TableProcessorFunction(tableConfig)) .name(tableConfig.getName()); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/Split.java b/groot-core/src/main/java/com/geedgenetworks/core/split/Split.java new file mode 100644 index 0000000..37e7b44 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/split/Split.java @@ -0,0 +1,16 @@ +package com.geedgenetworks.core.split; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.pojo.SplitConfig; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; + +import java.util.Set; + +public interface Split { + + DataStream splitFunction( + DataStream dataStream, SplitConfig splitConfig) + throws Exception; + String type(); +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java new file mode 100644 index 0000000..7a129ef --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java @@ -0,0 +1,79 @@ +package com.geedgenetworks.core.split; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.RuleContext; +import com.geedgenetworks.core.metrics.InternalMetrics; +import com.geedgenetworks.core.pojo.SplitConfig; +import com.googlecode.aviator.AviatorEvaluator; +import com.googlecode.aviator.AviatorEvaluatorInstance; +import com.googlecode.aviator.Expression; +import com.googlecode.aviator.Options; +import com.googlecode.aviator.exception.ExpressionRuntimeException; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; + +import java.util.List; +import java.util.Map; + + +@Slf4j +public class SplitFunction extends ProcessFunction { + private SplitConfig splitConfig; + private List routes; + private transient InternalMetrics internalMetrics; + + public SplitFunction(SplitConfig splitConfig) { + this.splitConfig = splitConfig; + + } + + @Override + public void open(Configuration parameters) throws Exception { + + this.internalMetrics = new InternalMetrics(getRuntimeContext()); + this.routes = splitConfig.getRules(); + for(RuleContext rule :routes){ + String expression = rule.getExpression(); + AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); + instance.setCachedExpressionByDefault(true); + instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL); + instance.setFunctionMissing(null); + Expression compiledExp = instance.compile(expression, true); + rule.setCompiledExpression(compiledExp); + OutputTag outputTag = new OutputTag<>(rule.getName()){}; + rule.setOutputTag(outputTag); + } + } + + + @Override + public void processElement(Event event, ProcessFunction.Context ctx, Collector out) throws Exception { + try { + internalMetrics.incrementInEvents(); + for (RuleContext route :routes){ + boolean result = route.getExpression() != null ? (filterExecute(route.getCompiledExpression(), route.getCompiledExpression().newEnv("event", event.getExtractedFields()))) : true; + if (result) { + ctx.output(route.getOutputTag(), event); + } + } + }catch (Exception e) { + internalMetrics.incrementErrorEvents(); + log.error("error in split function", e); + } + } + + public static Boolean filterExecute(Expression expression, Map map) { + + boolean result; + Object object = expression.execute(map); + if (object != null) { + result = (Boolean) object; + } else { + throw new ExpressionRuntimeException(); + } + return result; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitOperator.java b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitOperator.java new file mode 100644 index 0000000..f6d2c8c --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitOperator.java @@ -0,0 +1,29 @@ +package com.geedgenetworks.core.split; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.pojo.SplitConfig; +import org.apache.flink.streaming.api.datastream.DataStream; + +public class SplitOperator implements Split { + + @Override + public DataStream splitFunction( + DataStream dataStream, SplitConfig splitConfig) + throws Exception { + if (splitConfig.getParallelism() != 0) { + return dataStream + .process(new SplitFunction(splitConfig)) + .setParallelism(splitConfig.getParallelism()) + .name(splitConfig.getName()); + } else { + return dataStream + .process(new SplitFunction(splitConfig)) + .name(splitConfig.getName()); + } + } + @Override + public String type() { + return "split"; + } + +} diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.split.Split b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.split.Split new file mode 100644 index 0000000..500c367 --- /dev/null +++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.split.Split @@ -0,0 +1 @@ +com.geedgenetworks.core.split.SplitOperator \ No newline at end of file -- cgit v1.2.3 From 9b0297020611fcf70445284637f370b5f8c4fddd Mon Sep 17 00:00:00 2001 From: wangkuan Date: Mon, 26 Aug 2024 10:48:59 +0800 Subject: [feature][core][common]单元测试优化,命名优化 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bootstrap/execution/SplitExecutor.java | 14 ++--- .../bootstrap/main/simple/JobSplitTest.java | 33 ---------- .../bootstrap/main/simple/JobSplitWithAggTest.java | 72 ++++++++++++++++++++++ .../resources/grootstream_job_split_agg_test.yaml | 4 +- .../test/resources/grootstream_job_split_test.yaml | 2 +- .../common/config/RouteConfigOptions.java | 18 ------ .../common/config/SplitConfigOptions.java | 18 ++++++ .../aggregate/AggregateProcessorFunction.java | 4 +- .../geedgenetworks/core/split/SplitFunction.java | 10 +-- 9 files changed, 104 insertions(+), 71 deletions(-) create mode 100644 groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java delete mode 100644 groot-common/src/main/java/com/geedgenetworks/common/config/RouteConfigOptions.java create mode 100644 groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java index c0ac3a5..3513a67 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java @@ -6,7 +6,7 @@ import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.config.CheckConfigUtil; import com.geedgenetworks.common.config.CheckResult; -import com.geedgenetworks.common.config.RouteConfigOptions; +import com.geedgenetworks.common.config.SplitConfigOptions; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; import com.geedgenetworks.common.udf.RuleContext; @@ -14,16 +14,12 @@ import com.geedgenetworks.core.pojo.SplitConfig; import com.geedgenetworks.core.split.Split; import com.google.common.collect.Maps; import com.typesafe.config.Config; -import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; /** @@ -39,12 +35,12 @@ public class SplitExecutor extends AbstractExecutor { @Override protected Map initialize(List jarPaths, Config operatorConfig) { - Map routeConfigMap = Maps.newHashMap(); + Map splitConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.SPLITS)) { Config routes = operatorConfig.getConfig(Constants.SPLITS); routes.root().unwrapped().forEach((key, value) -> { CheckResult result = CheckConfigUtil.checkAllExists(routes.getConfig(key), - RouteConfigOptions.TYPE.key()); + SplitConfigOptions.TYPE.key()); if (!result.isSuccess()) { throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( "split: %s, Message: %s", @@ -52,11 +48,11 @@ public class SplitExecutor extends AbstractExecutor { } SplitConfig splitConfig = new JSONObject((Map) value).toJavaObject(SplitConfig.class); splitConfig.setName(key); - routeConfigMap.put(key, splitConfig); + splitConfigMap.put(key, splitConfig); }); } - return routeConfigMap; + return splitConfigMap; } @Override diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java index dfe0600..2f6984b 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java @@ -70,38 +70,5 @@ public class JobSplitTest { } Assert.assertEquals(7, CollectSink.values.size()); } - @Test - public void testSplitForAgg() { - - CollectSink.values.clear(); - String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_agg_test.yaml"}; - ExecuteCommandArgs executeCommandArgs = CommandLineUtils - .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); - - executeCommandArgs.buildCommand(); - - - GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig(); - Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs); - // check config file exist - Map configMap = YamlUtil.loadByPath(configFile.toString()); - ConfigObject configObject = ConfigValueFactory.fromMap(configMap); - Config config = configObject.toConfig(); - config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE), - ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); - - - JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig); - jobExecution.getSingleOutputStreamOperator(); - - try { - jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute(); - } catch (Exception e) { - throw new JobExecuteException("Job executed error", e); - } - Assert.assertEquals(1, CollectSink.values.size()); - Assert.assertEquals("2", CollectSink.values.get(0).getExtractedFields().get("sessions").toString()); - - } } diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java new file mode 100644 index 0000000..2edc5e7 --- /dev/null +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java @@ -0,0 +1,72 @@ +package com.geedgenetworks.bootstrap.main.simple; + +import cn.hutool.setting.yaml.YamlUtil; +import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs; +import com.geedgenetworks.bootstrap.enums.EngineType; +import com.geedgenetworks.bootstrap.exception.JobExecuteException; +import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName; +import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink; +import com.geedgenetworks.bootstrap.utils.CommandLineUtils; +import com.geedgenetworks.bootstrap.utils.ConfigFileUtils; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.config.ConfigProvider; +import com.geedgenetworks.common.config.GrootStreamConfig; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigObject; +import com.typesafe.config.ConfigUtil; +import com.typesafe.config.ConfigValueFactory; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; + +import java.nio.file.Path; +import java.util.Map; + + +public class JobSplitWithAggTest { + + @ClassRule + public static MiniClusterWithClientResource flinkCluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberSlotsPerTaskManager(1) + .setNumberTaskManagers(1) + .build()); + + @Test + public void testSplitForAgg() { + + CollectSink.values.clear(); + String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_agg_test.yaml"}; + ExecuteCommandArgs executeCommandArgs = CommandLineUtils + .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); + + executeCommandArgs.buildCommand(); + + + GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig(); + Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs); + // check config file exist + Map configMap = YamlUtil.loadByPath(configFile.toString()); + ConfigObject configObject = ConfigValueFactory.fromMap(configMap); + Config config = configObject.toConfig(); + + config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE), + ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); + + + JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig); + jobExecution.getSingleOutputStreamOperator(); + + try { + jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute(); + } catch (Exception e) { + throw new JobExecuteException("Job executed error", e); + } + Assert.assertEquals(1, CollectSink.values.size()); + Assert.assertEquals("2", CollectSink.values.get(0).getExtractedFields().get("sessions").toString()); + + } +} diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml index 732d0f6..872800f 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml @@ -3,7 +3,7 @@ sources: type : inline fields: # [array of object] Field List, if not set, all fields(Map) will be output. properties: - data: '[{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' + data: '[{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' interval.per.row: 1s # 可选 repeat.count: 1 # 可选 format: json @@ -58,7 +58,7 @@ postprocessing_pipelines: type: aggregate group_by_fields: [decoded_as] window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time - window_size: 3 + window_size: 5 window_timestamp_field: test_time functions: - function: NUMBER_SUM diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml index f13d69e..9bb2900 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml @@ -60,7 +60,7 @@ postprocessing_pipelines: type: aggregate group_by_fields: [decoded_as] window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time - window_size: 3 + window_size: 5 window_timestamp_field: test_time functions: - function: NUMBER_SUM diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/RouteConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/RouteConfigOptions.java deleted file mode 100644 index 4d4ef12..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/RouteConfigOptions.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.geedgenetworks.common.config; - -import com.alibaba.fastjson2.TypeReference; -import com.geedgenetworks.common.udf.RuleContext; -import java.util.List; - -public interface RouteConfigOptions { - Option TYPE = Options.key("type") - .stringType() - .noDefaultValue() - .withDescription("The type of route ."); - - Option> RULES = Options.key("rules") - .type(new TypeReference>() {}) - .noDefaultValue() - .withDescription("The rules to be executed."); - -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java new file mode 100644 index 0000000..a2acb71 --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java @@ -0,0 +1,18 @@ +package com.geedgenetworks.common.config; + +import com.alibaba.fastjson2.TypeReference; +import com.geedgenetworks.common.udf.RuleContext; +import java.util.List; + +public interface SplitConfigOptions { + Option TYPE = Options.key("type") + .stringType() + .noDefaultValue() + .withDescription("The type of route ."); + + Option> RULES = Options.key("rules") + .type(new TypeReference>() {}) + .noDefaultValue() + .withDescription("The rules to be executed."); + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java index 803fefc..4f9535d 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java @@ -31,8 +31,7 @@ import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect; public class AggregateProcessorFunction implements org.apache.flink.api.common.functions.AggregateFunction { private final List udfContexts; private final List udfClassNameLists; - private final List groupByFields; - private LinkedList functions; + private final LinkedList functions; public AggregateProcessorFunction(AggregateConfig aggregateConfig, ExecutionConfig config) { udfClassNameLists = JSON.parseObject(config.getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class); @@ -40,7 +39,6 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f if (udfContexts == null || udfContexts.isEmpty()) { throw new RuntimeException(); } - groupByFields = aggregateConfig.getGroup_by_fields(); functions = Lists.newLinkedList(); Map udfClassReflect = getClassReflect(udfClassNameLists); try { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java index 7a129ef..f07b568 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java @@ -21,8 +21,8 @@ import java.util.Map; @Slf4j public class SplitFunction extends ProcessFunction { - private SplitConfig splitConfig; - private List routes; + private final SplitConfig splitConfig; + private List rules; private transient InternalMetrics internalMetrics; public SplitFunction(SplitConfig splitConfig) { @@ -34,8 +34,8 @@ public class SplitFunction extends ProcessFunction { public void open(Configuration parameters) throws Exception { this.internalMetrics = new InternalMetrics(getRuntimeContext()); - this.routes = splitConfig.getRules(); - for(RuleContext rule :routes){ + this.rules = splitConfig.getRules(); + for(RuleContext rule : rules){ String expression = rule.getExpression(); AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); instance.setCachedExpressionByDefault(true); @@ -53,7 +53,7 @@ public class SplitFunction extends ProcessFunction { public void processElement(Event event, ProcessFunction.Context ctx, Collector out) throws Exception { try { internalMetrics.incrementInEvents(); - for (RuleContext route :routes){ + for (RuleContext route : rules){ boolean result = route.getExpression() != null ? (filterExecute(route.getCompiledExpression(), route.getCompiledExpression().newEnv("event", event.getExtractedFields()))) : true; if (result) { ctx.output(route.getOutputTag(), event); -- cgit v1.2.3 From 2e21d795d80f83b1f402eea98ed0b4835f15abb9 Mon Sep 17 00:00:00 2001 From: doufenghu Date: Tue, 27 Aug 2024 19:09:43 +0800 Subject: [feature][bootstrap]Add the SM4 algorithm for encrypting sensitive configuration data --- config/template/grootstream_job_template.yaml | 2 +- .../bootstrap/command/SM4ConfigShade.java | 37 ++++++++++++++++++++++ .../com.geedgenetworks.common.config.ConfigShade | 3 +- .../bootstrap/utils/ConfigShadeTest.java | 12 +++++-- groot-common/pom.xml | 7 ++++ .../geedgenetworks/example/GrootStreamExample.java | 2 +- .../main/resources/examples/inline_to_kafka.yaml | 6 ++-- pom.xml | 7 ++++ 8 files changed, 68 insertions(+), 8 deletions(-) create mode 100644 groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4ConfigShade.java diff --git a/config/template/grootstream_job_template.yaml b/config/template/grootstream_job_template.yaml index 7cf50c8..0ca2d68 100644 --- a/config/template/grootstream_job_template.yaml +++ b/config/template/grootstream_job_template.yaml @@ -151,7 +151,7 @@ preprocessing_pipelines: # [object] Define Processors for preprocessing pipeline # It will be accomplished the common processing for the event by the user-defined functions. # processing_pipelines: # [object] Define Processors for processing pipelines. - projection_processor: # [object] Define projection processor name, must be unique. + z: # [object] Define projection processor name, must be unique. type: projection # [string] Processor Type remove_fields: output_fields: diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4ConfigShade.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4ConfigShade.java new file mode 100644 index 0000000..05d3e52 --- /dev/null +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4ConfigShade.java @@ -0,0 +1,37 @@ +package com.geedgenetworks.bootstrap.command; + +import cn.hutool.crypto.KeyUtil; +import cn.hutool.crypto.SmUtil; +import cn.hutool.crypto.symmetric.SM4; +import com.geedgenetworks.common.config.ConfigShade; + +import java.nio.charset.StandardCharsets; + +public class SM4ConfigShade implements ConfigShade { + private static final String IDENTIFIER = "sm4"; + + private static final String[] SENSITIVE_OPTIONS = + new String[] {"connection.user", "connection.password", "kafka.sasl.jaas.config","kafka.ssl.keystore.password","kafka.ssl.truststore.password","kafka.ssl.key.password"}; + + private static final byte[] SECURITY_KEY = KeyUtil.generateKey(SM4.ALGORITHM_NAME, ".geedgenetworks.".getBytes(StandardCharsets.UTF_8)).getEncoded(); + + @Override + public String[] sensitiveOptions() { + return SENSITIVE_OPTIONS; + } + + @Override + public String getIdentifier() { + return IDENTIFIER; + } + + @Override + public String encrypt(String content) { + return SmUtil.sm4(SECURITY_KEY).encryptHex(content, StandardCharsets.UTF_8); + } + + @Override + public String decrypt(String content) { + return SmUtil.sm4(SECURITY_KEY).decryptStr(content, StandardCharsets.UTF_8); + } +} diff --git a/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.config.ConfigShade b/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.config.ConfigShade index 6654db5..f490f28 100644 --- a/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.config.ConfigShade +++ b/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.config.ConfigShade @@ -1,2 +1,3 @@ com.geedgenetworks.bootstrap.command.Base64ConfigShade -com.geedgenetworks.bootstrap.command.AESConfigShade \ No newline at end of file +com.geedgenetworks.bootstrap.command.AESConfigShade +com.geedgenetworks.bootstrap.command.SM4ConfigShade \ No newline at end of file diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/ConfigShadeTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/ConfigShadeTest.java index c3746a4..17f56ce 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/ConfigShadeTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/ConfigShadeTest.java @@ -65,8 +65,16 @@ public class ConfigShadeTest { Assertions.assertEquals("159c7da83d988a9ec041d10a6bfbe221bcbaed6b62d9cc1b04ff51e633ebd105", encryptPassword); Assertions.assertEquals(decryptUsername, USERNAME); Assertions.assertEquals(decryptPassword, PASSWORD); - System.out.println( ConfigShadeUtils.encryptOption("aes", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"galaxy2019\";")); - System.out.println( ConfigShadeUtils.decryptOption("aes", "454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817")); + encryptUsername = ConfigShadeUtils.encryptOption("sm4", USERNAME); + decryptUsername = ConfigShadeUtils.decryptOption("sm4", encryptUsername); + Assertions.assertEquals("72ea74367a15cb96b0d1d42104149519", encryptUsername); + Assertions.assertEquals(decryptUsername, USERNAME); + encryptPassword = ConfigShadeUtils.encryptOption("sm4", PASSWORD); + decryptPassword = ConfigShadeUtils.decryptOption("sm4", encryptPassword); + Assertions.assertEquals("3876c7088d395bbbfa826e3648b6c9a022e7f80941c132313bde6dc8a7f2351f", encryptPassword); + Assertions.assertEquals(decryptPassword, PASSWORD); + System.out.println( ConfigShadeUtils.encryptOption("sm4", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"galaxy2019\";")); + System.out.println( ConfigShadeUtils.decryptOption("sm4", "f76480be84a8ee1b009504c6c56a5bed48239c348a468f94b4029a6a3148f51530b025d6dfa140af93b4c7c6fe0e3dce543773e779d272b5579555fbd3271e7fdbee088673a901b3f3b28e914a25f30a4a859d97594c5ea7d7c1dcefe8c62560baea32b6da0b767232ed8aca17af2dc6")); System.out.println( ConfigShadeUtils.encryptOption("aes", "testuser")); System.out.println( ConfigShadeUtils.encryptOption("aes", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"olap\" password=\"galaxy2019\";")); } diff --git a/groot-common/pom.xml b/groot-common/pom.xml index 10e9ed4..37a4d25 100644 --- a/groot-common/pom.xml +++ b/groot-common/pom.xml @@ -41,6 +41,13 @@ hutool-all + + + org.bouncycastle + bcprov-jdk18on + + + org.apache.avro avro diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java index 0eba408..f435f59 100644 --- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java +++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java @@ -13,7 +13,7 @@ import java.nio.file.Paths; public class GrootStreamExample { public static void main(String[] args) throws FileNotFoundException, URISyntaxException { - String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print_with_aggregation.yaml"; + String configPath = args.length > 0 ? args[0] : "/examples/inline_to_kafka.yaml"; String configFile = getTestConfigFile(configPath); ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs(); executeCommandArgs.setConfigFile(configFile); diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml index 517d29b..63159c5 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml @@ -46,7 +46,7 @@ sinks: kafka.compression.type: snappy kafka.security.protocol: SASL_PLAINTEXT kafka.sasl.mechanism: PLAIN - kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + kafka.sasl.jaas.config: f76480be84a8ee1b009504c6c56a5bed48239c348a468f94b4029a6a3148f51530b025d6dfa140af93b4c7c6fe0e3dce543773e779d272b5579555fbd3271e7fdbee088673a901b3f3b28e914a25f30a4a859d97594c5ea7d7c1dcefe8c62560baea32b6da0b767232ed8aca17af2dc6 format: json log.failures.only: true @@ -64,7 +64,7 @@ sinks: kafka.compression.type: snappy kafka.security.protocol: SASL_PLAINTEXT kafka.sasl.mechanism: PLAIN - kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.sasl.jaas.config: f76480be84a8ee1b009504c6c56a5bed48239c348a468f94b4029a6a3148f51530b025d6dfa140af93b4c7c6fe0e3dce543773e779d272b5579555fbd3271e7fdbee088673a901b3f3b28e914a25f30a4a859d97594c5ea7d7c1dcefe8c62560baea32b6da0b767232ed8aca17af2dc6 format: json log.failures.only: true @@ -72,7 +72,7 @@ application: # [object] Define job configuration env: name: example-inline-to-kafka parallelism: 3 - shade.identifier: aes + shade.identifier: sm4 pipeline: object-reuse: true topology: diff --git a/pom.xml b/pom.xml index 53bcf1f..73d45b8 100644 --- a/pom.xml +++ b/pom.xml @@ -55,6 +55,7 @@ 2.4.0 2.0.32 5.8.22 + 1.78.1 2.0.2 2.0.0 5.3.3 @@ -391,6 +392,12 @@ ${hutool.version} + + org.bouncycastle + bcprov-jdk18on + ${bouncycastle.version} + + com.github.seancfoley ipaddress -- cgit v1.2.3 From d82e8100a1ea9297af71d1b2daa02f9b6358cc19 Mon Sep 17 00:00:00 2001 From: doufenghu Date: Tue, 27 Aug 2024 19:55:20 +0800 Subject: add SM4 description (GAL-650) --- docs/connector/config-encryption-decryption.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/connector/config-encryption-decryption.md b/docs/connector/config-encryption-decryption.md index 3146569..c2b05f6 100644 --- a/docs/connector/config-encryption-decryption.md +++ b/docs/connector/config-encryption-decryption.md @@ -6,14 +6,14 @@ In production environments, sensitive configuration items such as passwords are ## How to use -Groot Stream default support base64 and AES encryption and decryption. +Groot Stream support base64, AES and SM4 encryption and decryption. Base64 encryption support encrypt the following parameters: - username - password - auth -AES encryption support encrypt the following parameters: +AES/SM4 encryption support encrypt the following parameters: - username - password - auth -- cgit v1.2.3 From 0bfc0a2fb13409f816cd168f6cfbe353396bce1e Mon Sep 17 00:00:00 2001 From: doufenghu Date: Tue, 27 Aug 2024 21:22:58 +0800 Subject: rename version 1.6.0-SNAPSHOT --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 73d45b8..b88bd3f 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ - 1.5.0-SNAPSHOT + 1.6.0-SNAPSHOT 11 UTF-8 ${java.version} -- cgit v1.2.3 From 9da15fc59204a79eef815e0a4cddbf7027d44274 Mon Sep 17 00:00:00 2001 From: wangkuan Date: Wed, 28 Aug 2024 18:12:18 +0800 Subject: [feature][bootstrap][common] GAL-651 Groot Stream支持配置task properties MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/grootstream_job_example.yaml | 7 ++++++- .../bootstrap/execution/JobRuntimeEnvironment.java | 12 ++++++++++-- .../bootstrap/execution/RuntimeEnvironment.java | 5 ++++- .../geedgenetworks/bootstrap/main/simple/SimpleJobTest.java | 3 +-- .../src/test/resources/grootstream_job_etl_test.yaml | 4 +++- .../src/main/java/com/geedgenetworks/common/Constants.java | 3 +++ .../main/java/com/geedgenetworks/core/udf/PathCombine.java | 6 +++++- 7 files changed, 32 insertions(+), 8 deletions(-) diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml index 46d1123..b77958d 100644 --- a/config/grootstream_job_example.yaml +++ b/config/grootstream_job_example.yaml @@ -80,4 +80,9 @@ application: - name: aggregate_processor downstream: [ print_sink ] - name: print_sink - downstream: [] \ No newline at end of file + downstream: [] + properties: + hos.bucket.name.rtp_file: traffic_rtp_file_bucket + hos.bucket.name.http_file: traffic_http_file_bucket + hos.bucket.name.eml_file: traffic_eml_file_bucket + hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket \ No newline at end of file diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java index 2e68098..1962ce8 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java @@ -22,8 +22,6 @@ import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.TernaryBoolean; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.net.URL; import java.util.*; import java.util.stream.Collectors; @@ -37,6 +35,7 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ private String jobName = Constants.DEFAULT_JOB_NAME; private Set splitSet = new HashSet<>(); + private final Map taskProperties= new HashMap<>(); private JobRuntimeEnvironment(Config config, GrootStreamConfig grootStreamConfig) { this.grootStreamConfig = grootStreamConfig; @@ -65,6 +64,14 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ return envConfig; } + @Override + public RuntimeEnvironment initTaskProperties(Config taskProperties) { + this.taskProperties.putAll(taskProperties.root().unwrapped().entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue())))); + return this; + } + @Override public CheckResult checkConfig() { return EnvironmentUtil.checkRestartStrategy(envConfig); @@ -137,6 +144,7 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } configuration.setString(Constants.SYSPROP_UDF_PLUGIN_CONFIG, JSON.toJSONString(grootStreamConfig.getUDFPluginConfig())); configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(grootStreamConfig.getCommonConfig())); + configuration.setString(Constants.SYSPROP_GROOTSTREAM_TASK_PROPERTIES, JSON.toJSONString(taskProperties)); environment.getConfig().setGlobalJobParameters(configuration); setTimeCharacteristic(); setCheckpoint(); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java index bee1c0a..3b06c0c 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java @@ -13,12 +13,15 @@ import java.util.List; public interface RuntimeEnvironment { RuntimeEnvironment setEnvConfig(Config envConfig); Config getEnvConfig(); - + RuntimeEnvironment initTaskProperties(Config taskProperties); CheckResult checkConfig(); RuntimeEnvironment prepare(); void registerPlugin(List pluginPaths); default void initialize(Config config) { + if (config.getConfig(Constants.APPLICATION).hasPath(Constants.PROPERTIES)) { + this.initTaskProperties(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.PROPERTIES))); + } this.setEnvConfig(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV))).prepare(); } } diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java index 556c8c4..90ff95d 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java @@ -75,11 +75,10 @@ public class SimpleJobTest { assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("recv_time").toString())); assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("processing_time").toString())); assertTrue(0 != Long.parseLong(CollectSink.values.get(0).getExtractedFields().get("log_id").toString())); - Assert.assertEquals("http://192.168.44.12:9098/hos/traffic_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString()); + Assert.assertEquals("http://192.168.44.12:9098/hos/traffic_policy_capture_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString()); Assert.assertEquals("[2600:1015:b002::,255.255.255.255]", CollectSink.values.get(0).getExtractedFields().get("ip_string").toString()); Assert.assertEquals("hello", CollectSink.values.get(0).getExtractedFields().get("mail_attachment_name").toString()); Assert.assertEquals("MULTIPATH_ETHERNET", CollectSink.values.get(0).getExtractedFields().get("tunnels_schema_type").toString()); - List asn_list = (List) CollectSink.values.get(0).getExtractedFields().get("asn_list"); } diff --git a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml index 888c94e..7716d33 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml @@ -111,7 +111,7 @@ processing_pipelines: lookup_fields: [ packet_capture_file ] output_fields: [ packet_capture_file ] parameters: - path: [ props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file] + path: [ props.hos.path, props.hos.bucket.name.policy_capture_file, packet_capture_file] - function: STRING_JOINER lookup_fields: [ server_ip,client_ip ] output_fields: [ ip_string ] @@ -205,4 +205,6 @@ application: # [object] Application Configuration downstream: [collect_sink] - name: collect_sink parallelism: 1 +# properties: +# hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java index e973c20..ba572ef 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java @@ -11,6 +11,8 @@ public final class Constants { public static final String SINKS = "sinks"; public static final int SYSTEM_EXIT_CODE = 2618; public static final String APPLICATION = "application"; + + public static final String PROPERTIES = "properties"; public static final String SPLITS = "splits"; public static final String APPLICATION_ENV ="env"; public static final String APPLICATION_TOPOLOGY = "topology"; @@ -23,6 +25,7 @@ public final class Constants { " \\____||_| \\___/ \\___/ \\__| |____/ \\__||_| \\___| \\__,_||_| |_| |_|\n" + " \n"; + public static final String SYSPROP_GROOTSTREAM_TASK_PROPERTIES = "grootstream.task.properties"; public static final String SYSPROP_GROOTSTREAM_CONFIG = "grootstream.config"; public static final String SYSPROP_GROOTSTREAM_PREFIX = "props."; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java index 874735d..5a5393d 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java @@ -26,8 +26,12 @@ public class PathCombine implements ScalarFunction { Configuration configuration = (Configuration) runtimeContext .getExecutionConfig().getGlobalJobParameters(); + Map taskProperties = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_TASK_PROPERTIES), Map.class); CommonConfig engineConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class); - Map propertiesConfig =engineConfig.getPropertiesConfig(); + Map propertiesConfig = new HashMap<>(engineConfig.getPropertiesConfig()); + if(taskProperties!=null){ + propertiesConfig.putAll(taskProperties); + } if (udfContext.getParameters() != null && !udfContext.getParameters().isEmpty()) { String paths = udfContext.getParameters().getOrDefault("path","").toString(); -- cgit v1.2.3 From 8d90c04d22a5df3ac5a6d4d12fc1b9fee03f38e8 Mon Sep 17 00:00:00 2001 From: wangkuan Date: Thu, 29 Aug 2024 10:13:48 +0800 Subject: [improve][bootstrap] GAL-651 Groot Stream支持在作业(Job)级别定义变量,修改命名,并统一存储至Flink grootstream.config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java | 7 ++----- .../com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java | 4 ++-- groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml | 4 ++-- .../src/main/java/com/geedgenetworks/common/Constants.java | 2 -- .../src/main/java/com/geedgenetworks/core/udf/PathCombine.java | 6 +----- 5 files changed, 7 insertions(+), 16 deletions(-) diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java index 1962ce8..07fde4d 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java @@ -35,8 +35,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ private String jobName = Constants.DEFAULT_JOB_NAME; private Set splitSet = new HashSet<>(); - private final Map taskProperties= new HashMap<>(); - private JobRuntimeEnvironment(Config config, GrootStreamConfig grootStreamConfig) { this.grootStreamConfig = grootStreamConfig; this.initialize(config); @@ -65,8 +63,8 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } @Override - public RuntimeEnvironment initTaskProperties(Config taskProperties) { - this.taskProperties.putAll(taskProperties.root().unwrapped().entrySet() + public RuntimeEnvironment loadJobProperties(Config jobProperties) { + this.grootStreamConfig.getCommonConfig().getPropertiesConfig().putAll(jobProperties.root().unwrapped().entrySet() .stream() .collect(Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue())))); return this; @@ -144,7 +142,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } configuration.setString(Constants.SYSPROP_UDF_PLUGIN_CONFIG, JSON.toJSONString(grootStreamConfig.getUDFPluginConfig())); configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(grootStreamConfig.getCommonConfig())); - configuration.setString(Constants.SYSPROP_GROOTSTREAM_TASK_PROPERTIES, JSON.toJSONString(taskProperties)); environment.getConfig().setGlobalJobParameters(configuration); setTimeCharacteristic(); setCheckpoint(); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java index 3b06c0c..710e7f6 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java @@ -13,14 +13,14 @@ import java.util.List; public interface RuntimeEnvironment { RuntimeEnvironment setEnvConfig(Config envConfig); Config getEnvConfig(); - RuntimeEnvironment initTaskProperties(Config taskProperties); + RuntimeEnvironment loadJobProperties(Config jobProperties); CheckResult checkConfig(); RuntimeEnvironment prepare(); void registerPlugin(List pluginPaths); default void initialize(Config config) { if (config.getConfig(Constants.APPLICATION).hasPath(Constants.PROPERTIES)) { - this.initTaskProperties(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.PROPERTIES))); + this.loadJobProperties(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.PROPERTIES))); } this.setEnvConfig(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV))).prepare(); } diff --git a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml index 7716d33..76dcf7f 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml @@ -205,6 +205,6 @@ application: # [object] Application Configuration downstream: [collect_sink] - name: collect_sink parallelism: 1 -# properties: -# hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket + properties: + hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java index ba572ef..b523591 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java @@ -25,8 +25,6 @@ public final class Constants { " \\____||_| \\___/ \\___/ \\__| |____/ \\__||_| \\___| \\__,_||_| |_| |_|\n" + " \n"; - public static final String SYSPROP_GROOTSTREAM_TASK_PROPERTIES = "grootstream.task.properties"; - public static final String SYSPROP_GROOTSTREAM_CONFIG = "grootstream.config"; public static final String SYSPROP_GROOTSTREAM_PREFIX = "props."; public static final String SYSPROP_GROOTSTREAM_KB_SCHEDULER_INTERVAL_NAME = "scheduler.knowledge_base.update.interval.minutes"; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java index 5a5393d..874735d 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java @@ -26,12 +26,8 @@ public class PathCombine implements ScalarFunction { Configuration configuration = (Configuration) runtimeContext .getExecutionConfig().getGlobalJobParameters(); - Map taskProperties = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_TASK_PROPERTIES), Map.class); CommonConfig engineConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class); - Map propertiesConfig = new HashMap<>(engineConfig.getPropertiesConfig()); - if(taskProperties!=null){ - propertiesConfig.putAll(taskProperties); - } + Map propertiesConfig =engineConfig.getPropertiesConfig(); if (udfContext.getParameters() != null && !udfContext.getParameters().isEmpty()) { String paths = udfContext.getParameters().getOrDefault("path","").toString(); -- cgit v1.2.3 From 0ea9b9d9db5f92e7afd7b86ddad1f8d69d5c0945 Mon Sep 17 00:00:00 2001 From: wangkuan Date: Thu, 29 Aug 2024 18:24:42 +0800 Subject: [feature][bootstrap][core]增加预聚合功能,相关函数支持merge MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/grootstream_job_example.yaml | 1 + .../bootstrap/main/simple/JobSplitWithAggTest.java | 4 +- .../resources/grootstream_job_split_agg_test.yaml | 51 +++------------- .../common/config/AggregateConfigOptions.java | 7 ++- .../common/udf/AggregateFunction.java | 7 +-- .../geedgenetworks/core/pojo/AggregateConfig.java | 8 ++- .../geedgenetworks/core/pojo/OnlineStatistics.java | 32 ++++++++++ .../aggregate/AggregateProcessorImpl.java | 68 +++++++++++++++++----- .../core/processor/aggregate/KeySelector.java | 14 +++-- .../geedgenetworks/core/udf/udaf/CollectList.java | 55 ++++++++++------- .../geedgenetworks/core/udf/udaf/CollectSet.java | 26 ++++++--- .../geedgenetworks/core/udf/udaf/FirstValue.java | 46 ++++++++------- .../HdrHistogram/HdrHistogramBaseAggregate.java | 5 +- .../geedgenetworks/core/udf/udaf/LastValue.java | 48 ++++++++------- .../geedgenetworks/core/udf/udaf/LongCount.java | 23 ++++++-- .../com/geedgenetworks/core/udf/udaf/Mean.java | 42 ++++++++----- .../geedgenetworks/core/udf/udaf/NumberSum.java | 57 ++++++++++++------ .../udf/udaf/hlld/HlldApproxCountDistinct.java | 5 +- .../core/udf/udaf/hlld/HlldBaseAggregate.java | 5 +- .../core/udf/test/aggregate/CollectListTest.java | 44 +++++++++++++- .../core/udf/test/aggregate/CollectSetTest.java | 45 +++++++++++++- .../core/udf/test/aggregate/FirstValueTest.java | 41 ++++++++++++- .../core/udf/test/aggregate/LastValueTest.java | 41 ++++++++++++- .../core/udf/test/aggregate/LongCountTest.java | 41 ++++++++++++- .../core/udf/test/aggregate/MeanTest.java | 53 +++++++++++++++-- .../core/udf/test/aggregate/NumberSumTest.java | 37 ++++++++++++ groot-tests/test-e2e-kafka/pom.xml | 12 ++++ 27 files changed, 614 insertions(+), 204 deletions(-) diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml index b77958d..f5c6610 100644 --- a/config/grootstream_job_example.yaml +++ b/config/grootstream_job_example.yaml @@ -34,6 +34,7 @@ processing_pipelines: window_timestamp_field: recv_time window_size: 6 window_slide: 10 #滑动窗口步长 + mini_batch: true functions: - function: NUMBER_SUM lookup_fields: [ sent_pkts ] diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java index 2edc5e7..6ed3888 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java @@ -65,8 +65,10 @@ public class JobSplitWithAggTest { } catch (Exception e) { throw new JobExecuteException("Job executed error", e); } - Assert.assertEquals(1, CollectSink.values.size()); + + Assert.assertEquals(2, CollectSink.values.size()); Assert.assertEquals("2", CollectSink.values.get(0).getExtractedFields().get("sessions").toString()); + Assert.assertEquals("1.5", CollectSink.values.get(0).getExtractedFields().get("pkts").toString()); } } diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml index 872800f..6a7011a 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml @@ -3,66 +3,34 @@ sources: type : inline fields: # [array of object] Field List, if not set, all fields(Map) will be output. properties: - data: '[{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' + data: '[{"pkts":2,"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":1,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":1,"server_ip":"2600:1015:b002::"},{"sessions":0,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925799000", "client_ip":"192.168.0.2","pkts":1,"server_ip":"2600:1015:b002::"}]' interval.per.row: 1s # 可选 repeat.count: 1 # 可选 format: json json.ignore.parse.errors: false + watermark_timestamp: recv_time + watermark_timestamp_unit: ms + watermark_lag: 10 sinks: collect_sink: type: collect properties: format: json -splits: - test_split: - type: split - rules: - - name: aggregate_processor - expression: event.decoded_as == 'DNS' postprocessing_pipelines: - pre_etl_processor: # [object] Processing Pipeline - type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl - remove_fields: [fields,tags] - output_fields: - functions: # [array of object] Function List - - - function: FLATTEN - lookup_fields: [ fields,tags ] - output_fields: [ ] - parameters: - #prefix: "" - depth: 3 - # delimiter: "." - - - function: RENAME - lookup_fields: [ '' ] - output_fields: [ '' ] - filter: - parameters: - # parent_fields: [tags] - # rename_fields: - # tags: tags - rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key; - - - - function: UNIX_TIMESTAMP_CONVERTER - lookup_fields: [ timestamp_ms ] - output_fields: [ recv_time ] - parameters: - precision: seconds - interval: 300 - # aggregate_processor: type: aggregate group_by_fields: [decoded_as] - window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time + window_type: tumbling_event_time # tumbling_event_time,sliding_processing_time,sliding_event_time window_size: 5 window_timestamp_field: test_time + mini_batch: true functions: - function: NUMBER_SUM lookup_fields: [ sessions ] + - function: MEAN + lookup_fields: [ pkts ] table_processor: type: table @@ -79,9 +47,6 @@ application: # [object] Application Configuration topology: # [array of object] Node List. It will be used build data flow for job dag graph. - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. parallelism: 1 # [number] Operator-Level Parallelism. - downstream: [test_split] - - name: test_split - parallelism: 1 downstream: [ aggregate_processor ] - name: aggregate_processor parallelism: 1 diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java index 0b0379d..af94abf 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java @@ -5,8 +5,6 @@ import com.geedgenetworks.common.udf.UDFContext; import java.util.List; -import static com.geedgenetworks.common.Event.WINDOW_START_TIMESTAMP; - public interface AggregateConfigOptions { Option TYPE = Options.key("type") .stringType() @@ -42,7 +40,10 @@ public interface AggregateConfigOptions { .intType() .noDefaultValue() .withDescription("The size of window."); - + Option MINI_BATCH = Options.key("mini_batch") + .booleanType() + .defaultValue(false) + .withDescription("The label of pre_aggrergate."); Option WINDOW_SLIDE = Options.key("window_slide") .intType() .noDefaultValue() diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java index 455073f..6e5ab80 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java @@ -8,14 +8,11 @@ import java.io.Serializable; public interface AggregateFunction extends Serializable { void open(UDFContext udfContext); - Accumulator initAccumulator(Accumulator acc); - Accumulator add(Event val, Accumulator acc); - String functionName(); - + default Accumulator getMiddleResult(Accumulator acc){return acc;} Accumulator getResult(Accumulator acc); - + Accumulator merge(Accumulator a, Accumulator b); default void close(){}; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java index d3cbaac..ebdb0bd 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java @@ -1,15 +1,15 @@ package com.geedgenetworks.core.pojo; +import com.alibaba.fastjson2.annotation.JSONField; import com.geedgenetworks.common.udf.UDFContext; import lombok.Data; import lombok.EqualsAndHashCode; -import java.io.Serializable; import java.util.List; -import java.util.Map; + @EqualsAndHashCode(callSuper = true) @Data -public class AggregateConfig extends ProcessorConfig { +public class AggregateConfig extends ProcessorConfig { private List group_by_fields; @@ -18,5 +18,7 @@ public class AggregateConfig extends ProcessorConfig { private Integer window_size; private Integer window_slide; private List functions; + @JSONField(defaultValue = "false" ) + private Boolean mini_batch; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java index 416a7ea..2508730 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java @@ -13,18 +13,50 @@ public class OnlineStatistics { aggregate += (delta * (val.doubleValue() - mean)); return this; } + + public OnlineStatistics merge(OnlineStatistics other) { + if (other.n == 0) { + return this; // Nothing to merge + } + if (this.n == 0) { + this.n = other.n; + this.mean = other.mean; + this.aggregate = other.aggregate; + return this; + } + + // Combine counts + long newN = this.n + other.n; + + // Calculate the new mean + double delta = other.mean - this.mean; + this.mean += delta * (other.n / (double) newN); + + // Update the aggregate + this.aggregate += other.aggregate + + (this.n * delta * delta) / newN; + + // Update the count + this.n = newN; + + return this; + } + //计算总体标准差 public double stddevp() { return Math.sqrt(variancep()); } + //计算总体方差 public double variancep() { return aggregate / n; } + //计算样本标准差 public double stddev() { return Math.sqrt(variance()); } + //计算样本方差 public double variance() { return aggregate / (n - 1); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java index cf78310..c261fb6 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java @@ -12,42 +12,83 @@ import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeW import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.util.OutputTag; import static com.geedgenetworks.common.Constants.*; -public class AggregateProcessorImpl implements AggregateProcessor { +public class AggregateProcessorImpl implements AggregateProcessor { @Override - public DataStream processorFunction(DataStream grootEventDataStream, AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception { + public DataStream processorFunction(DataStream grootEventSingleOutputStreamOperator, AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception { - if (aggregateConfig.getParallelism() != 0) { + SingleOutputStreamOperator singleOutputStreamOperator; + if (aggregateConfig.getMini_batch()) { switch (aggregateConfig.getWindow_type()) { case TUMBLING_PROCESSING_TIME: - return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .process(new FirstAggregationProcessingTime(aggregateConfig, aggregateConfig.getWindow_size())) + .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields())) + .window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))) + .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; case TUMBLING_EVENT_TIME: - return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .process(new FirstAggregationEventTime(aggregateConfig, aggregateConfig.getWindow_size())) + .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields())) + .window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))) + .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; case SLIDING_PROCESSING_TIME: - return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .process(new FirstAggregationProcessingTime(aggregateConfig, aggregateConfig.getWindow_slide())) + .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields())) + .window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))) + .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; case SLIDING_EVENT_TIME: - return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .process(new FirstAggregationEventTime(aggregateConfig, aggregateConfig.getWindow_slide())) + .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields())) + .window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))) + .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; default: throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type"); + } - }else { + } else { switch (aggregateConfig.getWindow_type()) { case TUMBLING_PROCESSING_TIME: - return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())) + .window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))) + .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; case TUMBLING_EVENT_TIME: - return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())) + .window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))) + .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; case SLIDING_PROCESSING_TIME: - return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())) + .window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))) + .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; case SLIDING_EVENT_TIME: - return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())) + .window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))) + .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; default: throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type"); } } + if (aggregateConfig.getParallelism() != 0) { + singleOutputStreamOperator.setParallelism(aggregateConfig.getParallelism()); + } + return singleOutputStreamOperator.name(aggregateConfig.getName()); } @@ -55,4 +96,5 @@ public class AggregateProcessorImpl implements AggregateProcessor { public String type() { return "aggregate"; } + } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java index 165ed1b..da09690 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java @@ -1,5 +1,6 @@ package com.geedgenetworks.core.processor.aggregate; +import cn.hutool.crypto.SecureUtil; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.KeybyEntity; @@ -20,16 +21,17 @@ public class KeySelector implements org.apache.flink.api.java.functions.KeySelec KeybyEntity keybyEntity = new KeybyEntity(new HashMap<>()); StringBuilder stringBuilder = new StringBuilder(); - for(String key: keys){ + for (String key : keys) { - if(value.getExtractedFields().containsKey(key)){ - keybyEntity.getKeys().put(key,value.getExtractedFields().get(key)); + if (value.getExtractedFields().containsKey(key)) { + keybyEntity.getKeys().put(key, value.getExtractedFields().get(key)); stringBuilder.append(value.getExtractedFields().get(key).toString()); - }else { + } else { stringBuilder.append(","); } } - keybyEntity.setKeysToString(stringBuilder.toString()); - return keybyEntity; + String hashedKey = SecureUtil.md5(stringBuilder.toString()); + keybyEntity.setKeysToString(hashedKey); + return keybyEntity; } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java index 423eff9..3921ee2 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java @@ -1,17 +1,17 @@ /** - * Copyright 2017 Hortonworks. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Copyright 2017 Hortonworks. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. **/ package com.geedgenetworks.core.udf.udaf; @@ -22,9 +22,9 @@ import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.UDFContext; -import com.geedgenetworks.core.processor.projection.UdfEntity; -import java.util.*; +import java.util.ArrayList; +import java.util.List; /** * Collects elements within a group and returns the list of aggregated objects @@ -36,18 +36,18 @@ public class CollectList implements AggregateFunction { @Override public void open(UDFContext udfContext) { - if(udfContext.getLookup_fields()==null ){ + if (udfContext.getLookup_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - this.lookupField = udfContext.getLookup_fields().get(0); - if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + this.lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { this.outputField = udfContext.getOutput_fields().get(0); - } - else { + } else { outputField = lookupField; } } + @Override public Accumulator initAccumulator(Accumulator acc) { acc.getMetricsFields().put(outputField, new ArrayList<>()); @@ -56,7 +56,7 @@ public class CollectList implements AggregateFunction { @Override public Accumulator add(Event event, Accumulator acc) { - if(event.getExtractedFields().containsKey(lookupField)){ + if (event.getExtractedFields().containsKey(lookupField)) { Object object = event.getExtractedFields().get(lookupField); List aggregate = (List) acc.getMetricsFields().get(outputField); aggregate.add(object); @@ -75,4 +75,17 @@ public class CollectList implements AggregateFunction { return acc; } + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + List firstValue = (List) firstAcc.getMetricsFields().get(outputField); + List secondValue = (List) secondAcc.getMetricsFields().get(outputField); + firstValue.addAll(secondValue); + } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + List secondValue = (List) secondAcc.getMetricsFields().get(outputField); + firstAcc.getMetricsFields().put(outputField, secondValue); + } + return firstAcc; + } + } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java index b4dfb14..9ec9b09 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java @@ -8,7 +8,6 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.UDFContext; -import java.util.ArrayList; import java.util.HashSet; import java.util.Set; @@ -23,17 +22,17 @@ public class CollectSet implements AggregateFunction { @Override public void open(UDFContext udfContext) { - if(udfContext.getLookup_fields()==null ){ + if (udfContext.getLookup_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - this.lookupField = udfContext.getLookup_fields().get(0); - if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + this.lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { this.outputField = udfContext.getOutput_fields().get(0); - } - else { + } else { outputField = lookupField; } } + @Override public Accumulator initAccumulator(Accumulator acc) { acc.getMetricsFields().put(outputField, new HashSet<>()); @@ -42,7 +41,7 @@ public class CollectSet implements AggregateFunction { @Override public Accumulator add(Event event, Accumulator acc) { - if(event.getExtractedFields().containsKey(lookupField)){ + if (event.getExtractedFields().containsKey(lookupField)) { Object object = event.getExtractedFields().get(lookupField); Set aggregate = (Set) acc.getMetricsFields().get(outputField); aggregate.add(object); @@ -61,5 +60,16 @@ public class CollectSet implements AggregateFunction { return acc; } - + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + Set firstValue = (Set) firstAcc.getMetricsFields().get(outputField); + Set secondValue = (Set) secondAcc.getMetricsFields().get(outputField); + firstValue.addAll(secondValue); + } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + Set secondValue = (Set) secondAcc.getMetricsFields().get(outputField); + firstAcc.getMetricsFields().put(outputField, secondValue); + } + return firstAcc; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java index 6301a01..a1a35be 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java @@ -1,17 +1,17 @@ /** - * Copyright 2017 Hortonworks. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Copyright 2017 Hortonworks. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. **/ package com.geedgenetworks.core.udf.udaf; @@ -23,8 +23,6 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.UDFContext; -import java.util.ArrayList; - /** * Collects elements within a group and returns the list of aggregated objects */ @@ -36,14 +34,13 @@ public class FirstValue implements AggregateFunction { @Override public void open(UDFContext udfContext) { - if(udfContext.getLookup_fields()==null ){ + if (udfContext.getLookup_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - this.lookupField = udfContext.getLookup_fields().get(0); - if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + this.lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { this.outputField = udfContext.getOutput_fields().get(0); - } - else { + } else { outputField = lookupField; } } @@ -55,7 +52,7 @@ public class FirstValue implements AggregateFunction { @Override public Accumulator add(Event event, Accumulator acc) { - if(!acc.getMetricsFields().containsKey(outputField) && event.getExtractedFields().containsKey(lookupField)){ + if (!acc.getMetricsFields().containsKey(outputField) && event.getExtractedFields().containsKey(lookupField)) { acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField)); } return acc; @@ -71,4 +68,11 @@ public class FirstValue implements AggregateFunction { return acc; } + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField)); + } + return firstAcc; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java index 1648fa5..6af0be3 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java @@ -95,7 +95,10 @@ public abstract class HdrHistogramBaseAggregate implements AggregateFunction { his.merge(h); } - + @Override + public Accumulator merge(Accumulator a, Accumulator b) { + return null; + } @Override public void close() {} } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java index f27a2e6..44b374e 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java @@ -1,17 +1,17 @@ /** - * Copyright 2017 Hortonworks. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Copyright 2017 Hortonworks. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. **/ package com.geedgenetworks.core.udf.udaf; @@ -23,9 +23,6 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.UDFContext; -import java.util.ArrayList; -import java.util.List; - /** * Collects elements within a group and returns the list of aggregated objects */ @@ -37,17 +34,17 @@ public class LastValue implements AggregateFunction { @Override public void open(UDFContext udfContext) { - if(udfContext.getLookup_fields()==null ){ + if (udfContext.getLookup_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - this.lookupField = udfContext.getLookup_fields().get(0); - if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + this.lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { this.outputField = udfContext.getOutput_fields().get(0); - } - else { + } else { outputField = lookupField; } } + @Override public Accumulator initAccumulator(Accumulator acc) { return acc; @@ -55,7 +52,7 @@ public class LastValue implements AggregateFunction { @Override public Accumulator add(Event event, Accumulator acc) { - if(event.getExtractedFields().containsKey(lookupField)){ + if (event.getExtractedFields().containsKey(lookupField)) { acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField)); } return acc; @@ -71,4 +68,11 @@ public class LastValue implements AggregateFunction { return acc; } + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (secondAcc.getMetricsFields().containsKey(outputField)) { + firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField)); + } + return firstAcc; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java index ea33271..05de38c 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java @@ -13,20 +13,22 @@ public class LongCount implements AggregateFunction { @Override - public void open(UDFContext udfContext){ - if(udfContext.getOutput_fields()==null ){ + public void open(UDFContext udfContext) { + if (udfContext.getOutput_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } outputField = udfContext.getOutput_fields().get(0); } + @Override public Accumulator initAccumulator(Accumulator acc) { return acc; } + @Override public Accumulator add(Event event, Accumulator acc) { - acc.getMetricsFields().compute(outputField, (k, v) -> (v == null) ? 1L : (long)v + 1L); + acc.getMetricsFields().compute(outputField, (k, v) -> (v == null) ? 1L : (long) v + 1L); return acc; } @@ -40,5 +42,18 @@ public class LongCount implements AggregateFunction { return acc; } - + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + + long firstValue = (long) firstAcc.getMetricsFields().get(outputField); + long secondValue = (long) secondAcc.getMetricsFields().get(outputField); + firstValue = firstValue + secondValue; + firstAcc.getMetricsFields().put(outputField, firstValue); + } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + Number secondValue = (Number) secondAcc.getMetricsFields().get(outputField); + firstAcc.getMetricsFields().put(outputField, secondValue); + } + return firstAcc; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java index 2a615ef..9c4e070 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java @@ -16,20 +16,20 @@ public class Mean implements AggregateFunction { private String outputField; private Integer precision; private DecimalFormat df; + @Override - public void open(UDFContext udfContext){ + public void open(UDFContext udfContext) { - if(udfContext.getLookup_fields()==null ){ + if (udfContext.getLookup_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - lookupField = udfContext.getLookup_fields().get(0); - if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { outputField = udfContext.getOutput_fields().get(0); - } - else { + } else { outputField = lookupField; } - if(udfContext.getParameters()!= null && !udfContext.getParameters().isEmpty()) { + if (udfContext.getParameters() != null && !udfContext.getParameters().isEmpty()) { precision = Integer.parseInt(udfContext.getParameters().getOrDefault("precision", "-1").toString()); if (precision > 0) { StringBuilder pattern = new StringBuilder("#."); @@ -38,14 +38,15 @@ public class Mean implements AggregateFunction { } df = new DecimalFormat(pattern.toString()); } - }else { + } else { precision = -1; } } + @Override public Accumulator initAccumulator(Accumulator acc) { - acc.getMetricsFields().put(outputField,new OnlineStatistics()); + acc.getMetricsFields().put(outputField, new OnlineStatistics()); return acc; } @@ -67,16 +68,27 @@ public class Mean implements AggregateFunction { @Override public Accumulator getResult(Accumulator acc) { OnlineStatistics aggregate = (OnlineStatistics) acc.getMetricsFields().get(outputField); - if(precision<0){ + if (precision < 0) { acc.getMetricsFields().put(outputField, aggregate.mean()); - } - else if(precision>0){ + } else if (precision > 0) { acc.getMetricsFields().put(outputField, df.format(aggregate.mean())); - } - else { - acc.getMetricsFields().put(outputField,(long)aggregate.mean()); + } else { + acc.getMetricsFields().put(outputField, (long) aggregate.mean()); } return acc; } + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + OnlineStatistics acc1 = (OnlineStatistics) firstAcc.getMetricsFields().get(outputField); + acc1.merge((OnlineStatistics) secondAcc.getMetricsFields().get(outputField)); + long inEvents = firstAcc.getInEvents() + (secondAcc.getInEvents()); + long outEvent = firstAcc.getOutEvents() + (secondAcc.getOutEvents()); + long error = firstAcc.getErrorCount() + (secondAcc.getErrorCount()); + firstAcc.setInEvents(inEvents); + firstAcc.setErrorCount(error); + firstAcc.setOutEvents(outEvent); + return firstAcc; + } + } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java index 01e9a5b..e972133 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java @@ -6,7 +6,6 @@ import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.UDFContext; -import com.geedgenetworks.core.pojo.OnlineStatistics; public class NumberSum implements AggregateFunction { @@ -15,15 +14,14 @@ public class NumberSum implements AggregateFunction { @Override - public void open(UDFContext udfContext){ - if(udfContext.getLookup_fields()==null ){ + public void open(UDFContext udfContext) { + if (udfContext.getLookup_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - lookupField = udfContext.getLookup_fields().get(0); - if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { outputField = udfContext.getOutput_fields().get(0); - } - else { + } else { outputField = lookupField; } } @@ -32,22 +30,23 @@ public class NumberSum implements AggregateFunction { public Accumulator initAccumulator(Accumulator acc) { return acc; } + @Override public Accumulator add(Event event, Accumulator acc) { - if(event.getExtractedFields().containsKey(lookupField)){ - Number val = (Number) event.getExtractedFields().get(lookupField); - Number aggregate = (Number) acc.getMetricsFields().getOrDefault(outputField, 0L); - if (aggregate instanceof Long && ( val instanceof Integer|| val instanceof Long)) { - aggregate = aggregate.longValue() + val.longValue(); - } else if (aggregate instanceof Float || val instanceof Float) { - aggregate = aggregate.floatValue() + val.floatValue(); - } else if (aggregate instanceof Double || val instanceof Double) { - aggregate = aggregate.doubleValue() + val.doubleValue(); - } - acc.getMetricsFields().put(outputField, aggregate); + if (event.getExtractedFields().containsKey(lookupField)) { + Number val = (Number) event.getExtractedFields().get(lookupField); + Number aggregate = (Number) acc.getMetricsFields().getOrDefault(outputField, 0L); + if (aggregate instanceof Long && (val instanceof Integer || val instanceof Long)) { + aggregate = aggregate.longValue() + val.longValue(); + } else if (aggregate instanceof Float || val instanceof Float) { + aggregate = aggregate.floatValue() + val.floatValue(); + } else if (aggregate instanceof Double || val instanceof Double) { + aggregate = aggregate.doubleValue() + val.doubleValue(); } - return acc; + acc.getMetricsFields().put(outputField, aggregate); + } + return acc; } @Override @@ -65,4 +64,24 @@ public class NumberSum implements AggregateFunction { } + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + + Number firstValue = (Number) firstAcc.getMetricsFields().get(outputField); + Number secondValue = (Number) secondAcc.getMetricsFields().get(outputField); + if (firstValue instanceof Long && (secondValue instanceof Integer || secondValue instanceof Long)) { + firstValue = firstValue.longValue() + secondValue.longValue(); + } else if (firstValue instanceof Float || secondValue instanceof Float) { + firstValue = firstValue.floatValue() + secondValue.floatValue(); + } else if (firstValue instanceof Double || secondValue instanceof Double) { + firstValue = firstValue.doubleValue() + secondValue.doubleValue(); + } + firstAcc.getMetricsFields().put(outputField, firstValue); + } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + Number secondValue = (Number) secondAcc.getMetricsFields().get(outputField); + firstAcc.getMetricsFields().put(outputField, secondValue); + } + return firstAcc; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java index ec003f8..041bad9 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java @@ -16,7 +16,10 @@ public class HlldApproxCountDistinct extends HlldBaseAggregate { return acc; } - + @Override + public Accumulator merge(Accumulator a, Accumulator b) { + return null; + } @Override public String functionName() { return "APPROX_COUNT_DISTINCT_HLLD"; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java index 71d61dc..d6c3a44 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java @@ -92,7 +92,10 @@ public abstract class HlldBaseAggregate implements AggregateFunction { Hll hll = HllUtils.deserializeHll(value); hllUnion.update(hll); } - + @Override + public Accumulator merge(Accumulator a, Accumulator b) { + return null; + } @Override public void close() {} } diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java index b0d846b..8c0fe3f 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java @@ -36,11 +36,51 @@ public class CollectListTest { public void test() throws ParseException { List arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4"); - excute(arr); + List arr2 = List.of("192.168.1.5", "192.168.1.6", "192.168.1.3", "192.168.1.4"); + testMerge(arr,arr2); + testGetResult(arr); } - private static void excute(List arr) throws ParseException { + private void testMerge(List arr,List arr2) { + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_list")); + CollectList collectList = new CollectList(); + Map metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + collectList.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = collectList.getResult(collectList.merge(result1,result2)); + List vals = (List) result.getMetricsFields().get("field_list"); + assertEquals(vals.size(),8); + assertEquals("192.168.1.6",vals.get(5).toString()); + + } + private Accumulator getMiddleResult(UDFContext udfContext,List arr) { + + + CollectList collectList = new CollectList(); + Map metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + collectList.open(udfContext); + Accumulator agg = collectList.initAccumulator(accumulator); + + for (String o : arr) { + Event event = new Event(); + Map extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = collectList.add(event, agg); + + } + return collectList.getMiddleResult(agg); + } + private void testGetResult(List arr) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java index ea4fe8d..47e74bd 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java @@ -31,14 +31,53 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class CollectSetTest { @Test - public void testNumberSumTest() throws ParseException { + public void test() throws ParseException { List arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4"); - excute(arr); + List arr2 = List.of("192.168.1.5", "192.168.1.6", "192.168.1.3", "192.168.1.4"); + testMerge(arr,arr2); + testGetResult(arr); } - private static void excute(List arr) throws ParseException { + private void testMerge(List arr,List arr2) { + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_list")); + CollectSet collectSet = new CollectSet(); + Map metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + collectSet.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = collectSet.getResult(collectSet.merge(result1,result2)); + Set vals = (Set) result.getMetricsFields().get("field_list"); + assertEquals(vals.size(),6); + + } + private Accumulator getMiddleResult(UDFContext udfContext,List arr) { + + + CollectSet collectSet = new CollectSet(); + Map metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + collectSet.open(udfContext); + Accumulator agg = collectSet.initAccumulator(accumulator); + + for (String o : arr) { + Event event = new Event(); + Map extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = collectSet.add(event, agg); + + } + return collectSet.getMiddleResult(agg); + } + private static void testGetResult(List arr) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java index 506f6de..3d87b14 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java @@ -34,11 +34,48 @@ public class FirstValueTest { public void test() throws ParseException { List arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4"); - excute(arr); + List arr2 = List.of("192.168.1.2", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4"); + testMerge(arr,arr2); + testGetResult(arr); } + private void testMerge(List arr,List arr2) { - private static void excute(List arr) throws ParseException { + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_first")); + FirstValue firstValue = new FirstValue(); + Map metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + firstValue.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = firstValue.getResult(firstValue.merge(result1,result2)); + String val = (String) result.getMetricsFields().get("field_first"); + assertEquals(val,"192.168.1.1"); + + } + private Accumulator getMiddleResult(UDFContext udfContext,List arr) { + + + FirstValue firstValue = new FirstValue(); + Map metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + firstValue.open(udfContext); + Accumulator agg = firstValue.initAccumulator(accumulator); + for (String o : arr) { + Event event = new Event(); + Map extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = firstValue.add(event, agg); + + } + return firstValue.getMiddleResult(agg); + } + private static void testGetResult(List arr) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java index f8306cd..3d61019 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java @@ -37,11 +37,48 @@ public class LastValueTest { public void test() throws ParseException { List arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4"); - excute(arr); + List arr2 = List.of("192.168.1.2", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.3"); + testMerge(arr,arr2); + testGetResult(arr); } + private void testMerge(List arr,List arr2) { - private static void excute(List arr) throws ParseException { + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_last")); + LastValue lastValue = new LastValue(); + Map metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + lastValue.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = lastValue.getResult(lastValue.merge(result1,result2)); + String val = (String) result.getMetricsFields().get("field_last"); + assertEquals(val,"192.168.1.3"); + + } + private Accumulator getMiddleResult(UDFContext udfContext,List arr) { + + + LastValue lastValue = new LastValue(); + Map metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + lastValue.open(udfContext); + Accumulator agg = lastValue.initAccumulator(accumulator); + for (String o : arr) { + Event event = new Event(); + Map extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = lastValue.add(event, agg); + + } + return lastValue.getMiddleResult(agg); + } + private static void testGetResult(List arr) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java index 3c02499..d13df72 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java @@ -19,6 +19,7 @@ package com.geedgenetworks.core.udf.test.aggregate; import com.geedgenetworks.common.Accumulator; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.udaf.LastValue; import com.geedgenetworks.core.udf.udaf.LongCount; import com.geedgenetworks.core.udf.udaf.NumberSum; import com.ibm.icu.text.NumberFormat; @@ -38,10 +39,46 @@ public class LongCountTest { public void test() throws ParseException { Long[] longArr = new Long[]{1L, 2L, 3L, 4L}; - excute(longArr); + Long[] longArr2 = new Long[]{1L, 2L, 3L, 4L}; + testMerge(longArr,longArr2); + testGetResult(longArr); } + private void testMerge(Number[] arr,Number[] arr2) { - private static void excute(Number[] arr) throws ParseException { + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("count")); + LongCount longCount = new LongCount(); + Map metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + longCount.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = longCount.getResult(longCount.merge(result1,result2)); + assertEquals(Integer.parseInt((result.getMetricsFields().get("count").toString())),8); + + } + private Accumulator getMiddleResult(UDFContext udfContext,Number[] arr) { + + + LongCount longCount = new LongCount(); + Map metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + longCount.open(udfContext); + Accumulator agg = longCount.initAccumulator(accumulator); + for (Number o : arr) { + Event event = new Event(); + Map extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = longCount.add(event, agg); + + } + return longCount.getMiddleResult(agg); + } + private static void testGetResult(Number[] arr) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setOutput_fields(Collections.singletonList("count")); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java index 6deed0f..2927f43 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java @@ -39,12 +39,52 @@ public class MeanTest { Integer[] intArr1 = new Integer[]{1, 2, 3, 4}; Integer[] intArr2 = new Integer[]{1, 6, 3}; - excute(intArr1, 0); - excute2(intArr2, 2); - excute3(intArr1); + testInt(intArr1, 0); + testDouble(intArr2, 2); + testNoPrecision(intArr1); + testMerge(intArr1,intArr2,2); + } + private void testMerge(Number[] arr1,Number[] arr2,int precision) throws ParseException { + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_mean")); + udfContext.setParameters(new HashMap<>()); + udfContext.getParameters().put("precision", precision); + Mean mean = new Mean(); + Map metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + mean.open(udfContext); + Accumulator result1 = getMiddleResult(arr1,precision); + Accumulator result2 = getMiddleResult(arr2,precision); + Accumulator result = mean.getResult(mean.merge(result1,result2)); + assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_mean").toString())),NumberFormat.getInstance().parse("2.86")); + } + private Accumulator getMiddleResult(Number[] arr,int precision) throws ParseException { + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_mean")); + udfContext.setParameters(new HashMap<>()); + udfContext.getParameters().put("precision", precision); + Mean mean = new Mean(); + Map metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + mean.open(udfContext); + Accumulator agg = mean.initAccumulator(accumulator); + for (Number o : arr) { + Event event = new Event(); + Map extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = mean.add(event, agg); + + } + return mean.getMiddleResult(agg); } - private static void excute(Number[] arr,int precision) throws ParseException { + + private void testInt(Number[] arr,int precision) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); @@ -65,11 +105,12 @@ public class MeanTest { agg = mean.add(event, agg); } + Accumulator result = mean.getResult(agg); assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_mean").toString())),NumberFormat.getInstance().parse("2")); } - private static void excute2(Number[] arr,int precision) throws ParseException { + private void testDouble(Number[] arr,int precision) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); @@ -94,7 +135,7 @@ public class MeanTest { assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_mean").toString())),NumberFormat.getInstance().parse("3.33")); } - private static void excute3(Number[] arr) throws ParseException { + private void testNoPrecision(Number[] arr) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java index d0d3d2c..3fd9506 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java @@ -19,6 +19,7 @@ package com.geedgenetworks.core.udf.test.aggregate; import com.geedgenetworks.common.Accumulator; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.udaf.LongCount; import com.geedgenetworks.core.udf.udaf.NumberSum; import com.ibm.icu.text.NumberFormat; import org.junit.jupiter.api.Test; @@ -41,8 +42,44 @@ public class NumberSumTest { excute(doubleArr, Double.class); excute(intArr, Long.class); excute(longArr, Long.class); + testMerge(intArr,floatArr); + + } + private void testMerge(Number[] arr,Number[] arr2) { + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_sum")); + NumberSum numberSum = new NumberSum(); + Map metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + numberSum.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = numberSum.getResult(numberSum.merge(result1,result2)); + assertEquals(Float.parseFloat((result.getMetricsFields().get("field_sum").toString())),20.0f); + } + private Accumulator getMiddleResult(UDFContext udfContext,Number[] arr) { + + + NumberSum numberSum = new NumberSum(); + Map metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + numberSum.open(udfContext); + Accumulator agg = numberSum.initAccumulator(accumulator); + for (Number o : arr) { + Event event = new Event(); + Map extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = numberSum.add(event, agg); + } + return numberSum.getMiddleResult(agg); + } private static void excute(Number[] arr, Class clazz) throws ParseException { UDFContext udfContext = new UDFContext(); diff --git a/groot-tests/test-e2e-kafka/pom.xml b/groot-tests/test-e2e-kafka/pom.xml index ab1ba72..4592f79 100644 --- a/groot-tests/test-e2e-kafka/pom.xml +++ b/groot-tests/test-e2e-kafka/pom.xml @@ -47,6 +47,18 @@ + + + + org.apache.maven.plugins + maven-compiler-plugin + + 9 + 9 + + + + \ No newline at end of file -- cgit v1.2.3 From 13323b1fe6315cedc5e312fe084fb883442fe066 Mon Sep 17 00:00:00 2001 From: wangkuan Date: Thu, 29 Aug 2024 18:47:29 +0800 Subject: [improve][bootstrap]增加未提交的文件,删除获取中间值方法 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/udf/AggregateFunction.java | 1 - .../aggregate/AbstractFirstAggregation.java | 191 +++++++++++++++++++++ .../aggregate/FirstAggregationEventTime.java | 60 +++++++ .../aggregate/FirstAggregationProcessingTime.java | 59 +++++++ .../core/processor/aggregate/PreKeySelector.java | 37 ++++ .../SecondAggregateProcessorFunction.java | 131 ++++++++++++++ .../core/udf/test/aggregate/CollectListTest.java | 2 +- .../core/udf/test/aggregate/CollectSetTest.java | 2 +- .../core/udf/test/aggregate/FirstValueTest.java | 2 +- .../core/udf/test/aggregate/LastValueTest.java | 2 +- .../core/udf/test/aggregate/LongCountTest.java | 2 +- .../core/udf/test/aggregate/MeanTest.java | 2 +- .../core/udf/test/aggregate/NumberSumTest.java | 2 +- 13 files changed, 485 insertions(+), 8 deletions(-) create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java index 6e5ab80..6f6e048 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java @@ -11,7 +11,6 @@ public interface AggregateFunction extends Serializable { Accumulator initAccumulator(Accumulator acc); Accumulator add(Event val, Accumulator acc); String functionName(); - default Accumulator getMiddleResult(Accumulator acc){return acc;} Accumulator getResult(Accumulator acc); Accumulator merge(Accumulator a, Accumulator b); default void close(){}; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java new file mode 100644 index 0000000..3632ba7 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java @@ -0,0 +1,191 @@ +package com.geedgenetworks.core.processor.aggregate; + + +import cn.hutool.crypto.SecureUtil; +import com.alibaba.fastjson.JSON; +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.metrics.InternalMetrics; +import com.geedgenetworks.core.pojo.AggregateConfig; +import com.geedgenetworks.core.processor.projection.UdfEntity; +import com.google.common.collect.Lists; +import com.googlecode.aviator.AviatorEvaluator; +import com.googlecode.aviator.AviatorEvaluatorInstance; +import com.googlecode.aviator.Expression; +import com.googlecode.aviator.Options; +import com.googlecode.aviator.exception.ExpressionRuntimeException; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; + +import static com.geedgenetworks.core.utils.UDFUtils.filterExecute; +import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect; + +@Slf4j +public class AbstractFirstAggregation extends ProcessFunction { + + private final long windowSize; + private Long staggerOffset = null; + + protected final Map> windows = new HashMap<>(); + protected List groupByFields; + private LinkedList functions; + + protected InternalMetrics internalMetrics; + private final AggregateConfig aggregateConfig; + + public AbstractFirstAggregation(AggregateConfig aggregateConfig, long windowSize) { + this.windowSize = windowSize; + this.aggregateConfig = aggregateConfig; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + functions = Lists.newLinkedList(); + try { + this.internalMetrics = new InternalMetrics(getRuntimeContext()); + List udfContexts = aggregateConfig.getFunctions(); + if (udfContexts == null || udfContexts.isEmpty()) { + return; + } + Configuration configuration = (Configuration) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); + List udfClassNameLists = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class); + + udfContexts = aggregateConfig.getFunctions(); + if (udfContexts == null || udfContexts.isEmpty()) { + throw new RuntimeException(); + } + groupByFields = aggregateConfig.getGroup_by_fields(); + functions = Lists.newLinkedList(); + Map udfClassReflect = getClassReflect(udfClassNameLists); + + for (UDFContext udfContext : udfContexts) { + Expression filterExpression = null; + UdfEntity udfEntity = new UdfEntity(); + // 平台注册的函数包含任务中配置的函数则对函数进行实例化 + if (udfClassReflect.containsKey(udfContext.getFunction())) { + Class cls = Class.forName(udfClassReflect.get(udfContext.getFunction())); + AggregateFunction aggregateFunction = (AggregateFunction) cls.getConstructor().newInstance(); + // 函数如果包含filter,对表达式进行编译 + if (udfContext.getFilter() != null) { + AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); + instance.setCachedExpressionByDefault(true); + instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL); + instance.setFunctionMissing(null); + filterExpression = instance.compile(udfContext.getFilter(), true); + } + udfEntity.setAggregateFunction(aggregateFunction); + udfEntity.setFilterExpression(filterExpression); + udfEntity.setName(udfContext.getFunction()); + udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction())); + udfEntity.setUdfContext(udfContext); + functions.add(udfEntity); + } else { + throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, + "Unsupported UDAF: " + udfContext.getFunction()); + } + + } + for (UdfEntity udfEntity : functions) { + udfEntity.getAggregateFunction().open(udfEntity.getUdfContext()); + } + + } catch (Exception e) { + throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Initialization UDAF failed!", e); + + } + } + + @Override + public void processElement(Event value, ProcessFunction.Context ctx, Collector out) throws Exception { + + } + + public void onTimer(long timestamp, Collector out) throws Exception { + Map accumulator = windows.remove(timestamp); + for (Accumulator value : accumulator.values()) { + value = getResult(value); + out.collect(value); + internalMetrics.incrementOutEvents(); + } + accumulator.clear(); + } + + private long assignWindowStart(long timestamp, long offset) { + return timestamp - (timestamp - offset + windowSize) % windowSize; + } + + protected long assignWindowEnd(long timestamp) { + if (staggerOffset == null) { + staggerOffset = getStaggerOffset(); + } + return assignWindowStart(timestamp, (staggerOffset % windowSize)) + windowSize; + } + + private long getStaggerOffset() { + return (long) (ThreadLocalRandom.current().nextDouble() * (double) windowSize); + } + + public Accumulator createAccumulator() { + + Map map = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + + accumulator.setMetricsFields(map); + for (UdfEntity udfEntity : functions) { + udfEntity.getAggregateFunction().initAccumulator(accumulator); + } + return accumulator; + + } + + public String getKey(Event value, List keys) { + StringBuilder stringBuilder = new StringBuilder(); + for (String key : keys) { + + if (value.getExtractedFields().containsKey(key)) { + stringBuilder.append(value.getExtractedFields().get(key).toString()); + } else { + stringBuilder.append(","); + } + } + return SecureUtil.md5(stringBuilder.toString()); + + } + + public Accumulator add(Event event, Accumulator accumulator) { + accumulator.setInEvents(accumulator.getInEvents() + 1); + for (UdfEntity udafEntity : functions) { + try { + boolean result = udafEntity.getFilterExpression() != null ? filterExecute(udafEntity.getFilterExpression(), udafEntity.getFilterExpression().newEnv("event", event.getExtractedFields())) : true; + if (result) { + udafEntity.getAggregateFunction().add(event, accumulator); + } + } catch (ExpressionRuntimeException ignore) { + log.error("Function " + udafEntity.getName() + " Invalid filter ! "); + accumulator.setErrorCount(accumulator.getErrorCount() + 1); + } catch (Exception e) { + log.error("Function " + udafEntity.getName() + " execute exception !", e); + accumulator.setErrorCount(accumulator.getErrorCount() + 1); + } + } + return accumulator; + } + + public Accumulator getResult(Accumulator accumulator) { + return accumulator; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java new file mode 100644 index 0000000..9390de4 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java @@ -0,0 +1,60 @@ +package com.geedgenetworks.core.processor.aggregate; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.pojo.AggregateConfig; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.HashMap; +import java.util.Map; +import java.util.PriorityQueue; + +@Slf4j +public class FirstAggregationEventTime extends AbstractFirstAggregation { + + private final PriorityQueue eventTimeTimersQueue = new PriorityQueue<>(); + + public FirstAggregationEventTime(AggregateConfig aggregateConfig, long windowSize) { + super(aggregateConfig, windowSize); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + } + + @Override + public void processElement(Event value, ProcessFunction.Context ctx, Collector out) throws Exception { + Long timestamp; + internalMetrics.incrementInEvents(); + try { + String key = getKey(value, groupByFields); + while ((timestamp = eventTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentProcessingTime()) { + eventTimeTimersQueue.poll(); + onTimer(timestamp, out); + } + long windowEnd = assignWindowEnd(ctx.timerService().currentProcessingTime()); + if (!windows.containsKey(windowEnd)) { + Map map = new HashMap<>(); + map.put(key, createAccumulator()); + windows.put(windowEnd, map); + eventTimeTimersQueue.add(windowEnd); + } else { + if (!windows.get(windowEnd).containsKey(key)) { + windows.get(windowEnd).put(key, createAccumulator()); + } + } + add(value, windows.get(windowEnd).get(key)); + } catch (Exception e) { + log.error("Error in pre-aggregate processElement", e); + internalMetrics.incrementErrorEvents(); + } + } + + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java new file mode 100644 index 0000000..e98daa5 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationProcessingTime.java @@ -0,0 +1,59 @@ +package com.geedgenetworks.core.processor.aggregate; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.pojo.AggregateConfig; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.HashMap; +import java.util.Map; +import java.util.PriorityQueue; + +@Slf4j +public class FirstAggregationProcessingTime extends AbstractFirstAggregation { + + private final PriorityQueue processingTimeTimersQueue = new PriorityQueue<>(); + + public FirstAggregationProcessingTime(AggregateConfig aggregateConfig, long windowSize) { + super(aggregateConfig, windowSize); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + } + + @Override + public void processElement(Event value, ProcessFunction.Context ctx, Collector out) throws Exception { + Long timestamp; + internalMetrics.incrementInEvents(); + try { + String key = getKey(value, groupByFields); + while ((timestamp = processingTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentProcessingTime()) { + processingTimeTimersQueue.poll(); + onTimer(timestamp, out); + } + long windowEnd = assignWindowEnd(ctx.timerService().currentProcessingTime()); + if (!windows.containsKey(windowEnd)) { + Map map = new HashMap<>(); + map.put(key, createAccumulator()); + windows.put(windowEnd, map); + processingTimeTimersQueue.add(windowEnd); + } else { + if (!windows.get(windowEnd).containsKey(key)) { + windows.get(windowEnd).put(key, createAccumulator()); + } + } + add(value, windows.get(windowEnd).get(key)); + } catch (Exception e) { + log.error("Error in pre-aggregate processElement", e); + internalMetrics.incrementErrorEvents(); + } + } + + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java new file mode 100644 index 0000000..6e43184 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/PreKeySelector.java @@ -0,0 +1,37 @@ +package com.geedgenetworks.core.processor.aggregate; + +import cn.hutool.crypto.SecureUtil; +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.KeybyEntity; + +import java.util.HashMap; +import java.util.List; + +public class PreKeySelector implements org.apache.flink.api.java.functions.KeySelector { + + + private final List keys; + + public PreKeySelector(List keys) { + this.keys = keys; + } + + @Override + public KeybyEntity getKey(Accumulator value) throws Exception { + + KeybyEntity keybyEntity = new KeybyEntity(new HashMap<>()); + StringBuilder stringBuilder = new StringBuilder(); + for (String key : keys) { + + if (value.getMetricsFields().containsKey(key)) { + keybyEntity.getKeys().put(key, value.getMetricsFields().get(key)); + stringBuilder.append(value.getMetricsFields().get(key).toString()); + } else { + stringBuilder.append(","); + } + } + String hashedKey = SecureUtil.md5(stringBuilder.toString()); + keybyEntity.setKeysToString(hashedKey); + return keybyEntity; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java new file mode 100644 index 0000000..7c0a434 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java @@ -0,0 +1,131 @@ +package com.geedgenetworks.core.processor.aggregate; + +import com.alibaba.fastjson.JSON; +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.pojo.AggregateConfig; +import com.geedgenetworks.core.processor.projection.UdfEntity; +import com.google.common.collect.Lists; +import com.googlecode.aviator.AviatorEvaluator; +import com.googlecode.aviator.AviatorEvaluatorInstance; +import com.googlecode.aviator.Expression; +import com.googlecode.aviator.Options; +import com.googlecode.aviator.exception.ExpressionRuntimeException; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.common.ExecutionConfig; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static com.geedgenetworks.core.utils.UDFUtils.filterExecute; +import static com.geedgenetworks.core.utils.UDFUtils.getClassReflect; + +@Slf4j +public class SecondAggregateProcessorFunction implements org.apache.flink.api.common.functions.AggregateFunction { + private final List udfContexts; + private final List udfClassNameLists; + private final LinkedList functions; + + public SecondAggregateProcessorFunction(AggregateConfig aggregateConfig, ExecutionConfig config) { + udfClassNameLists = JSON.parseObject(config.getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class); + udfContexts = aggregateConfig.getFunctions(); + if (udfContexts == null || udfContexts.isEmpty()) { + throw new RuntimeException(); + } + functions = Lists.newLinkedList(); + Map udfClassReflect = getClassReflect(udfClassNameLists); + try { + for (UDFContext udfContext : udfContexts) { + Expression filterExpression = null; + UdfEntity udfEntity = new UdfEntity(); + // 平台注册的函数包含任务中配置的函数则对函数进行实例化 + if (udfClassReflect.containsKey(udfContext.getFunction())) { + Class cls = Class.forName(udfClassReflect.get(udfContext.getFunction())); + AggregateFunction aggregateFunction = (AggregateFunction) cls.getConstructor().newInstance(); + // 函数如果包含filter,对表达式进行编译 + if (udfContext.getFilter() != null) { + AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); + instance.setCachedExpressionByDefault(true); + instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL); + instance.setFunctionMissing(null); + filterExpression = instance.compile(udfContext.getFilter(), true); + } + udfEntity.setAggregateFunction(aggregateFunction); + udfEntity.setFilterExpression(filterExpression); + udfEntity.setName(udfContext.getFunction()); + udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction())); + udfEntity.setUdfContext(udfContext); + functions.add(udfEntity); + } else { + throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, + "Unsupported UDAF: " + udfContext.getFunction()); + } + + } + for (UdfEntity udfEntity : functions) { + udfEntity.getAggregateFunction().open(udfEntity.getUdfContext()); + } + } catch (Exception e) { + throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Initialization UDAF failed!", e); + + } + } + + @Override + public Accumulator createAccumulator() { + Map map = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(map); + for (UdfEntity udfEntity : functions) { + udfEntity.getAggregateFunction().initAccumulator(accumulator); + } + return accumulator; + } + + @Override + public Accumulator add(Accumulator event, Accumulator accumulator) { + return merge(event, accumulator); + } + + @Override + public Accumulator getResult(Accumulator accumulator) { + for (UdfEntity udafEntity : functions) { + try { + udafEntity.getAggregateFunction().getResult(accumulator); + } catch (Exception e) { + log.error("Function " + udafEntity.getName() + " getResult exception !", e); + } + } + return accumulator; + } + + @Override + public Accumulator merge(Accumulator acc1, Accumulator acc2) { + acc1.setInEvents(acc1.getInEvents() + acc2.getInEvents()); + acc1.setOutEvents(acc1.getOutEvents() + acc2.getOutEvents()); + acc1.setErrorCount(acc1.getErrorCount() + acc2.getErrorCount()); + for (UdfEntity udafEntity : functions) { + try { + boolean result = udafEntity.getFilterExpression() != null ? filterExecute(udafEntity.getFilterExpression(), udafEntity.getFilterExpression().newEnv("event", acc1.getMetricsFields())) : true; + if (result) { + udafEntity.getAggregateFunction().merge(acc1, acc2); + } + } catch (ExpressionRuntimeException ignore) { + log.error("Function " + udafEntity.getName() + " Invalid filter ! "); + acc1.setErrorCount(acc1.getErrorCount() + 1); + } catch (Exception e) { + log.error("Function " + udafEntity.getName() + " execute exception !", e); + acc1.setErrorCount(acc1.getErrorCount() + 1); + } + } + return acc1; + } + + +} diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java index 8c0fe3f..2bf13a5 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java @@ -78,7 +78,7 @@ public class CollectListTest { agg = collectList.add(event, agg); } - return collectList.getMiddleResult(agg); + return agg; } private void testGetResult(List arr) throws ParseException { diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java index 47e74bd..8e992f6 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java @@ -75,7 +75,7 @@ public class CollectSetTest { agg = collectSet.add(event, agg); } - return collectSet.getMiddleResult(agg); + return agg; } private static void testGetResult(List arr) throws ParseException { diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java index 3d87b14..43a9732 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java @@ -73,7 +73,7 @@ public class FirstValueTest { agg = firstValue.add(event, agg); } - return firstValue.getMiddleResult(agg); + return agg; } private static void testGetResult(List arr) throws ParseException { diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java index 3d61019..e952908 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java @@ -76,7 +76,7 @@ public class LastValueTest { agg = lastValue.add(event, agg); } - return lastValue.getMiddleResult(agg); + return agg; } private static void testGetResult(List arr) throws ParseException { diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java index d13df72..c1dfb9e 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java @@ -76,7 +76,7 @@ public class LongCountTest { agg = longCount.add(event, agg); } - return longCount.getMiddleResult(agg); + return agg; } private static void testGetResult(Number[] arr) throws ParseException { diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java index 2927f43..cc4eaf0 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java @@ -80,7 +80,7 @@ public class MeanTest { agg = mean.add(event, agg); } - return mean.getMiddleResult(agg); + return agg; } diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java index 3fd9506..a4072ca 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java @@ -78,7 +78,7 @@ public class NumberSumTest { agg = numberSum.add(event, agg); } - return numberSum.getMiddleResult(agg); + return agg; } private static void excute(Number[] arr, Class clazz) throws ParseException { -- cgit v1.2.3 From d4e7f873064a9d95578a64977eadf15b11ed4e11 Mon Sep 17 00:00:00 2001 From: wangkuan Date: Fri, 30 Aug 2024 11:10:40 +0800 Subject: [improve][bootstrap][core]完善监控输出及单元测试,修改部分问题 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bootstrap/main/simple/JobSplitWithAggTest.java | 4 ++-- .../src/test/resources/grootstream_job_split_agg_test.yaml | 2 +- .../core/processor/aggregate/FirstAggregationEventTime.java | 4 ++-- .../processor/aggregate/SecondAggregateProcessorFunction.java | 9 ++------- 4 files changed, 7 insertions(+), 12 deletions(-) diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java index 6ed3888..9fa81c0 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java @@ -67,8 +67,8 @@ public class JobSplitWithAggTest { } Assert.assertEquals(2, CollectSink.values.size()); - Assert.assertEquals("2", CollectSink.values.get(0).getExtractedFields().get("sessions").toString()); - Assert.assertEquals("1.5", CollectSink.values.get(0).getExtractedFields().get("pkts").toString()); + Assert.assertEquals("3", CollectSink.values.get(1).getExtractedFields().get("sessions").toString()); + Assert.assertEquals("3.0", CollectSink.values.get(1).getExtractedFields().get("pkts").toString()); } } diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml index 6a7011a..5163642 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml @@ -3,7 +3,7 @@ sources: type : inline fields: # [array of object] Field List, if not set, all fields(Map) will be output. properties: - data: '[{"pkts":2,"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":1,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":1,"server_ip":"2600:1015:b002::"},{"sessions":0,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925799000", "client_ip":"192.168.0.2","pkts":1,"server_ip":"2600:1015:b002::"}]' + data: '[{"pkts":1,"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"pkts":2,"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":3,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724936692000", "client_ip":"192.168.0.2","pkts":4,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724937692000", "client_ip":"192.168.0.2","pkts":5,"server_ip":"2600:1015:b002::"}]' interval.per.row: 1s # 可选 repeat.count: 1 # 可选 format: json diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java index 9390de4..156c0ed 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/FirstAggregationEventTime.java @@ -34,11 +34,11 @@ public class FirstAggregationEventTime extends AbstractFirstAggregation { internalMetrics.incrementInEvents(); try { String key = getKey(value, groupByFields); - while ((timestamp = eventTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentProcessingTime()) { + while ((timestamp = eventTimeTimersQueue.peek()) != null && timestamp <= ctx.timerService().currentWatermark()) { eventTimeTimersQueue.poll(); onTimer(timestamp, out); } - long windowEnd = assignWindowEnd(ctx.timerService().currentProcessingTime()); + long windowEnd = assignWindowEnd(ctx.timerService().currentWatermark()); if (!windows.containsKey(windowEnd)) { Map map = new HashMap<>(); map.put(key, createAccumulator()); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java index 7c0a434..68fa53e 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java @@ -107,15 +107,10 @@ public class SecondAggregateProcessorFunction implements org.apache.flink.api.co @Override public Accumulator merge(Accumulator acc1, Accumulator acc2) { - acc1.setInEvents(acc1.getInEvents() + acc2.getInEvents()); - acc1.setOutEvents(acc1.getOutEvents() + acc2.getOutEvents()); - acc1.setErrorCount(acc1.getErrorCount() + acc2.getErrorCount()); + acc1.setInEvents(acc1.getInEvents() + 1); for (UdfEntity udafEntity : functions) { try { - boolean result = udafEntity.getFilterExpression() != null ? filterExecute(udafEntity.getFilterExpression(), udafEntity.getFilterExpression().newEnv("event", acc1.getMetricsFields())) : true; - if (result) { - udafEntity.getAggregateFunction().merge(acc1, acc2); - } + udafEntity.getAggregateFunction().merge(acc1, acc2); } catch (ExpressionRuntimeException ignore) { log.error("Function " + udafEntity.getName() + " Invalid filter ! "); acc1.setErrorCount(acc1.getErrorCount() + 1); -- cgit v1.2.3 From a55399cb95c6408233e84540db482ae5e6131746 Mon Sep 17 00:00:00 2001 From: doufenghu Date: Sun, 1 Sep 2024 23:49:48 +0800 Subject: [Improve][bootstrap] Improve job-level user-defined variables, move the path from application/properties to application/env/properties, and add support for defining variables via the runtime CLI. (GAL-651) --- config/grootstream_job_example.yaml | 10 ++++---- .../bootstrap/command/CommandArgs.java | 4 +-- .../bootstrap/command/ExecuteCommand.java | 26 +++++++++++++++---- .../bootstrap/command/ExecuteCommandArgs.java | 11 ++++---- .../bootstrap/execution/JobExecution.java | 28 ++++++++++---------- .../bootstrap/execution/JobRuntimeEnvironment.java | 30 ++++++++++------------ .../bootstrap/execution/RuntimeEnvironment.java | 11 ++------ .../geedgenetworks/example/GrootStreamExample.java | 5 +++- .../main/resources/examples/inline_to_print.yaml | 6 +++++ .../src/test/resources/grootstream.yaml | 4 +++ .../test/e2e/base/InlineToPrintIT.java | 14 +++++++++- .../src/test/resources/inline_to_print.yaml | 20 +++++++++++++-- 12 files changed, 108 insertions(+), 61 deletions(-) diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml index f5c6610..37ef114 100644 --- a/config/grootstream_job_example.yaml +++ b/config/grootstream_job_example.yaml @@ -71,6 +71,11 @@ application: execution: restart: strategy: none + properties: + hos.bucket.name.rtp_file: traffic_rtp_file_bucket + hos.bucket.name.http_file: traffic_http_file_bucket + hos.bucket.name.eml_file: traffic_eml_file_bucket + hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket topology: - name: inline_source downstream: [decoded_as_split] @@ -82,8 +87,3 @@ application: downstream: [ print_sink ] - name: print_sink downstream: [] - properties: - hos.bucket.name.rtp_file: traffic_rtp_file_bucket - hos.bucket.name.http_file: traffic_http_file_bucket - hos.bucket.name.eml_file: traffic_eml_file_bucket - hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket \ No newline at end of file diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java index f83183c..6ee5151 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java @@ -31,10 +31,10 @@ public abstract class CommandArgs { description = "Whether check config") protected boolean checkConfig = false; - + // user-defined parameters @Parameter(names = {"-i", "--variable"}, splitter = ParameterSplitter.class, - description = "user-defined parameters , such as -i data_center=bj " + description = "Job level user-defined parameters, such as -i traffic_file_bucket=traffic_file_bucket, or -i scheduler.knowledge_base.update.interval.minutes=1. " + "We use ',' as separator, when inside \"\", ',' are treated as normal characters instead of delimiters.") protected List variables = Collections.emptyList(); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java index 79a27e1..c3538b0 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommand.java @@ -26,27 +26,43 @@ public class ExecuteCommand implements Command { @Override public void execute() throws CommandExecuteException, ConfigCheckException { + // Groot Stream Global Config for all processing jobs GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig(); Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs); // check config file exist checkConfigExist(configFile); - Config config = ConfigBuilder.of(configFile); + Config jobConfig = ConfigBuilder.of(configFile); + // if user specified job name using command line arguments, override config option if (!executeCommandArgs.getJobName().equals(Constants.DEFAULT_JOB_NAME)) { - config = config.withValue( + jobConfig = jobConfig.withValue( ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV , Constants.JOB_NAME), ConfigValueFactory.fromAnyRef(executeCommandArgs.getJobName())); } + // if user specified target type using command line arguments, override config option if(executeCommandArgs.getTargetType() != null) { - config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE), + jobConfig = jobConfig.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE), ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); } - JobExecution jobExecution = new JobExecution(config, grootStreamConfig); + // if user specified variables using command line arguments, override config option + if(!executeCommandArgs.getVariables().isEmpty()) { + for (String variable : executeCommandArgs.getVariables()) { + String[] keyValue = variable.split("="); + if (keyValue.length != 2) { + throw new CommandExecuteException("Invalid variable format: " + variable); + } + jobConfig = jobConfig.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, Constants.PROPERTIES, keyValue[0]), + ConfigValueFactory.fromAnyRef(keyValue[1])); + } + } + + + JobExecution jobExecution = new JobExecution(jobConfig, grootStreamConfig); try { jobExecution.execute(); } catch (Exception e) { - throw new JobExecuteException("Job executed error", e); + throw new JobExecuteException("Job executed failed", e); } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java index 0c00a61..61ced82 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java @@ -20,14 +20,14 @@ public class ExecuteCommandArgs extends CommandArgs { @Parameter(names={"-e", "--deploy-mode"}, converter = DeployModeConverter.class, - description = "deploy mode, only support [run] ") + description = "Job deploy mode, only support [run] ") private DeployMode deployMode = DeployMode.RUN; @Parameter( names = {"--target"}, converter = TargetTypeConverter.class, description = - "job submitted target type, support [local, remote, yarn-session, yarn-per-job]") + "Job submitted target type, support [local, remote, yarn-session, yarn-per-job]") private TargetType targetType; @Override @@ -55,7 +55,7 @@ public class ExecuteCommandArgs extends CommandArgs { @Override public String toString() { - return "FlinkCommandArgs{" + return "CommandArgs{" + "deployMode=" + deployMode + ", targetType=" @@ -74,7 +74,6 @@ public class ExecuteCommandArgs extends CommandArgs { } - private void userParamsToSysEnv() { if (!this.variables.isEmpty()) { variables.stream() @@ -111,7 +110,7 @@ public class ExecuteCommandArgs extends CommandArgs { return targetType; } else { throw new IllegalArgumentException( - "Groot-Stream job on flink engine submitted target type only " + "GrootStream job submitted target type only " + "support these options: [local, remote, yarn-session, yarn-per-job]"); } } @@ -132,7 +131,7 @@ public class ExecuteCommandArgs extends CommandArgs { return deployMode; } else { throw new IllegalArgumentException( - "Groot-Stream job on flink engine deploy mode only " + "GrootStream job deploy mode only " + "support these options: [run, run-application]"); } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java index 22b23a7..6a23a0b 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java @@ -39,26 +39,26 @@ public class JobExecution { private final List nodes; private final List jarPaths; - public JobExecution(Config config, GrootStreamConfig grootStreamConfig) { + public JobExecution(Config jobConfig, GrootStreamConfig grootStreamConfig) { try { jarPaths = new ArrayList<>(Collections.singletonList(new File(StartBuilder.appBootstrapDir() .resolve(GrootStreamRunner.APP_JAR_NAME).toString()) .toURI().toURL())); } catch (MalformedURLException e) { - throw new JobExecuteException("load groot stream bootstrap jar error.", e); + throw new JobExecuteException("Load GrootStream Bootstrap jar failed.", e); } - registerPlugin(config.getConfig(Constants.APPLICATION)); + registerPlugin(jobConfig.getConfig(Constants.APPLICATION)); - this.sourceExecutor = new SourceExecutor(jarPaths, config); - this.sinkExecutor = new SinkExecutor(jarPaths, config); - this.filterExecutor = new FilterExecutor(jarPaths, config); - this.splitExecutor = new SplitExecutor(jarPaths, config); - this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, config); - this.processingExecutor = new ProcessingExecutor(jarPaths, config); - this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, config); + this.sourceExecutor = new SourceExecutor(jarPaths, jobConfig); + this.sinkExecutor = new SinkExecutor(jarPaths, jobConfig); + this.filterExecutor = new FilterExecutor(jarPaths, jobConfig); + this.splitExecutor = new SplitExecutor(jarPaths, jobConfig); + this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, jobConfig); + this.processingExecutor = new ProcessingExecutor(jarPaths, jobConfig); + this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, jobConfig); this.jobRuntimeEnvironment = - JobRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths), grootStreamConfig); + JobRuntimeEnvironment.getInstance(this.registerPlugin(jobConfig, jarPaths), grootStreamConfig); this.sourceExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.sinkExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); @@ -67,7 +67,7 @@ public class JobExecution { this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.nodes = buildJobNode(config); + this.nodes = buildJobNode(jobConfig); } @@ -126,7 +126,7 @@ public class JobExecution { return new URL(uri); } catch (MalformedURLException e) { throw new RuntimeException( - "the uri of jar illegal:" + uri, e); + "The uri of jar illegal:" + uri, e); } }) .collect(Collectors.toSet()); @@ -251,7 +251,7 @@ public class JobExecution { private void buildJobGraph(DataStream dataStream, String downstreamNodeName) { Node node = getNode(downstreamNodeName).orElseGet(() -> { - throw new JobExecuteException("can't find downstream node " + downstreamNodeName); + throw new JobExecuteException("Can't find downstream node " + downstreamNodeName); }); if (node.getType().name().equals(ProcessorType.FILTER.name())) { if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java index 07fde4d..4c6c2d2 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java @@ -35,16 +35,16 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ private String jobName = Constants.DEFAULT_JOB_NAME; private Set splitSet = new HashSet<>(); - private JobRuntimeEnvironment(Config config, GrootStreamConfig grootStreamConfig) { + private JobRuntimeEnvironment(Config jobConfig, GrootStreamConfig grootStreamConfig) { this.grootStreamConfig = grootStreamConfig; - this.initialize(config); + this.initialize(jobConfig); } - public static JobRuntimeEnvironment getInstance(Config config, GrootStreamConfig grootStreamConfig) { + public static JobRuntimeEnvironment getInstance(Config jobConfig, GrootStreamConfig grootStreamConfig) { if (INSTANCE == null) { synchronized (JobRuntimeEnvironment.class) { if (INSTANCE == null) { - INSTANCE = new JobRuntimeEnvironment(config, grootStreamConfig); + INSTANCE = new JobRuntimeEnvironment(jobConfig, grootStreamConfig); } } } @@ -62,14 +62,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ return envConfig; } - @Override - public RuntimeEnvironment loadJobProperties(Config jobProperties) { - this.grootStreamConfig.getCommonConfig().getPropertiesConfig().putAll(jobProperties.root().unwrapped().entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue())))); - return this; - } - @Override public CheckResult checkConfig() { return EnvironmentUtil.checkRestartStrategy(envConfig); @@ -78,10 +70,19 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ @Override public RuntimeEnvironment prepare() { - createStreamEnvironment(); + if (envConfig.hasPath(Constants.JOB_NAME)) { jobName = envConfig.getString(Constants.JOB_NAME); } + // Job-level user-defined variables override the grootStreamConfig + if (envConfig.hasPath(Constants.PROPERTIES)) { + envConfig.getConfig(Constants.PROPERTIES).root().unwrapped().entrySet().forEach(entry -> { + this.grootStreamConfig.getCommonConfig().getPropertiesConfig().put(entry.getKey(), String.valueOf(entry.getValue())); + }); + } + + createStreamEnvironment(); + return this; } @@ -89,8 +90,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ return jobName; } - - public boolean isLocalMode() { return envConfig.hasPath(ExecutionConfigKeyName.ENV_TARGET_TYPE) && envConfig.getString(ExecutionConfigKeyName.ENV_TARGET_TYPE).equals(TargetType.LOCAL.getTarget()); @@ -131,7 +130,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } private void createStreamEnvironment() { - Configuration configuration = new Configuration(); EnvironmentUtil.initConfiguration(envConfig, configuration); if (isLocalMode()) { diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java index 710e7f6..b177e40 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/RuntimeEnvironment.java @@ -1,7 +1,4 @@ package com.geedgenetworks.bootstrap.execution; - - -import com.geedgenetworks.bootstrap.enums.TargetType; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.CheckResult; import com.typesafe.config.Config; @@ -12,16 +9,12 @@ import java.util.List; public interface RuntimeEnvironment { RuntimeEnvironment setEnvConfig(Config envConfig); + //Prepare runtime environment for job execution + RuntimeEnvironment prepare(); Config getEnvConfig(); - RuntimeEnvironment loadJobProperties(Config jobProperties); CheckResult checkConfig(); - RuntimeEnvironment prepare(); - void registerPlugin(List pluginPaths); default void initialize(Config config) { - if (config.getConfig(Constants.APPLICATION).hasPath(Constants.PROPERTIES)) { - this.loadJobProperties(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.PROPERTIES))); - } this.setEnvConfig(config.getConfig(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV))).prepare(); } } diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java index f435f59..9b58289 100644 --- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java +++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java @@ -9,11 +9,12 @@ import java.io.FileNotFoundException; import java.net.URISyntaxException; import java.net.URL; import java.nio.file.Paths; +import java.util.List; public class GrootStreamExample { public static void main(String[] args) throws FileNotFoundException, URISyntaxException { - String configPath = args.length > 0 ? args[0] : "/examples/inline_to_kafka.yaml"; + String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print.yaml"; String configFile = getTestConfigFile(configPath); ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs(); executeCommandArgs.setConfigFile(configFile); @@ -21,6 +22,8 @@ public class GrootStreamExample { executeCommandArgs.setEncrypt(false); executeCommandArgs.setDecrypt(false); executeCommandArgs.setVersion(false); + executeCommandArgs.setVariables(List.of("hos.bucket.name.traffic_file=user_define_traffic_file_bucket", + "scheduler.knowledge_base.update.interval.minutes=1")); executeCommandArgs.setDeployMode(DeployMode.RUN); executeCommandArgs.setTargetType(TargetType.LOCAL); GrootStreamServer.run(executeCommandArgs.buildCommand()); diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml index 00f2a7d..408fbad 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print.yaml @@ -33,6 +33,12 @@ application: parallelism: 3 pipeline: object-reuse: true + properties: + hos.bucket.name.traffic_file: local_traffic_file_bucket + hos.bucket.name.rtp_file: traffic_rtp_file_bucket + hos.bucket.name.http_file: traffic_http_file_bucket + hos.bucket.name.eml_file: traffic_eml_file_bucket + hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket topology: - name: inline_source downstream: [filter_operator] diff --git a/groot-tests/test-common/src/test/resources/grootstream.yaml b/groot-tests/test-common/src/test/resources/grootstream.yaml index 5520945..2eb105b 100644 --- a/groot-tests/test-common/src/test/resources/grootstream.yaml +++ b/groot-tests/test-common/src/test/resources/grootstream.yaml @@ -15,3 +15,7 @@ grootstream: hos.bucket.name.traffic_file: traffic_file_bucket hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket scheduler.knowledge_base.update.interval.minutes: 5 + hos.bucket.name.rtp_file: traffic_rtp_file_bucket + hos.bucket.name.http_file: traffic_http_file_bucket + hos.bucket.name.eml_file: traffic_eml_file_bucket + hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java index d4cdcbd..dde7b28 100644 --- a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java +++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java @@ -8,6 +8,7 @@ import com.geedgenetworks.test.common.container.AbstractTestFlinkContainer; import com.geedgenetworks.test.common.container.TestContainerId; import com.geedgenetworks.test.common.junit.DisabledOnContainer; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.TestTemplate; import java.io.IOException; @@ -31,7 +32,7 @@ public class InlineToPrintIT extends TestSuiteBase { CompletableFuture.supplyAsync( () -> { try { - return container.executeJob("/kafka_to_print.yaml"); + return container.executeJob("/inline_to_print.yaml"); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); @@ -85,6 +86,17 @@ public class InlineToPrintIT extends TestSuiteBase { Assertions.assertNotNull(jobNumRestartsReference.get()); }); + + await().atMost(300000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + String logs = container.getServerLogs(); + Assertions.assertTrue(StringUtils.countMatches(logs, "job_level_traffic_rtp_file_bucket/test_pcap_file") > 10); + Assertions.assertTrue(StringUtils.countMatches(logs, "job_level_traffic_http_file_bucket/test_http_req_file") > 10); + }); + + + } } diff --git a/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml b/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml index daf6e32..b4773a1 100644 --- a/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml +++ b/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml @@ -2,7 +2,7 @@ sources: inline_source: type: inline properties: - data: '[{"tcp_rtt_ms":128,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]' + data: '[{"tcp_rtt_ms":128,"decoded_as":"HTTP","rtp_pcap_path":"test_pcap_file","http_request_body":"test_http_req_file","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]' format: json json.ignore.parse.errors: false @@ -14,11 +14,21 @@ filters: processing_pipelines: projection_processor: - type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + type: projection remove_fields: [http_request_line, http_response_line, http_response_content_type] functions: - function: DROP filter: event.server_ip == '4.4.4.4' + - function: PATH_COMBINE + lookup_fields: [ rtp_pcap_path ] + output_fields: [ rtp_pcap_path ] + parameters: + path: [ props.hos.path, props.hos.bucket.name.rtp_file, rtp_pcap_path ] + - function: PATH_COMBINE + lookup_fields: [ http_request_body ] + output_fields: [ http_request_body ] + parameters: + path: [ props.hos.path, props.hos.bucket.name.http_file, http_request_body ] sinks: print_sink: @@ -33,6 +43,12 @@ application: parallelism: 3 pipeline: object-reuse: true + properties: + hos.bucket.name.rtp_file: job_level_traffic_rtp_file_bucket + hos.bucket.name.http_file: job_level_traffic_http_file_bucket + hos.bucket.name.eml_file: job_level_traffic_eml_file_bucket + hos.bucket.name.policy_capture_file: job_level_traffic_policy_capture_file_bucket + topology: - name: inline_source downstream: [filter_operator] -- cgit v1.2.3 From 0d5ce165d30383a4b9b9945b61150d8e8015893d Mon Sep 17 00:00:00 2001 From: doufenghu Date: Mon, 2 Sep 2024 23:36:07 +0800 Subject: [Fix][e2e-common] Support user-defined variables via CLI when submitting a job. --- docs/env-config.md | 23 +++++++++++++++++++--- docs/grootstream-config.md | 5 ++++- docs/user-guide.md | 4 ++-- .../bootstrap/main/GrootStreamRunner.java | 2 +- .../common/container/AbstractTestContainer.java | 22 +++++++++++++++++++-- .../container/AbstractTestFlinkContainer.java | 8 +++++++- .../test/common/container/TestContainer.java | 3 +++ .../test/e2e/base/InlineToPrintIT.java | 11 +++++++---- 8 files changed, 64 insertions(+), 14 deletions(-) diff --git a/docs/env-config.md b/docs/env-config.md index 7a31494..8e22a53 100644 --- a/docs/env-config.md +++ b/docs/env-config.md @@ -57,10 +57,10 @@ Specify a list of classpath URLs via `pipeline.classpaths`, The classpaths are s You can directly use the flink parameter by prefixing `flink.`, such as `flink.execution.buffer-timeout`, `flink.object-reuse`, etc. More details can be found in the official [flink documentation](https://flink.apache.org/). Of course, you can use groot stream parameter, here are some parameter names corresponding to the names in Flink. -| Groot Stream | Flink | +| Groot Stream | Flink | |----------------------------------------|---------------------------------------------------------------| -| execution.buffer-timeout | flink.execution.buffer-timeout | -| pipeline.object-reuse | flink.object-reuse | +| execution.buffer-timeout | flink.execution.buffer-timeout.interval | +| pipeline.object-reuse | flink.pipeline.object-reuse | | pipeline.max-parallelism | flink.pipeline.max-parallelism | | execution.restart.strategy | flink.restart-strategy | | execution.restart.attempts | flink.restart-strategy.fixed-delay.attempts | @@ -70,3 +70,20 @@ Of course, you can use groot stream parameter, here are some parameter names cor | execution.restart.delayInterval | flink.restart-strategy.failure-rate.delay | | ... | ... | +## Properties +Job-level user-defined variables can be set in the `properties` section using key-value pairs, where the key represents a configuration property and the value specifies the desired setting. +The properties can be used in the configuration file by using `props.${property_name}`. It will override the corresponding settings in the `grootstream.yaml` file for the duration of the job. +```yaml +application: + env: + name: example-inline-to-print + parallelism: 3 + pipeline: + object-reuse: true + properties: + hos.bucket.name.rtp_file: job_level_traffic_rtp_file_bucket + hos.bucket.name.http_file: job_level_traffic_http_file_bucket + hos.bucket.name.eml_file: job_level_traffic_eml_file_bucket + hos.bucket.name.policy_capture_file: job_level_traffic_policy_capture_file_bucket +``` + diff --git a/docs/grootstream-config.md b/docs/grootstream-config.md index fb902ae..9dd442f 100644 --- a/docs/grootstream-config.md +++ b/docs/grootstream-config.md @@ -20,7 +20,7 @@ grootstream: ``` -### Knowledge Base +## Knowledge Base The knowledge base is a collection of libraries that can be used in the groot-stream job's UDFs. File system type can be specified `local`, `http` or `hdfs`. If the value is `http`, must be ` QGW Knowledge Base Repository` URL. The library will be dynamically updated according to the `scheduler.knowledge_base.update.interval.minutes` configuration. @@ -77,3 +77,6 @@ grootstream: - asn_builtin.mmdb - asn_user_defined.mmdb ``` +## Properties +Global user-defined variables can be set in the `properties` section using key-value pairs, where the key represents a configuration property and the value specifies the desired setting. +The properties can be used in the configuration file by using `props.${property_name}`. \ No newline at end of file diff --git a/docs/user-guide.md b/docs/user-guide.md index e35616f..d52cfed 100644 --- a/docs/user-guide.md +++ b/docs/user-guide.md @@ -153,7 +153,7 @@ Used to define job environment configuration information. For more details, you # Command ## Run a job by CLI - +Note: When submitting a job via CLI, you can use `-D` parameter to specify flink configuration. For example, `-Dexecution.buffer-timeout.interval=1000` to set the buffer timeout to 1000ms. More details can be found in the official [flink documentation](https://flink.apache.org/). ```bash Usage: start.sh [options] Options: @@ -164,7 +164,7 @@ Options: -e, --deploy-mode Deploy mode, only support [run] (default: run) --target Submitted target type, support [local, remote, yarn-session, yarn-per-job] -n, --name Job name (default: groot-stream-job) - -i, --variable User-defined parameters, eg. -i key=value (default: []) + -i, --variable User-defined variables, eg. -i key=value (default: []) -h, --help Show help message -v, --version Show version message diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamRunner.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamRunner.java index 8ab8bdc..6a106d2 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamRunner.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamRunner.java @@ -76,7 +76,7 @@ public class GrootStreamRunner { bootstrapCommandArgs.getVariables().stream() .filter(Objects::nonNull) .map(String::trim) - .forEach(variable -> command.add("-D" + variable)); + .forEach(variable -> command.add("-i " + variable)); return command; } diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java index 0f1e3f7..14eb5fb 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java @@ -62,8 +62,9 @@ public abstract class AbstractTestContainer implements TestContainer { container, this.startModuleFullPath, GROOTSTREAM_HOME); } - protected Container.ExecResult executeJob(GenericContainer container, String confFile) + protected Container.ExecResult executeJob(GenericContainer container, String confFile, List variables) throws IOException, InterruptedException { + final String confInContainerPath = ContainerUtil.copyConfigFileToContainer(container, confFile); // copy connectors ContainerUtil.copyConnectorJarToContainer( @@ -81,10 +82,27 @@ public abstract class AbstractTestContainer implements TestContainer { command.add(ContainerUtil.adaptPathForWin(confInContainerPath)); command.add("--target"); command.add("remote"); - command.addAll(getExtraStartShellCommands()); + List extraStartShellCommands = new ArrayList<>(getExtraStartShellCommands()); + if (variables != null && !variables.isEmpty()) { + variables.forEach( + v -> { + extraStartShellCommands.add("-i"); + extraStartShellCommands.add(v); + }); + } + command.addAll(extraStartShellCommands); return executeCommand(container, command); } + + + protected Container.ExecResult executeJob(GenericContainer container, String confFile) + throws IOException, InterruptedException { + return executeJob(container, confFile, null); + } + + + protected Container.ExecResult savepointJob(GenericContainer container, String jobId) throws IOException, InterruptedException { final List command = new ArrayList<>(); diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java index 30e6eb3..b833115 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java @@ -127,8 +127,14 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer { @Override public Container.ExecResult executeJob(String confFile) throws IOException, InterruptedException { + return executeJob(confFile, null); + } + + @Override + public Container.ExecResult executeJob(String confFile, List variables) + throws IOException, InterruptedException { log.info("test in container: {}", identifier()); - return executeJob(jobManager, confFile); + return executeJob(jobManager, confFile, variables); } @Override diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainer.java index 6e4cd1f..b3bf77a 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainer.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainer.java @@ -5,6 +5,7 @@ import org.testcontainers.containers.Container; import org.testcontainers.containers.Network; import java.io.IOException; +import java.util.List; public interface TestContainer extends TestResource { Network NETWORK = Network.newNetwork(); @@ -15,6 +16,8 @@ public interface TestContainer extends TestResource { Container.ExecResult executeJob(String confFile) throws IOException, InterruptedException; + Container.ExecResult executeJob(String confFile, List variables) + throws IOException, InterruptedException; default Container.ExecResult savepointJob(String jobId) throws IOException, InterruptedException { diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java index dde7b28..1c1e777 100644 --- a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java +++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java @@ -24,7 +24,7 @@ import static org.awaitility.Awaitility.await; @DisabledOnContainer( value = {TestContainerId.FLINK_1_17}, type = {}, - disabledReason = "only flink adjusts the parameter configuration rules") + disabledReason = "Only flink adjusts the parameter configuration rules") public class InlineToPrintIT extends TestSuiteBase { @TestTemplate @@ -32,7 +32,10 @@ public class InlineToPrintIT extends TestSuiteBase { CompletableFuture.supplyAsync( () -> { try { - return container.executeJob("/inline_to_print.yaml"); + List variables = List.of( + "hos.bucket.name.rtp_file=cli_job_level_traffic_rtp_file_bucket", + "hos.bucket.name.http_file=cli_job_level_traffic_http_file_bucket"); + return container.executeJob("/inline_to_print.yaml", variables); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); @@ -91,8 +94,8 @@ public class InlineToPrintIT extends TestSuiteBase { .untilAsserted( () -> { String logs = container.getServerLogs(); - Assertions.assertTrue(StringUtils.countMatches(logs, "job_level_traffic_rtp_file_bucket/test_pcap_file") > 10); - Assertions.assertTrue(StringUtils.countMatches(logs, "job_level_traffic_http_file_bucket/test_http_req_file") > 10); + Assertions.assertTrue(StringUtils.countMatches(logs, "cli_job_level_traffic_rtp_file_bucket/test_pcap_file") > 10); + Assertions.assertTrue(StringUtils.countMatches(logs, "cli_job_level_traffic_http_file_bucket/test_http_req_file") > 10); }); -- cgit v1.2.3 From e3efdcac80dc1ca8fb0bdd08f69318f745f9bf7c Mon Sep 17 00:00:00 2001 From: lifengchao Date: Tue, 3 Sep 2024 14:35:51 +0800 Subject: [feature][core] sketch聚合函数实现merge方法支持两阶段聚合 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../HdrHistogram/HdrHistogramBaseAggregate.java | 25 +++++++++++++--- .../udf/udaf/hlld/HlldApproxCountDistinct.java | 5 +--- .../core/udf/udaf/hlld/HlldBaseAggregate.java | 33 +++++++++++++++++++--- 3 files changed, 51 insertions(+), 12 deletions(-) diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java index 6af0be3..a099fde 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java @@ -59,6 +59,26 @@ public abstract class HdrHistogramBaseAggregate implements AggregateFunction { return acc; } + @Override + public Accumulator merge(Accumulator acc, Accumulator other) { + Object agg = acc.getMetricsFields().get(outputField); + Object aggOther = other.getMetricsFields().get(outputField); + Object rst; + + if(agg == null){ + rst = aggOther; + } else if (aggOther == null) { + rst = agg; + }else{ + rst = ((Histogramer)agg).merge(((Histogramer) aggOther)); + } + + if(rst != null){ + acc.getMetricsFields().put(outputField, rst); + } + return acc; + } + protected void updateHdr(Accumulator acc, Object value) { Map aggs = acc.getMetricsFields(); ArrayHistogram his = (ArrayHistogram) aggs.get(outputField); @@ -95,10 +115,7 @@ public abstract class HdrHistogramBaseAggregate implements AggregateFunction { his.merge(h); } - @Override - public Accumulator merge(Accumulator a, Accumulator b) { - return null; - } + @Override public void close() {} } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java index 041bad9..ec003f8 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java @@ -16,10 +16,7 @@ public class HlldApproxCountDistinct extends HlldBaseAggregate { return acc; } - @Override - public Accumulator merge(Accumulator a, Accumulator b) { - return null; - } + @Override public String functionName() { return "APPROX_COUNT_DISTINCT_HLLD"; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java index d6c3a44..0802c22 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java @@ -51,6 +51,34 @@ public abstract class HlldBaseAggregate implements AggregateFunction { return acc; } + @Override + public Accumulator merge(Accumulator acc, Accumulator other) { + Object agg = acc.getMetricsFields().get(outputField); + Object aggOther = other.getMetricsFields().get(outputField); + Object rst; + + if(agg == null){ + rst = aggOther; + } else if (aggOther == null) { + rst = agg; + }else{ + if(inputSketch){ + ((HllUnion)agg).update(((HllUnion) aggOther).getResult()); + rst = agg; + }else{ + final HllUnion union = new HllUnion(precision); + union.update((Hll) agg); + union.update((Hll) aggOther); + rst = union.getResult(); + } + } + + if(rst != null){ + acc.getMetricsFields().put(outputField, rst); + } + return acc; + } + protected Hll getResultHll(Accumulator acc){ Object agg = acc.getMetricsFields().get(outputField); if (agg == null) { @@ -92,10 +120,7 @@ public abstract class HlldBaseAggregate implements AggregateFunction { Hll hll = HllUtils.deserializeHll(value); hllUnion.update(hll); } - @Override - public Accumulator merge(Accumulator a, Accumulator b) { - return null; - } + @Override public void close() {} } -- cgit v1.2.3 From 8217c23fa564ad2051427a12f4eabf3fea75aafc Mon Sep 17 00:00:00 2001 From: wangkuan Date: Wed, 4 Sep 2024 09:50:32 +0800 Subject: [improve][bootstrap]优化代码,修改splitsset位置,单元测试适配properties配置变更 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bootstrap/execution/JobExecution.java | 24 +++++++++++++++++----- .../bootstrap/execution/JobRuntimeEnvironment.java | 9 -------- .../bootstrap/execution/SplitExecutor.java | 9 +++----- .../bootstrap/main/simple/JobExecutionTest.java | 20 ++++++++++++------ 4 files changed, 36 insertions(+), 26 deletions(-) diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java index 6a23a0b..f6e19eb 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java @@ -6,7 +6,14 @@ import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.main.GrootStreamRunner; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.config.CheckConfigUtil; +import com.geedgenetworks.common.config.CheckResult; import com.geedgenetworks.common.config.GrootStreamConfig; +import com.geedgenetworks.common.config.SplitConfigOptions; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.ConfigValidationException; +import com.geedgenetworks.common.udf.RuleContext; +import com.geedgenetworks.core.pojo.SplitConfig; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.typesafe.config.Config; @@ -38,6 +45,7 @@ public class JobExecution { private final Executor, JobRuntimeEnvironment> postprocessingExecutor; private final List nodes; private final List jarPaths; + private final Set splitSet = new HashSet<>(); public JobExecution(Config jobConfig, GrootStreamConfig grootStreamConfig) { try { @@ -201,6 +209,12 @@ public class JobExecution { } else if (filters.containsKey(node.getName())) { node.setType(ProcessorType.FILTER); } else if (splits.containsKey(node.getName())) { + splits.forEach((key, value) -> { + SplitConfig splitConfig = new com.alibaba.fastjson.JSONObject((Map) value).toJavaObject(SplitConfig.class); + for(RuleContext ruleContext:splitConfig.getRules()) { + splitSet.add(ruleContext.getName()); + } + }); node.setType(ProcessorType.SPLIT); } else if (preprocessingPipelines.containsKey(node.getName())) { node.setType(ProcessorType.PREPROCESSING); @@ -254,7 +268,7 @@ public class JobExecution { throw new JobExecuteException("Can't find downstream node " + downstreamNodeName); }); if (node.getType().name().equals(ProcessorType.FILTER.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = filterExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { }), node); } else { @@ -264,28 +278,28 @@ public class JobExecution { dataStream = splitExecutor.execute(dataStream, node); } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { }), node); } else { dataStream = preprocessingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = processingExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { }), node); } else { dataStream = processingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { }), node); } else { dataStream = postprocessingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.SINK.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = sinkExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { }), node); } else { diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java index 4c6c2d2..a4289ff 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java @@ -33,7 +33,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ private GrootStreamConfig grootStreamConfig; private StreamExecutionEnvironment environment; private String jobName = Constants.DEFAULT_JOB_NAME; - private Set splitSet = new HashSet<>(); private JobRuntimeEnvironment(Config jobConfig, GrootStreamConfig grootStreamConfig) { this.grootStreamConfig = grootStreamConfig; @@ -200,14 +199,6 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } } - public Set getSplitSet() { - return splitSet; - } - - public void setSplitSet(Set splitSet) { - this.splitSet = splitSet; - } - private void setCheckpoint() { long interval = 0; if (envConfig.hasPath(ExecutionConfigKeyName.CHECKPOINTING_INTERVAL)) { diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java index 3513a67..e549087 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java @@ -37,9 +37,9 @@ public class SplitExecutor extends AbstractExecutor { protected Map initialize(List jarPaths, Config operatorConfig) { Map splitConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.SPLITS)) { - Config routes = operatorConfig.getConfig(Constants.SPLITS); - routes.root().unwrapped().forEach((key, value) -> { - CheckResult result = CheckConfigUtil.checkAllExists(routes.getConfig(key), + Config splitsConfig = operatorConfig.getConfig(Constants.SPLITS); + splitsConfig.root().unwrapped().forEach((key, value) -> { + CheckResult result = CheckConfigUtil.checkAllExists(splitsConfig.getConfig(key), SplitConfigOptions.TYPE.key()); if (!result.isSuccess()) { throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( @@ -75,9 +75,6 @@ public class SplitExecutor extends AbstractExecutor { if (node.getParallelism() > 0) { splitConfig.setParallelism(node.getParallelism()); } - for(RuleContext ruleContext:splitConfig.getRules()) { - jobRuntimeEnvironment.getSplitSet().add(ruleContext.getName()); - } try { dataStream = split.splitFunction( diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java index 7e995cf..7b9544a 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java @@ -7,8 +7,10 @@ import com.geedgenetworks.bootstrap.execution.*; import com.geedgenetworks.bootstrap.main.GrootStreamRunner; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.GrootStreamConfig; +import com.geedgenetworks.common.udf.RuleContext; import com.geedgenetworks.common.utils.ReflectionUtils; import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.pojo.SplitConfig; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.typesafe.config.Config; @@ -42,7 +44,7 @@ public class JobExecutionTest { private final Executor, JobRuntimeEnvironment> processingExecutor; private final Executor, JobRuntimeEnvironment> postprocessingExecutor; private final Executor, JobRuntimeEnvironment> sinkExecutor; - + private final Set splitSet = new HashSet<>(); private final List nodes; private BiConsumer ADD_URL_TO_CLASSLOADER = @@ -219,6 +221,12 @@ public class JobExecutionTest { } else if (sinks.containsKey(node.getName())) { node.setType(ProcessorType.SINK); } else if (splits.containsKey(node.getName())) { + splits.forEach((key, value) -> { + SplitConfig splitConfig = new com.alibaba.fastjson.JSONObject((Map) value).toJavaObject(SplitConfig.class); + for(RuleContext ruleContext:splitConfig.getRules()) { + splitSet.add(ruleContext.getName()); + } + }); node.setType(ProcessorType.SPLIT); } else if (filters.containsKey(node.getName())) { node.setType(ProcessorType.FILTER); @@ -262,7 +270,7 @@ public class JobExecutionTest { throw new JobExecuteException("can't find downstream node " + downstreamNodeName); }); if (node.getType().name().equals(ProcessorType.FILTER.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = filterExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { }), node); } else { @@ -272,28 +280,28 @@ public class JobExecutionTest { dataStream = splitExecutor.execute(dataStream, node); } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { }), node); } else { dataStream = preprocessingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = processingExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { }), node); } else { dataStream = processingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { }), node); } else { dataStream = postprocessingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.SINK.name())) { - if (jobRuntimeEnvironment.getSplitSet().contains(node.getName())) { + if (splitSet.contains(node.getName())) { dataStream = sinkExecutor.execute(((SingleOutputStreamOperator) dataStream).getSideOutput(new OutputTag(node.getName()) { }), node); } else { -- cgit v1.2.3 From 063af747e7ccedd2c6c0688782766616db3f36b0 Mon Sep 17 00:00:00 2001 From: wangkuan Date: Wed, 4 Sep 2024 09:51:57 +0800 Subject: [improve][bootstrap]单元测试配置文件适配properties变更 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml index 76dcf7f..9724e21 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml @@ -191,6 +191,8 @@ sinks: application: # [object] Application Configuration env: # [object] Environment Variables name: groot-stream-job # [string] Job Name + properties: + hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket pipeline: object-reuse: true # [boolean] Object Reuse, default is false topology: # [array of object] Node List. It will be used build data flow for job dag graph. @@ -205,6 +207,5 @@ application: # [object] Application Configuration downstream: [collect_sink] - name: collect_sink parallelism: 1 - properties: - hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket + -- cgit v1.2.3 From eb1b2aed15b929a680386c986a8179bd02ee5781 Mon Sep 17 00:00:00 2001 From: doufenghu Date: Thu, 19 Sep 2024 17:54:57 +0800 Subject: [Fix][e2e-clickhouse] Fix the issue where the source table data is cleared during testClickHouse. --- .../test/java/com/geedgenetworks/test/e2e/clickhouse/ClickHouseIT.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/groot-tests/test-e2e-clickhouse/src/test/java/com/geedgenetworks/test/e2e/clickhouse/ClickHouseIT.java b/groot-tests/test-e2e-clickhouse/src/test/java/com/geedgenetworks/test/e2e/clickhouse/ClickHouseIT.java index 20caace..8b44ed7 100644 --- a/groot-tests/test-e2e-clickhouse/src/test/java/com/geedgenetworks/test/e2e/clickhouse/ClickHouseIT.java +++ b/groot-tests/test-e2e-clickhouse/src/test/java/com/geedgenetworks/test/e2e/clickhouse/ClickHouseIT.java @@ -122,7 +122,6 @@ public class ClickHouseIT extends TestSuiteBase implements TestResource { @TestTemplate public void testClickHouse(TestContainer container) throws Exception { assertHasData(SOURCE_TABLE); - clearTable(SOURCE_TABLE); } @TestTemplate @@ -144,7 +143,6 @@ public class ClickHouseIT extends TestSuiteBase implements TestResource { () -> { assertHasData(SINK_TABLE); compareResult(); - clearTable(SINK_TABLE); }); } -- cgit v1.2.3 From 56500315923d67696caa6a261eb39c27a7db71a4 Mon Sep 17 00:00:00 2001 From: doufenghu Date: Thu, 19 Sep 2024 18:04:00 +0800 Subject: Rename version 1.6.0 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index b88bd3f..f1fb003 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ - 1.6.0-SNAPSHOT + 1.6.0 11 UTF-8 ${java.version} -- cgit v1.2.3