diff options
| author | 王宽 <[email protected]> | 2024-10-22 09:05:44 +0000 |
|---|---|---|
| committer | 王宽 <[email protected]> | 2024-10-22 09:05:44 +0000 |
| commit | cb03e3cf57a112b6ae07d8e36b7239b9331c63c1 (patch) | |
| tree | d0b9215238a5abc6a9e03fff8ab5890a4f18ec1b | |
| parent | 5b953aa3a8673098f2777bff573c714230dc885f (diff) | |
| parent | 2bfffd4d59179c13cad6b9516b0d23acbbb2f93a (diff) | |
Merge branch 'feature/max-min' into 'develop'
[feature][core]删除原NumberMax,NumberMin。支持更通用的Max,Min函数。支持对日期/字符串/数字进行处理,考虑性能等因素,暂不支持识别字符串类型的日期。
See merge request galaxy/platform/groot-stream!118
11 files changed, 515 insertions, 421 deletions
diff --git a/config/udf.plugins b/config/udf.plugins index 7cd3b0e..83da898 100644 --- a/config/udf.plugins +++ b/config/udf.plugins @@ -36,5 +36,5 @@ com.geedgenetworks.core.udf.udtf.PathUnroll com.geedgenetworks.core.udf.uuid.Uuid com.geedgenetworks.core.udf.uuid.UuidV5 com.geedgenetworks.core.udf.uuid.UuidV7 -com.geedgenetworks.core.udf.udaf.NumberMax -com.geedgenetworks.core.udf.udaf.NumberMin
\ No newline at end of file +com.geedgenetworks.core.udf.udaf.Max +com.geedgenetworks.core.udf.udaf.Min
\ No newline at end of file diff --git a/groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml index 7473939..92eaef2 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml @@ -41,12 +41,12 @@ postprocessing_pipelines: lookup_fields: [ client_country ] - function: FIRST_VALUE lookup_fields: [ server_country ] - - function: NUMBER_MIN + - function: MIN lookup_fields: [ timestamp_ms ] output_fields: [ start_timestamp_ms ] - - function: NUMBER_MIN + - function: MIN lookup_fields: [ recv_time ] - - function: NUMBER_MAX + - function: MAX lookup_fields: [ timestamp_ms ] output_fields: [ end_timestamp_ms ] - function: FIRST_VALUE diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins index 9950a64..fe7a083 100644 --- a/groot-common/src/main/resources/udf.plugins +++ b/groot-common/src/main/resources/udf.plugins @@ -33,5 +33,5 @@ com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantiles com.geedgenetworks.core.udf.udtf.JsonUnroll com.geedgenetworks.core.udf.udtf.Unroll com.geedgenetworks.core.udf.udtf.PathUnroll -com.geedgenetworks.core.udf.udaf.NumberMax -com.geedgenetworks.core.udf.udaf.NumberMin
\ No newline at end of file +com.geedgenetworks.core.udf.udaf.Max +com.geedgenetworks.core.udf.udaf.Min
\ No newline at end of file diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Max.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Max.java new file mode 100644 index 0000000..e4632bb --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Max.java @@ -0,0 +1,121 @@ +package com.geedgenetworks.core.udf.udaf; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Map; + + +public class Max implements AggregateFunction { + + private String lookupField; + private String outputField; + + @Override + public void open(UDFContext udfContext) { + if (udfContext.getLookup_fields() == null) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); + } + this.lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { + this.outputField = udfContext.getOutput_fields().get(0); + } else { + outputField = lookupField; + } + } + + @Override + public Accumulator initAccumulator(Accumulator acc) { + return acc; + } + + @Override + public Accumulator add(Event event, Accumulator acc) { + Map<String, Object> eventFields = event.getExtractedFields(); + Map<String, Object> metricsFields = acc.getMetricsFields(); + + if (metricsFields.get(outputField) == null && eventFields.get(lookupField) != null) { + metricsFields.put(outputField, eventFields.get(lookupField)); + } else if (metricsFields.get(outputField) != null && eventFields.get(lookupField) != null) { + Object currentValue = metricsFields.get(outputField); + Object newValue = eventFields.get(lookupField); + + // 直接检测是否为时间类型对象 + if (currentValue instanceof LocalDateTime && newValue instanceof LocalDateTime) { + LocalDateTime time1 = (LocalDateTime) currentValue; + LocalDateTime time2 = (LocalDateTime) newValue; + if (time1.isBefore(time2)) { + metricsFields.put(outputField, newValue); + } + } else if (currentValue instanceof String && newValue instanceof String) { + // 处理字符串比较 + String value1 = currentValue.toString(); + String value2 = newValue.toString(); + if (value1.compareTo(value2) < 0) { + metricsFields.put(outputField, newValue); + } + + } else { + // 数字进行比较 + double value1 = Double.parseDouble(currentValue.toString()); + double value2 = Double.parseDouble(newValue.toString()); + if (value1 < value2) { + metricsFields.put(outputField, newValue); + } + } + } + return acc; + } + + @Override + public String functionName() { + return "MAX"; + } + + @Override + public Accumulator getResult(Accumulator acc) { + return acc; + } + + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + Map<String, Object> firstMetrics = firstAcc.getMetricsFields(); + Map<String, Object> secondMetrics = secondAcc.getMetricsFields(); + Object firstValue = firstMetrics.get(outputField); + Object secondValue = secondMetrics.get(outputField); + if (firstValue == null && secondValue != null) { + firstMetrics.put(outputField, secondValue); + } else if (firstValue != null && secondValue != null) { + // 直接检测是否为时间类型对象 + if (firstValue instanceof LocalDateTime && secondValue instanceof LocalDateTime) { + LocalDateTime time1 = (LocalDateTime) firstValue; + LocalDateTime time2 = (LocalDateTime) secondValue; + if (time1.isBefore(time2)) { + firstMetrics.put(outputField, secondValue); + } + } else if (firstValue instanceof String && secondValue instanceof String) { + + String value1 = firstValue.toString(); + String value2 = secondValue.toString(); + if (value1.compareTo(value2) < 0) { + firstMetrics.put(outputField, secondValue); + } + } else { + // 假设为数字,进行比较 + double value1 = Double.parseDouble(firstValue.toString()); + double value2 = Double.parseDouble(secondValue.toString()); + if (value1 < value2) { + firstMetrics.put(outputField, secondValue); + } + } + } + return firstAcc; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Min.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Min.java new file mode 100644 index 0000000..70153c9 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Min.java @@ -0,0 +1,120 @@ +package com.geedgenetworks.core.udf.udaf; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Map; + + +public class Min implements AggregateFunction { + + private String lookupField; + private String outputField; + + @Override + public void open(UDFContext udfContext) { + if (udfContext.getLookup_fields() == null) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); + } + this.lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { + this.outputField = udfContext.getOutput_fields().get(0); + } else { + outputField = lookupField; + } + } + + @Override + public Accumulator initAccumulator(Accumulator acc) { + return acc; + } + + @Override + public Accumulator add(Event event, Accumulator acc) { + Map<String, Object> eventFields = event.getExtractedFields(); + Map<String, Object> metricsFields = acc.getMetricsFields(); + + if (metricsFields.get(outputField) == null && eventFields.get(lookupField) != null) { + metricsFields.put(outputField, eventFields.get(lookupField)); + } else if (metricsFields.get(outputField) != null && eventFields.get(lookupField) != null) { + Object currentValue = metricsFields.get(outputField); + Object newValue = eventFields.get(lookupField); + + // 直接检测是否为时间类型对象 + if (currentValue instanceof LocalDateTime && newValue instanceof LocalDateTime) { + LocalDateTime time1 = (LocalDateTime) currentValue; + LocalDateTime time2 = (LocalDateTime) newValue; + if (time1.isAfter(time2)) { + metricsFields.put(outputField, newValue); + } + } else if (currentValue instanceof String && newValue instanceof String) { + // 处理字符串比较 + String value1 = currentValue.toString(); + String value2 = newValue.toString(); + if (value1.compareTo(value2) > 0) { + metricsFields.put(outputField, newValue); + } + + } else { + // 假设为数字,进行比较 + double value1 = Double.parseDouble(currentValue.toString()); + double value2 = Double.parseDouble(newValue.toString()); + if (value1 > value2) { + metricsFields.put(outputField, newValue); + } + } + } + return acc; + } + + @Override + public String functionName() { + return "MIN"; + } + + @Override + public Accumulator getResult(Accumulator acc) { + return acc; + } + + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + Map<String, Object> firstMetrics = firstAcc.getMetricsFields(); + Map<String, Object> secondMetrics = secondAcc.getMetricsFields(); + Object firstValue = firstMetrics.get(outputField); + Object secondValue = secondMetrics.get(outputField); + if (firstValue == null && secondValue != null) { + firstMetrics.put(outputField, secondValue); + } else if (firstValue != null && secondValue != null) { + // 直接检测是否为时间类型对象 + if (firstValue instanceof LocalDateTime && secondValue instanceof LocalDateTime) { + LocalDateTime time1 = (LocalDateTime) firstValue; + LocalDateTime time2 = (LocalDateTime) secondValue; + if (time1.isAfter(time2)) { + firstMetrics.put(outputField, secondValue); + } + } else if (firstValue instanceof String && secondValue instanceof String) { + String value1 = firstValue.toString(); + String value2 = secondValue.toString(); + if (value1.compareTo(value2) > 0) { + firstMetrics.put(outputField, secondValue); + } + } else { + // 假设为数字,进行比较 + double value1 = Double.parseDouble(firstValue.toString()); + double value2 = Double.parseDouble(secondValue.toString()); + if (value1 > value2) { + firstAcc.getMetricsFields().put(outputField, secondValue); + } + } + } + return firstAcc; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMax.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMax.java deleted file mode 100644 index 865ce64..0000000 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMax.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Copyright 2017 Hortonworks. - * <p> - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ -package com.geedgenetworks.core.udf.udaf; - - -import com.geedgenetworks.common.Accumulator; -import com.geedgenetworks.common.Event; -import com.geedgenetworks.common.exception.CommonErrorCode; -import com.geedgenetworks.common.exception.GrootStreamRuntimeException; -import com.geedgenetworks.common.udf.AggregateFunction; -import com.geedgenetworks.common.udf.UDFContext; - -/** - * Collects elements within a group and returns the list of aggregated objects - */ -public class NumberMax implements AggregateFunction { - - private String lookupField; - private String outputField; - - - @Override - public void open(UDFContext udfContext) { - if (udfContext.getLookup_fields() == null) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); - } - this.lookupField = udfContext.getLookup_fields().get(0); - if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { - this.outputField = udfContext.getOutput_fields().get(0); - } else { - outputField = lookupField; - } - } - - @Override - public Accumulator initAccumulator(Accumulator acc) { - return acc; - } - - @Override - public Accumulator add(Event event, Accumulator acc) { - if (acc.getMetricsFields().get(outputField)==null && event.getExtractedFields().get(lookupField)!=null) { - acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField)); - } else if (acc.getMetricsFields().get(outputField)!=null && event.getExtractedFields().get(lookupField)!=null) { - double value1 = Double.parseDouble(acc.getMetricsFields().get(outputField).toString()); - double value2 = Double.parseDouble(event.getExtractedFields().get(lookupField).toString()); - if (value1 < value2) { - acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField)); - } - } - return acc; - } - - @Override - public String functionName() { - return "NUMBER_MAX"; - } - - @Override - public Accumulator getResult(Accumulator acc) { - return acc; - } - - @Override - public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { - if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { - firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField)); - } - else if(firstAcc.getMetricsFields().containsKey(outputField) && !secondAcc.getMetricsFields().containsKey(outputField)){ - firstAcc.getMetricsFields().put(outputField, firstAcc.getMetricsFields().get(outputField)); - } - else if(firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)){ - double value1 = Double.parseDouble(firstAcc.getMetricsFields().get(outputField).toString()); - double value2 = Double.parseDouble(secondAcc.getMetricsFields().get(outputField).toString()); - if (value1 < value2) { - firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField)); - } - } - return firstAcc; - } -} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMin.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMin.java deleted file mode 100644 index 3af21ec..0000000 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMin.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Copyright 2017 Hortonworks. - * <p> - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ -package com.geedgenetworks.core.udf.udaf; - - -import com.geedgenetworks.common.Accumulator; -import com.geedgenetworks.common.Event; -import com.geedgenetworks.common.exception.CommonErrorCode; -import com.geedgenetworks.common.exception.GrootStreamRuntimeException; -import com.geedgenetworks.common.udf.AggregateFunction; -import com.geedgenetworks.common.udf.UDFContext; - -/** - * Collects elements within a group and returns the list of aggregated objects - */ -public class NumberMin implements AggregateFunction { - - private String lookupField; - private String outputField; - - - @Override - public void open(UDFContext udfContext) { - if (udfContext.getLookup_fields() == null) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); - } - this.lookupField = udfContext.getLookup_fields().get(0); - if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { - this.outputField = udfContext.getOutput_fields().get(0); - } else { - outputField = lookupField; - } - } - - @Override - public Accumulator initAccumulator(Accumulator acc) { - return acc; - } - - @Override - public Accumulator add(Event event, Accumulator acc) { - if (acc.getMetricsFields().get(outputField)==null && event.getExtractedFields().get(lookupField)!=null) { - acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField)); - } else if (acc.getMetricsFields().get(outputField)!=null && event.getExtractedFields()!=null) { - double value1 = Double.parseDouble(acc.getMetricsFields().get(outputField).toString()); - double value2 = Double.parseDouble(event.getExtractedFields().get(lookupField).toString()); - if (value1 > value2) { - acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField)); - } - } - return acc; - } - - @Override - public String functionName() { - return "NUMBER_MIN"; - } - - @Override - public Accumulator getResult(Accumulator acc) { - return acc; - } - - @Override - public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { - if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { - firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField)); - } - else if(firstAcc.getMetricsFields().containsKey(outputField) && !secondAcc.getMetricsFields().containsKey(outputField)){ - firstAcc.getMetricsFields().put(outputField, firstAcc.getMetricsFields().get(outputField)); - } - else if(firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)){ - double value1 = Double.parseDouble(firstAcc.getMetricsFields().get(outputField).toString()); - double value2 = Double.parseDouble(secondAcc.getMetricsFields().get(outputField).toString()); - if (value1 > value2) { - firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField)); - } - } - return firstAcc; - } -} diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MaxTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MaxTest.java new file mode 100644 index 0000000..67fec93 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MaxTest.java @@ -0,0 +1,144 @@ +package com.geedgenetworks.core.udf.test.aggregate; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.udaf.Max; +import com.geedgenetworks.core.udf.udaf.Min; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class MaxTest { + + private Max maxFunction; + private Accumulator acc; + private Event event; + + @BeforeEach + void setUp() { + maxFunction = new Max(); + acc = new Accumulator(); + event = new Event(); + + // 初始化上下文 + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("value")); + udfContext.setOutput_fields(List.of("maxValue")); + maxFunction.open(udfContext); + + // 初始化累加器的 metricsFields + acc.setMetricsFields(new HashMap<>()); + } + + @Test + void testAddMultipleNumericValues() { + // 设置事件中的多个数值 + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("value", 100); + event.setExtractedFields(extractedFields); + Accumulator result = maxFunction.add(event, acc); + + // 添加第二个数值 + extractedFields.put("value", 200); + result = maxFunction.add(event, result); + + // 添加第三个数值 + extractedFields.put("value", 150); + result = maxFunction.add(event, result); + + // 验证最大值应该是200 + assertEquals(200, result.getMetricsFields().get("maxValue")); + } + + @Test + void testAddMultipleStringValues() { + // 设置事件中的多个字符串值 + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("value", "abc"); + event.setExtractedFields(extractedFields); + Accumulator result = maxFunction.add(event, acc); + + // 添加第二个字符串值 + extractedFields.put("value", "def"); + result = maxFunction.add(event, result); + + // 添加第三个字符串值 + extractedFields.put("value", "ghi"); + result = maxFunction.add(event, result); + + // 验证最大字符串值(按字母顺序最大的是 "ghi") + assertEquals("ghi", result.getMetricsFields().get("maxValue")); + } + + @Test + void testAddMultipleLocalDateTimes() { + // 设置事件中的多个时间值 + Map<String, Object> extractedFields = new HashMap<>(); + + extractedFields.put("value", LocalDateTime.of(2024, 10, 22, 12, 0)); + event.setExtractedFields(extractedFields); + Accumulator result = maxFunction.add(event, acc); + + // 添加第二个时间值 + extractedFields.put("value", LocalDateTime.of(2024, 10, 23, 12, 0)); + result = maxFunction.add(event, result); + + // 添加第三个时间值 + extractedFields.put("value", LocalDateTime.of(2024, 10, 21, 12, 0)); + result = maxFunction.add(event, result); + + // 验证最大时间值(最大的是2024年10月23日12点) + assertEquals(LocalDateTime.of(2024, 10, 23, 12, 0), result.getMetricsFields().get("maxValue")); + } + + @Test + void testMergeAccumulatorsForNumber() { + Accumulator acc1 = new Accumulator(); + Accumulator acc2 = new Accumulator(); + Map map1 = new HashMap(); + map1.put("maxValue", 10L); + Map map2 = new HashMap(); + map2.put("maxValue", 5.0); + acc1.setMetricsFields(map1); + acc2.setMetricsFields(map2); + Accumulator mergedAcc = maxFunction.merge(acc1, acc2); + assertEquals(10L, mergedAcc.getMetricsFields().get("maxValue")); + } + + @Test + void testMergeAccumulatorsForTime() { + Accumulator acc1 = new Accumulator(); + Accumulator acc2 = new Accumulator(); + Map map1 = new HashMap(); + map1.put("maxValue", LocalDateTime.of(2023, 1, 1, 0, 0,1)); + Map map2 = new HashMap(); + map2.put("maxValue", LocalDateTime.of(2023, 1, 1, 0, 0,2)); + acc1.setMetricsFields(map1); + acc2.setMetricsFields(map2); + Accumulator mergedAcc = maxFunction.merge(acc1, acc2); + assertEquals(LocalDateTime.of(2023, 1, 1, 0, 0,2), mergedAcc.getMetricsFields().get("maxValue")); + } + + @Test + void testMergeAccumulatorsForString() { + Accumulator acc1 = new Accumulator(); + Accumulator acc2 = new Accumulator(); + Map map1 = new HashMap(); + map1.put("maxValue", "qwe"); + Map map2 = new HashMap(); + map2.put("maxValue", "abc"); + acc1.setMetricsFields(map1); + acc2.setMetricsFields(map2); + Accumulator mergedAcc = maxFunction.merge(acc1, acc2); + assertEquals("qwe", mergedAcc.getMetricsFields().get("maxValue")); + } +}
\ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MinTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MinTest.java new file mode 100644 index 0000000..ee63137 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MinTest.java @@ -0,0 +1,123 @@ +package com.geedgenetworks.core.udf.test.aggregate; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.udaf.Min; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class MinTest { + + private Min minFunction; + private UDFContext udfContext; + private Accumulator acc; + private Event event1, event2, event3; + + @BeforeEach + void setUp() { + minFunction = new Min(); + udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("value")); + udfContext.setOutput_fields(List.of("minValue")); + minFunction.open(udfContext); + + acc = new Accumulator(); + acc.setMetricsFields(new HashMap<>()); + + event1 = new Event(); + event2 = new Event(); + event3 = new Event(); + } + + @Test + void testAddNumericValue() { + event1.setExtractedFields(Map.of("value", 5.0)); + event2.setExtractedFields(Map.of("value", 3.0)); + event3.setExtractedFields(Map.of("value", 10.0)); + + acc = minFunction.add(event1, acc); + acc = minFunction.add(event2, acc); + acc = minFunction.add(event3, acc); + + assertEquals(3.0, acc.getMetricsFields().get("minValue")); + } + + @Test + void testAddStringValue() { + event1.setExtractedFields(Map.of("value", "apple")); + event2.setExtractedFields(Map.of("value", "banana")); + event3.setExtractedFields(Map.of("value", "cherry")); + + acc = minFunction.add(event1, acc); + acc = minFunction.add(event2, acc); + acc = minFunction.add(event3, acc); + + assertEquals("apple", acc.getMetricsFields().get("minValue")); + } + + @Test + void testAddLocalDateTime() { + event1.setExtractedFields(Map.of("value", LocalDateTime.of(2023, 1, 1, 0, 0))); + event2.setExtractedFields(Map.of("value", LocalDateTime.of(2022, 1, 1, 0, 0))); + event3.setExtractedFields(Map.of("value", LocalDateTime.of(2024, 1, 1, 0, 0))); + + acc = minFunction.add(event1, acc); + acc = minFunction.add(event2, acc); + acc = minFunction.add(event3, acc); + + assertEquals(LocalDateTime.of(2022, 1, 1, 0, 0), acc.getMetricsFields().get("minValue")); + } + + @Test + void testMergeAccumulatorsForNumber() { + Accumulator acc1 = new Accumulator(); + Accumulator acc2 = new Accumulator(); + Map map1 = new HashMap(); + map1.put("minValue", 10L); + Map map2 = new HashMap(); + map2.put("minValue", 5.0); + acc1.setMetricsFields(map1); + acc2.setMetricsFields(map2); + Accumulator mergedAcc = minFunction.merge(acc1, acc2); + assertEquals(5.0, mergedAcc.getMetricsFields().get("minValue")); + } + + @Test + void testMergeAccumulatorsForTime() { + Accumulator acc1 = new Accumulator(); + Accumulator acc2 = new Accumulator(); + Map map1 = new HashMap(); + map1.put("minValue", LocalDateTime.of(2023, 1, 1, 0, 0,1)); + Map map2 = new HashMap(); + map2.put("minValue", LocalDateTime.of(2023, 1, 1, 0, 0,2)); + acc1.setMetricsFields(map1); + acc2.setMetricsFields(map2); + Accumulator mergedAcc = minFunction.merge(acc1, acc2); + assertEquals(LocalDateTime.of(2023, 1, 1, 0, 0,1), mergedAcc.getMetricsFields().get("minValue")); + } + + @Test + void testMergeAccumulatorsForString() { + Accumulator acc1 = new Accumulator(); + Accumulator acc2 = new Accumulator(); + Map map1 = new HashMap(); + map1.put("minValue", "qwe"); + Map map2 = new HashMap(); + map2.put("minValue", "abc"); + acc1.setMetricsFields(map1); + acc2.setMetricsFields(map2); + Accumulator mergedAcc = minFunction.merge(acc1, acc2); + assertEquals("abc", mergedAcc.getMetricsFields().get("minValue")); + } + + +}
\ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMaxTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMaxTest.java deleted file mode 100644 index d7715c0..0000000 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMaxTest.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Copyright 2017 Hortonworks. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ -package com.geedgenetworks.core.udf.test.aggregate; - - -import com.geedgenetworks.common.Accumulator; -import com.geedgenetworks.common.Event; -import com.geedgenetworks.common.udf.UDFContext; -import com.geedgenetworks.core.udf.udaf.NumberMax; -import com.geedgenetworks.core.udf.udaf.NumberSum; -import com.ibm.icu.text.NumberFormat; -import org.junit.jupiter.api.Test; - -import java.text.ParseException; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -public class NumberMaxTest { - - @Test - public void test() throws ParseException { - - Integer[] intArr = new Integer[]{1, 2, 3, 4}; - Long[] longArr = new Long[]{1L, 2L, 3L, 4L}; - Float[] floatArr = new Float[]{1.0f, 2.0f, 3.0f, 4.0f}; - Double[] doubleArr = new Double[]{1.0, 2.0, 3.0, 4.0}; - excute(floatArr, Float.class); - excute(doubleArr, Double.class); - excute(intArr, Integer.class); - excute(longArr, Long.class); - testMerge(intArr,floatArr); - - } - private void testMerge(Number[] arr,Number[] arr2) { - - UDFContext udfContext = new UDFContext(); - udfContext.setLookup_fields(List.of("field")); - udfContext.setOutput_fields(Collections.singletonList("field_max")); - NumberMax numberMax = new NumberMax(); - Map<String, Object> metricsFields = new HashMap<>(); - Accumulator accumulator = new Accumulator(); - accumulator.setMetricsFields(metricsFields); - numberMax.open(udfContext); - Accumulator result1 = getMiddleResult(udfContext,arr); - Accumulator result2 = getMiddleResult(udfContext,arr2); - Accumulator result = numberMax.getResult(numberMax.merge(result1,result2)); - assertEquals(Float.parseFloat((result.getMetricsFields().get("field_max").toString())),4); - - } - private Accumulator getMiddleResult(UDFContext udfContext,Number[] arr) { - - - NumberMax numberMax = new NumberMax(); - Map<String, Object> metricsFields = new HashMap<>(); - Accumulator accumulator = new Accumulator(); - accumulator.setMetricsFields(metricsFields); - numberMax.open(udfContext); - Accumulator agg = numberMax.initAccumulator(accumulator); - for (Number o : arr) { - Event event = new Event(); - Map<String, Object> extractedFields = new HashMap<>(); - extractedFields.put("field", o); - event.setExtractedFields(extractedFields); - agg = numberMax.add(event, agg); - - } - return agg; - } - private static void excute(Number[] arr, Class<? extends Number> clazz) throws ParseException { - - UDFContext udfContext = new UDFContext(); - udfContext.setLookup_fields(List.of("field")); - udfContext.setOutput_fields(Collections.singletonList("field_max")); - NumberMax numberMax = new NumberMax(); - Map<String, Object> metricsFields = new HashMap<>(); - Accumulator accumulator = new Accumulator(); - accumulator.setMetricsFields(metricsFields); - numberMax.open(udfContext); - Accumulator agg = numberMax.initAccumulator(accumulator); - for (Number o : arr) { - Event event = new Event(); - Map<String, Object> extractedFields = new HashMap<>(); - extractedFields.put("field", o); - event.setExtractedFields(extractedFields); - agg = numberMax.add(event, agg); - - } - Accumulator result = numberMax.getResult(agg); - assertEquals(clazz, result.getMetricsFields().get("field_max").getClass()); - assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_max").toString())),NumberFormat.getInstance().parse("4")); - } - - - - -}
\ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMinTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMinTest.java deleted file mode 100644 index ccbb994..0000000 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMinTest.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Copyright 2017 Hortonworks. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ -package com.geedgenetworks.core.udf.test.aggregate; - - -import com.geedgenetworks.common.Accumulator; -import com.geedgenetworks.common.Event; -import com.geedgenetworks.common.udf.UDFContext; -import com.geedgenetworks.core.udf.udaf.NumberMax; -import com.geedgenetworks.core.udf.udaf.NumberMin; -import com.ibm.icu.text.NumberFormat; -import org.junit.jupiter.api.Test; - -import java.text.ParseException; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -public class NumberMinTest { - - @Test - public void test() throws ParseException { - - Integer[] intArr = new Integer[]{1, 2, 3, 4}; - Long[] longArr = new Long[]{1L, 2L, 3L, 4L}; - Float[] floatArr = new Float[]{1.0f, 2.0f, 3.0f, 4.0f}; - Double[] doubleArr = new Double[]{1.0, 2.0, 3.0, 4.0}; - excute(floatArr, Float.class); - excute(doubleArr, Double.class); - excute(intArr, Integer.class); - excute(longArr, Long.class); - testMerge(intArr,floatArr); - - } - private void testMerge(Number[] arr,Number[] arr2) { - - UDFContext udfContext = new UDFContext(); - udfContext.setLookup_fields(List.of("field")); - udfContext.setOutput_fields(Collections.singletonList("field_min")); - NumberMin numberMin = new NumberMin(); - Map<String, Object> metricsFields = new HashMap<>(); - Accumulator accumulator = new Accumulator(); - accumulator.setMetricsFields(metricsFields); - numberMin.open(udfContext); - Accumulator result1 = getMiddleResult(udfContext,arr); - Accumulator result2 = getMiddleResult(udfContext,arr2); - Accumulator result = numberMin.getResult(numberMin.merge(result1,result2)); - assertEquals(Float.parseFloat((result.getMetricsFields().get("field_min").toString())),1); - - } - private Accumulator getMiddleResult(UDFContext udfContext,Number[] arr) { - - - NumberMin numberMin = new NumberMin(); - Map<String, Object> metricsFields = new HashMap<>(); - Accumulator accumulator = new Accumulator(); - accumulator.setMetricsFields(metricsFields); - numberMin.open(udfContext); - Accumulator agg = numberMin.initAccumulator(accumulator); - for (Number o : arr) { - Event event = new Event(); - Map<String, Object> extractedFields = new HashMap<>(); - extractedFields.put("field", o); - event.setExtractedFields(extractedFields); - agg = numberMin.add(event, agg); - - } - return agg; - } - private static void excute(Number[] arr, Class<? extends Number> clazz) throws ParseException { - - UDFContext udfContext = new UDFContext(); - udfContext.setLookup_fields(List.of("field")); - udfContext.setOutput_fields(Collections.singletonList("field_min")); - NumberMin numberMin = new NumberMin(); - Map<String, Object> metricsFields = new HashMap<>(); - Accumulator accumulator = new Accumulator(); - accumulator.setMetricsFields(metricsFields); - numberMin.open(udfContext); - Accumulator agg = numberMin.initAccumulator(accumulator); - for (Number o : arr) { - Event event = new Event(); - Map<String, Object> extractedFields = new HashMap<>(); - extractedFields.put("field", o); - event.setExtractedFields(extractedFields); - agg = numberMin.add(event, agg); - - } - Accumulator result = numberMin.getResult(agg); - assertEquals(clazz, result.getMetricsFields().get("field_min").getClass()); - assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_min").toString())),NumberFormat.getInstance().parse("1")); - } - - - - -}
\ No newline at end of file |
