diff options
27 files changed, 614 insertions, 204 deletions
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml index b77958d..f5c6610 100644 --- a/config/grootstream_job_example.yaml +++ b/config/grootstream_job_example.yaml @@ -34,6 +34,7 @@ processing_pipelines: window_timestamp_field: recv_time window_size: 6 window_slide: 10 #滑动窗口步长 + mini_batch: true functions: - function: NUMBER_SUM lookup_fields: [ sent_pkts ] diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java index 2edc5e7..6ed3888 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java @@ -65,8 +65,10 @@ public class JobSplitWithAggTest { } catch (Exception e) { throw new JobExecuteException("Job executed error", e); } - Assert.assertEquals(1, CollectSink.values.size()); + + Assert.assertEquals(2, CollectSink.values.size()); Assert.assertEquals("2", CollectSink.values.get(0).getExtractedFields().get("sessions").toString()); + Assert.assertEquals("1.5", CollectSink.values.get(0).getExtractedFields().get("pkts").toString()); } } diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml index 872800f..6a7011a 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml @@ -3,66 +3,34 @@ sources: type : inline fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. properties: - data: '[{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' + data: '[{"pkts":2,"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":1,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":1,"server_ip":"2600:1015:b002::"},{"sessions":0,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925799000", "client_ip":"192.168.0.2","pkts":1,"server_ip":"2600:1015:b002::"}]' interval.per.row: 1s # 可选 repeat.count: 1 # 可选 format: json json.ignore.parse.errors: false + watermark_timestamp: recv_time + watermark_timestamp_unit: ms + watermark_lag: 10 sinks: collect_sink: type: collect properties: format: json -splits: - test_split: - type: split - rules: - - name: aggregate_processor - expression: event.decoded_as == 'DNS' postprocessing_pipelines: - pre_etl_processor: # [object] Processing Pipeline - type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl - remove_fields: [fields,tags] - output_fields: - functions: # [array of object] Function List - - - function: FLATTEN - lookup_fields: [ fields,tags ] - output_fields: [ ] - parameters: - #prefix: "" - depth: 3 - # delimiter: "." - - - function: RENAME - lookup_fields: [ '' ] - output_fields: [ '' ] - filter: - parameters: - # parent_fields: [tags] - # rename_fields: - # tags: tags - rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key; - - - - function: UNIX_TIMESTAMP_CONVERTER - lookup_fields: [ timestamp_ms ] - output_fields: [ recv_time ] - parameters: - precision: seconds - interval: 300 - # aggregate_processor: type: aggregate group_by_fields: [decoded_as] - window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time + window_type: tumbling_event_time # tumbling_event_time,sliding_processing_time,sliding_event_time window_size: 5 window_timestamp_field: test_time + mini_batch: true functions: - function: NUMBER_SUM lookup_fields: [ sessions ] + - function: MEAN + lookup_fields: [ pkts ] table_processor: type: table @@ -79,9 +47,6 @@ application: # [object] Application Configuration topology: # [array of object] Node List. It will be used build data flow for job dag graph. - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. parallelism: 1 # [number] Operator-Level Parallelism. - downstream: [test_split] - - name: test_split - parallelism: 1 downstream: [ aggregate_processor ] - name: aggregate_processor parallelism: 1 diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java index 0b0379d..af94abf 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java @@ -5,8 +5,6 @@ import com.geedgenetworks.common.udf.UDFContext; import java.util.List; -import static com.geedgenetworks.common.Event.WINDOW_START_TIMESTAMP; - public interface AggregateConfigOptions { Option<String> TYPE = Options.key("type") .stringType() @@ -42,7 +40,10 @@ public interface AggregateConfigOptions { .intType() .noDefaultValue() .withDescription("The size of window."); - + Option<Boolean> MINI_BATCH = Options.key("mini_batch") + .booleanType() + .defaultValue(false) + .withDescription("The label of pre_aggrergate."); Option<Integer> WINDOW_SLIDE = Options.key("window_slide") .intType() .noDefaultValue() diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java index 455073f..6e5ab80 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java @@ -8,14 +8,11 @@ import java.io.Serializable; public interface AggregateFunction extends Serializable { void open(UDFContext udfContext); - Accumulator initAccumulator(Accumulator acc); - Accumulator add(Event val, Accumulator acc); - String functionName(); - + default Accumulator getMiddleResult(Accumulator acc){return acc;} Accumulator getResult(Accumulator acc); - + Accumulator merge(Accumulator a, Accumulator b); default void close(){}; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java index d3cbaac..ebdb0bd 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java @@ -1,15 +1,15 @@ package com.geedgenetworks.core.pojo; +import com.alibaba.fastjson2.annotation.JSONField; import com.geedgenetworks.common.udf.UDFContext; import lombok.Data; import lombok.EqualsAndHashCode; -import java.io.Serializable; import java.util.List; -import java.util.Map; + @EqualsAndHashCode(callSuper = true) @Data -public class AggregateConfig extends ProcessorConfig { +public class AggregateConfig extends ProcessorConfig { private List<String> group_by_fields; @@ -18,5 +18,7 @@ public class AggregateConfig extends ProcessorConfig { private Integer window_size; private Integer window_slide; private List<UDFContext> functions; + @JSONField(defaultValue = "false" ) + private Boolean mini_batch; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java index 416a7ea..2508730 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java @@ -13,18 +13,50 @@ public class OnlineStatistics { aggregate += (delta * (val.doubleValue() - mean)); return this; } + + public OnlineStatistics merge(OnlineStatistics other) { + if (other.n == 0) { + return this; // Nothing to merge + } + if (this.n == 0) { + this.n = other.n; + this.mean = other.mean; + this.aggregate = other.aggregate; + return this; + } + + // Combine counts + long newN = this.n + other.n; + + // Calculate the new mean + double delta = other.mean - this.mean; + this.mean += delta * (other.n / (double) newN); + + // Update the aggregate + this.aggregate += other.aggregate + + (this.n * delta * delta) / newN; + + // Update the count + this.n = newN; + + return this; + } + //计算总体标准差 public double stddevp() { return Math.sqrt(variancep()); } + //计算总体方差 public double variancep() { return aggregate / n; } + //计算样本标准差 public double stddev() { return Math.sqrt(variance()); } + //计算样本方差 public double variance() { return aggregate / (n - 1); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java index cf78310..c261fb6 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java @@ -12,42 +12,83 @@ import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeW import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.util.OutputTag; import static com.geedgenetworks.common.Constants.*; -public class AggregateProcessorImpl implements AggregateProcessor { +public class AggregateProcessorImpl implements AggregateProcessor { @Override - public DataStream<Event> processorFunction(DataStream<Event> grootEventDataStream, AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception { + public DataStream<Event> processorFunction(DataStream<Event> grootEventSingleOutputStreamOperator, AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception { - if (aggregateConfig.getParallelism() != 0) { + SingleOutputStreamOperator<Event> singleOutputStreamOperator; + if (aggregateConfig.getMini_batch()) { switch (aggregateConfig.getWindow_type()) { case TUMBLING_PROCESSING_TIME: - return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .process(new FirstAggregationProcessingTime(aggregateConfig, aggregateConfig.getWindow_size())) + .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields())) + .window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))) + .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; case TUMBLING_EVENT_TIME: - return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .process(new FirstAggregationEventTime(aggregateConfig, aggregateConfig.getWindow_size())) + .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields())) + .window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))) + .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; case SLIDING_PROCESSING_TIME: - return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .process(new FirstAggregationProcessingTime(aggregateConfig, aggregateConfig.getWindow_slide())) + .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields())) + .window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))) + .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; case SLIDING_EVENT_TIME: - return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .process(new FirstAggregationEventTime(aggregateConfig, aggregateConfig.getWindow_slide())) + .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields())) + .window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))) + .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; default: throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type"); + } - }else { + } else { switch (aggregateConfig.getWindow_type()) { case TUMBLING_PROCESSING_TIME: - return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())) + .window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))) + .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; case TUMBLING_EVENT_TIME: - return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())) + .window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))) + .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; case SLIDING_PROCESSING_TIME: - return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())) + .window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))) + .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; case SLIDING_EVENT_TIME: - return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName()); + singleOutputStreamOperator = grootEventSingleOutputStreamOperator + .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())) + .window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))) + .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + break; default: throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type"); } } + if (aggregateConfig.getParallelism() != 0) { + singleOutputStreamOperator.setParallelism(aggregateConfig.getParallelism()); + } + return singleOutputStreamOperator.name(aggregateConfig.getName()); } @@ -55,4 +96,5 @@ public class AggregateProcessorImpl implements AggregateProcessor { public String type() { return "aggregate"; } + } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java index 165ed1b..da09690 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java @@ -1,5 +1,6 @@ package com.geedgenetworks.core.processor.aggregate; +import cn.hutool.crypto.SecureUtil; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.KeybyEntity; @@ -20,16 +21,17 @@ public class KeySelector implements org.apache.flink.api.java.functions.KeySelec KeybyEntity keybyEntity = new KeybyEntity(new HashMap<>()); StringBuilder stringBuilder = new StringBuilder(); - for(String key: keys){ + for (String key : keys) { - if(value.getExtractedFields().containsKey(key)){ - keybyEntity.getKeys().put(key,value.getExtractedFields().get(key)); + if (value.getExtractedFields().containsKey(key)) { + keybyEntity.getKeys().put(key, value.getExtractedFields().get(key)); stringBuilder.append(value.getExtractedFields().get(key).toString()); - }else { + } else { stringBuilder.append(","); } } - keybyEntity.setKeysToString(stringBuilder.toString()); - return keybyEntity; + String hashedKey = SecureUtil.md5(stringBuilder.toString()); + keybyEntity.setKeysToString(hashedKey); + return keybyEntity; } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java index 423eff9..3921ee2 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java @@ -1,17 +1,17 @@ /** - * Copyright 2017 Hortonworks. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Copyright 2017 Hortonworks. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. **/ package com.geedgenetworks.core.udf.udaf; @@ -22,9 +22,9 @@ import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.UDFContext; -import com.geedgenetworks.core.processor.projection.UdfEntity; -import java.util.*; +import java.util.ArrayList; +import java.util.List; /** * Collects elements within a group and returns the list of aggregated objects @@ -36,18 +36,18 @@ public class CollectList implements AggregateFunction { @Override public void open(UDFContext udfContext) { - if(udfContext.getLookup_fields()==null ){ + if (udfContext.getLookup_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - this.lookupField = udfContext.getLookup_fields().get(0); - if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + this.lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { this.outputField = udfContext.getOutput_fields().get(0); - } - else { + } else { outputField = lookupField; } } + @Override public Accumulator initAccumulator(Accumulator acc) { acc.getMetricsFields().put(outputField, new ArrayList<>()); @@ -56,7 +56,7 @@ public class CollectList implements AggregateFunction { @Override public Accumulator add(Event event, Accumulator acc) { - if(event.getExtractedFields().containsKey(lookupField)){ + if (event.getExtractedFields().containsKey(lookupField)) { Object object = event.getExtractedFields().get(lookupField); List<Object> aggregate = (List<Object>) acc.getMetricsFields().get(outputField); aggregate.add(object); @@ -75,4 +75,17 @@ public class CollectList implements AggregateFunction { return acc; } + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + List<Object> firstValue = (List<Object>) firstAcc.getMetricsFields().get(outputField); + List<Object> secondValue = (List<Object>) secondAcc.getMetricsFields().get(outputField); + firstValue.addAll(secondValue); + } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + List<Object> secondValue = (List<Object>) secondAcc.getMetricsFields().get(outputField); + firstAcc.getMetricsFields().put(outputField, secondValue); + } + return firstAcc; + } + } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java index b4dfb14..9ec9b09 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java @@ -8,7 +8,6 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.UDFContext; -import java.util.ArrayList; import java.util.HashSet; import java.util.Set; @@ -23,17 +22,17 @@ public class CollectSet implements AggregateFunction { @Override public void open(UDFContext udfContext) { - if(udfContext.getLookup_fields()==null ){ + if (udfContext.getLookup_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - this.lookupField = udfContext.getLookup_fields().get(0); - if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + this.lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { this.outputField = udfContext.getOutput_fields().get(0); - } - else { + } else { outputField = lookupField; } } + @Override public Accumulator initAccumulator(Accumulator acc) { acc.getMetricsFields().put(outputField, new HashSet<>()); @@ -42,7 +41,7 @@ public class CollectSet implements AggregateFunction { @Override public Accumulator add(Event event, Accumulator acc) { - if(event.getExtractedFields().containsKey(lookupField)){ + if (event.getExtractedFields().containsKey(lookupField)) { Object object = event.getExtractedFields().get(lookupField); Set<Object> aggregate = (Set<Object>) acc.getMetricsFields().get(outputField); aggregate.add(object); @@ -61,5 +60,16 @@ public class CollectSet implements AggregateFunction { return acc; } - + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + Set<Object> firstValue = (Set<Object>) firstAcc.getMetricsFields().get(outputField); + Set<Object> secondValue = (Set<Object>) secondAcc.getMetricsFields().get(outputField); + firstValue.addAll(secondValue); + } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + Set<Object> secondValue = (Set<Object>) secondAcc.getMetricsFields().get(outputField); + firstAcc.getMetricsFields().put(outputField, secondValue); + } + return firstAcc; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java index 6301a01..a1a35be 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java @@ -1,17 +1,17 @@ /** - * Copyright 2017 Hortonworks. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Copyright 2017 Hortonworks. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. **/ package com.geedgenetworks.core.udf.udaf; @@ -23,8 +23,6 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.UDFContext; -import java.util.ArrayList; - /** * Collects elements within a group and returns the list of aggregated objects */ @@ -36,14 +34,13 @@ public class FirstValue implements AggregateFunction { @Override public void open(UDFContext udfContext) { - if(udfContext.getLookup_fields()==null ){ + if (udfContext.getLookup_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - this.lookupField = udfContext.getLookup_fields().get(0); - if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + this.lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { this.outputField = udfContext.getOutput_fields().get(0); - } - else { + } else { outputField = lookupField; } } @@ -55,7 +52,7 @@ public class FirstValue implements AggregateFunction { @Override public Accumulator add(Event event, Accumulator acc) { - if(!acc.getMetricsFields().containsKey(outputField) && event.getExtractedFields().containsKey(lookupField)){ + if (!acc.getMetricsFields().containsKey(outputField) && event.getExtractedFields().containsKey(lookupField)) { acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField)); } return acc; @@ -71,4 +68,11 @@ public class FirstValue implements AggregateFunction { return acc; } + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField)); + } + return firstAcc; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java index 1648fa5..6af0be3 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java @@ -95,7 +95,10 @@ public abstract class HdrHistogramBaseAggregate implements AggregateFunction { his.merge(h);
}
-
+ @Override
+ public Accumulator merge(Accumulator a, Accumulator b) {
+ return null;
+ }
@Override
public void close() {}
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java index f27a2e6..44b374e 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java @@ -1,17 +1,17 @@ /** - * Copyright 2017 Hortonworks. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Copyright 2017 Hortonworks. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. **/ package com.geedgenetworks.core.udf.udaf; @@ -23,9 +23,6 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.UDFContext; -import java.util.ArrayList; -import java.util.List; - /** * Collects elements within a group and returns the list of aggregated objects */ @@ -37,17 +34,17 @@ public class LastValue implements AggregateFunction { @Override public void open(UDFContext udfContext) { - if(udfContext.getLookup_fields()==null ){ + if (udfContext.getLookup_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - this.lookupField = udfContext.getLookup_fields().get(0); - if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + this.lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { this.outputField = udfContext.getOutput_fields().get(0); - } - else { + } else { outputField = lookupField; } } + @Override public Accumulator initAccumulator(Accumulator acc) { return acc; @@ -55,7 +52,7 @@ public class LastValue implements AggregateFunction { @Override public Accumulator add(Event event, Accumulator acc) { - if(event.getExtractedFields().containsKey(lookupField)){ + if (event.getExtractedFields().containsKey(lookupField)) { acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField)); } return acc; @@ -71,4 +68,11 @@ public class LastValue implements AggregateFunction { return acc; } + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (secondAcc.getMetricsFields().containsKey(outputField)) { + firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField)); + } + return firstAcc; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java index ea33271..05de38c 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java @@ -13,20 +13,22 @@ public class LongCount implements AggregateFunction { @Override - public void open(UDFContext udfContext){ - if(udfContext.getOutput_fields()==null ){ + public void open(UDFContext udfContext) { + if (udfContext.getOutput_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } outputField = udfContext.getOutput_fields().get(0); } + @Override public Accumulator initAccumulator(Accumulator acc) { return acc; } + @Override public Accumulator add(Event event, Accumulator acc) { - acc.getMetricsFields().compute(outputField, (k, v) -> (v == null) ? 1L : (long)v + 1L); + acc.getMetricsFields().compute(outputField, (k, v) -> (v == null) ? 1L : (long) v + 1L); return acc; } @@ -40,5 +42,18 @@ public class LongCount implements AggregateFunction { return acc; } - + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + + long firstValue = (long) firstAcc.getMetricsFields().get(outputField); + long secondValue = (long) secondAcc.getMetricsFields().get(outputField); + firstValue = firstValue + secondValue; + firstAcc.getMetricsFields().put(outputField, firstValue); + } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + Number secondValue = (Number) secondAcc.getMetricsFields().get(outputField); + firstAcc.getMetricsFields().put(outputField, secondValue); + } + return firstAcc; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java index 2a615ef..9c4e070 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java @@ -16,20 +16,20 @@ public class Mean implements AggregateFunction { private String outputField; private Integer precision; private DecimalFormat df; + @Override - public void open(UDFContext udfContext){ + public void open(UDFContext udfContext) { - if(udfContext.getLookup_fields()==null ){ + if (udfContext.getLookup_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - lookupField = udfContext.getLookup_fields().get(0); - if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { outputField = udfContext.getOutput_fields().get(0); - } - else { + } else { outputField = lookupField; } - if(udfContext.getParameters()!= null && !udfContext.getParameters().isEmpty()) { + if (udfContext.getParameters() != null && !udfContext.getParameters().isEmpty()) { precision = Integer.parseInt(udfContext.getParameters().getOrDefault("precision", "-1").toString()); if (precision > 0) { StringBuilder pattern = new StringBuilder("#."); @@ -38,14 +38,15 @@ public class Mean implements AggregateFunction { } df = new DecimalFormat(pattern.toString()); } - }else { + } else { precision = -1; } } + @Override public Accumulator initAccumulator(Accumulator acc) { - acc.getMetricsFields().put(outputField,new OnlineStatistics()); + acc.getMetricsFields().put(outputField, new OnlineStatistics()); return acc; } @@ -67,16 +68,27 @@ public class Mean implements AggregateFunction { @Override public Accumulator getResult(Accumulator acc) { OnlineStatistics aggregate = (OnlineStatistics) acc.getMetricsFields().get(outputField); - if(precision<0){ + if (precision < 0) { acc.getMetricsFields().put(outputField, aggregate.mean()); - } - else if(precision>0){ + } else if (precision > 0) { acc.getMetricsFields().put(outputField, df.format(aggregate.mean())); - } - else { - acc.getMetricsFields().put(outputField,(long)aggregate.mean()); + } else { + acc.getMetricsFields().put(outputField, (long) aggregate.mean()); } return acc; } + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + OnlineStatistics acc1 = (OnlineStatistics) firstAcc.getMetricsFields().get(outputField); + acc1.merge((OnlineStatistics) secondAcc.getMetricsFields().get(outputField)); + long inEvents = firstAcc.getInEvents() + (secondAcc.getInEvents()); + long outEvent = firstAcc.getOutEvents() + (secondAcc.getOutEvents()); + long error = firstAcc.getErrorCount() + (secondAcc.getErrorCount()); + firstAcc.setInEvents(inEvents); + firstAcc.setErrorCount(error); + firstAcc.setOutEvents(outEvent); + return firstAcc; + } + } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java index 01e9a5b..e972133 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java @@ -6,7 +6,6 @@ import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.UDFContext; -import com.geedgenetworks.core.pojo.OnlineStatistics; public class NumberSum implements AggregateFunction { @@ -15,15 +14,14 @@ public class NumberSum implements AggregateFunction { @Override - public void open(UDFContext udfContext){ - if(udfContext.getLookup_fields()==null ){ + public void open(UDFContext udfContext) { + if (udfContext.getLookup_fields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - lookupField = udfContext.getLookup_fields().get(0); - if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { outputField = udfContext.getOutput_fields().get(0); - } - else { + } else { outputField = lookupField; } } @@ -32,22 +30,23 @@ public class NumberSum implements AggregateFunction { public Accumulator initAccumulator(Accumulator acc) { return acc; } + @Override public Accumulator add(Event event, Accumulator acc) { - if(event.getExtractedFields().containsKey(lookupField)){ - Number val = (Number) event.getExtractedFields().get(lookupField); - Number aggregate = (Number) acc.getMetricsFields().getOrDefault(outputField, 0L); - if (aggregate instanceof Long && ( val instanceof Integer|| val instanceof Long)) { - aggregate = aggregate.longValue() + val.longValue(); - } else if (aggregate instanceof Float || val instanceof Float) { - aggregate = aggregate.floatValue() + val.floatValue(); - } else if (aggregate instanceof Double || val instanceof Double) { - aggregate = aggregate.doubleValue() + val.doubleValue(); - } - acc.getMetricsFields().put(outputField, aggregate); + if (event.getExtractedFields().containsKey(lookupField)) { + Number val = (Number) event.getExtractedFields().get(lookupField); + Number aggregate = (Number) acc.getMetricsFields().getOrDefault(outputField, 0L); + if (aggregate instanceof Long && (val instanceof Integer || val instanceof Long)) { + aggregate = aggregate.longValue() + val.longValue(); + } else if (aggregate instanceof Float || val instanceof Float) { + aggregate = aggregate.floatValue() + val.floatValue(); + } else if (aggregate instanceof Double || val instanceof Double) { + aggregate = aggregate.doubleValue() + val.doubleValue(); } - return acc; + acc.getMetricsFields().put(outputField, aggregate); + } + return acc; } @Override @@ -65,4 +64,24 @@ public class NumberSum implements AggregateFunction { } + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + + Number firstValue = (Number) firstAcc.getMetricsFields().get(outputField); + Number secondValue = (Number) secondAcc.getMetricsFields().get(outputField); + if (firstValue instanceof Long && (secondValue instanceof Integer || secondValue instanceof Long)) { + firstValue = firstValue.longValue() + secondValue.longValue(); + } else if (firstValue instanceof Float || secondValue instanceof Float) { + firstValue = firstValue.floatValue() + secondValue.floatValue(); + } else if (firstValue instanceof Double || secondValue instanceof Double) { + firstValue = firstValue.doubleValue() + secondValue.doubleValue(); + } + firstAcc.getMetricsFields().put(outputField, firstValue); + } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + Number secondValue = (Number) secondAcc.getMetricsFields().get(outputField); + firstAcc.getMetricsFields().put(outputField, secondValue); + } + return firstAcc; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java index ec003f8..041bad9 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java @@ -16,7 +16,10 @@ public class HlldApproxCountDistinct extends HlldBaseAggregate { return acc;
}
-
+ @Override
+ public Accumulator merge(Accumulator a, Accumulator b) {
+ return null;
+ }
@Override
public String functionName() {
return "APPROX_COUNT_DISTINCT_HLLD";
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java index 71d61dc..d6c3a44 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java @@ -92,7 +92,10 @@ public abstract class HlldBaseAggregate implements AggregateFunction { Hll hll = HllUtils.deserializeHll(value);
hllUnion.update(hll);
}
-
+ @Override
+ public Accumulator merge(Accumulator a, Accumulator b) {
+ return null;
+ }
@Override
public void close() {}
}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java index b0d846b..8c0fe3f 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java @@ -36,11 +36,51 @@ public class CollectListTest { public void test() throws ParseException { List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4"); - excute(arr); + List<String> arr2 = List.of("192.168.1.5", "192.168.1.6", "192.168.1.3", "192.168.1.4"); + testMerge(arr,arr2); + testGetResult(arr); } - private static void excute(List<String> arr) throws ParseException { + private void testMerge(List<String> arr,List<String> arr2) { + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_list")); + CollectList collectList = new CollectList(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + collectList.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = collectList.getResult(collectList.merge(result1,result2)); + List<String> vals = (List<String>) result.getMetricsFields().get("field_list"); + assertEquals(vals.size(),8); + assertEquals("192.168.1.6",vals.get(5).toString()); + + } + private Accumulator getMiddleResult(UDFContext udfContext,List<String> arr) { + + + CollectList collectList = new CollectList(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + collectList.open(udfContext); + Accumulator agg = collectList.initAccumulator(accumulator); + + for (String o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = collectList.add(event, agg); + + } + return collectList.getMiddleResult(agg); + } + private void testGetResult(List<String> arr) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java index ea4fe8d..47e74bd 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java @@ -31,14 +31,53 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class CollectSetTest { @Test - public void testNumberSumTest() throws ParseException { + public void test() throws ParseException { List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4"); - excute(arr); + List<String> arr2 = List.of("192.168.1.5", "192.168.1.6", "192.168.1.3", "192.168.1.4"); + testMerge(arr,arr2); + testGetResult(arr); } - private static void excute(List<String> arr) throws ParseException { + private void testMerge(List<String> arr,List<String> arr2) { + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_list")); + CollectSet collectSet = new CollectSet(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + collectSet.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = collectSet.getResult(collectSet.merge(result1,result2)); + Set<String> vals = (Set<String>) result.getMetricsFields().get("field_list"); + assertEquals(vals.size(),6); + + } + private Accumulator getMiddleResult(UDFContext udfContext,List<String> arr) { + + + CollectSet collectSet = new CollectSet(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + collectSet.open(udfContext); + Accumulator agg = collectSet.initAccumulator(accumulator); + + for (String o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = collectSet.add(event, agg); + + } + return collectSet.getMiddleResult(agg); + } + private static void testGetResult(List<String> arr) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java index 506f6de..3d87b14 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java @@ -34,11 +34,48 @@ public class FirstValueTest { public void test() throws ParseException { List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4"); - excute(arr); + List<String> arr2 = List.of("192.168.1.2", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4"); + testMerge(arr,arr2); + testGetResult(arr); } + private void testMerge(List<String> arr,List<String> arr2) { - private static void excute(List<String> arr) throws ParseException { + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_first")); + FirstValue firstValue = new FirstValue(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + firstValue.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = firstValue.getResult(firstValue.merge(result1,result2)); + String val = (String) result.getMetricsFields().get("field_first"); + assertEquals(val,"192.168.1.1"); + + } + private Accumulator getMiddleResult(UDFContext udfContext,List<String> arr) { + + + FirstValue firstValue = new FirstValue(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + firstValue.open(udfContext); + Accumulator agg = firstValue.initAccumulator(accumulator); + for (String o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = firstValue.add(event, agg); + + } + return firstValue.getMiddleResult(agg); + } + private static void testGetResult(List<String> arr) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java index f8306cd..3d61019 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java @@ -37,11 +37,48 @@ public class LastValueTest { public void test() throws ParseException { List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4"); - excute(arr); + List<String> arr2 = List.of("192.168.1.2", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.3"); + testMerge(arr,arr2); + testGetResult(arr); } + private void testMerge(List<String> arr,List<String> arr2) { - private static void excute(List<String> arr) throws ParseException { + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_last")); + LastValue lastValue = new LastValue(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + lastValue.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = lastValue.getResult(lastValue.merge(result1,result2)); + String val = (String) result.getMetricsFields().get("field_last"); + assertEquals(val,"192.168.1.3"); + + } + private Accumulator getMiddleResult(UDFContext udfContext,List<String> arr) { + + + LastValue lastValue = new LastValue(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + lastValue.open(udfContext); + Accumulator agg = lastValue.initAccumulator(accumulator); + for (String o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = lastValue.add(event, agg); + + } + return lastValue.getMiddleResult(agg); + } + private static void testGetResult(List<String> arr) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java index 3c02499..d13df72 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java @@ -19,6 +19,7 @@ package com.geedgenetworks.core.udf.test.aggregate; import com.geedgenetworks.common.Accumulator; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.udaf.LastValue; import com.geedgenetworks.core.udf.udaf.LongCount; import com.geedgenetworks.core.udf.udaf.NumberSum; import com.ibm.icu.text.NumberFormat; @@ -38,10 +39,46 @@ public class LongCountTest { public void test() throws ParseException { Long[] longArr = new Long[]{1L, 2L, 3L, 4L}; - excute(longArr); + Long[] longArr2 = new Long[]{1L, 2L, 3L, 4L}; + testMerge(longArr,longArr2); + testGetResult(longArr); } + private void testMerge(Number[] arr,Number[] arr2) { - private static void excute(Number[] arr) throws ParseException { + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("count")); + LongCount longCount = new LongCount(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + longCount.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = longCount.getResult(longCount.merge(result1,result2)); + assertEquals(Integer.parseInt((result.getMetricsFields().get("count").toString())),8); + + } + private Accumulator getMiddleResult(UDFContext udfContext,Number[] arr) { + + + LongCount longCount = new LongCount(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + longCount.open(udfContext); + Accumulator agg = longCount.initAccumulator(accumulator); + for (Number o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = longCount.add(event, agg); + + } + return longCount.getMiddleResult(agg); + } + private static void testGetResult(Number[] arr) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setOutput_fields(Collections.singletonList("count")); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java index 6deed0f..2927f43 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java @@ -39,12 +39,52 @@ public class MeanTest { Integer[] intArr1 = new Integer[]{1, 2, 3, 4}; Integer[] intArr2 = new Integer[]{1, 6, 3}; - excute(intArr1, 0); - excute2(intArr2, 2); - excute3(intArr1); + testInt(intArr1, 0); + testDouble(intArr2, 2); + testNoPrecision(intArr1); + testMerge(intArr1,intArr2,2); + } + private void testMerge(Number[] arr1,Number[] arr2,int precision) throws ParseException { + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_mean")); + udfContext.setParameters(new HashMap<>()); + udfContext.getParameters().put("precision", precision); + Mean mean = new Mean(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + mean.open(udfContext); + Accumulator result1 = getMiddleResult(arr1,precision); + Accumulator result2 = getMiddleResult(arr2,precision); + Accumulator result = mean.getResult(mean.merge(result1,result2)); + assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_mean").toString())),NumberFormat.getInstance().parse("2.86")); + } + private Accumulator getMiddleResult(Number[] arr,int precision) throws ParseException { + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_mean")); + udfContext.setParameters(new HashMap<>()); + udfContext.getParameters().put("precision", precision); + Mean mean = new Mean(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + mean.open(udfContext); + Accumulator agg = mean.initAccumulator(accumulator); + for (Number o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = mean.add(event, agg); + + } + return mean.getMiddleResult(agg); } - private static void excute(Number[] arr,int precision) throws ParseException { + + private void testInt(Number[] arr,int precision) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); @@ -65,11 +105,12 @@ public class MeanTest { agg = mean.add(event, agg); } + Accumulator result = mean.getResult(agg); assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_mean").toString())),NumberFormat.getInstance().parse("2")); } - private static void excute2(Number[] arr,int precision) throws ParseException { + private void testDouble(Number[] arr,int precision) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); @@ -94,7 +135,7 @@ public class MeanTest { assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_mean").toString())),NumberFormat.getInstance().parse("3.33")); } - private static void excute3(Number[] arr) throws ParseException { + private void testNoPrecision(Number[] arr) throws ParseException { UDFContext udfContext = new UDFContext(); udfContext.setLookup_fields(List.of("field")); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java index d0d3d2c..3fd9506 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java @@ -19,6 +19,7 @@ package com.geedgenetworks.core.udf.test.aggregate; import com.geedgenetworks.common.Accumulator; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.udaf.LongCount; import com.geedgenetworks.core.udf.udaf.NumberSum; import com.ibm.icu.text.NumberFormat; import org.junit.jupiter.api.Test; @@ -41,8 +42,44 @@ public class NumberSumTest { excute(doubleArr, Double.class); excute(intArr, Long.class); excute(longArr, Long.class); + testMerge(intArr,floatArr); + + } + private void testMerge(Number[] arr,Number[] arr2) { + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_sum")); + NumberSum numberSum = new NumberSum(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + numberSum.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = numberSum.getResult(numberSum.merge(result1,result2)); + assertEquals(Float.parseFloat((result.getMetricsFields().get("field_sum").toString())),20.0f); + } + private Accumulator getMiddleResult(UDFContext udfContext,Number[] arr) { + + + NumberSum numberSum = new NumberSum(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + numberSum.open(udfContext); + Accumulator agg = numberSum.initAccumulator(accumulator); + for (Number o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = numberSum.add(event, agg); + } + return numberSum.getMiddleResult(agg); + } private static void excute(Number[] arr, Class<? extends Number> clazz) throws ParseException { UDFContext udfContext = new UDFContext(); diff --git a/groot-tests/test-e2e-kafka/pom.xml b/groot-tests/test-e2e-kafka/pom.xml index ab1ba72..4592f79 100644 --- a/groot-tests/test-e2e-kafka/pom.xml +++ b/groot-tests/test-e2e-kafka/pom.xml @@ -47,6 +47,18 @@ </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>9</source> + <target>9</target> + </configuration> + </plugin> + </plugins> + </build> </project>
\ No newline at end of file |
