summaryrefslogtreecommitdiff
path: root/groot-core
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-11-26 18:13:48 +0800
committerwangkuan <[email protected]>2024-11-26 18:13:48 +0800
commitf2989ca998a3d164e53221625aa74c61585c0efa (patch)
treea1cb40cd415eea9e14dc547a8df78c0704e707f3 /groot-core
parent88d73bd9313e16d9738b84533f9ad17255e60b65 (diff)
[feature][core]CN-1730 回滚CollectList和CollectSet,新增数组聚合函数ArrayContactAgg及相关单元测试
Diffstat (limited to 'groot-core')
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java20
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java26
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/array/ArrayContactAgg.java106
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/ArrayContactAggTest.java195
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java18
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java40
6 files changed, 350 insertions, 55 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java
index bd6b76a..26a4087 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java
@@ -7,7 +7,8 @@ import com.geedgenetworks.common.config.Accumulator;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
/**
* Collects elements within a group and returns the list of aggregated objects
@@ -16,7 +17,6 @@ public class CollectList implements AggregateFunction {
private String lookupField;
private String outputField;
- private String collectType;
@Override
public void open(UDFContext udfContext) {
@@ -28,9 +28,7 @@ public class CollectList implements AggregateFunction {
this.outputField = udfContext.getOutputFields() == null || udfContext.getOutputFields().isEmpty()
? lookupField
: udfContext.getOutputFields().get(0);
- this.collectType = udfContext.getParameters() == null
- ? "object"
- : udfContext.getParameters().getOrDefault("collect_type", "object").toString();
+
}
@Override
@@ -45,16 +43,7 @@ public class CollectList implements AggregateFunction {
if (valueObj == null) {
return acc;
}
- if (collectType.equals("array")) {
- if (valueObj instanceof List<?>) {
- List<?> valueList = (List<?>) valueObj;
- List<Object> aggregateList = getOrInitList(acc);
- aggregateList.addAll(valueList);
- }
- } else {
- getOrInitList(acc).add(valueObj);
- }
-
+ getOrInitList(acc).add(valueObj);
return acc;
}
@@ -92,5 +81,4 @@ public class CollectList implements AggregateFunction {
}
-
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java
index ddca8d1..5397a19 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java
@@ -1,15 +1,15 @@
package com.geedgenetworks.core.udf.udaf;
-import com.geedgenetworks.common.config.Accumulator;
-import com.geedgenetworks.common.exception.CommonErrorCode;
-import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.api.common.udf.AggregateFunction;
import com.geedgenetworks.api.common.udf.UDFContext;
import com.geedgenetworks.api.event.Event;
+import com.geedgenetworks.common.config.Accumulator;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-
-import java.util.*;
+import java.util.HashSet;
+import java.util.Set;
/**
* Collects elements within a group and returns the list of aggregated objects
@@ -18,7 +18,6 @@ public class CollectSet implements AggregateFunction {
private String lookupField;
private String outputField;
- private String collectType;
@Override
public void open(UDFContext udfContext) {
@@ -30,9 +29,6 @@ public class CollectSet implements AggregateFunction {
this.outputField = udfContext.getOutputFields() == null || udfContext.getOutputFields().isEmpty()
? lookupField
: udfContext.getOutputFields().get(0);
- this.collectType = udfContext.getParameters() == null
- ? "object"
- : udfContext.getParameters().getOrDefault("collect_type", "object").toString();
}
@Override
@@ -47,17 +43,7 @@ public class CollectSet implements AggregateFunction {
if (valueObj == null) {
return acc;
}
-
- if (collectType.equals("array")) {
- if (valueObj instanceof List<?>) {
- List<?> valueList = (List<?>) valueObj;
- Set<Object> aggregateSet = getOrInitSet(acc);
- aggregateSet.addAll(valueList);
- }
- } else {
- getOrInitSet(acc).add(valueObj);
- }
-
+ getOrInitSet(acc).add(valueObj);
return acc;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/array/ArrayContactAgg.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/array/ArrayContactAgg.java
new file mode 100644
index 0000000..04e8cce
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/array/ArrayContactAgg.java
@@ -0,0 +1,106 @@
+package com.geedgenetworks.core.udf.udaf.array;
+
+
+import com.geedgenetworks.api.common.udf.AggregateFunction;
+import com.geedgenetworks.api.common.udf.UDFContext;
+import com.geedgenetworks.api.event.Event;
+import com.geedgenetworks.common.config.Accumulator;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+
+import java.util.*;
+
+/**
+ * Aggregates elements within a group and returns a collection based on mode (list or set).
+ */
+public class ArrayContactAgg implements AggregateFunction {
+
+ private String lookupField;
+ private String outputField;
+ private String mode;
+ private long maxSize;
+
+ @Override
+ public void open(UDFContext udfContext) {
+ // Validate input parameters
+ if (udfContext.getLookupFields() == null || udfContext.getLookupFields().isEmpty() || udfContext.getParameters() == null) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required lookup field parameter");
+ }
+ this.lookupField = udfContext.getLookupFields().get(0);
+ this.outputField = (udfContext.getOutputFields() == null || udfContext.getOutputFields().isEmpty())
+ ? lookupField : udfContext.getOutputFields().get(0);
+ this.mode = (String) udfContext.getParameters().getOrDefault("mode", "all");
+ this.maxSize = Long.parseLong(udfContext.getParameters().getOrDefault("max_size", 1000000L).toString());
+ }
+
+ @Override
+ public Accumulator initAccumulator(Accumulator acc) {
+ acc.getMetricsFields().put(outputField, createLimitedCollection());
+ return acc;
+ }
+
+ @Override
+ public Accumulator add(Event event, Accumulator acc) {
+ Object valueObj = event.getExtractedFields().get(lookupField);
+ if (valueObj instanceof List<?>) {
+ Collection<Object> collection = getOrInitCollection(acc);
+ collection.addAll((List<?>) valueObj);
+ acc.getMetricsFields().put(outputField, collection);
+ }
+ return acc;
+ }
+
+ @Override
+ public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
+ Collection<Object> firstCollection = getOrInitCollection(firstAcc);
+ Collection<Object> secondCollection = getOrInitCollection(secondAcc);
+
+ if (firstCollection != null && secondCollection != null) {
+ firstCollection.addAll(secondCollection);
+ } else if (firstCollection == null) {
+ firstAcc.getMetricsFields().put(outputField, secondCollection);
+ }
+
+ return firstAcc;
+ }
+
+ @Override
+ public Accumulator getResult(Accumulator acc) {
+ Collection<Object> collection = getOrInitCollection(acc);
+ // Ensure the result does not exceed maxSize
+ if (collection instanceof List) {
+ // Limit the size of List
+ collection = limitListSize((List<Object>) collection);
+ } else if (collection instanceof Set) {
+ // Limit the size of Set (converted to a list for limiting)
+ collection = limitSetSize((Set<Object>) collection);
+ }
+ acc.getMetricsFields().put(outputField, collection);
+ return acc;
+ }
+
+ @Override
+ public String functionName() {
+ return "ARRAY_CONCAT_AGG";
+ }
+
+ @SuppressWarnings("unchecked")
+ private Collection<Object> getOrInitCollection(Accumulator acc) {
+ return (Collection<Object>) acc.getMetricsFields().computeIfAbsent(outputField, k -> createLimitedCollection());
+ }
+
+ private Collection<Object> createLimitedCollection() {
+ return "all".equals(mode) ? new LinkedList<>() : new HashSet<>();
+ }
+ private List<Object> limitListSize(List<Object> list) {
+ return list.size() > maxSize ? list.subList(0, (int) maxSize) : list;
+ }
+
+ private Set<Object> limitSetSize(Set<Object> set) {
+ List<Object> list = new ArrayList<>(set);
+ list = list.size() > maxSize ? list.subList(0, (int) maxSize) : list;
+ return new HashSet<>(list);
+ }
+}
+
+
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/ArrayContactAggTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/ArrayContactAggTest.java
new file mode 100644
index 0000000..8eec775
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/ArrayContactAggTest.java
@@ -0,0 +1,195 @@
+package com.geedgenetworks.core.udf.test.aggregate;
+import com.geedgenetworks.api.common.udf.UDFContext;
+import com.geedgenetworks.api.event.Event;
+import com.geedgenetworks.common.config.Accumulator;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.core.udf.udaf.array.ArrayContactAgg;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.*;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class ArrayContactAggTest {
+
+ private ArrayContactAgg aggFunction;
+ private UDFContext udfContext;
+ private Accumulator accumulator;
+
+ @BeforeEach
+ void setUp() {
+ aggFunction = new ArrayContactAgg();
+ udfContext = new UDFContext();
+ udfContext.setLookupFields(Collections.singletonList("field1"));
+ udfContext.setParameters(new HashMap<>());
+ udfContext.setOutputFields(Collections.singletonList("outputField"));
+ udfContext.getParameters().put("max_size", 10L);
+ accumulator = new Accumulator();
+ accumulator.setMetricsFields(new HashMap<>());
+ }
+
+ @Test
+ void testOpenWithValidParameters() {
+ udfContext.getParameters().put("mode", "all");
+ udfContext.getParameters().put("max_size", 10L);
+
+ assertDoesNotThrow(() -> aggFunction.open(udfContext));
+ }
+
+ @Test
+ void testOpenWithInvalidParameters() {
+ UDFContext invalidContext = new UDFContext();
+ assertThrows(GrootStreamRuntimeException.class, () -> aggFunction.open(invalidContext));
+
+ invalidContext.setLookupFields(Collections.emptyList());
+ assertThrows(GrootStreamRuntimeException.class, () -> aggFunction.open(invalidContext));
+ }
+
+ @Test
+ void testInitAccumulatorWithNullOutputField() {
+ udfContext.setOutputFields(null);
+ udfContext.getParameters().put("mode", "all");
+ aggFunction.open(udfContext);
+
+ Accumulator acc = aggFunction.initAccumulator(accumulator);
+ assertTrue(acc.getMetricsFields().containsKey("field1"));
+ }
+
+ @Test
+ void testAddWithNullValue() {
+ udfContext.getParameters().put("mode", "all");
+ aggFunction.open(udfContext);
+ aggFunction.initAccumulator(accumulator);
+
+ Event event = new Event();
+
+ Map map = new HashMap();
+ map.put("field1", null);
+ event.setExtractedFields(map);
+
+ aggFunction.add(event, accumulator);
+ List<?> list = (List<?>) accumulator.getMetricsFields().get("outputField");
+ assertTrue(list.isEmpty());
+ }
+
+ @Test
+ void testAddWithMultipleValuesAndDuplicates() {
+ udfContext.getParameters().put("mode", "all");
+ aggFunction.open(udfContext);
+ aggFunction.initAccumulator(accumulator);
+
+ Event event1 = new Event();
+ event1.setExtractedFields(Map.of("field1", List.of("value1", "value2", "value2")));
+ aggFunction.add(event1, accumulator);
+
+ Event event2 = new Event();
+ event2.setExtractedFields(Map.of("field1", List.of("value3", "value2")));
+ aggFunction.add(event2, accumulator);
+
+ List<?> list = (List<?>) accumulator.getMetricsFields().get("outputField");
+
+ assertEquals(5, list.size());
+ assertEquals(Arrays.asList("value1", "value2", "value2", "value3", "value2"), list);
+ }
+
+ @Test
+ void testAddWithDistinctModeAndDuplicates() {
+ udfContext.getParameters().put("mode", "distinct");
+ aggFunction.open(udfContext);
+ aggFunction.initAccumulator(accumulator);
+
+ Event event1 = new Event();
+ event1.setExtractedFields(Map.of("field1", List.of("value1", "value2", "value2")));
+ aggFunction.add(event1, accumulator);
+
+ Event event2 = new Event();
+ event2.setExtractedFields(Map.of("field1", List.of("value3", "value2")));
+ aggFunction.add(event2, accumulator);
+
+ Set<?> set = (Set<?>) accumulator.getMetricsFields().get("outputField");
+
+ assertEquals(3, set.size());
+ assertTrue(set.containsAll(Set.of("value1", "value2", "value3")));
+ }
+
+ @Test
+ void testMergeAccumulatorsWithAllMode() {
+ udfContext.getParameters().put("mode", "all");
+ aggFunction.open(udfContext);
+
+ Accumulator acc1 = aggFunction.initAccumulator(accumulator);
+ Accumulator acc2 = new Accumulator();
+ acc2.setMetricsFields(new HashMap<>());
+ aggFunction.initAccumulator(acc2);
+
+ List<String> list1 = (List<String>) acc1.getMetricsFields().get("outputField");
+ List<String> list2 = (List<String>)acc2.getMetricsFields().get("outputField");
+
+ list1.addAll(Arrays.asList("value1", "value2"));
+ list2.addAll(Arrays.asList("value3", "value4"));
+
+ aggFunction.merge(acc1, acc2);
+
+ assertEquals(Arrays.asList("value1", "value2", "value3", "value4"), list1);
+ }
+
+ @Test
+ void testMergeAccumulatorsWithDistinctMode() {
+ udfContext.getParameters().put("mode", "distinct");
+ udfContext.getParameters().put("max_size", 10L);
+
+ aggFunction.open(udfContext);
+
+ Accumulator acc1 = aggFunction.initAccumulator(accumulator);
+ Accumulator acc2 = new Accumulator();
+ acc2.setMetricsFields(new HashMap<>());
+ aggFunction.initAccumulator(acc2);
+
+ Set<String> set1 = (Set<String>) acc1.getMetricsFields().get("outputField");
+ Set<String> set2 = (Set<String>) acc2.getMetricsFields().get("outputField");
+
+ set1.addAll(Set.of("value1", "value2"));
+ set2.addAll(Set.of("value2", "value3"));
+
+ aggFunction.merge(acc1, acc2);
+
+ assertEquals(3, set1.size());
+ assertTrue(set1.containsAll(Set.of("value1", "value2", "value3")));
+ }
+
+ @Test
+ void testAddWithMaxSizeExceeded() {
+ udfContext.getParameters().put("mode", "all");
+ udfContext.getParameters().put("max_size", 3L);
+ aggFunction.open(udfContext);
+ aggFunction.initAccumulator(accumulator);
+
+ Event event1 = new Event();
+ event1.setExtractedFields(Map.of("field1", List.of("value1", "value2", "value3", "value4")));
+ aggFunction.add(event1, accumulator);
+ aggFunction.getResult(accumulator);
+ List<?> list = (List<?>) accumulator.getMetricsFields().get("outputField");
+
+ assertEquals(3, list.size());
+ assertEquals(Arrays.asList("value1", "value2", "value3"), list); // 超出限制后拒绝添加
+ }
+
+ @Test
+ void testAddWithEmptyInputField() {
+ udfContext.getParameters().put("mode", "all");
+ aggFunction.open(udfContext);
+ aggFunction.initAccumulator(accumulator);
+
+ Event event = new Event();
+ event.setExtractedFields(Map.of()); // Missing "field1"
+
+ aggFunction.add(event, accumulator);
+ List<?> list = (List<?>) accumulator.getMetricsFields().get("outputField");
+
+ assertTrue(list.isEmpty());
+ }
+}
+
+
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java
index a3a0487..31af426 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java
@@ -17,7 +17,7 @@ public class CollectListTest {
public void testObjectType() {
// 测试默认 "object" 类型
List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.1");
- UDFContext udfContext = createUDFContext("object");
+ UDFContext udfContext = createUDFContext();
CollectList collectList = new CollectList();
collectList.open(udfContext);
@@ -37,9 +37,10 @@ public class CollectListTest {
// 测试 "array" 类型
List<List<String>> arrays = List.of(
List.of("192.168.1.1", "192.168.1.2"),
+ List.of("192.168.1.1", "192.168.1.2"),
List.of("192.168.1.3", "192.168.1.4")
);
- UDFContext udfContext = createUDFContext("array");
+ UDFContext udfContext = createUDFContext();
CollectList collectList = new CollectList();
collectList.open(udfContext);
@@ -50,17 +51,16 @@ public class CollectListTest {
Accumulator result = collectList.getResult(accumulator);
List<List<String>> aggregated = (List<List<String>>) result.getMetricsFields().get("field_list");
- assertEquals(aggregated.size(), 4);
- assertEquals("192.168.1.1", aggregated.get(0));
+ assertEquals(aggregated.size(), 3);
}
@Test
public void testMerge() {
// 测试合并逻辑
List<String> arr1 = List.of("192.168.1.1", "192.168.1.2");
- List<String> arr2 = List.of("192.168.1.3", "192.168.1.4");
+ List<String> arr2 = List.of("192.168.1.1", "192.168.1.4");
- UDFContext udfContext = createUDFContext("object");
+ UDFContext udfContext = createUDFContext();
CollectList collectList = new CollectList();
collectList.open(udfContext);
@@ -71,13 +71,12 @@ public class CollectListTest {
List<String> aggregated = (List<String>) merged.getMetricsFields().get("field_list");
assertEquals(aggregated.size(), 4);
- assertEquals("192.168.1.4", aggregated.get(3));
}
@Test
public void testEmptyInput() {
// 测试空输入
- UDFContext udfContext = createUDFContext("object");
+ UDFContext udfContext = createUDFContext();
CollectList collectList = new CollectList();
collectList.open(udfContext);
@@ -89,12 +88,11 @@ public class CollectListTest {
}
- private UDFContext createUDFContext(String collectType) {
+ private UDFContext createUDFContext() {
UDFContext udfContext = new UDFContext();
udfContext.setLookupFields(List.of("field"));
udfContext.setOutputFields(Collections.singletonList("field_list"));
Map<String, Object> parameters = new HashMap<>();
- parameters.put("collect_type", collectType);
udfContext.setParameters(parameters);
return udfContext;
}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java
index 694e482..08681b6 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java
@@ -1,6 +1,7 @@
package com.geedgenetworks.core.udf.test.aggregate;
import com.geedgenetworks.common.config.Accumulator;
+import com.geedgenetworks.core.udf.udaf.CollectList;
import com.geedgenetworks.core.udf.udaf.CollectSet;
import com.geedgenetworks.api.common.udf.UDFContext;
import com.geedgenetworks.api.event.Event;
@@ -15,8 +16,8 @@ public class CollectSetTest {
@Test
public void testObjectType() {
// 测试 "object" 类型,验证去重
- List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.1");
- UDFContext udfContext = createUDFContext("object");
+ List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3");
+ UDFContext udfContext = createUDFContext();
CollectSet collectSet = new CollectSet();
collectSet.open(udfContext);
@@ -27,15 +28,37 @@ public class CollectSetTest {
Accumulator result = collectSet.getResult(accumulator);
Set<String> aggregated = (Set<String>) result.getMetricsFields().get("field_list");
- assertEquals(aggregated.size(), 2);
+ assertEquals(aggregated.size(), 3);
assertTrue(aggregated.contains("192.168.1.1"));
assertTrue(aggregated.contains("192.168.1.2"));
+ assertTrue(aggregated.contains("192.168.1.3"));
+
}
+ @Test
+ public void testArrayType() {
+ // 测试 "array" 类型
+ List<List<String>> arrays = List.of(
+ List.of("192.168.1.1", "192.168.1.2"),
+ List.of("192.168.1.1", "192.168.1.2"),
+ List.of("192.168.1.3", "192.168.1.4")
+ );
+ UDFContext udfContext = createUDFContext();
+ CollectSet collectSet = new CollectSet();
+ collectSet.open(udfContext);
+ Accumulator accumulator = initializeAccumulator(collectSet, udfContext);
+ for (List<String> array : arrays) {
+ accumulator = addEvent(collectSet, udfContext, accumulator, "field", array);
+ }
+
+ Accumulator result = collectSet.getResult(accumulator);
+ Set<List<String>> aggregated = (Set<List<String>>) result.getMetricsFields().get("field_list");
+ assertEquals(aggregated.size(), 2);
+ }
@Test
public void testEmptyInput() {
// 测试空输入
- UDFContext udfContext = createUDFContext("object");
+ UDFContext udfContext = createUDFContext();
CollectSet collectSet = new CollectSet();
collectSet.open(udfContext);
@@ -53,7 +76,7 @@ public class CollectSetTest {
List<String> arr1 = List.of("192.168.1.1", "192.168.1.2");
List<String> arr2 = List.of("192.168.1.3", "192.168.1.1");
- UDFContext udfContext = createUDFContext("object");
+ UDFContext udfContext = createUDFContext();
CollectSet collectSet = new CollectSet();
collectSet.open(udfContext);
@@ -70,7 +93,7 @@ public class CollectSetTest {
@Test
public void testNullValues() {
// 测试字段为 null 的场景
- UDFContext udfContext = createUDFContext("object");
+ UDFContext udfContext = createUDFContext();
CollectSet collectSet = new CollectSet();
collectSet.open(udfContext);
@@ -89,7 +112,7 @@ public class CollectSetTest {
public void testDuplicateValues() {
// 测试重复值是否被正确去重
List<String> arr = List.of("192.168.1.1", "192.168.1.1", "192.168.1.1");
- UDFContext udfContext = createUDFContext("object");
+ UDFContext udfContext = createUDFContext();
CollectSet collectSet = new CollectSet();
collectSet.open(udfContext);
@@ -102,12 +125,11 @@ public class CollectSetTest {
assertTrue(aggregated.contains("192.168.1.1"));
}
- private UDFContext createUDFContext(String collectType) {
+ private UDFContext createUDFContext() {
UDFContext udfContext = new UDFContext();
udfContext.setLookupFields(List.of("field"));
udfContext.setOutputFields(Collections.singletonList("field_list"));
Map<String, Object> parameters = new HashMap<>();
- parameters.put("collect_type", collectType);
udfContext.setParameters(parameters);
return udfContext;
}