summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-07-24 14:42:35 +0800
committerwangkuan <[email protected]>2024-07-24 14:42:35 +0800
commitcd6f0015542af281c140b05f35cde8e79cc64a1e (patch)
tree788a3516600b45229f4d96ed4e8cc759864fac99
parent6e558c28ce9f07f58e9adcbbd802b586c6a179da (diff)
[improve][core]新增聚合函数LongCount,Mean以及相关单元测试
-rw-r--r--config/grootstream_job_example.yaml16
-rw-r--r--config/udf.plugins7
-rw-r--r--groot-common/src/main/resources/udf.plugins4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java37
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java47
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java84
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java5
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java69
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java67
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java69
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java97
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java73
15 files changed, 584 insertions, 3 deletions
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml
index 62a320b..74239c9 100644
--- a/config/grootstream_job_example.yaml
+++ b/config/grootstream_job_example.yaml
@@ -30,6 +30,22 @@ processing_pipelines:
- function: NUMBER_SUM
lookup_fields: [ sent_pkts ]
output_fields: [ sent_pkts_sum ]
+ - function: NUMBER_SUM
+ lookup_fields: [ sent_bytes ]
+ output_fields: [ sent_bytes_sum ]
+ - function: COLLECT_LIST
+ lookup_fields: [ client_port ]
+ output_fields: [ client_port_list ]
+ - function: COLLECT_SET
+ lookup_fields: [ client_ip ]
+ output_fields: [ client_ip_set ]
+ - function: LONG_COUNT
+ output_fields: [ sessions ]
+ - function: MEAN
+ lookup_fields: [ received_pkts ]
+ output_fields: [ received_pkts_mean ]
+ parameters:
+ precision: 1
sinks:
print_sink:
type: print
diff --git a/config/udf.plugins b/config/udf.plugins
index a8a62b4..b3f67b4 100644
--- a/config/udf.plugins
+++ b/config/udf.plugins
@@ -14,4 +14,9 @@ com.geedgenetworks.core.udf.PathCombine
com.geedgenetworks.core.udf.Rename
com.geedgenetworks.core.udf.SnowflakeId
com.geedgenetworks.core.udf.StringJoiner
-com.geedgenetworks.core.udf.UnixTimestampConverter \ No newline at end of file
+com.geedgenetworks.core.udf.UnixTimestampConverter
+com.geedgenetworks.core.udf.udaf.NumberSum
+com.geedgenetworks.core.udf.udaf.CollectList
+com.geedgenetworks.core.udf.udaf.CollectSet
+com.geedgenetworks.core.udf.udaf.LongCount
+com.geedgenetworks.core.udf.udaf.Mean \ No newline at end of file
diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins
index 73692a2..0eb24cb 100644
--- a/groot-common/src/main/resources/udf.plugins
+++ b/groot-common/src/main/resources/udf.plugins
@@ -16,4 +16,6 @@ com.geedgenetworks.core.udf.UnixTimestampConverter
com.geedgenetworks.core.udf.Flatten
com.geedgenetworks.core.udf.udaf.NumberSum
com.geedgenetworks.core.udf.udaf.CollectList
-com.geedgenetworks.core.udf.udaf.CollectSet \ No newline at end of file
+com.geedgenetworks.core.udf.udaf.CollectSet
+com.geedgenetworks.core.udf.udaf.LongCount
+com.geedgenetworks.core.udf.udaf.Mean \ No newline at end of file
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java
new file mode 100644
index 0000000..416a7ea
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java
@@ -0,0 +1,37 @@
+package com.geedgenetworks.core.pojo;
+
+
+public class OnlineStatistics {
+ private long n;
+ private double mean;
+ private double aggregate;
+
+ public OnlineStatistics add(Number val) {
+ ++n;
+ double delta = val.doubleValue() - mean;
+ mean += delta / n;
+ aggregate += (delta * (val.doubleValue() - mean));
+ return this;
+ }
+ //计算总体标准差
+ public double stddevp() {
+ return Math.sqrt(variancep());
+ }
+ //计算总体方差
+ public double variancep() {
+ return aggregate / n;
+ }
+ //计算样本标准差
+ public double stddev() {
+ return Math.sqrt(variance());
+ }
+ //计算样本方差
+ public double variance() {
+ return aggregate / (n - 1);
+ }
+
+ public double mean() {
+ return mean;
+ }
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java
index 4b02166..165ed1b 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java
@@ -28,8 +28,8 @@ public class KeySelector implements org.apache.flink.api.java.functions.KeySelec
}else {
stringBuilder.append(",");
}
- keybyEntity.setKeysToString(stringBuilder.toString());
}
+ keybyEntity.setKeysToString(stringBuilder.toString());
return keybyEntity;
}
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java
index 1a82208..0345b7b 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java
@@ -18,6 +18,8 @@ package com.geedgenetworks.core.udf.udaf;
import com.geedgenetworks.common.Accumulator;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.AggregateFunction;
import com.geedgenetworks.common.udf.UDFContext;
import java.util.*;
@@ -33,6 +35,9 @@ public class CollectList implements AggregateFunction {
@Override
public Accumulator open(UDFContext udfContext,Accumulator acc) {
+ if(udfContext.getLookup_fields()==null ){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
+ }
this.lookupField = udfContext.getLookup_fields().get(0);
if(!udfContext.getOutput_fields().isEmpty()) {
this.outputField = udfContext.getOutput_fields().get(0);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java
index 42bd7e8..7891f93 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java
@@ -3,6 +3,8 @@ package com.geedgenetworks.core.udf.udaf;
import com.geedgenetworks.common.Accumulator;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.AggregateFunction;
import com.geedgenetworks.common.udf.UDFContext;
@@ -20,6 +22,9 @@ public class CollectSet implements AggregateFunction {
@Override
public Accumulator open(UDFContext udfContext,Accumulator acc) {
+ if(udfContext.getLookup_fields()==null ){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
+ }
this.lookupField = udfContext.getLookup_fields().get(0);
if(!udfContext.getOutput_fields().isEmpty()) {
this.outputField = udfContext.getOutput_fields().get(0);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java
new file mode 100644
index 0000000..5662935
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java
@@ -0,0 +1,47 @@
+package com.geedgenetworks.core.udf.udaf;
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+
+
+public class LongCount implements AggregateFunction {
+ private String outputField;
+
+
+ @Override
+ public Accumulator open(UDFContext udfContext,Accumulator acc){
+ if(udfContext.getOutput_fields()==null ){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
+ }
+ outputField = udfContext.getOutput_fields().get(0);
+ return acc;
+ }
+
+
+ @Override
+ public Accumulator add(Event event, Accumulator acc) {
+
+ acc.getMetricsFields().compute(outputField, (k, v) -> (v == null) ? 1L : (long)v + 1L);
+ return acc;
+ }
+
+ @Override
+ public String functionName() {
+ return "LONG_COUNT";
+ }
+
+ @Override
+ public Accumulator getResult(Accumulator acc) {
+ return acc;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java
new file mode 100644
index 0000000..a3f098b
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java
@@ -0,0 +1,84 @@
+package com.geedgenetworks.core.udf.udaf;
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.pojo.OnlineStatistics;
+
+import java.text.DecimalFormat;
+
+
+public class Mean implements AggregateFunction {
+ private String lookupField;
+ private String outputField;
+ private Integer precision;
+ private DecimalFormat df;
+ @Override
+ public Accumulator open(UDFContext udfContext,Accumulator acc){
+
+ if(udfContext.getLookup_fields()==null ){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
+ }
+ lookupField = udfContext.getLookup_fields().get(0);
+ if(!udfContext.getOutput_fields().isEmpty()) {
+ outputField = udfContext.getOutput_fields().get(0);
+ }
+ else {
+ outputField = lookupField;
+ }
+ if(!udfContext.getParameters().isEmpty()) {
+ precision = Integer.parseInt(udfContext.getParameters().getOrDefault("precision", "-1").toString());
+ if (precision > 0) {
+ StringBuilder pattern = new StringBuilder("#.");
+ for (int i = 0; i < precision; i++) {
+ pattern.append("#");
+ }
+ df = new DecimalFormat(pattern.toString());
+ }
+ }else {
+ precision = -1;
+ }
+ acc.getMetricsFields().put(outputField,new OnlineStatistics());
+ return acc;
+ }
+
+
+ @Override
+ public Accumulator add(Event event, Accumulator acc) {
+
+ Number val = (Number) event.getExtractedFields().getOrDefault(lookupField, 0.0);
+ OnlineStatistics aggregate = (OnlineStatistics) acc.getMetricsFields().get(outputField);
+ aggregate.add(val);
+ return acc;
+
+ }
+
+ @Override
+ public String functionName() {
+ return "MEAN";
+ }
+
+ @Override
+ public Accumulator getResult(Accumulator acc) {
+ OnlineStatistics aggregate = (OnlineStatistics) acc.getMetricsFields().get(outputField);
+ if(precision<0){
+ acc.getMetricsFields().put(outputField, aggregate.mean());
+ }
+ else if(precision>0){
+ acc.getMetricsFields().put(outputField, df.format(aggregate.mean()));
+ }
+ else {
+ acc.getMetricsFields().put(outputField,(long)aggregate.mean());
+ }
+ return acc;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java
index a491844..d92df5c 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java
@@ -2,6 +2,8 @@ package com.geedgenetworks.core.udf.udaf;
import com.geedgenetworks.common.Accumulator;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.AggregateFunction;
import com.geedgenetworks.common.udf.UDFContext;
@@ -13,6 +15,9 @@ public class NumberSum implements AggregateFunction {
@Override
public Accumulator open(UDFContext udfContext,Accumulator acc){
+ if(udfContext.getLookup_fields()==null ){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
+ }
lookupField = udfContext.getLookup_fields().get(0);
if(!udfContext.getOutput_fields().isEmpty()) {
outputField = udfContext.getOutput_fields().get(0);
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java
new file mode 100644
index 0000000..a01edb3
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java
@@ -0,0 +1,69 @@
+/**
+ * Copyright 2017 Hortonworks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package com.geedgenetworks.core.udf.test.aggregate;
+
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.udf.udaf.CollectList;
+import org.junit.jupiter.api.Test;
+
+import java.text.ParseException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class CollectListTest {
+
+ @Test
+ public void testNumberSumTest() throws ParseException {
+
+ List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4");
+ excute(arr);
+
+ }
+
+ private static void excute(List<String> arr) throws ParseException {
+
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_list"));
+ CollectList collectList = new CollectList();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ Accumulator agg = collectList.open(udfContext,accumulator);
+
+ for (String o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = collectList.add(event, agg);
+
+ }
+ Accumulator result = collectList.getResult(agg);
+ List<String> vals = (List<String>) result.getMetricsFields().get("field_list");
+ assertEquals(vals.size(),4);
+ }
+
+
+
+} \ No newline at end of file
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java
new file mode 100644
index 0000000..ae69d7c
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java
@@ -0,0 +1,67 @@
+/**
+ * Copyright 2017 Hortonworks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package com.geedgenetworks.core.udf.test.aggregate;
+
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.udf.udaf.CollectList;
+import com.geedgenetworks.core.udf.udaf.CollectSet;
+import org.junit.jupiter.api.Test;
+
+import java.text.ParseException;
+import java.util.*;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class CollectSetTest {
+
+ @Test
+ public void testNumberSumTest() throws ParseException {
+
+ List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4");
+ excute(arr);
+
+ }
+
+ private static void excute(List<String> arr) throws ParseException {
+
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_list"));
+ CollectSet collectSet = new CollectSet();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ Accumulator agg = collectSet.open(udfContext,accumulator);
+
+ for (String o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = collectSet.add(event, agg);
+
+ }
+ Accumulator result = collectSet.getResult(agg);
+ Set<String> vals = (Set<String>) result.getMetricsFields().get("field_list");
+ assertEquals(vals.size(),4);
+ }
+
+
+
+} \ No newline at end of file
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java
new file mode 100644
index 0000000..3bde558
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java
@@ -0,0 +1,69 @@
+/**
+ * Copyright 2017 Hortonworks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package com.geedgenetworks.core.udf.test.aggregate;
+
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.udf.udaf.LongCount;
+import com.geedgenetworks.core.udf.udaf.NumberSum;
+import com.ibm.icu.text.NumberFormat;
+import org.junit.jupiter.api.Test;
+
+import java.text.ParseException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class LongCountTest {
+
+ @Test
+ public void testNumberSumTest() throws ParseException {
+
+ Long[] longArr = new Long[]{1L, 2L, 3L, 4L};
+ excute(longArr);
+ }
+
+ private static void excute(Number[] arr) throws ParseException {
+
+ UDFContext udfContext = new UDFContext();
+ udfContext.setOutput_fields(Collections.singletonList("count"));
+ LongCount longCount = new LongCount();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ Accumulator agg = longCount.open(udfContext,accumulator);
+
+ for (Number o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = longCount.add(event, agg);
+
+ }
+ Accumulator result = longCount.getResult(agg);
+ assertEquals(Integer.parseInt((result.getMetricsFields().get("count").toString())),4);
+ }
+
+
+
+
+} \ No newline at end of file
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java
new file mode 100644
index 0000000..48c4e0f
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java
@@ -0,0 +1,97 @@
+/**
+ * Copyright 2017 Hortonworks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package com.geedgenetworks.core.udf.test.aggregate;
+
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.udf.udaf.Mean;
+import com.geedgenetworks.core.udf.udaf.NumberSum;
+import com.ibm.icu.text.NumberFormat;
+import org.junit.jupiter.api.Test;
+
+import java.text.ParseException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class MeanTest {
+
+ @Test
+ public void testNumberSumTest() throws ParseException {
+
+ Integer[] intArr1 = new Integer[]{1, 2, 3, 4};
+ Integer[] intArr2 = new Integer[]{1, 6, 3};
+ excute(intArr1, 0);
+ excute2(intArr2, 2);
+ }
+
+ private static void excute(Number[] arr,int precision) throws ParseException {
+
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_mean"));
+ udfContext.setParameters(new HashMap<>());
+ udfContext.getParameters().put("precision", precision);
+ Mean mean = new Mean();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ Accumulator agg = mean.open(udfContext,accumulator);
+
+ for (Number o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = mean.add(event, agg);
+
+ }
+ Accumulator result = mean.getResult(agg);
+ assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_mean").toString())),NumberFormat.getInstance().parse("2"));
+ }
+
+ private static void excute2(Number[] arr,int precision) throws ParseException {
+
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_mean"));
+ udfContext.setParameters(new HashMap<>());
+ udfContext.getParameters().put("precision", precision);
+ Mean mean = new Mean();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ Accumulator agg = mean.open(udfContext,accumulator);
+
+ for (Number o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = mean.add(event, agg);
+
+ }
+ Accumulator result = mean.getResult(agg);
+ assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_mean").toString())),NumberFormat.getInstance().parse("3.33"));
+ }
+
+
+} \ No newline at end of file
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java
new file mode 100644
index 0000000..a1cd54e
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java
@@ -0,0 +1,73 @@
+/**
+ * Copyright 2017 Hortonworks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package com.geedgenetworks.core.udf.test.aggregate;
+
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.udf.udaf.NumberSum;
+import com.ibm.icu.text.NumberFormat;
+import org.junit.jupiter.api.Test;
+
+import java.text.ParseException;
+import java.util.*;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class NumberSumTest {
+
+ @Test
+ public void testNumberSumTest() throws ParseException {
+
+ Integer[] intArr = new Integer[]{1, 2, 3, 4};
+ Long[] longArr = new Long[]{1L, 2L, 3L, 4L};
+ Float[] floatArr = new Float[]{1.0f, 2.0f, 3.0f, 4.0f};
+ Double[] doubleArr = new Double[]{1.0, 2.0, 3.0, 4.0};
+ excute(floatArr, Float.class);
+ excute(doubleArr, Double.class);
+ excute(intArr, Long.class);
+ excute(longArr, Long.class);
+ }
+
+ private static void excute(Number[] arr, Class<? extends Number> clazz) throws ParseException {
+
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_sum"));
+ NumberSum numberSum = new NumberSum();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ Accumulator agg = numberSum.open(udfContext,accumulator);
+
+ for (Number o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = numberSum.add(event, agg);
+
+ }
+ Accumulator result = numberSum.getResult(agg);
+ assertEquals(clazz, result.getMetricsFields().get("field_sum").getClass());
+ assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_sum").toString())),NumberFormat.getInstance().parse("10"));
+ }
+
+
+
+
+} \ No newline at end of file