diff options
| author | 王宽 <[email protected]> | 2024-07-24 06:52:41 +0000 |
|---|---|---|
| committer | 王宽 <[email protected]> | 2024-07-24 06:52:41 +0000 |
| commit | b593e070f20c3097a21765229f87c444f3a60bad (patch) | |
| tree | b68d0211b548534a97a0d585d13068a4094f1156 | |
| parent | 3cff9a87fa0beab38caff2b34d7344b4186e24e1 (diff) | |
| parent | cd6f0015542af281c140b05f35cde8e79cc64a1e (diff) | |
Merge branch 'feature/aggregate' into 'develop'
[improve][core]新增聚合函数LongCount,Mean以及相关单元测试
See merge request galaxy/platform/groot-stream!79
15 files changed, 584 insertions, 3 deletions
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml index 62a320b..74239c9 100644 --- a/config/grootstream_job_example.yaml +++ b/config/grootstream_job_example.yaml @@ -30,6 +30,22 @@ processing_pipelines: - function: NUMBER_SUM lookup_fields: [ sent_pkts ] output_fields: [ sent_pkts_sum ] + - function: NUMBER_SUM + lookup_fields: [ sent_bytes ] + output_fields: [ sent_bytes_sum ] + - function: COLLECT_LIST + lookup_fields: [ client_port ] + output_fields: [ client_port_list ] + - function: COLLECT_SET + lookup_fields: [ client_ip ] + output_fields: [ client_ip_set ] + - function: LONG_COUNT + output_fields: [ sessions ] + - function: MEAN + lookup_fields: [ received_pkts ] + output_fields: [ received_pkts_mean ] + parameters: + precision: 1 sinks: print_sink: type: print diff --git a/config/udf.plugins b/config/udf.plugins index a8a62b4..b3f67b4 100644 --- a/config/udf.plugins +++ b/config/udf.plugins @@ -14,4 +14,9 @@ com.geedgenetworks.core.udf.PathCombine com.geedgenetworks.core.udf.Rename com.geedgenetworks.core.udf.SnowflakeId com.geedgenetworks.core.udf.StringJoiner -com.geedgenetworks.core.udf.UnixTimestampConverter
\ No newline at end of file +com.geedgenetworks.core.udf.UnixTimestampConverter +com.geedgenetworks.core.udf.udaf.NumberSum +com.geedgenetworks.core.udf.udaf.CollectList +com.geedgenetworks.core.udf.udaf.CollectSet +com.geedgenetworks.core.udf.udaf.LongCount +com.geedgenetworks.core.udf.udaf.Mean
\ No newline at end of file diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins index 73692a2..0eb24cb 100644 --- a/groot-common/src/main/resources/udf.plugins +++ b/groot-common/src/main/resources/udf.plugins @@ -16,4 +16,6 @@ com.geedgenetworks.core.udf.UnixTimestampConverter com.geedgenetworks.core.udf.Flatten com.geedgenetworks.core.udf.udaf.NumberSum com.geedgenetworks.core.udf.udaf.CollectList -com.geedgenetworks.core.udf.udaf.CollectSet
\ No newline at end of file +com.geedgenetworks.core.udf.udaf.CollectSet +com.geedgenetworks.core.udf.udaf.LongCount +com.geedgenetworks.core.udf.udaf.Mean
\ No newline at end of file diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java new file mode 100644 index 0000000..416a7ea --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/OnlineStatistics.java @@ -0,0 +1,37 @@ +package com.geedgenetworks.core.pojo; + + +public class OnlineStatistics { + private long n; + private double mean; + private double aggregate; + + public OnlineStatistics add(Number val) { + ++n; + double delta = val.doubleValue() - mean; + mean += delta / n; + aggregate += (delta * (val.doubleValue() - mean)); + return this; + } + //计算总体标准差 + public double stddevp() { + return Math.sqrt(variancep()); + } + //计算总体方差 + public double variancep() { + return aggregate / n; + } + //计算样本标准差 + public double stddev() { + return Math.sqrt(variance()); + } + //计算样本方差 + public double variance() { + return aggregate / (n - 1); + } + + public double mean() { + return mean; + } + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java index 4b02166..165ed1b 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/KeySelector.java @@ -28,8 +28,8 @@ public class KeySelector implements org.apache.flink.api.java.functions.KeySelec }else { stringBuilder.append(","); } - keybyEntity.setKeysToString(stringBuilder.toString()); } + keybyEntity.setKeysToString(stringBuilder.toString()); return keybyEntity; } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java index 1a82208..0345b7b 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java @@ -18,6 +18,8 @@ package com.geedgenetworks.core.udf.udaf; import com.geedgenetworks.common.Accumulator; import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.UDFContext; import java.util.*; @@ -33,6 +35,9 @@ public class CollectList implements AggregateFunction { @Override public Accumulator open(UDFContext udfContext,Accumulator acc) { + if(udfContext.getLookup_fields()==null ){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); + } this.lookupField = udfContext.getLookup_fields().get(0); if(!udfContext.getOutput_fields().isEmpty()) { this.outputField = udfContext.getOutput_fields().get(0); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java index 42bd7e8..7891f93 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java @@ -3,6 +3,8 @@ package com.geedgenetworks.core.udf.udaf; import com.geedgenetworks.common.Accumulator; import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.UDFContext; @@ -20,6 +22,9 @@ public class CollectSet implements AggregateFunction { @Override public Accumulator open(UDFContext udfContext,Accumulator acc) { + if(udfContext.getLookup_fields()==null ){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); + } this.lookupField = udfContext.getLookup_fields().get(0); if(!udfContext.getOutput_fields().isEmpty()) { this.outputField = udfContext.getOutput_fields().get(0); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java new file mode 100644 index 0000000..5662935 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java @@ -0,0 +1,47 @@ +package com.geedgenetworks.core.udf.udaf; + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; + + +public class LongCount implements AggregateFunction { + private String outputField; + + + @Override + public Accumulator open(UDFContext udfContext,Accumulator acc){ + if(udfContext.getOutput_fields()==null ){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); + } + outputField = udfContext.getOutput_fields().get(0); + return acc; + } + + + @Override + public Accumulator add(Event event, Accumulator acc) { + + acc.getMetricsFields().compute(outputField, (k, v) -> (v == null) ? 1L : (long)v + 1L); + return acc; + } + + @Override + public String functionName() { + return "LONG_COUNT"; + } + + @Override + public Accumulator getResult(Accumulator acc) { + return acc; + } + + @Override + public void close() { + + } + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java new file mode 100644 index 0000000..a3f098b --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java @@ -0,0 +1,84 @@ +package com.geedgenetworks.core.udf.udaf; + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.pojo.OnlineStatistics; + +import java.text.DecimalFormat; + + +public class Mean implements AggregateFunction { + private String lookupField; + private String outputField; + private Integer precision; + private DecimalFormat df; + @Override + public Accumulator open(UDFContext udfContext,Accumulator acc){ + + if(udfContext.getLookup_fields()==null ){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); + } + lookupField = udfContext.getLookup_fields().get(0); + if(!udfContext.getOutput_fields().isEmpty()) { + outputField = udfContext.getOutput_fields().get(0); + } + else { + outputField = lookupField; + } + if(!udfContext.getParameters().isEmpty()) { + precision = Integer.parseInt(udfContext.getParameters().getOrDefault("precision", "-1").toString()); + if (precision > 0) { + StringBuilder pattern = new StringBuilder("#."); + for (int i = 0; i < precision; i++) { + pattern.append("#"); + } + df = new DecimalFormat(pattern.toString()); + } + }else { + precision = -1; + } + acc.getMetricsFields().put(outputField,new OnlineStatistics()); + return acc; + } + + + @Override + public Accumulator add(Event event, Accumulator acc) { + + Number val = (Number) event.getExtractedFields().getOrDefault(lookupField, 0.0); + OnlineStatistics aggregate = (OnlineStatistics) acc.getMetricsFields().get(outputField); + aggregate.add(val); + return acc; + + } + + @Override + public String functionName() { + return "MEAN"; + } + + @Override + public Accumulator getResult(Accumulator acc) { + OnlineStatistics aggregate = (OnlineStatistics) acc.getMetricsFields().get(outputField); + if(precision<0){ + acc.getMetricsFields().put(outputField, aggregate.mean()); + } + else if(precision>0){ + acc.getMetricsFields().put(outputField, df.format(aggregate.mean())); + } + else { + acc.getMetricsFields().put(outputField,(long)aggregate.mean()); + } + return acc; + } + + @Override + public void close() { + + } + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java index a491844..d92df5c 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java @@ -2,6 +2,8 @@ package com.geedgenetworks.core.udf.udaf; import com.geedgenetworks.common.Accumulator; import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.UDFContext; @@ -13,6 +15,9 @@ public class NumberSum implements AggregateFunction { @Override public Accumulator open(UDFContext udfContext,Accumulator acc){ + if(udfContext.getLookup_fields()==null ){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); + } lookupField = udfContext.getLookup_fields().get(0); if(!udfContext.getOutput_fields().isEmpty()) { outputField = udfContext.getOutput_fields().get(0); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java new file mode 100644 index 0000000..a01edb3 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java @@ -0,0 +1,69 @@ +/** + * Copyright 2017 Hortonworks. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package com.geedgenetworks.core.udf.test.aggregate; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.udaf.CollectList; +import org.junit.jupiter.api.Test; + +import java.text.ParseException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class CollectListTest { + + @Test + public void testNumberSumTest() throws ParseException { + + List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4"); + excute(arr); + + } + + private static void excute(List<String> arr) throws ParseException { + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_list")); + CollectList collectList = new CollectList(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + Accumulator agg = collectList.open(udfContext,accumulator); + + for (String o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = collectList.add(event, agg); + + } + Accumulator result = collectList.getResult(agg); + List<String> vals = (List<String>) result.getMetricsFields().get("field_list"); + assertEquals(vals.size(),4); + } + + + +}
\ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java new file mode 100644 index 0000000..ae69d7c --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java @@ -0,0 +1,67 @@ +/** + * Copyright 2017 Hortonworks. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package com.geedgenetworks.core.udf.test.aggregate; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.udaf.CollectList; +import com.geedgenetworks.core.udf.udaf.CollectSet; +import org.junit.jupiter.api.Test; + +import java.text.ParseException; +import java.util.*; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class CollectSetTest { + + @Test + public void testNumberSumTest() throws ParseException { + + List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4"); + excute(arr); + + } + + private static void excute(List<String> arr) throws ParseException { + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_list")); + CollectSet collectSet = new CollectSet(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + Accumulator agg = collectSet.open(udfContext,accumulator); + + for (String o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = collectSet.add(event, agg); + + } + Accumulator result = collectSet.getResult(agg); + Set<String> vals = (Set<String>) result.getMetricsFields().get("field_list"); + assertEquals(vals.size(),4); + } + + + +}
\ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java new file mode 100644 index 0000000..3bde558 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java @@ -0,0 +1,69 @@ +/** + * Copyright 2017 Hortonworks. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package com.geedgenetworks.core.udf.test.aggregate; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.udaf.LongCount; +import com.geedgenetworks.core.udf.udaf.NumberSum; +import com.ibm.icu.text.NumberFormat; +import org.junit.jupiter.api.Test; + +import java.text.ParseException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class LongCountTest { + + @Test + public void testNumberSumTest() throws ParseException { + + Long[] longArr = new Long[]{1L, 2L, 3L, 4L}; + excute(longArr); + } + + private static void excute(Number[] arr) throws ParseException { + + UDFContext udfContext = new UDFContext(); + udfContext.setOutput_fields(Collections.singletonList("count")); + LongCount longCount = new LongCount(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + Accumulator agg = longCount.open(udfContext,accumulator); + + for (Number o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = longCount.add(event, agg); + + } + Accumulator result = longCount.getResult(agg); + assertEquals(Integer.parseInt((result.getMetricsFields().get("count").toString())),4); + } + + + + +}
\ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java new file mode 100644 index 0000000..48c4e0f --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java @@ -0,0 +1,97 @@ +/** + * Copyright 2017 Hortonworks. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package com.geedgenetworks.core.udf.test.aggregate; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.udaf.Mean; +import com.geedgenetworks.core.udf.udaf.NumberSum; +import com.ibm.icu.text.NumberFormat; +import org.junit.jupiter.api.Test; + +import java.text.ParseException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class MeanTest { + + @Test + public void testNumberSumTest() throws ParseException { + + Integer[] intArr1 = new Integer[]{1, 2, 3, 4}; + Integer[] intArr2 = new Integer[]{1, 6, 3}; + excute(intArr1, 0); + excute2(intArr2, 2); + } + + private static void excute(Number[] arr,int precision) throws ParseException { + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_mean")); + udfContext.setParameters(new HashMap<>()); + udfContext.getParameters().put("precision", precision); + Mean mean = new Mean(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + Accumulator agg = mean.open(udfContext,accumulator); + + for (Number o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = mean.add(event, agg); + + } + Accumulator result = mean.getResult(agg); + assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_mean").toString())),NumberFormat.getInstance().parse("2")); + } + + private static void excute2(Number[] arr,int precision) throws ParseException { + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_mean")); + udfContext.setParameters(new HashMap<>()); + udfContext.getParameters().put("precision", precision); + Mean mean = new Mean(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + Accumulator agg = mean.open(udfContext,accumulator); + + for (Number o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = mean.add(event, agg); + + } + Accumulator result = mean.getResult(agg); + assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_mean").toString())),NumberFormat.getInstance().parse("3.33")); + } + + +}
\ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java new file mode 100644 index 0000000..a1cd54e --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java @@ -0,0 +1,73 @@ +/** + * Copyright 2017 Hortonworks. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package com.geedgenetworks.core.udf.test.aggregate; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.udaf.NumberSum; +import com.ibm.icu.text.NumberFormat; +import org.junit.jupiter.api.Test; + +import java.text.ParseException; +import java.util.*; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class NumberSumTest { + + @Test + public void testNumberSumTest() throws ParseException { + + Integer[] intArr = new Integer[]{1, 2, 3, 4}; + Long[] longArr = new Long[]{1L, 2L, 3L, 4L}; + Float[] floatArr = new Float[]{1.0f, 2.0f, 3.0f, 4.0f}; + Double[] doubleArr = new Double[]{1.0, 2.0, 3.0, 4.0}; + excute(floatArr, Float.class); + excute(doubleArr, Double.class); + excute(intArr, Long.class); + excute(longArr, Long.class); + } + + private static void excute(Number[] arr, Class<? extends Number> clazz) throws ParseException { + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_sum")); + NumberSum numberSum = new NumberSum(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + Accumulator agg = numberSum.open(udfContext,accumulator); + + for (Number o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = numberSum.add(event, agg); + + } + Accumulator result = numberSum.getResult(agg); + assertEquals(clazz, result.getMetricsFields().get("field_sum").getClass()); + assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_sum").toString())),NumberFormat.getInstance().parse("10")); + } + + + + +}
\ No newline at end of file |
