summaryrefslogtreecommitdiff
path: root/groot-core
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-08-29 18:24:42 +0800
committerwangkuan <[email protected]>2024-08-29 18:24:42 +0800
commit0ea9b9d9db5f92e7afd7b86ddad1f8d69d5c0945 (patch)
treeca735cab001f5f3a597d87122cda0c998f3b9426 /groot-core
parent8d90c04d22a5df3ac5a6d4d12fc1b9fee03f38e8 (diff)
[feature][bootstrap][core]增加预聚合功能,相关函数支持merge
Diffstat (limited to 'groot-core')
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java32
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java68
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java14
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java55
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java26
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java46
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java48
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java23
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java42
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java57
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java5
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java44
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java45
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java41
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java41
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java41
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java53
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java37
21 files changed, 584 insertions, 152 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java
index d3cbaac..ebdb0bd 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/AggregateConfig.java
@@ -1,15 +1,15 @@
package com.geedgenetworks.core.pojo;
+import com.alibaba.fastjson2.annotation.JSONField;
import com.geedgenetworks.common.udf.UDFContext;
import lombok.Data;
import lombok.EqualsAndHashCode;
-import java.io.Serializable;
import java.util.List;
-import java.util.Map;
+
@EqualsAndHashCode(callSuper = true)
@Data
-public class AggregateConfig extends ProcessorConfig {
+public class AggregateConfig extends ProcessorConfig {
private List<String> group_by_fields;
@@ -18,5 +18,7 @@ public class AggregateConfig extends ProcessorConfig {
private Integer window_size;
private Integer window_slide;
private List<UDFContext> functions;
+ @JSONField(defaultValue = "false" )
+ private Boolean mini_batch;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java
index 416a7ea..2508730 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java
@@ -13,18 +13,50 @@ public class OnlineStatistics {
aggregate += (delta * (val.doubleValue() - mean));
return this;
}
+
+ public OnlineStatistics merge(OnlineStatistics other) {
+ if (other.n == 0) {
+ return this; // Nothing to merge
+ }
+ if (this.n == 0) {
+ this.n = other.n;
+ this.mean = other.mean;
+ this.aggregate = other.aggregate;
+ return this;
+ }
+
+ // Combine counts
+ long newN = this.n + other.n;
+
+ // Calculate the new mean
+ double delta = other.mean - this.mean;
+ this.mean += delta * (other.n / (double) newN);
+
+ // Update the aggregate
+ this.aggregate += other.aggregate +
+ (this.n * delta * delta) / newN;
+
+ // Update the count
+ this.n = newN;
+
+ return this;
+ }
+
//计算总体标准差
public double stddevp() {
return Math.sqrt(variancep());
}
+
//计算总体方差
public double variancep() {
return aggregate / n;
}
+
//计算样本标准差
public double stddev() {
return Math.sqrt(variance());
}
+
//计算样本方差
public double variance() {
return aggregate / (n - 1);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
index cf78310..c261fb6 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
@@ -12,42 +12,83 @@ import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeW
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.util.OutputTag;
import static com.geedgenetworks.common.Constants.*;
-public class AggregateProcessorImpl implements AggregateProcessor {
+public class AggregateProcessorImpl implements AggregateProcessor {
@Override
- public DataStream<Event> processorFunction(DataStream<Event> grootEventDataStream, AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception {
+ public DataStream<Event> processorFunction(DataStream<Event> grootEventSingleOutputStreamOperator, AggregateConfig aggregateConfig, ExecutionConfig config) throws Exception {
- if (aggregateConfig.getParallelism() != 0) {
+ SingleOutputStreamOperator<Event> singleOutputStreamOperator;
+ if (aggregateConfig.getMini_batch()) {
switch (aggregateConfig.getWindow_type()) {
case TUMBLING_PROCESSING_TIME:
- return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ .process(new FirstAggregationProcessingTime(aggregateConfig, aggregateConfig.getWindow_size()))
+ .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields()))
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size())))
+ .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ break;
case TUMBLING_EVENT_TIME:
- return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ .process(new FirstAggregationEventTime(aggregateConfig, aggregateConfig.getWindow_size()))
+ .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields()))
+ .window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size())))
+ .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ break;
case SLIDING_PROCESSING_TIME:
- return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ .process(new FirstAggregationProcessingTime(aggregateConfig, aggregateConfig.getWindow_slide()))
+ .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields()))
+ .window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide())))
+ .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ break;
case SLIDING_EVENT_TIME:
- return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).setParallelism(aggregateConfig.getParallelism()).name(aggregateConfig.getName());
+ singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ .process(new FirstAggregationEventTime(aggregateConfig, aggregateConfig.getWindow_slide()))
+ .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields()))
+ .window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide())))
+ .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ break;
default:
throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type");
+
}
- }else {
+ } else {
switch (aggregateConfig.getWindow_type()) {
case TUMBLING_PROCESSING_TIME:
- return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
+ singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields()))
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size())))
+ .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ break;
case TUMBLING_EVENT_TIME:
- return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
+ singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields()))
+ .window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size())))
+ .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ break;
case SLIDING_PROCESSING_TIME:
- return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
+ singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields()))
+ .window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide())))
+ .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ break;
case SLIDING_EVENT_TIME:
- return grootEventDataStream.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())).window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))).aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)).name(aggregateConfig.getName());
+ singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields()))
+ .window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide())))
+ .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ break;
default:
throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type");
}
}
+ if (aggregateConfig.getParallelism() != 0) {
+ singleOutputStreamOperator.setParallelism(aggregateConfig.getParallelism());
+ }
+ return singleOutputStreamOperator.name(aggregateConfig.getName());
}
@@ -55,4 +96,5 @@ public class AggregateProcessorImpl implements AggregateProcessor {
public String type() {
return "aggregate";
}
+
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java
index 165ed1b..da09690 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java
@@ -1,5 +1,6 @@
package com.geedgenetworks.core.processor.aggregate;
+import cn.hutool.crypto.SecureUtil;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.KeybyEntity;
@@ -20,16 +21,17 @@ public class KeySelector implements org.apache.flink.api.java.functions.KeySelec
KeybyEntity keybyEntity = new KeybyEntity(new HashMap<>());
StringBuilder stringBuilder = new StringBuilder();
- for(String key: keys){
+ for (String key : keys) {
- if(value.getExtractedFields().containsKey(key)){
- keybyEntity.getKeys().put(key,value.getExtractedFields().get(key));
+ if (value.getExtractedFields().containsKey(key)) {
+ keybyEntity.getKeys().put(key, value.getExtractedFields().get(key));
stringBuilder.append(value.getExtractedFields().get(key).toString());
- }else {
+ } else {
stringBuilder.append(",");
}
}
- keybyEntity.setKeysToString(stringBuilder.toString());
- return keybyEntity;
+ String hashedKey = SecureUtil.md5(stringBuilder.toString());
+ keybyEntity.setKeysToString(hashedKey);
+ return keybyEntity;
}
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java
index 423eff9..3921ee2 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java
@@ -1,17 +1,17 @@
/**
- * Copyright 2017 Hortonworks.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Copyright 2017 Hortonworks.
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
**/
package com.geedgenetworks.core.udf.udaf;
@@ -22,9 +22,9 @@ import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.AggregateFunction;
import com.geedgenetworks.common.udf.UDFContext;
-import com.geedgenetworks.core.processor.projection.UdfEntity;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
/**
* Collects elements within a group and returns the list of aggregated objects
@@ -36,18 +36,18 @@ public class CollectList implements AggregateFunction {
@Override
public void open(UDFContext udfContext) {
- if(udfContext.getLookup_fields()==null ){
+ if (udfContext.getLookup_fields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- this.lookupField = udfContext.getLookup_fields().get(0);
- if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
+ this.lookupField = udfContext.getLookup_fields().get(0);
+ if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
this.outputField = udfContext.getOutput_fields().get(0);
- }
- else {
+ } else {
outputField = lookupField;
}
}
+
@Override
public Accumulator initAccumulator(Accumulator acc) {
acc.getMetricsFields().put(outputField, new ArrayList<>());
@@ -56,7 +56,7 @@ public class CollectList implements AggregateFunction {
@Override
public Accumulator add(Event event, Accumulator acc) {
- if(event.getExtractedFields().containsKey(lookupField)){
+ if (event.getExtractedFields().containsKey(lookupField)) {
Object object = event.getExtractedFields().get(lookupField);
List<Object> aggregate = (List<Object>) acc.getMetricsFields().get(outputField);
aggregate.add(object);
@@ -75,4 +75,17 @@ public class CollectList implements AggregateFunction {
return acc;
}
+ @Override
+ public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
+ if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+ List<Object> firstValue = (List<Object>) firstAcc.getMetricsFields().get(outputField);
+ List<Object> secondValue = (List<Object>) secondAcc.getMetricsFields().get(outputField);
+ firstValue.addAll(secondValue);
+ } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+ List<Object> secondValue = (List<Object>) secondAcc.getMetricsFields().get(outputField);
+ firstAcc.getMetricsFields().put(outputField, secondValue);
+ }
+ return firstAcc;
+ }
+
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java
index b4dfb14..9ec9b09 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java
@@ -8,7 +8,6 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.AggregateFunction;
import com.geedgenetworks.common.udf.UDFContext;
-import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
@@ -23,17 +22,17 @@ public class CollectSet implements AggregateFunction {
@Override
public void open(UDFContext udfContext) {
- if(udfContext.getLookup_fields()==null ){
+ if (udfContext.getLookup_fields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- this.lookupField = udfContext.getLookup_fields().get(0);
- if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
+ this.lookupField = udfContext.getLookup_fields().get(0);
+ if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
this.outputField = udfContext.getOutput_fields().get(0);
- }
- else {
+ } else {
outputField = lookupField;
}
}
+
@Override
public Accumulator initAccumulator(Accumulator acc) {
acc.getMetricsFields().put(outputField, new HashSet<>());
@@ -42,7 +41,7 @@ public class CollectSet implements AggregateFunction {
@Override
public Accumulator add(Event event, Accumulator acc) {
- if(event.getExtractedFields().containsKey(lookupField)){
+ if (event.getExtractedFields().containsKey(lookupField)) {
Object object = event.getExtractedFields().get(lookupField);
Set<Object> aggregate = (Set<Object>) acc.getMetricsFields().get(outputField);
aggregate.add(object);
@@ -61,5 +60,16 @@ public class CollectSet implements AggregateFunction {
return acc;
}
-
+ @Override
+ public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
+ if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+ Set<Object> firstValue = (Set<Object>) firstAcc.getMetricsFields().get(outputField);
+ Set<Object> secondValue = (Set<Object>) secondAcc.getMetricsFields().get(outputField);
+ firstValue.addAll(secondValue);
+ } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+ Set<Object> secondValue = (Set<Object>) secondAcc.getMetricsFields().get(outputField);
+ firstAcc.getMetricsFields().put(outputField, secondValue);
+ }
+ return firstAcc;
+ }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java
index 6301a01..a1a35be 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java
@@ -1,17 +1,17 @@
/**
- * Copyright 2017 Hortonworks.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Copyright 2017 Hortonworks.
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
**/
package com.geedgenetworks.core.udf.udaf;
@@ -23,8 +23,6 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.AggregateFunction;
import com.geedgenetworks.common.udf.UDFContext;
-import java.util.ArrayList;
-
/**
* Collects elements within a group and returns the list of aggregated objects
*/
@@ -36,14 +34,13 @@ public class FirstValue implements AggregateFunction {
@Override
public void open(UDFContext udfContext) {
- if(udfContext.getLookup_fields()==null ){
+ if (udfContext.getLookup_fields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- this.lookupField = udfContext.getLookup_fields().get(0);
- if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
+ this.lookupField = udfContext.getLookup_fields().get(0);
+ if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
this.outputField = udfContext.getOutput_fields().get(0);
- }
- else {
+ } else {
outputField = lookupField;
}
}
@@ -55,7 +52,7 @@ public class FirstValue implements AggregateFunction {
@Override
public Accumulator add(Event event, Accumulator acc) {
- if(!acc.getMetricsFields().containsKey(outputField) && event.getExtractedFields().containsKey(lookupField)){
+ if (!acc.getMetricsFields().containsKey(outputField) && event.getExtractedFields().containsKey(lookupField)) {
acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField));
}
return acc;
@@ -71,4 +68,11 @@ public class FirstValue implements AggregateFunction {
return acc;
}
+ @Override
+ public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
+ if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+ firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField));
+ }
+ return firstAcc;
+ }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java
index 1648fa5..6af0be3 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java
@@ -95,7 +95,10 @@ public abstract class HdrHistogramBaseAggregate implements AggregateFunction {
his.merge(h);
}
-
+ @Override
+ public Accumulator merge(Accumulator a, Accumulator b) {
+ return null;
+ }
@Override
public void close() {}
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java
index f27a2e6..44b374e 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java
@@ -1,17 +1,17 @@
/**
- * Copyright 2017 Hortonworks.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Copyright 2017 Hortonworks.
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
**/
package com.geedgenetworks.core.udf.udaf;
@@ -23,9 +23,6 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.AggregateFunction;
import com.geedgenetworks.common.udf.UDFContext;
-import java.util.ArrayList;
-import java.util.List;
-
/**
* Collects elements within a group and returns the list of aggregated objects
*/
@@ -37,17 +34,17 @@ public class LastValue implements AggregateFunction {
@Override
public void open(UDFContext udfContext) {
- if(udfContext.getLookup_fields()==null ){
+ if (udfContext.getLookup_fields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- this.lookupField = udfContext.getLookup_fields().get(0);
- if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
+ this.lookupField = udfContext.getLookup_fields().get(0);
+ if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
this.outputField = udfContext.getOutput_fields().get(0);
- }
- else {
+ } else {
outputField = lookupField;
}
}
+
@Override
public Accumulator initAccumulator(Accumulator acc) {
return acc;
@@ -55,7 +52,7 @@ public class LastValue implements AggregateFunction {
@Override
public Accumulator add(Event event, Accumulator acc) {
- if(event.getExtractedFields().containsKey(lookupField)){
+ if (event.getExtractedFields().containsKey(lookupField)) {
acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField));
}
return acc;
@@ -71,4 +68,11 @@ public class LastValue implements AggregateFunction {
return acc;
}
+ @Override
+ public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
+ if (secondAcc.getMetricsFields().containsKey(outputField)) {
+ firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField));
+ }
+ return firstAcc;
+ }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java
index ea33271..05de38c 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java
@@ -13,20 +13,22 @@ public class LongCount implements AggregateFunction {
@Override
- public void open(UDFContext udfContext){
- if(udfContext.getOutput_fields()==null ){
+ public void open(UDFContext udfContext) {
+ if (udfContext.getOutput_fields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
outputField = udfContext.getOutput_fields().get(0);
}
+
@Override
public Accumulator initAccumulator(Accumulator acc) {
return acc;
}
+
@Override
public Accumulator add(Event event, Accumulator acc) {
- acc.getMetricsFields().compute(outputField, (k, v) -> (v == null) ? 1L : (long)v + 1L);
+ acc.getMetricsFields().compute(outputField, (k, v) -> (v == null) ? 1L : (long) v + 1L);
return acc;
}
@@ -40,5 +42,18 @@ public class LongCount implements AggregateFunction {
return acc;
}
-
+ @Override
+ public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
+ if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+
+ long firstValue = (long) firstAcc.getMetricsFields().get(outputField);
+ long secondValue = (long) secondAcc.getMetricsFields().get(outputField);
+ firstValue = firstValue + secondValue;
+ firstAcc.getMetricsFields().put(outputField, firstValue);
+ } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+ Number secondValue = (Number) secondAcc.getMetricsFields().get(outputField);
+ firstAcc.getMetricsFields().put(outputField, secondValue);
+ }
+ return firstAcc;
+ }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java
index 2a615ef..9c4e070 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java
@@ -16,20 +16,20 @@ public class Mean implements AggregateFunction {
private String outputField;
private Integer precision;
private DecimalFormat df;
+
@Override
- public void open(UDFContext udfContext){
+ public void open(UDFContext udfContext) {
- if(udfContext.getLookup_fields()==null ){
+ if (udfContext.getLookup_fields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- lookupField = udfContext.getLookup_fields().get(0);
- if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
+ lookupField = udfContext.getLookup_fields().get(0);
+ if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
outputField = udfContext.getOutput_fields().get(0);
- }
- else {
+ } else {
outputField = lookupField;
}
- if(udfContext.getParameters()!= null && !udfContext.getParameters().isEmpty()) {
+ if (udfContext.getParameters() != null && !udfContext.getParameters().isEmpty()) {
precision = Integer.parseInt(udfContext.getParameters().getOrDefault("precision", "-1").toString());
if (precision > 0) {
StringBuilder pattern = new StringBuilder("#.");
@@ -38,14 +38,15 @@ public class Mean implements AggregateFunction {
}
df = new DecimalFormat(pattern.toString());
}
- }else {
+ } else {
precision = -1;
}
}
+
@Override
public Accumulator initAccumulator(Accumulator acc) {
- acc.getMetricsFields().put(outputField,new OnlineStatistics());
+ acc.getMetricsFields().put(outputField, new OnlineStatistics());
return acc;
}
@@ -67,16 +68,27 @@ public class Mean implements AggregateFunction {
@Override
public Accumulator getResult(Accumulator acc) {
OnlineStatistics aggregate = (OnlineStatistics) acc.getMetricsFields().get(outputField);
- if(precision<0){
+ if (precision < 0) {
acc.getMetricsFields().put(outputField, aggregate.mean());
- }
- else if(precision>0){
+ } else if (precision > 0) {
acc.getMetricsFields().put(outputField, df.format(aggregate.mean()));
- }
- else {
- acc.getMetricsFields().put(outputField,(long)aggregate.mean());
+ } else {
+ acc.getMetricsFields().put(outputField, (long) aggregate.mean());
}
return acc;
}
+ @Override
+ public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
+ OnlineStatistics acc1 = (OnlineStatistics) firstAcc.getMetricsFields().get(outputField);
+ acc1.merge((OnlineStatistics) secondAcc.getMetricsFields().get(outputField));
+ long inEvents = firstAcc.getInEvents() + (secondAcc.getInEvents());
+ long outEvent = firstAcc.getOutEvents() + (secondAcc.getOutEvents());
+ long error = firstAcc.getErrorCount() + (secondAcc.getErrorCount());
+ firstAcc.setInEvents(inEvents);
+ firstAcc.setErrorCount(error);
+ firstAcc.setOutEvents(outEvent);
+ return firstAcc;
+ }
+
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java
index 01e9a5b..e972133 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java
@@ -6,7 +6,6 @@ import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.AggregateFunction;
import com.geedgenetworks.common.udf.UDFContext;
-import com.geedgenetworks.core.pojo.OnlineStatistics;
public class NumberSum implements AggregateFunction {
@@ -15,15 +14,14 @@ public class NumberSum implements AggregateFunction {
@Override
- public void open(UDFContext udfContext){
- if(udfContext.getLookup_fields()==null ){
+ public void open(UDFContext udfContext) {
+ if (udfContext.getLookup_fields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- lookupField = udfContext.getLookup_fields().get(0);
- if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
+ lookupField = udfContext.getLookup_fields().get(0);
+ if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
outputField = udfContext.getOutput_fields().get(0);
- }
- else {
+ } else {
outputField = lookupField;
}
}
@@ -32,22 +30,23 @@ public class NumberSum implements AggregateFunction {
public Accumulator initAccumulator(Accumulator acc) {
return acc;
}
+
@Override
public Accumulator add(Event event, Accumulator acc) {
- if(event.getExtractedFields().containsKey(lookupField)){
- Number val = (Number) event.getExtractedFields().get(lookupField);
- Number aggregate = (Number) acc.getMetricsFields().getOrDefault(outputField, 0L);
- if (aggregate instanceof Long && ( val instanceof Integer|| val instanceof Long)) {
- aggregate = aggregate.longValue() + val.longValue();
- } else if (aggregate instanceof Float || val instanceof Float) {
- aggregate = aggregate.floatValue() + val.floatValue();
- } else if (aggregate instanceof Double || val instanceof Double) {
- aggregate = aggregate.doubleValue() + val.doubleValue();
- }
- acc.getMetricsFields().put(outputField, aggregate);
+ if (event.getExtractedFields().containsKey(lookupField)) {
+ Number val = (Number) event.getExtractedFields().get(lookupField);
+ Number aggregate = (Number) acc.getMetricsFields().getOrDefault(outputField, 0L);
+ if (aggregate instanceof Long && (val instanceof Integer || val instanceof Long)) {
+ aggregate = aggregate.longValue() + val.longValue();
+ } else if (aggregate instanceof Float || val instanceof Float) {
+ aggregate = aggregate.floatValue() + val.floatValue();
+ } else if (aggregate instanceof Double || val instanceof Double) {
+ aggregate = aggregate.doubleValue() + val.doubleValue();
}
- return acc;
+ acc.getMetricsFields().put(outputField, aggregate);
+ }
+ return acc;
}
@Override
@@ -65,4 +64,24 @@ public class NumberSum implements AggregateFunction {
}
+ @Override
+ public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
+ if (firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+
+ Number firstValue = (Number) firstAcc.getMetricsFields().get(outputField);
+ Number secondValue = (Number) secondAcc.getMetricsFields().get(outputField);
+ if (firstValue instanceof Long && (secondValue instanceof Integer || secondValue instanceof Long)) {
+ firstValue = firstValue.longValue() + secondValue.longValue();
+ } else if (firstValue instanceof Float || secondValue instanceof Float) {
+ firstValue = firstValue.floatValue() + secondValue.floatValue();
+ } else if (firstValue instanceof Double || secondValue instanceof Double) {
+ firstValue = firstValue.doubleValue() + secondValue.doubleValue();
+ }
+ firstAcc.getMetricsFields().put(outputField, firstValue);
+ } else if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+ Number secondValue = (Number) secondAcc.getMetricsFields().get(outputField);
+ firstAcc.getMetricsFields().put(outputField, secondValue);
+ }
+ return firstAcc;
+ }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java
index ec003f8..041bad9 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java
@@ -16,7 +16,10 @@ public class HlldApproxCountDistinct extends HlldBaseAggregate {
return acc;
}
-
+ @Override
+ public Accumulator merge(Accumulator a, Accumulator b) {
+ return null;
+ }
@Override
public String functionName() {
return "APPROX_COUNT_DISTINCT_HLLD";
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java
index 71d61dc..d6c3a44 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java
@@ -92,7 +92,10 @@ public abstract class HlldBaseAggregate implements AggregateFunction {
Hll hll = HllUtils.deserializeHll(value);
hllUnion.update(hll);
}
-
+ @Override
+ public Accumulator merge(Accumulator a, Accumulator b) {
+ return null;
+ }
@Override
public void close() {}
}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java
index b0d846b..8c0fe3f 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java
@@ -36,11 +36,51 @@ public class CollectListTest {
public void test() throws ParseException {
List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4");
- excute(arr);
+ List<String> arr2 = List.of("192.168.1.5", "192.168.1.6", "192.168.1.3", "192.168.1.4");
+ testMerge(arr,arr2);
+ testGetResult(arr);
}
- private static void excute(List<String> arr) throws ParseException {
+ private void testMerge(List<String> arr,List<String> arr2) {
+
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_list"));
+ CollectList collectList = new CollectList();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ collectList.open(udfContext);
+ Accumulator result1 = getMiddleResult(udfContext,arr);
+ Accumulator result2 = getMiddleResult(udfContext,arr2);
+ Accumulator result = collectList.getResult(collectList.merge(result1,result2));
+ List<String> vals = (List<String>) result.getMetricsFields().get("field_list");
+ assertEquals(vals.size(),8);
+ assertEquals("192.168.1.6",vals.get(5).toString());
+
+ }
+ private Accumulator getMiddleResult(UDFContext udfContext,List<String> arr) {
+
+
+ CollectList collectList = new CollectList();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ collectList.open(udfContext);
+ Accumulator agg = collectList.initAccumulator(accumulator);
+
+ for (String o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = collectList.add(event, agg);
+
+ }
+ return collectList.getMiddleResult(agg);
+ }
+ private void testGetResult(List<String> arr) throws ParseException {
UDFContext udfContext = new UDFContext();
udfContext.setLookup_fields(List.of("field"));
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java
index ea4fe8d..47e74bd 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java
@@ -31,14 +31,53 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class CollectSetTest {
@Test
- public void testNumberSumTest() throws ParseException {
+ public void test() throws ParseException {
List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4");
- excute(arr);
+ List<String> arr2 = List.of("192.168.1.5", "192.168.1.6", "192.168.1.3", "192.168.1.4");
+ testMerge(arr,arr2);
+ testGetResult(arr);
}
- private static void excute(List<String> arr) throws ParseException {
+ private void testMerge(List<String> arr,List<String> arr2) {
+
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_list"));
+ CollectSet collectSet = new CollectSet();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ collectSet.open(udfContext);
+ Accumulator result1 = getMiddleResult(udfContext,arr);
+ Accumulator result2 = getMiddleResult(udfContext,arr2);
+ Accumulator result = collectSet.getResult(collectSet.merge(result1,result2));
+ Set<String> vals = (Set<String>) result.getMetricsFields().get("field_list");
+ assertEquals(vals.size(),6);
+
+ }
+ private Accumulator getMiddleResult(UDFContext udfContext,List<String> arr) {
+
+
+ CollectSet collectSet = new CollectSet();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ collectSet.open(udfContext);
+ Accumulator agg = collectSet.initAccumulator(accumulator);
+
+ for (String o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = collectSet.add(event, agg);
+
+ }
+ return collectSet.getMiddleResult(agg);
+ }
+ private static void testGetResult(List<String> arr) throws ParseException {
UDFContext udfContext = new UDFContext();
udfContext.setLookup_fields(List.of("field"));
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java
index 506f6de..3d87b14 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java
@@ -34,11 +34,48 @@ public class FirstValueTest {
public void test() throws ParseException {
List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4");
- excute(arr);
+ List<String> arr2 = List.of("192.168.1.2", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4");
+ testMerge(arr,arr2);
+ testGetResult(arr);
}
+ private void testMerge(List<String> arr,List<String> arr2) {
- private static void excute(List<String> arr) throws ParseException {
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_first"));
+ FirstValue firstValue = new FirstValue();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ firstValue.open(udfContext);
+ Accumulator result1 = getMiddleResult(udfContext,arr);
+ Accumulator result2 = getMiddleResult(udfContext,arr2);
+ Accumulator result = firstValue.getResult(firstValue.merge(result1,result2));
+ String val = (String) result.getMetricsFields().get("field_first");
+ assertEquals(val,"192.168.1.1");
+
+ }
+ private Accumulator getMiddleResult(UDFContext udfContext,List<String> arr) {
+
+
+ FirstValue firstValue = new FirstValue();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ firstValue.open(udfContext);
+ Accumulator agg = firstValue.initAccumulator(accumulator);
+ for (String o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = firstValue.add(event, agg);
+
+ }
+ return firstValue.getMiddleResult(agg);
+ }
+ private static void testGetResult(List<String> arr) throws ParseException {
UDFContext udfContext = new UDFContext();
udfContext.setLookup_fields(List.of("field"));
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java
index f8306cd..3d61019 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java
@@ -37,11 +37,48 @@ public class LastValueTest {
public void test() throws ParseException {
List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4");
- excute(arr);
+ List<String> arr2 = List.of("192.168.1.2", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.3");
+ testMerge(arr,arr2);
+ testGetResult(arr);
}
+ private void testMerge(List<String> arr,List<String> arr2) {
- private static void excute(List<String> arr) throws ParseException {
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_last"));
+ LastValue lastValue = new LastValue();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ lastValue.open(udfContext);
+ Accumulator result1 = getMiddleResult(udfContext,arr);
+ Accumulator result2 = getMiddleResult(udfContext,arr2);
+ Accumulator result = lastValue.getResult(lastValue.merge(result1,result2));
+ String val = (String) result.getMetricsFields().get("field_last");
+ assertEquals(val,"192.168.1.3");
+
+ }
+ private Accumulator getMiddleResult(UDFContext udfContext,List<String> arr) {
+
+
+ LastValue lastValue = new LastValue();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ lastValue.open(udfContext);
+ Accumulator agg = lastValue.initAccumulator(accumulator);
+ for (String o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = lastValue.add(event, agg);
+
+ }
+ return lastValue.getMiddleResult(agg);
+ }
+ private static void testGetResult(List<String> arr) throws ParseException {
UDFContext udfContext = new UDFContext();
udfContext.setLookup_fields(List.of("field"));
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java
index 3c02499..d13df72 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java
@@ -19,6 +19,7 @@ package com.geedgenetworks.core.udf.test.aggregate;
import com.geedgenetworks.common.Accumulator;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.udf.udaf.LastValue;
import com.geedgenetworks.core.udf.udaf.LongCount;
import com.geedgenetworks.core.udf.udaf.NumberSum;
import com.ibm.icu.text.NumberFormat;
@@ -38,10 +39,46 @@ public class LongCountTest {
public void test() throws ParseException {
Long[] longArr = new Long[]{1L, 2L, 3L, 4L};
- excute(longArr);
+ Long[] longArr2 = new Long[]{1L, 2L, 3L, 4L};
+ testMerge(longArr,longArr2);
+ testGetResult(longArr);
}
+ private void testMerge(Number[] arr,Number[] arr2) {
- private static void excute(Number[] arr) throws ParseException {
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("count"));
+ LongCount longCount = new LongCount();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ longCount.open(udfContext);
+ Accumulator result1 = getMiddleResult(udfContext,arr);
+ Accumulator result2 = getMiddleResult(udfContext,arr2);
+ Accumulator result = longCount.getResult(longCount.merge(result1,result2));
+ assertEquals(Integer.parseInt((result.getMetricsFields().get("count").toString())),8);
+
+ }
+ private Accumulator getMiddleResult(UDFContext udfContext,Number[] arr) {
+
+
+ LongCount longCount = new LongCount();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ longCount.open(udfContext);
+ Accumulator agg = longCount.initAccumulator(accumulator);
+ for (Number o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = longCount.add(event, agg);
+
+ }
+ return longCount.getMiddleResult(agg);
+ }
+ private static void testGetResult(Number[] arr) throws ParseException {
UDFContext udfContext = new UDFContext();
udfContext.setOutput_fields(Collections.singletonList("count"));
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java
index 6deed0f..2927f43 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java
@@ -39,12 +39,52 @@ public class MeanTest {
Integer[] intArr1 = new Integer[]{1, 2, 3, 4};
Integer[] intArr2 = new Integer[]{1, 6, 3};
- excute(intArr1, 0);
- excute2(intArr2, 2);
- excute3(intArr1);
+ testInt(intArr1, 0);
+ testDouble(intArr2, 2);
+ testNoPrecision(intArr1);
+ testMerge(intArr1,intArr2,2);
+ }
+ private void testMerge(Number[] arr1,Number[] arr2,int precision) throws ParseException {
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_mean"));
+ udfContext.setParameters(new HashMap<>());
+ udfContext.getParameters().put("precision", precision);
+ Mean mean = new Mean();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ mean.open(udfContext);
+ Accumulator result1 = getMiddleResult(arr1,precision);
+ Accumulator result2 = getMiddleResult(arr2,precision);
+ Accumulator result = mean.getResult(mean.merge(result1,result2));
+ assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_mean").toString())),NumberFormat.getInstance().parse("2.86"));
+ }
+ private Accumulator getMiddleResult(Number[] arr,int precision) throws ParseException {
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_mean"));
+ udfContext.setParameters(new HashMap<>());
+ udfContext.getParameters().put("precision", precision);
+ Mean mean = new Mean();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ mean.open(udfContext);
+ Accumulator agg = mean.initAccumulator(accumulator);
+ for (Number o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = mean.add(event, agg);
+
+ }
+ return mean.getMiddleResult(agg);
}
- private static void excute(Number[] arr,int precision) throws ParseException {
+
+ private void testInt(Number[] arr,int precision) throws ParseException {
UDFContext udfContext = new UDFContext();
udfContext.setLookup_fields(List.of("field"));
@@ -65,11 +105,12 @@ public class MeanTest {
agg = mean.add(event, agg);
}
+
Accumulator result = mean.getResult(agg);
assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_mean").toString())),NumberFormat.getInstance().parse("2"));
}
- private static void excute2(Number[] arr,int precision) throws ParseException {
+ private void testDouble(Number[] arr,int precision) throws ParseException {
UDFContext udfContext = new UDFContext();
udfContext.setLookup_fields(List.of("field"));
@@ -94,7 +135,7 @@ public class MeanTest {
assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_mean").toString())),NumberFormat.getInstance().parse("3.33"));
}
- private static void excute3(Number[] arr) throws ParseException {
+ private void testNoPrecision(Number[] arr) throws ParseException {
UDFContext udfContext = new UDFContext();
udfContext.setLookup_fields(List.of("field"));
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java
index d0d3d2c..3fd9506 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java
@@ -19,6 +19,7 @@ package com.geedgenetworks.core.udf.test.aggregate;
import com.geedgenetworks.common.Accumulator;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.udf.udaf.LongCount;
import com.geedgenetworks.core.udf.udaf.NumberSum;
import com.ibm.icu.text.NumberFormat;
import org.junit.jupiter.api.Test;
@@ -41,8 +42,44 @@ public class NumberSumTest {
excute(doubleArr, Double.class);
excute(intArr, Long.class);
excute(longArr, Long.class);
+ testMerge(intArr,floatArr);
+
+ }
+ private void testMerge(Number[] arr,Number[] arr2) {
+
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_sum"));
+ NumberSum numberSum = new NumberSum();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ numberSum.open(udfContext);
+ Accumulator result1 = getMiddleResult(udfContext,arr);
+ Accumulator result2 = getMiddleResult(udfContext,arr2);
+ Accumulator result = numberSum.getResult(numberSum.merge(result1,result2));
+ assertEquals(Float.parseFloat((result.getMetricsFields().get("field_sum").toString())),20.0f);
+
}
+ private Accumulator getMiddleResult(UDFContext udfContext,Number[] arr) {
+
+
+ NumberSum numberSum = new NumberSum();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ numberSum.open(udfContext);
+ Accumulator agg = numberSum.initAccumulator(accumulator);
+ for (Number o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = numberSum.add(event, agg);
+ }
+ return numberSum.getMiddleResult(agg);
+ }
private static void excute(Number[] arr, Class<? extends Number> clazz) throws ParseException {
UDFContext udfContext = new UDFContext();