diff options
| author | doufenghu <[email protected]> | 2024-11-26 18:30:52 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-11-26 18:30:52 +0800 |
| commit | d445302cea095c538d30a8b11d134845cd671bac (patch) | |
| tree | a5eff682e8541e9c4fd39a2c95c9b7d8959ff9f0 | |
| parent | 5d14e59f1a590996a3014a3fff9a99444a3fd4bd (diff) | |
| parent | 56c4a9a7e1619b01662db092934019a83dbf2168 (diff) | |
Merge branch 'develop' of https://git.mesalab.cn/galaxy/platform/groot-stream into develop
10 files changed, 358 insertions, 60 deletions
diff --git a/config/udf.plugins b/config/udf.plugins index 3d6a353..6a3f245 100644 --- a/config/udf.plugins +++ b/config/udf.plugins @@ -36,4 +36,5 @@ com.geedgenetworks.core.udf.udtf.JsonUnroll com.geedgenetworks.core.udf.udtf.Unroll com.geedgenetworks.core.udf.udtf.PathUnroll com.geedgenetworks.core.udf.udaf.Max -com.geedgenetworks.core.udf.udaf.Min
\ No newline at end of file +com.geedgenetworks.core.udf.udaf.Min +com.geedgenetworks.core.udf.udaf.array.ArrayContactAgg
\ No newline at end of file diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobAggTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobAggTest.java index 5945e51..779a8a5 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobAggTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobAggTest.java @@ -73,7 +73,7 @@ public class JobAggTest { Set<String> set = (Set<String>) CollectSink.values.get(1).getExtractedFields().get("server_ip_set"); Set<String> set2 = (Set<String>) CollectSink.values.get(1).getExtractedFields().get("client_ips_set"); Assert.assertEquals(1, set.size()); - Assert.assertEquals(3, set2.size()); + Assert.assertEquals(2, set2.size()); Assert.assertEquals(2, list.size()); Assert.assertEquals("2", CollectSink.values.get(1).getExtractedFields().get("count").toString()); diff --git a/groot-bootstrap/src/test/resources/grootstream_job_agg_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_agg_test.yaml index 1ccaf3d..2633f6b 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_agg_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_agg_test.yaml @@ -51,11 +51,12 @@ postprocessing_pipelines: - function: LAST_VALUE lookup_fields: [ log_id ] output_fields: [ log_id_last ] - - function: COLLECT_SET + - function: ARRAY_CONCAT_AGG lookup_fields: [ client_ips ] output_fields: [ client_ips_set ] parameters: - collect_type: array + mode: distinct + max_size: 2 application: # [object] Application Configuration env: # [object] Environment Variables name: groot-stream-job # [string] Job Name diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins index 3d6a353..6a3f245 100644 --- a/groot-common/src/main/resources/udf.plugins +++ b/groot-common/src/main/resources/udf.plugins @@ -36,4 +36,5 @@ com.geedgenetworks.core.udf.udtf.JsonUnroll com.geedgenetworks.core.udf.udtf.Unroll com.geedgenetworks.core.udf.udtf.PathUnroll com.geedgenetworks.core.udf.udaf.Max -com.geedgenetworks.core.udf.udaf.Min
\ No newline at end of file +com.geedgenetworks.core.udf.udaf.Min +com.geedgenetworks.core.udf.udaf.array.ArrayContactAgg
\ No newline at end of file diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java index bd6b76a..26a4087 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java @@ -7,7 +7,8 @@ import com.geedgenetworks.common.config.Accumulator; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; -import java.util.*; +import java.util.ArrayList; +import java.util.List; /** * Collects elements within a group and returns the list of aggregated objects @@ -16,7 +17,6 @@ public class CollectList implements AggregateFunction { private String lookupField; private String outputField; - private String collectType; @Override public void open(UDFContext udfContext) { @@ -28,9 +28,7 @@ public class CollectList implements AggregateFunction { this.outputField = udfContext.getOutputFields() == null || udfContext.getOutputFields().isEmpty() ? lookupField : udfContext.getOutputFields().get(0); - this.collectType = udfContext.getParameters() == null - ? "object" - : udfContext.getParameters().getOrDefault("collect_type", "object").toString(); + } @Override @@ -45,16 +43,7 @@ public class CollectList implements AggregateFunction { if (valueObj == null) { return acc; } - if (collectType.equals("array")) { - if (valueObj instanceof List<?>) { - List<?> valueList = (List<?>) valueObj; - List<Object> aggregateList = getOrInitList(acc); - aggregateList.addAll(valueList); - } - } else { - getOrInitList(acc).add(valueObj); - } - + getOrInitList(acc).add(valueObj); return acc; } @@ -92,5 +81,4 @@ public class CollectList implements AggregateFunction { } - } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java index ddca8d1..5397a19 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java @@ -1,15 +1,15 @@ package com.geedgenetworks.core.udf.udaf; -import com.geedgenetworks.common.config.Accumulator; -import com.geedgenetworks.common.exception.CommonErrorCode; -import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.api.common.udf.AggregateFunction; import com.geedgenetworks.api.common.udf.UDFContext; import com.geedgenetworks.api.event.Event; +import com.geedgenetworks.common.config.Accumulator; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; - -import java.util.*; +import java.util.HashSet; +import java.util.Set; /** * Collects elements within a group and returns the list of aggregated objects @@ -18,7 +18,6 @@ public class CollectSet implements AggregateFunction { private String lookupField; private String outputField; - private String collectType; @Override public void open(UDFContext udfContext) { @@ -30,9 +29,6 @@ public class CollectSet implements AggregateFunction { this.outputField = udfContext.getOutputFields() == null || udfContext.getOutputFields().isEmpty() ? lookupField : udfContext.getOutputFields().get(0); - this.collectType = udfContext.getParameters() == null - ? "object" - : udfContext.getParameters().getOrDefault("collect_type", "object").toString(); } @Override @@ -47,17 +43,7 @@ public class CollectSet implements AggregateFunction { if (valueObj == null) { return acc; } - - if (collectType.equals("array")) { - if (valueObj instanceof List<?>) { - List<?> valueList = (List<?>) valueObj; - Set<Object> aggregateSet = getOrInitSet(acc); - aggregateSet.addAll(valueList); - } - } else { - getOrInitSet(acc).add(valueObj); - } - + getOrInitSet(acc).add(valueObj); return acc; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/array/ArrayContactAgg.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/array/ArrayContactAgg.java new file mode 100644 index 0000000..04e8cce --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/array/ArrayContactAgg.java @@ -0,0 +1,106 @@ +package com.geedgenetworks.core.udf.udaf.array; + + +import com.geedgenetworks.api.common.udf.AggregateFunction; +import com.geedgenetworks.api.common.udf.UDFContext; +import com.geedgenetworks.api.event.Event; +import com.geedgenetworks.common.config.Accumulator; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; + +import java.util.*; + +/** + * Aggregates elements within a group and returns a collection based on mode (list or set). + */ +public class ArrayContactAgg implements AggregateFunction { + + private String lookupField; + private String outputField; + private String mode; + private long maxSize; + + @Override + public void open(UDFContext udfContext) { + // Validate input parameters + if (udfContext.getLookupFields() == null || udfContext.getLookupFields().isEmpty() || udfContext.getParameters() == null) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required lookup field parameter"); + } + this.lookupField = udfContext.getLookupFields().get(0); + this.outputField = (udfContext.getOutputFields() == null || udfContext.getOutputFields().isEmpty()) + ? lookupField : udfContext.getOutputFields().get(0); + this.mode = (String) udfContext.getParameters().getOrDefault("mode", "all"); + this.maxSize = Long.parseLong(udfContext.getParameters().getOrDefault("max_size", 1000000L).toString()); + } + + @Override + public Accumulator initAccumulator(Accumulator acc) { + acc.getMetricsFields().put(outputField, createLimitedCollection()); + return acc; + } + + @Override + public Accumulator add(Event event, Accumulator acc) { + Object valueObj = event.getExtractedFields().get(lookupField); + if (valueObj instanceof List<?>) { + Collection<Object> collection = getOrInitCollection(acc); + collection.addAll((List<?>) valueObj); + acc.getMetricsFields().put(outputField, collection); + } + return acc; + } + + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + Collection<Object> firstCollection = getOrInitCollection(firstAcc); + Collection<Object> secondCollection = getOrInitCollection(secondAcc); + + if (firstCollection != null && secondCollection != null) { + firstCollection.addAll(secondCollection); + } else if (firstCollection == null) { + firstAcc.getMetricsFields().put(outputField, secondCollection); + } + + return firstAcc; + } + + @Override + public Accumulator getResult(Accumulator acc) { + Collection<Object> collection = getOrInitCollection(acc); + // Ensure the result does not exceed maxSize + if (collection instanceof List) { + // Limit the size of List + collection = limitListSize((List<Object>) collection); + } else if (collection instanceof Set) { + // Limit the size of Set (converted to a list for limiting) + collection = limitSetSize((Set<Object>) collection); + } + acc.getMetricsFields().put(outputField, collection); + return acc; + } + + @Override + public String functionName() { + return "ARRAY_CONCAT_AGG"; + } + + @SuppressWarnings("unchecked") + private Collection<Object> getOrInitCollection(Accumulator acc) { + return (Collection<Object>) acc.getMetricsFields().computeIfAbsent(outputField, k -> createLimitedCollection()); + } + + private Collection<Object> createLimitedCollection() { + return "all".equals(mode) ? new LinkedList<>() : new HashSet<>(); + } + private List<Object> limitListSize(List<Object> list) { + return list.size() > maxSize ? list.subList(0, (int) maxSize) : list; + } + + private Set<Object> limitSetSize(Set<Object> set) { + List<Object> list = new ArrayList<>(set); + list = list.size() > maxSize ? list.subList(0, (int) maxSize) : list; + return new HashSet<>(list); + } +} + + diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/ArrayContactAggTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/ArrayContactAggTest.java new file mode 100644 index 0000000..8eec775 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/ArrayContactAggTest.java @@ -0,0 +1,195 @@ +package com.geedgenetworks.core.udf.test.aggregate; +import com.geedgenetworks.api.common.udf.UDFContext; +import com.geedgenetworks.api.event.Event; +import com.geedgenetworks.common.config.Accumulator; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.core.udf.udaf.array.ArrayContactAgg; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.*; + +import static org.junit.jupiter.api.Assertions.*; + +class ArrayContactAggTest { + + private ArrayContactAgg aggFunction; + private UDFContext udfContext; + private Accumulator accumulator; + + @BeforeEach + void setUp() { + aggFunction = new ArrayContactAgg(); + udfContext = new UDFContext(); + udfContext.setLookupFields(Collections.singletonList("field1")); + udfContext.setParameters(new HashMap<>()); + udfContext.setOutputFields(Collections.singletonList("outputField")); + udfContext.getParameters().put("max_size", 10L); + accumulator = new Accumulator(); + accumulator.setMetricsFields(new HashMap<>()); + } + + @Test + void testOpenWithValidParameters() { + udfContext.getParameters().put("mode", "all"); + udfContext.getParameters().put("max_size", 10L); + + assertDoesNotThrow(() -> aggFunction.open(udfContext)); + } + + @Test + void testOpenWithInvalidParameters() { + UDFContext invalidContext = new UDFContext(); + assertThrows(GrootStreamRuntimeException.class, () -> aggFunction.open(invalidContext)); + + invalidContext.setLookupFields(Collections.emptyList()); + assertThrows(GrootStreamRuntimeException.class, () -> aggFunction.open(invalidContext)); + } + + @Test + void testInitAccumulatorWithNullOutputField() { + udfContext.setOutputFields(null); + udfContext.getParameters().put("mode", "all"); + aggFunction.open(udfContext); + + Accumulator acc = aggFunction.initAccumulator(accumulator); + assertTrue(acc.getMetricsFields().containsKey("field1")); + } + + @Test + void testAddWithNullValue() { + udfContext.getParameters().put("mode", "all"); + aggFunction.open(udfContext); + aggFunction.initAccumulator(accumulator); + + Event event = new Event(); + + Map map = new HashMap(); + map.put("field1", null); + event.setExtractedFields(map); + + aggFunction.add(event, accumulator); + List<?> list = (List<?>) accumulator.getMetricsFields().get("outputField"); + assertTrue(list.isEmpty()); + } + + @Test + void testAddWithMultipleValuesAndDuplicates() { + udfContext.getParameters().put("mode", "all"); + aggFunction.open(udfContext); + aggFunction.initAccumulator(accumulator); + + Event event1 = new Event(); + event1.setExtractedFields(Map.of("field1", List.of("value1", "value2", "value2"))); + aggFunction.add(event1, accumulator); + + Event event2 = new Event(); + event2.setExtractedFields(Map.of("field1", List.of("value3", "value2"))); + aggFunction.add(event2, accumulator); + + List<?> list = (List<?>) accumulator.getMetricsFields().get("outputField"); + + assertEquals(5, list.size()); + assertEquals(Arrays.asList("value1", "value2", "value2", "value3", "value2"), list); + } + + @Test + void testAddWithDistinctModeAndDuplicates() { + udfContext.getParameters().put("mode", "distinct"); + aggFunction.open(udfContext); + aggFunction.initAccumulator(accumulator); + + Event event1 = new Event(); + event1.setExtractedFields(Map.of("field1", List.of("value1", "value2", "value2"))); + aggFunction.add(event1, accumulator); + + Event event2 = new Event(); + event2.setExtractedFields(Map.of("field1", List.of("value3", "value2"))); + aggFunction.add(event2, accumulator); + + Set<?> set = (Set<?>) accumulator.getMetricsFields().get("outputField"); + + assertEquals(3, set.size()); + assertTrue(set.containsAll(Set.of("value1", "value2", "value3"))); + } + + @Test + void testMergeAccumulatorsWithAllMode() { + udfContext.getParameters().put("mode", "all"); + aggFunction.open(udfContext); + + Accumulator acc1 = aggFunction.initAccumulator(accumulator); + Accumulator acc2 = new Accumulator(); + acc2.setMetricsFields(new HashMap<>()); + aggFunction.initAccumulator(acc2); + + List<String> list1 = (List<String>) acc1.getMetricsFields().get("outputField"); + List<String> list2 = (List<String>)acc2.getMetricsFields().get("outputField"); + + list1.addAll(Arrays.asList("value1", "value2")); + list2.addAll(Arrays.asList("value3", "value4")); + + aggFunction.merge(acc1, acc2); + + assertEquals(Arrays.asList("value1", "value2", "value3", "value4"), list1); + } + + @Test + void testMergeAccumulatorsWithDistinctMode() { + udfContext.getParameters().put("mode", "distinct"); + udfContext.getParameters().put("max_size", 10L); + + aggFunction.open(udfContext); + + Accumulator acc1 = aggFunction.initAccumulator(accumulator); + Accumulator acc2 = new Accumulator(); + acc2.setMetricsFields(new HashMap<>()); + aggFunction.initAccumulator(acc2); + + Set<String> set1 = (Set<String>) acc1.getMetricsFields().get("outputField"); + Set<String> set2 = (Set<String>) acc2.getMetricsFields().get("outputField"); + + set1.addAll(Set.of("value1", "value2")); + set2.addAll(Set.of("value2", "value3")); + + aggFunction.merge(acc1, acc2); + + assertEquals(3, set1.size()); + assertTrue(set1.containsAll(Set.of("value1", "value2", "value3"))); + } + + @Test + void testAddWithMaxSizeExceeded() { + udfContext.getParameters().put("mode", "all"); + udfContext.getParameters().put("max_size", 3L); + aggFunction.open(udfContext); + aggFunction.initAccumulator(accumulator); + + Event event1 = new Event(); + event1.setExtractedFields(Map.of("field1", List.of("value1", "value2", "value3", "value4"))); + aggFunction.add(event1, accumulator); + aggFunction.getResult(accumulator); + List<?> list = (List<?>) accumulator.getMetricsFields().get("outputField"); + + assertEquals(3, list.size()); + assertEquals(Arrays.asList("value1", "value2", "value3"), list); // 超出限制后拒绝添加 + } + + @Test + void testAddWithEmptyInputField() { + udfContext.getParameters().put("mode", "all"); + aggFunction.open(udfContext); + aggFunction.initAccumulator(accumulator); + + Event event = new Event(); + event.setExtractedFields(Map.of()); // Missing "field1" + + aggFunction.add(event, accumulator); + List<?> list = (List<?>) accumulator.getMetricsFields().get("outputField"); + + assertTrue(list.isEmpty()); + } +} + + diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java index a3a0487..31af426 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java @@ -17,7 +17,7 @@ public class CollectListTest { public void testObjectType() { // 测试默认 "object" 类型 List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.1"); - UDFContext udfContext = createUDFContext("object"); + UDFContext udfContext = createUDFContext(); CollectList collectList = new CollectList(); collectList.open(udfContext); @@ -37,9 +37,10 @@ public class CollectListTest { // 测试 "array" 类型 List<List<String>> arrays = List.of( List.of("192.168.1.1", "192.168.1.2"), + List.of("192.168.1.1", "192.168.1.2"), List.of("192.168.1.3", "192.168.1.4") ); - UDFContext udfContext = createUDFContext("array"); + UDFContext udfContext = createUDFContext(); CollectList collectList = new CollectList(); collectList.open(udfContext); @@ -50,17 +51,16 @@ public class CollectListTest { Accumulator result = collectList.getResult(accumulator); List<List<String>> aggregated = (List<List<String>>) result.getMetricsFields().get("field_list"); - assertEquals(aggregated.size(), 4); - assertEquals("192.168.1.1", aggregated.get(0)); + assertEquals(aggregated.size(), 3); } @Test public void testMerge() { // 测试合并逻辑 List<String> arr1 = List.of("192.168.1.1", "192.168.1.2"); - List<String> arr2 = List.of("192.168.1.3", "192.168.1.4"); + List<String> arr2 = List.of("192.168.1.1", "192.168.1.4"); - UDFContext udfContext = createUDFContext("object"); + UDFContext udfContext = createUDFContext(); CollectList collectList = new CollectList(); collectList.open(udfContext); @@ -71,13 +71,12 @@ public class CollectListTest { List<String> aggregated = (List<String>) merged.getMetricsFields().get("field_list"); assertEquals(aggregated.size(), 4); - assertEquals("192.168.1.4", aggregated.get(3)); } @Test public void testEmptyInput() { // 测试空输入 - UDFContext udfContext = createUDFContext("object"); + UDFContext udfContext = createUDFContext(); CollectList collectList = new CollectList(); collectList.open(udfContext); @@ -89,12 +88,11 @@ public class CollectListTest { } - private UDFContext createUDFContext(String collectType) { + private UDFContext createUDFContext() { UDFContext udfContext = new UDFContext(); udfContext.setLookupFields(List.of("field")); udfContext.setOutputFields(Collections.singletonList("field_list")); Map<String, Object> parameters = new HashMap<>(); - parameters.put("collect_type", collectType); udfContext.setParameters(parameters); return udfContext; } diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java index 694e482..08681b6 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java @@ -1,6 +1,7 @@ package com.geedgenetworks.core.udf.test.aggregate; import com.geedgenetworks.common.config.Accumulator; +import com.geedgenetworks.core.udf.udaf.CollectList; import com.geedgenetworks.core.udf.udaf.CollectSet; import com.geedgenetworks.api.common.udf.UDFContext; import com.geedgenetworks.api.event.Event; @@ -15,8 +16,8 @@ public class CollectSetTest { @Test public void testObjectType() { // 测试 "object" 类型,验证去重 - List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.1"); - UDFContext udfContext = createUDFContext("object"); + List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3"); + UDFContext udfContext = createUDFContext(); CollectSet collectSet = new CollectSet(); collectSet.open(udfContext); @@ -27,15 +28,37 @@ public class CollectSetTest { Accumulator result = collectSet.getResult(accumulator); Set<String> aggregated = (Set<String>) result.getMetricsFields().get("field_list"); - assertEquals(aggregated.size(), 2); + assertEquals(aggregated.size(), 3); assertTrue(aggregated.contains("192.168.1.1")); assertTrue(aggregated.contains("192.168.1.2")); + assertTrue(aggregated.contains("192.168.1.3")); + } + @Test + public void testArrayType() { + // 测试 "array" 类型 + List<List<String>> arrays = List.of( + List.of("192.168.1.1", "192.168.1.2"), + List.of("192.168.1.1", "192.168.1.2"), + List.of("192.168.1.3", "192.168.1.4") + ); + UDFContext udfContext = createUDFContext(); + CollectSet collectSet = new CollectSet(); + collectSet.open(udfContext); + Accumulator accumulator = initializeAccumulator(collectSet, udfContext); + for (List<String> array : arrays) { + accumulator = addEvent(collectSet, udfContext, accumulator, "field", array); + } + + Accumulator result = collectSet.getResult(accumulator); + Set<List<String>> aggregated = (Set<List<String>>) result.getMetricsFields().get("field_list"); + assertEquals(aggregated.size(), 2); + } @Test public void testEmptyInput() { // 测试空输入 - UDFContext udfContext = createUDFContext("object"); + UDFContext udfContext = createUDFContext(); CollectSet collectSet = new CollectSet(); collectSet.open(udfContext); @@ -53,7 +76,7 @@ public class CollectSetTest { List<String> arr1 = List.of("192.168.1.1", "192.168.1.2"); List<String> arr2 = List.of("192.168.1.3", "192.168.1.1"); - UDFContext udfContext = createUDFContext("object"); + UDFContext udfContext = createUDFContext(); CollectSet collectSet = new CollectSet(); collectSet.open(udfContext); @@ -70,7 +93,7 @@ public class CollectSetTest { @Test public void testNullValues() { // 测试字段为 null 的场景 - UDFContext udfContext = createUDFContext("object"); + UDFContext udfContext = createUDFContext(); CollectSet collectSet = new CollectSet(); collectSet.open(udfContext); @@ -89,7 +112,7 @@ public class CollectSetTest { public void testDuplicateValues() { // 测试重复值是否被正确去重 List<String> arr = List.of("192.168.1.1", "192.168.1.1", "192.168.1.1"); - UDFContext udfContext = createUDFContext("object"); + UDFContext udfContext = createUDFContext(); CollectSet collectSet = new CollectSet(); collectSet.open(udfContext); @@ -102,12 +125,11 @@ public class CollectSetTest { assertTrue(aggregated.contains("192.168.1.1")); } - private UDFContext createUDFContext(String collectType) { + private UDFContext createUDFContext() { UDFContext udfContext = new UDFContext(); udfContext.setLookupFields(List.of("field")); udfContext.setOutputFields(Collections.singletonList("field_list")); Map<String, Object> parameters = new HashMap<>(); - parameters.put("collect_type", collectType); udfContext.setParameters(parameters); return udfContext; } |
