summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--config/udf.plugins4
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml6
-rw-r--r--groot-common/src/main/resources/udf.plugins4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Max.java121
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Min.java120
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMax.java94
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMin.java94
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MaxTest.java144
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MinTest.java123
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMaxTest.java113
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMinTest.java113
11 files changed, 515 insertions, 421 deletions
diff --git a/config/udf.plugins b/config/udf.plugins
index 7cd3b0e..83da898 100644
--- a/config/udf.plugins
+++ b/config/udf.plugins
@@ -36,5 +36,5 @@ com.geedgenetworks.core.udf.udtf.PathUnroll
com.geedgenetworks.core.udf.uuid.Uuid
com.geedgenetworks.core.udf.uuid.UuidV5
com.geedgenetworks.core.udf.uuid.UuidV7
-com.geedgenetworks.core.udf.udaf.NumberMax
-com.geedgenetworks.core.udf.udaf.NumberMin \ No newline at end of file
+com.geedgenetworks.core.udf.udaf.Max
+com.geedgenetworks.core.udf.udaf.Min \ No newline at end of file
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml
index 7473939..92eaef2 100644
--- a/groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml
+++ b/groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml
@@ -41,12 +41,12 @@ postprocessing_pipelines:
lookup_fields: [ client_country ]
- function: FIRST_VALUE
lookup_fields: [ server_country ]
- - function: NUMBER_MIN
+ - function: MIN
lookup_fields: [ timestamp_ms ]
output_fields: [ start_timestamp_ms ]
- - function: NUMBER_MIN
+ - function: MIN
lookup_fields: [ recv_time ]
- - function: NUMBER_MAX
+ - function: MAX
lookup_fields: [ timestamp_ms ]
output_fields: [ end_timestamp_ms ]
- function: FIRST_VALUE
diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins
index 9950a64..fe7a083 100644
--- a/groot-common/src/main/resources/udf.plugins
+++ b/groot-common/src/main/resources/udf.plugins
@@ -33,5 +33,5 @@ com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantiles
com.geedgenetworks.core.udf.udtf.JsonUnroll
com.geedgenetworks.core.udf.udtf.Unroll
com.geedgenetworks.core.udf.udtf.PathUnroll
-com.geedgenetworks.core.udf.udaf.NumberMax
-com.geedgenetworks.core.udf.udaf.NumberMin \ No newline at end of file
+com.geedgenetworks.core.udf.udaf.Max
+com.geedgenetworks.core.udf.udaf.Min \ No newline at end of file
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Max.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Max.java
new file mode 100644
index 0000000..e4632bb
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Max.java
@@ -0,0 +1,121 @@
+package com.geedgenetworks.core.udf.udaf;
+
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Map;
+
+
+public class Max implements AggregateFunction {
+
+ private String lookupField;
+ private String outputField;
+
+ @Override
+ public void open(UDFContext udfContext) {
+ if (udfContext.getLookup_fields() == null) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
+ }
+ this.lookupField = udfContext.getLookup_fields().get(0);
+ if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
+ this.outputField = udfContext.getOutput_fields().get(0);
+ } else {
+ outputField = lookupField;
+ }
+ }
+
+ @Override
+ public Accumulator initAccumulator(Accumulator acc) {
+ return acc;
+ }
+
+ @Override
+ public Accumulator add(Event event, Accumulator acc) {
+ Map<String, Object> eventFields = event.getExtractedFields();
+ Map<String, Object> metricsFields = acc.getMetricsFields();
+
+ if (metricsFields.get(outputField) == null && eventFields.get(lookupField) != null) {
+ metricsFields.put(outputField, eventFields.get(lookupField));
+ } else if (metricsFields.get(outputField) != null && eventFields.get(lookupField) != null) {
+ Object currentValue = metricsFields.get(outputField);
+ Object newValue = eventFields.get(lookupField);
+
+ // 直接检测是否为时间类型对象
+ if (currentValue instanceof LocalDateTime && newValue instanceof LocalDateTime) {
+ LocalDateTime time1 = (LocalDateTime) currentValue;
+ LocalDateTime time2 = (LocalDateTime) newValue;
+ if (time1.isBefore(time2)) {
+ metricsFields.put(outputField, newValue);
+ }
+ } else if (currentValue instanceof String && newValue instanceof String) {
+ // 处理字符串比较
+ String value1 = currentValue.toString();
+ String value2 = newValue.toString();
+ if (value1.compareTo(value2) < 0) {
+ metricsFields.put(outputField, newValue);
+ }
+
+ } else {
+ // 数字进行比较
+ double value1 = Double.parseDouble(currentValue.toString());
+ double value2 = Double.parseDouble(newValue.toString());
+ if (value1 < value2) {
+ metricsFields.put(outputField, newValue);
+ }
+ }
+ }
+ return acc;
+ }
+
+ @Override
+ public String functionName() {
+ return "MAX";
+ }
+
+ @Override
+ public Accumulator getResult(Accumulator acc) {
+ return acc;
+ }
+
+ @Override
+ public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
+ Map<String, Object> firstMetrics = firstAcc.getMetricsFields();
+ Map<String, Object> secondMetrics = secondAcc.getMetricsFields();
+ Object firstValue = firstMetrics.get(outputField);
+ Object secondValue = secondMetrics.get(outputField);
+ if (firstValue == null && secondValue != null) {
+ firstMetrics.put(outputField, secondValue);
+ } else if (firstValue != null && secondValue != null) {
+ // 直接检测是否为时间类型对象
+ if (firstValue instanceof LocalDateTime && secondValue instanceof LocalDateTime) {
+ LocalDateTime time1 = (LocalDateTime) firstValue;
+ LocalDateTime time2 = (LocalDateTime) secondValue;
+ if (time1.isBefore(time2)) {
+ firstMetrics.put(outputField, secondValue);
+ }
+ } else if (firstValue instanceof String && secondValue instanceof String) {
+
+ String value1 = firstValue.toString();
+ String value2 = secondValue.toString();
+ if (value1.compareTo(value2) < 0) {
+ firstMetrics.put(outputField, secondValue);
+ }
+ } else {
+ // 假设为数字,进行比较
+ double value1 = Double.parseDouble(firstValue.toString());
+ double value2 = Double.parseDouble(secondValue.toString());
+ if (value1 < value2) {
+ firstMetrics.put(outputField, secondValue);
+ }
+ }
+ }
+ return firstAcc;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Min.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Min.java
new file mode 100644
index 0000000..70153c9
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Min.java
@@ -0,0 +1,120 @@
+package com.geedgenetworks.core.udf.udaf;
+
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Map;
+
+
+public class Min implements AggregateFunction {
+
+ private String lookupField;
+ private String outputField;
+
+ @Override
+ public void open(UDFContext udfContext) {
+ if (udfContext.getLookup_fields() == null) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
+ }
+ this.lookupField = udfContext.getLookup_fields().get(0);
+ if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
+ this.outputField = udfContext.getOutput_fields().get(0);
+ } else {
+ outputField = lookupField;
+ }
+ }
+
+ @Override
+ public Accumulator initAccumulator(Accumulator acc) {
+ return acc;
+ }
+
+ @Override
+ public Accumulator add(Event event, Accumulator acc) {
+ Map<String, Object> eventFields = event.getExtractedFields();
+ Map<String, Object> metricsFields = acc.getMetricsFields();
+
+ if (metricsFields.get(outputField) == null && eventFields.get(lookupField) != null) {
+ metricsFields.put(outputField, eventFields.get(lookupField));
+ } else if (metricsFields.get(outputField) != null && eventFields.get(lookupField) != null) {
+ Object currentValue = metricsFields.get(outputField);
+ Object newValue = eventFields.get(lookupField);
+
+ // 直接检测是否为时间类型对象
+ if (currentValue instanceof LocalDateTime && newValue instanceof LocalDateTime) {
+ LocalDateTime time1 = (LocalDateTime) currentValue;
+ LocalDateTime time2 = (LocalDateTime) newValue;
+ if (time1.isAfter(time2)) {
+ metricsFields.put(outputField, newValue);
+ }
+ } else if (currentValue instanceof String && newValue instanceof String) {
+ // 处理字符串比较
+ String value1 = currentValue.toString();
+ String value2 = newValue.toString();
+ if (value1.compareTo(value2) > 0) {
+ metricsFields.put(outputField, newValue);
+ }
+
+ } else {
+ // 假设为数字,进行比较
+ double value1 = Double.parseDouble(currentValue.toString());
+ double value2 = Double.parseDouble(newValue.toString());
+ if (value1 > value2) {
+ metricsFields.put(outputField, newValue);
+ }
+ }
+ }
+ return acc;
+ }
+
+ @Override
+ public String functionName() {
+ return "MIN";
+ }
+
+ @Override
+ public Accumulator getResult(Accumulator acc) {
+ return acc;
+ }
+
+ @Override
+ public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
+ Map<String, Object> firstMetrics = firstAcc.getMetricsFields();
+ Map<String, Object> secondMetrics = secondAcc.getMetricsFields();
+ Object firstValue = firstMetrics.get(outputField);
+ Object secondValue = secondMetrics.get(outputField);
+ if (firstValue == null && secondValue != null) {
+ firstMetrics.put(outputField, secondValue);
+ } else if (firstValue != null && secondValue != null) {
+ // 直接检测是否为时间类型对象
+ if (firstValue instanceof LocalDateTime && secondValue instanceof LocalDateTime) {
+ LocalDateTime time1 = (LocalDateTime) firstValue;
+ LocalDateTime time2 = (LocalDateTime) secondValue;
+ if (time1.isAfter(time2)) {
+ firstMetrics.put(outputField, secondValue);
+ }
+ } else if (firstValue instanceof String && secondValue instanceof String) {
+ String value1 = firstValue.toString();
+ String value2 = secondValue.toString();
+ if (value1.compareTo(value2) > 0) {
+ firstMetrics.put(outputField, secondValue);
+ }
+ } else {
+ // 假设为数字,进行比较
+ double value1 = Double.parseDouble(firstValue.toString());
+ double value2 = Double.parseDouble(secondValue.toString());
+ if (value1 > value2) {
+ firstAcc.getMetricsFields().put(outputField, secondValue);
+ }
+ }
+ }
+ return firstAcc;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMax.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMax.java
deleted file mode 100644
index 865ce64..0000000
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMax.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Copyright 2017 Hortonworks.
- * <p>
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-package com.geedgenetworks.core.udf.udaf;
-
-
-import com.geedgenetworks.common.Accumulator;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.common.exception.CommonErrorCode;
-import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-import com.geedgenetworks.common.udf.AggregateFunction;
-import com.geedgenetworks.common.udf.UDFContext;
-
-/**
- * Collects elements within a group and returns the list of aggregated objects
- */
-public class NumberMax implements AggregateFunction {
-
- private String lookupField;
- private String outputField;
-
-
- @Override
- public void open(UDFContext udfContext) {
- if (udfContext.getLookup_fields() == null) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
- }
- this.lookupField = udfContext.getLookup_fields().get(0);
- if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
- this.outputField = udfContext.getOutput_fields().get(0);
- } else {
- outputField = lookupField;
- }
- }
-
- @Override
- public Accumulator initAccumulator(Accumulator acc) {
- return acc;
- }
-
- @Override
- public Accumulator add(Event event, Accumulator acc) {
- if (acc.getMetricsFields().get(outputField)==null && event.getExtractedFields().get(lookupField)!=null) {
- acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField));
- } else if (acc.getMetricsFields().get(outputField)!=null && event.getExtractedFields().get(lookupField)!=null) {
- double value1 = Double.parseDouble(acc.getMetricsFields().get(outputField).toString());
- double value2 = Double.parseDouble(event.getExtractedFields().get(lookupField).toString());
- if (value1 < value2) {
- acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField));
- }
- }
- return acc;
- }
-
- @Override
- public String functionName() {
- return "NUMBER_MAX";
- }
-
- @Override
- public Accumulator getResult(Accumulator acc) {
- return acc;
- }
-
- @Override
- public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
- if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
- firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField));
- }
- else if(firstAcc.getMetricsFields().containsKey(outputField) && !secondAcc.getMetricsFields().containsKey(outputField)){
- firstAcc.getMetricsFields().put(outputField, firstAcc.getMetricsFields().get(outputField));
- }
- else if(firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)){
- double value1 = Double.parseDouble(firstAcc.getMetricsFields().get(outputField).toString());
- double value2 = Double.parseDouble(secondAcc.getMetricsFields().get(outputField).toString());
- if (value1 < value2) {
- firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField));
- }
- }
- return firstAcc;
- }
-}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMin.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMin.java
deleted file mode 100644
index 3af21ec..0000000
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMin.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Copyright 2017 Hortonworks.
- * <p>
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-package com.geedgenetworks.core.udf.udaf;
-
-
-import com.geedgenetworks.common.Accumulator;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.common.exception.CommonErrorCode;
-import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-import com.geedgenetworks.common.udf.AggregateFunction;
-import com.geedgenetworks.common.udf.UDFContext;
-
-/**
- * Collects elements within a group and returns the list of aggregated objects
- */
-public class NumberMin implements AggregateFunction {
-
- private String lookupField;
- private String outputField;
-
-
- @Override
- public void open(UDFContext udfContext) {
- if (udfContext.getLookup_fields() == null) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
- }
- this.lookupField = udfContext.getLookup_fields().get(0);
- if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
- this.outputField = udfContext.getOutput_fields().get(0);
- } else {
- outputField = lookupField;
- }
- }
-
- @Override
- public Accumulator initAccumulator(Accumulator acc) {
- return acc;
- }
-
- @Override
- public Accumulator add(Event event, Accumulator acc) {
- if (acc.getMetricsFields().get(outputField)==null && event.getExtractedFields().get(lookupField)!=null) {
- acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField));
- } else if (acc.getMetricsFields().get(outputField)!=null && event.getExtractedFields()!=null) {
- double value1 = Double.parseDouble(acc.getMetricsFields().get(outputField).toString());
- double value2 = Double.parseDouble(event.getExtractedFields().get(lookupField).toString());
- if (value1 > value2) {
- acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField));
- }
- }
- return acc;
- }
-
- @Override
- public String functionName() {
- return "NUMBER_MIN";
- }
-
- @Override
- public Accumulator getResult(Accumulator acc) {
- return acc;
- }
-
- @Override
- public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
- if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
- firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField));
- }
- else if(firstAcc.getMetricsFields().containsKey(outputField) && !secondAcc.getMetricsFields().containsKey(outputField)){
- firstAcc.getMetricsFields().put(outputField, firstAcc.getMetricsFields().get(outputField));
- }
- else if(firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)){
- double value1 = Double.parseDouble(firstAcc.getMetricsFields().get(outputField).toString());
- double value2 = Double.parseDouble(secondAcc.getMetricsFields().get(outputField).toString());
- if (value1 > value2) {
- firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField));
- }
- }
- return firstAcc;
- }
-}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MaxTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MaxTest.java
new file mode 100644
index 0000000..67fec93
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MaxTest.java
@@ -0,0 +1,144 @@
+package com.geedgenetworks.core.udf.test.aggregate;
+
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.udf.udaf.Max;
+import com.geedgenetworks.core.udf.udaf.Min;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class MaxTest {
+
+ private Max maxFunction;
+ private Accumulator acc;
+ private Event event;
+
+ @BeforeEach
+ void setUp() {
+ maxFunction = new Max();
+ acc = new Accumulator();
+ event = new Event();
+
+ // 初始化上下文
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("value"));
+ udfContext.setOutput_fields(List.of("maxValue"));
+ maxFunction.open(udfContext);
+
+ // 初始化累加器的 metricsFields
+ acc.setMetricsFields(new HashMap<>());
+ }
+
+ @Test
+ void testAddMultipleNumericValues() {
+ // 设置事件中的多个数值
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("value", 100);
+ event.setExtractedFields(extractedFields);
+ Accumulator result = maxFunction.add(event, acc);
+
+ // 添加第二个数值
+ extractedFields.put("value", 200);
+ result = maxFunction.add(event, result);
+
+ // 添加第三个数值
+ extractedFields.put("value", 150);
+ result = maxFunction.add(event, result);
+
+ // 验证最大值应该是200
+ assertEquals(200, result.getMetricsFields().get("maxValue"));
+ }
+
+ @Test
+ void testAddMultipleStringValues() {
+ // 设置事件中的多个字符串值
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("value", "abc");
+ event.setExtractedFields(extractedFields);
+ Accumulator result = maxFunction.add(event, acc);
+
+ // 添加第二个字符串值
+ extractedFields.put("value", "def");
+ result = maxFunction.add(event, result);
+
+ // 添加第三个字符串值
+ extractedFields.put("value", "ghi");
+ result = maxFunction.add(event, result);
+
+ // 验证最大字符串值(按字母顺序最大的是 "ghi")
+ assertEquals("ghi", result.getMetricsFields().get("maxValue"));
+ }
+
+ @Test
+ void testAddMultipleLocalDateTimes() {
+ // 设置事件中的多个时间值
+ Map<String, Object> extractedFields = new HashMap<>();
+
+ extractedFields.put("value", LocalDateTime.of(2024, 10, 22, 12, 0));
+ event.setExtractedFields(extractedFields);
+ Accumulator result = maxFunction.add(event, acc);
+
+ // 添加第二个时间值
+ extractedFields.put("value", LocalDateTime.of(2024, 10, 23, 12, 0));
+ result = maxFunction.add(event, result);
+
+ // 添加第三个时间值
+ extractedFields.put("value", LocalDateTime.of(2024, 10, 21, 12, 0));
+ result = maxFunction.add(event, result);
+
+ // 验证最大时间值(最大的是2024年10月23日12点)
+ assertEquals(LocalDateTime.of(2024, 10, 23, 12, 0), result.getMetricsFields().get("maxValue"));
+ }
+
+ @Test
+ void testMergeAccumulatorsForNumber() {
+ Accumulator acc1 = new Accumulator();
+ Accumulator acc2 = new Accumulator();
+ Map map1 = new HashMap();
+ map1.put("maxValue", 10L);
+ Map map2 = new HashMap();
+ map2.put("maxValue", 5.0);
+ acc1.setMetricsFields(map1);
+ acc2.setMetricsFields(map2);
+ Accumulator mergedAcc = maxFunction.merge(acc1, acc2);
+ assertEquals(10L, mergedAcc.getMetricsFields().get("maxValue"));
+ }
+
+ @Test
+ void testMergeAccumulatorsForTime() {
+ Accumulator acc1 = new Accumulator();
+ Accumulator acc2 = new Accumulator();
+ Map map1 = new HashMap();
+ map1.put("maxValue", LocalDateTime.of(2023, 1, 1, 0, 0,1));
+ Map map2 = new HashMap();
+ map2.put("maxValue", LocalDateTime.of(2023, 1, 1, 0, 0,2));
+ acc1.setMetricsFields(map1);
+ acc2.setMetricsFields(map2);
+ Accumulator mergedAcc = maxFunction.merge(acc1, acc2);
+ assertEquals(LocalDateTime.of(2023, 1, 1, 0, 0,2), mergedAcc.getMetricsFields().get("maxValue"));
+ }
+
+ @Test
+ void testMergeAccumulatorsForString() {
+ Accumulator acc1 = new Accumulator();
+ Accumulator acc2 = new Accumulator();
+ Map map1 = new HashMap();
+ map1.put("maxValue", "qwe");
+ Map map2 = new HashMap();
+ map2.put("maxValue", "abc");
+ acc1.setMetricsFields(map1);
+ acc2.setMetricsFields(map2);
+ Accumulator mergedAcc = maxFunction.merge(acc1, acc2);
+ assertEquals("qwe", mergedAcc.getMetricsFields().get("maxValue"));
+ }
+} \ No newline at end of file
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MinTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MinTest.java
new file mode 100644
index 0000000..ee63137
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MinTest.java
@@ -0,0 +1,123 @@
+package com.geedgenetworks.core.udf.test.aggregate;
+
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.udf.udaf.Min;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class MinTest {
+
+ private Min minFunction;
+ private UDFContext udfContext;
+ private Accumulator acc;
+ private Event event1, event2, event3;
+
+ @BeforeEach
+ void setUp() {
+ minFunction = new Min();
+ udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("value"));
+ udfContext.setOutput_fields(List.of("minValue"));
+ minFunction.open(udfContext);
+
+ acc = new Accumulator();
+ acc.setMetricsFields(new HashMap<>());
+
+ event1 = new Event();
+ event2 = new Event();
+ event3 = new Event();
+ }
+
+ @Test
+ void testAddNumericValue() {
+ event1.setExtractedFields(Map.of("value", 5.0));
+ event2.setExtractedFields(Map.of("value", 3.0));
+ event3.setExtractedFields(Map.of("value", 10.0));
+
+ acc = minFunction.add(event1, acc);
+ acc = minFunction.add(event2, acc);
+ acc = minFunction.add(event3, acc);
+
+ assertEquals(3.0, acc.getMetricsFields().get("minValue"));
+ }
+
+ @Test
+ void testAddStringValue() {
+ event1.setExtractedFields(Map.of("value", "apple"));
+ event2.setExtractedFields(Map.of("value", "banana"));
+ event3.setExtractedFields(Map.of("value", "cherry"));
+
+ acc = minFunction.add(event1, acc);
+ acc = minFunction.add(event2, acc);
+ acc = minFunction.add(event3, acc);
+
+ assertEquals("apple", acc.getMetricsFields().get("minValue"));
+ }
+
+ @Test
+ void testAddLocalDateTime() {
+ event1.setExtractedFields(Map.of("value", LocalDateTime.of(2023, 1, 1, 0, 0)));
+ event2.setExtractedFields(Map.of("value", LocalDateTime.of(2022, 1, 1, 0, 0)));
+ event3.setExtractedFields(Map.of("value", LocalDateTime.of(2024, 1, 1, 0, 0)));
+
+ acc = minFunction.add(event1, acc);
+ acc = minFunction.add(event2, acc);
+ acc = minFunction.add(event3, acc);
+
+ assertEquals(LocalDateTime.of(2022, 1, 1, 0, 0), acc.getMetricsFields().get("minValue"));
+ }
+
+ @Test
+ void testMergeAccumulatorsForNumber() {
+ Accumulator acc1 = new Accumulator();
+ Accumulator acc2 = new Accumulator();
+ Map map1 = new HashMap();
+ map1.put("minValue", 10L);
+ Map map2 = new HashMap();
+ map2.put("minValue", 5.0);
+ acc1.setMetricsFields(map1);
+ acc2.setMetricsFields(map2);
+ Accumulator mergedAcc = minFunction.merge(acc1, acc2);
+ assertEquals(5.0, mergedAcc.getMetricsFields().get("minValue"));
+ }
+
+ @Test
+ void testMergeAccumulatorsForTime() {
+ Accumulator acc1 = new Accumulator();
+ Accumulator acc2 = new Accumulator();
+ Map map1 = new HashMap();
+ map1.put("minValue", LocalDateTime.of(2023, 1, 1, 0, 0,1));
+ Map map2 = new HashMap();
+ map2.put("minValue", LocalDateTime.of(2023, 1, 1, 0, 0,2));
+ acc1.setMetricsFields(map1);
+ acc2.setMetricsFields(map2);
+ Accumulator mergedAcc = minFunction.merge(acc1, acc2);
+ assertEquals(LocalDateTime.of(2023, 1, 1, 0, 0,1), mergedAcc.getMetricsFields().get("minValue"));
+ }
+
+ @Test
+ void testMergeAccumulatorsForString() {
+ Accumulator acc1 = new Accumulator();
+ Accumulator acc2 = new Accumulator();
+ Map map1 = new HashMap();
+ map1.put("minValue", "qwe");
+ Map map2 = new HashMap();
+ map2.put("minValue", "abc");
+ acc1.setMetricsFields(map1);
+ acc2.setMetricsFields(map2);
+ Accumulator mergedAcc = minFunction.merge(acc1, acc2);
+ assertEquals("abc", mergedAcc.getMetricsFields().get("minValue"));
+ }
+
+
+} \ No newline at end of file
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMaxTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMaxTest.java
deleted file mode 100644
index d7715c0..0000000
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMaxTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Copyright 2017 Hortonworks.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-package com.geedgenetworks.core.udf.test.aggregate;
-
-
-import com.geedgenetworks.common.Accumulator;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.common.udf.UDFContext;
-import com.geedgenetworks.core.udf.udaf.NumberMax;
-import com.geedgenetworks.core.udf.udaf.NumberSum;
-import com.ibm.icu.text.NumberFormat;
-import org.junit.jupiter.api.Test;
-
-import java.text.ParseException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-public class NumberMaxTest {
-
- @Test
- public void test() throws ParseException {
-
- Integer[] intArr = new Integer[]{1, 2, 3, 4};
- Long[] longArr = new Long[]{1L, 2L, 3L, 4L};
- Float[] floatArr = new Float[]{1.0f, 2.0f, 3.0f, 4.0f};
- Double[] doubleArr = new Double[]{1.0, 2.0, 3.0, 4.0};
- excute(floatArr, Float.class);
- excute(doubleArr, Double.class);
- excute(intArr, Integer.class);
- excute(longArr, Long.class);
- testMerge(intArr,floatArr);
-
- }
- private void testMerge(Number[] arr,Number[] arr2) {
-
- UDFContext udfContext = new UDFContext();
- udfContext.setLookup_fields(List.of("field"));
- udfContext.setOutput_fields(Collections.singletonList("field_max"));
- NumberMax numberMax = new NumberMax();
- Map<String, Object> metricsFields = new HashMap<>();
- Accumulator accumulator = new Accumulator();
- accumulator.setMetricsFields(metricsFields);
- numberMax.open(udfContext);
- Accumulator result1 = getMiddleResult(udfContext,arr);
- Accumulator result2 = getMiddleResult(udfContext,arr2);
- Accumulator result = numberMax.getResult(numberMax.merge(result1,result2));
- assertEquals(Float.parseFloat((result.getMetricsFields().get("field_max").toString())),4);
-
- }
- private Accumulator getMiddleResult(UDFContext udfContext,Number[] arr) {
-
-
- NumberMax numberMax = new NumberMax();
- Map<String, Object> metricsFields = new HashMap<>();
- Accumulator accumulator = new Accumulator();
- accumulator.setMetricsFields(metricsFields);
- numberMax.open(udfContext);
- Accumulator agg = numberMax.initAccumulator(accumulator);
- for (Number o : arr) {
- Event event = new Event();
- Map<String, Object> extractedFields = new HashMap<>();
- extractedFields.put("field", o);
- event.setExtractedFields(extractedFields);
- agg = numberMax.add(event, agg);
-
- }
- return agg;
- }
- private static void excute(Number[] arr, Class<? extends Number> clazz) throws ParseException {
-
- UDFContext udfContext = new UDFContext();
- udfContext.setLookup_fields(List.of("field"));
- udfContext.setOutput_fields(Collections.singletonList("field_max"));
- NumberMax numberMax = new NumberMax();
- Map<String, Object> metricsFields = new HashMap<>();
- Accumulator accumulator = new Accumulator();
- accumulator.setMetricsFields(metricsFields);
- numberMax.open(udfContext);
- Accumulator agg = numberMax.initAccumulator(accumulator);
- for (Number o : arr) {
- Event event = new Event();
- Map<String, Object> extractedFields = new HashMap<>();
- extractedFields.put("field", o);
- event.setExtractedFields(extractedFields);
- agg = numberMax.add(event, agg);
-
- }
- Accumulator result = numberMax.getResult(agg);
- assertEquals(clazz, result.getMetricsFields().get("field_max").getClass());
- assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_max").toString())),NumberFormat.getInstance().parse("4"));
- }
-
-
-
-
-} \ No newline at end of file
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMinTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMinTest.java
deleted file mode 100644
index ccbb994..0000000
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMinTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Copyright 2017 Hortonworks.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-package com.geedgenetworks.core.udf.test.aggregate;
-
-
-import com.geedgenetworks.common.Accumulator;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.common.udf.UDFContext;
-import com.geedgenetworks.core.udf.udaf.NumberMax;
-import com.geedgenetworks.core.udf.udaf.NumberMin;
-import com.ibm.icu.text.NumberFormat;
-import org.junit.jupiter.api.Test;
-
-import java.text.ParseException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-public class NumberMinTest {
-
- @Test
- public void test() throws ParseException {
-
- Integer[] intArr = new Integer[]{1, 2, 3, 4};
- Long[] longArr = new Long[]{1L, 2L, 3L, 4L};
- Float[] floatArr = new Float[]{1.0f, 2.0f, 3.0f, 4.0f};
- Double[] doubleArr = new Double[]{1.0, 2.0, 3.0, 4.0};
- excute(floatArr, Float.class);
- excute(doubleArr, Double.class);
- excute(intArr, Integer.class);
- excute(longArr, Long.class);
- testMerge(intArr,floatArr);
-
- }
- private void testMerge(Number[] arr,Number[] arr2) {
-
- UDFContext udfContext = new UDFContext();
- udfContext.setLookup_fields(List.of("field"));
- udfContext.setOutput_fields(Collections.singletonList("field_min"));
- NumberMin numberMin = new NumberMin();
- Map<String, Object> metricsFields = new HashMap<>();
- Accumulator accumulator = new Accumulator();
- accumulator.setMetricsFields(metricsFields);
- numberMin.open(udfContext);
- Accumulator result1 = getMiddleResult(udfContext,arr);
- Accumulator result2 = getMiddleResult(udfContext,arr2);
- Accumulator result = numberMin.getResult(numberMin.merge(result1,result2));
- assertEquals(Float.parseFloat((result.getMetricsFields().get("field_min").toString())),1);
-
- }
- private Accumulator getMiddleResult(UDFContext udfContext,Number[] arr) {
-
-
- NumberMin numberMin = new NumberMin();
- Map<String, Object> metricsFields = new HashMap<>();
- Accumulator accumulator = new Accumulator();
- accumulator.setMetricsFields(metricsFields);
- numberMin.open(udfContext);
- Accumulator agg = numberMin.initAccumulator(accumulator);
- for (Number o : arr) {
- Event event = new Event();
- Map<String, Object> extractedFields = new HashMap<>();
- extractedFields.put("field", o);
- event.setExtractedFields(extractedFields);
- agg = numberMin.add(event, agg);
-
- }
- return agg;
- }
- private static void excute(Number[] arr, Class<? extends Number> clazz) throws ParseException {
-
- UDFContext udfContext = new UDFContext();
- udfContext.setLookup_fields(List.of("field"));
- udfContext.setOutput_fields(Collections.singletonList("field_min"));
- NumberMin numberMin = new NumberMin();
- Map<String, Object> metricsFields = new HashMap<>();
- Accumulator accumulator = new Accumulator();
- accumulator.setMetricsFields(metricsFields);
- numberMin.open(udfContext);
- Accumulator agg = numberMin.initAccumulator(accumulator);
- for (Number o : arr) {
- Event event = new Event();
- Map<String, Object> extractedFields = new HashMap<>();
- extractedFields.put("field", o);
- event.setExtractedFields(extractedFields);
- agg = numberMin.add(event, agg);
-
- }
- Accumulator result = numberMin.getResult(agg);
- assertEquals(clazz, result.getMetricsFields().get("field_min").getClass());
- assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_min").toString())),NumberFormat.getInstance().parse("1"));
- }
-
-
-
-
-} \ No newline at end of file