From 7a9ccfa3b471cd7b80ecdd66452f7179ba131c83 Mon Sep 17 00:00:00 2001 From: wangkuan Date: Tue, 26 Nov 2024 18:39:28 +0800 Subject: [feature][core]CN-1730 修改函数类名为ArrayConcatAgg MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/udf.plugins | 2 +- groot-common/src/main/resources/udf.plugins | 2 +- .../core/udf/udaf/array/ArrayConcatAgg.java | 106 +++++++++++ .../core/udf/udaf/array/ArrayContactAgg.java | 106 ----------- .../udf/test/aggregate/ArrayConcatAggTest.java | 195 +++++++++++++++++++++ .../udf/test/aggregate/ArrayContactAggTest.java | 195 --------------------- 6 files changed, 303 insertions(+), 303 deletions(-) create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/array/ArrayConcatAgg.java delete mode 100644 groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/array/ArrayContactAgg.java create mode 100644 groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/ArrayConcatAggTest.java delete mode 100644 groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/ArrayContactAggTest.java diff --git a/config/udf.plugins b/config/udf.plugins index 6a3f245..a7cc218 100644 --- a/config/udf.plugins +++ b/config/udf.plugins @@ -37,4 +37,4 @@ com.geedgenetworks.core.udf.udtf.Unroll com.geedgenetworks.core.udf.udtf.PathUnroll com.geedgenetworks.core.udf.udaf.Max com.geedgenetworks.core.udf.udaf.Min -com.geedgenetworks.core.udf.udaf.array.ArrayContactAgg \ No newline at end of file +com.geedgenetworks.core.udf.udaf.array.ArrayConcatAgg \ No newline at end of file diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins index 6a3f245..a7cc218 100644 --- a/groot-common/src/main/resources/udf.plugins +++ b/groot-common/src/main/resources/udf.plugins @@ -37,4 +37,4 @@ com.geedgenetworks.core.udf.udtf.Unroll com.geedgenetworks.core.udf.udtf.PathUnroll com.geedgenetworks.core.udf.udaf.Max com.geedgenetworks.core.udf.udaf.Min -com.geedgenetworks.core.udf.udaf.array.ArrayContactAgg \ No newline at end of file +com.geedgenetworks.core.udf.udaf.array.ArrayConcatAgg \ No newline at end of file diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/array/ArrayConcatAgg.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/array/ArrayConcatAgg.java new file mode 100644 index 0000000..76fbfc9 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/array/ArrayConcatAgg.java @@ -0,0 +1,106 @@ +package com.geedgenetworks.core.udf.udaf.array; + + +import com.geedgenetworks.api.common.udf.AggregateFunction; +import com.geedgenetworks.api.common.udf.UDFContext; +import com.geedgenetworks.api.event.Event; +import com.geedgenetworks.common.config.Accumulator; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; + +import java.util.*; + +/** + * Aggregates elements within a group and returns a collection based on mode (list or set). + */ +public class ArrayConcatAgg implements AggregateFunction { + + private String lookupField; + private String outputField; + private String mode; + private long maxSize; + + @Override + public void open(UDFContext udfContext) { + // Validate input parameters + if (udfContext.getLookupFields() == null || udfContext.getLookupFields().isEmpty() || udfContext.getParameters() == null) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required lookup field parameter"); + } + this.lookupField = udfContext.getLookupFields().get(0); + this.outputField = (udfContext.getOutputFields() == null || udfContext.getOutputFields().isEmpty()) + ? lookupField : udfContext.getOutputFields().get(0); + this.mode = (String) udfContext.getParameters().getOrDefault("mode", "all"); + this.maxSize = Long.parseLong(udfContext.getParameters().getOrDefault("max_size", 1000000L).toString()); + } + + @Override + public Accumulator initAccumulator(Accumulator acc) { + acc.getMetricsFields().put(outputField, createLimitedCollection()); + return acc; + } + + @Override + public Accumulator add(Event event, Accumulator acc) { + Object valueObj = event.getExtractedFields().get(lookupField); + if (valueObj instanceof List) { + Collection collection = getOrInitCollection(acc); + collection.addAll((List) valueObj); + acc.getMetricsFields().put(outputField, collection); + } + return acc; + } + + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + Collection firstCollection = getOrInitCollection(firstAcc); + Collection secondCollection = getOrInitCollection(secondAcc); + + if (firstCollection != null && secondCollection != null) { + firstCollection.addAll(secondCollection); + } else if (firstCollection == null) { + firstAcc.getMetricsFields().put(outputField, secondCollection); + } + + return firstAcc; + } + + @Override + public Accumulator getResult(Accumulator acc) { + Collection collection = getOrInitCollection(acc); + // Ensure the result does not exceed maxSize + if (collection instanceof List) { + // Limit the size of List + collection = limitListSize((List) collection); + } else if (collection instanceof Set) { + // Limit the size of Set (converted to a list for limiting) + collection = limitSetSize((Set) collection); + } + acc.getMetricsFields().put(outputField, collection); + return acc; + } + + @Override + public String functionName() { + return "ARRAY_CONCAT_AGG"; + } + + @SuppressWarnings("unchecked") + private Collection getOrInitCollection(Accumulator acc) { + return (Collection) acc.getMetricsFields().computeIfAbsent(outputField, k -> createLimitedCollection()); + } + + private Collection createLimitedCollection() { + return "all".equals(mode) ? new LinkedList<>() : new HashSet<>(); + } + private List limitListSize(List list) { + return list.size() > maxSize ? list.subList(0, (int) maxSize) : list; + } + + private Set limitSetSize(Set set) { + List list = new ArrayList<>(set); + list = list.size() > maxSize ? list.subList(0, (int) maxSize) : list; + return new HashSet<>(list); + } +} + + diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/array/ArrayContactAgg.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/array/ArrayContactAgg.java deleted file mode 100644 index 04e8cce..0000000 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/array/ArrayContactAgg.java +++ /dev/null @@ -1,106 +0,0 @@ -package com.geedgenetworks.core.udf.udaf.array; - - -import com.geedgenetworks.api.common.udf.AggregateFunction; -import com.geedgenetworks.api.common.udf.UDFContext; -import com.geedgenetworks.api.event.Event; -import com.geedgenetworks.common.config.Accumulator; -import com.geedgenetworks.common.exception.CommonErrorCode; -import com.geedgenetworks.common.exception.GrootStreamRuntimeException; - -import java.util.*; - -/** - * Aggregates elements within a group and returns a collection based on mode (list or set). - */ -public class ArrayContactAgg implements AggregateFunction { - - private String lookupField; - private String outputField; - private String mode; - private long maxSize; - - @Override - public void open(UDFContext udfContext) { - // Validate input parameters - if (udfContext.getLookupFields() == null || udfContext.getLookupFields().isEmpty() || udfContext.getParameters() == null) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required lookup field parameter"); - } - this.lookupField = udfContext.getLookupFields().get(0); - this.outputField = (udfContext.getOutputFields() == null || udfContext.getOutputFields().isEmpty()) - ? lookupField : udfContext.getOutputFields().get(0); - this.mode = (String) udfContext.getParameters().getOrDefault("mode", "all"); - this.maxSize = Long.parseLong(udfContext.getParameters().getOrDefault("max_size", 1000000L).toString()); - } - - @Override - public Accumulator initAccumulator(Accumulator acc) { - acc.getMetricsFields().put(outputField, createLimitedCollection()); - return acc; - } - - @Override - public Accumulator add(Event event, Accumulator acc) { - Object valueObj = event.getExtractedFields().get(lookupField); - if (valueObj instanceof List) { - Collection collection = getOrInitCollection(acc); - collection.addAll((List) valueObj); - acc.getMetricsFields().put(outputField, collection); - } - return acc; - } - - @Override - public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { - Collection firstCollection = getOrInitCollection(firstAcc); - Collection secondCollection = getOrInitCollection(secondAcc); - - if (firstCollection != null && secondCollection != null) { - firstCollection.addAll(secondCollection); - } else if (firstCollection == null) { - firstAcc.getMetricsFields().put(outputField, secondCollection); - } - - return firstAcc; - } - - @Override - public Accumulator getResult(Accumulator acc) { - Collection collection = getOrInitCollection(acc); - // Ensure the result does not exceed maxSize - if (collection instanceof List) { - // Limit the size of List - collection = limitListSize((List) collection); - } else if (collection instanceof Set) { - // Limit the size of Set (converted to a list for limiting) - collection = limitSetSize((Set) collection); - } - acc.getMetricsFields().put(outputField, collection); - return acc; - } - - @Override - public String functionName() { - return "ARRAY_CONCAT_AGG"; - } - - @SuppressWarnings("unchecked") - private Collection getOrInitCollection(Accumulator acc) { - return (Collection) acc.getMetricsFields().computeIfAbsent(outputField, k -> createLimitedCollection()); - } - - private Collection createLimitedCollection() { - return "all".equals(mode) ? new LinkedList<>() : new HashSet<>(); - } - private List limitListSize(List list) { - return list.size() > maxSize ? list.subList(0, (int) maxSize) : list; - } - - private Set limitSetSize(Set set) { - List list = new ArrayList<>(set); - list = list.size() > maxSize ? list.subList(0, (int) maxSize) : list; - return new HashSet<>(list); - } -} - - diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/ArrayConcatAggTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/ArrayConcatAggTest.java new file mode 100644 index 0000000..742a160 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/ArrayConcatAggTest.java @@ -0,0 +1,195 @@ +package com.geedgenetworks.core.udf.test.aggregate; +import com.geedgenetworks.api.common.udf.UDFContext; +import com.geedgenetworks.api.event.Event; +import com.geedgenetworks.common.config.Accumulator; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.core.udf.udaf.array.ArrayConcatAgg; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.*; + +import static org.junit.jupiter.api.Assertions.*; + +class ArrayConcatAggTest { + + private ArrayConcatAgg aggFunction; + private UDFContext udfContext; + private Accumulator accumulator; + + @BeforeEach + void setUp() { + aggFunction = new ArrayConcatAgg(); + udfContext = new UDFContext(); + udfContext.setLookupFields(Collections.singletonList("field1")); + udfContext.setParameters(new HashMap<>()); + udfContext.setOutputFields(Collections.singletonList("outputField")); + udfContext.getParameters().put("max_size", 10L); + accumulator = new Accumulator(); + accumulator.setMetricsFields(new HashMap<>()); + } + + @Test + void testOpenWithValidParameters() { + udfContext.getParameters().put("mode", "all"); + udfContext.getParameters().put("max_size", 10L); + + assertDoesNotThrow(() -> aggFunction.open(udfContext)); + } + + @Test + void testOpenWithInvalidParameters() { + UDFContext invalidContext = new UDFContext(); + assertThrows(GrootStreamRuntimeException.class, () -> aggFunction.open(invalidContext)); + + invalidContext.setLookupFields(Collections.emptyList()); + assertThrows(GrootStreamRuntimeException.class, () -> aggFunction.open(invalidContext)); + } + + @Test + void testInitAccumulatorWithNullOutputField() { + udfContext.setOutputFields(null); + udfContext.getParameters().put("mode", "all"); + aggFunction.open(udfContext); + + Accumulator acc = aggFunction.initAccumulator(accumulator); + assertTrue(acc.getMetricsFields().containsKey("field1")); + } + + @Test + void testAddWithNullValue() { + udfContext.getParameters().put("mode", "all"); + aggFunction.open(udfContext); + aggFunction.initAccumulator(accumulator); + + Event event = new Event(); + + Map map = new HashMap(); + map.put("field1", null); + event.setExtractedFields(map); + + aggFunction.add(event, accumulator); + List list = (List) accumulator.getMetricsFields().get("outputField"); + assertTrue(list.isEmpty()); + } + + @Test + void testAddWithMultipleValuesAndDuplicates() { + udfContext.getParameters().put("mode", "all"); + aggFunction.open(udfContext); + aggFunction.initAccumulator(accumulator); + + Event event1 = new Event(); + event1.setExtractedFields(Map.of("field1", List.of("value1", "value2", "value2"))); + aggFunction.add(event1, accumulator); + + Event event2 = new Event(); + event2.setExtractedFields(Map.of("field1", List.of("value3", "value2"))); + aggFunction.add(event2, accumulator); + + List list = (List) accumulator.getMetricsFields().get("outputField"); + + assertEquals(5, list.size()); + assertEquals(Arrays.asList("value1", "value2", "value2", "value3", "value2"), list); + } + + @Test + void testAddWithDistinctModeAndDuplicates() { + udfContext.getParameters().put("mode", "distinct"); + aggFunction.open(udfContext); + aggFunction.initAccumulator(accumulator); + + Event event1 = new Event(); + event1.setExtractedFields(Map.of("field1", List.of("value1", "value2", "value2"))); + aggFunction.add(event1, accumulator); + + Event event2 = new Event(); + event2.setExtractedFields(Map.of("field1", List.of("value3", "value2"))); + aggFunction.add(event2, accumulator); + + Set set = (Set) accumulator.getMetricsFields().get("outputField"); + + assertEquals(3, set.size()); + assertTrue(set.containsAll(Set.of("value1", "value2", "value3"))); + } + + @Test + void testMergeAccumulatorsWithAllMode() { + udfContext.getParameters().put("mode", "all"); + aggFunction.open(udfContext); + + Accumulator acc1 = aggFunction.initAccumulator(accumulator); + Accumulator acc2 = new Accumulator(); + acc2.setMetricsFields(new HashMap<>()); + aggFunction.initAccumulator(acc2); + + List list1 = (List) acc1.getMetricsFields().get("outputField"); + List list2 = (List)acc2.getMetricsFields().get("outputField"); + + list1.addAll(Arrays.asList("value1", "value2")); + list2.addAll(Arrays.asList("value3", "value4")); + + aggFunction.merge(acc1, acc2); + + assertEquals(Arrays.asList("value1", "value2", "value3", "value4"), list1); + } + + @Test + void testMergeAccumulatorsWithDistinctMode() { + udfContext.getParameters().put("mode", "distinct"); + udfContext.getParameters().put("max_size", 10L); + + aggFunction.open(udfContext); + + Accumulator acc1 = aggFunction.initAccumulator(accumulator); + Accumulator acc2 = new Accumulator(); + acc2.setMetricsFields(new HashMap<>()); + aggFunction.initAccumulator(acc2); + + Set set1 = (Set) acc1.getMetricsFields().get("outputField"); + Set set2 = (Set) acc2.getMetricsFields().get("outputField"); + + set1.addAll(Set.of("value1", "value2")); + set2.addAll(Set.of("value2", "value3")); + + aggFunction.merge(acc1, acc2); + + assertEquals(3, set1.size()); + assertTrue(set1.containsAll(Set.of("value1", "value2", "value3"))); + } + + @Test + void testAddWithMaxSizeExceeded() { + udfContext.getParameters().put("mode", "all"); + udfContext.getParameters().put("max_size", 3L); + aggFunction.open(udfContext); + aggFunction.initAccumulator(accumulator); + + Event event1 = new Event(); + event1.setExtractedFields(Map.of("field1", List.of("value1", "value2", "value3", "value4"))); + aggFunction.add(event1, accumulator); + aggFunction.getResult(accumulator); + List list = (List) accumulator.getMetricsFields().get("outputField"); + + assertEquals(3, list.size()); + assertEquals(Arrays.asList("value1", "value2", "value3"), list); // 超出限制后拒绝添加 + } + + @Test + void testAddWithEmptyInputField() { + udfContext.getParameters().put("mode", "all"); + aggFunction.open(udfContext); + aggFunction.initAccumulator(accumulator); + + Event event = new Event(); + event.setExtractedFields(Map.of()); // Missing "field1" + + aggFunction.add(event, accumulator); + List list = (List) accumulator.getMetricsFields().get("outputField"); + + assertTrue(list.isEmpty()); + } +} + + diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/ArrayContactAggTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/ArrayContactAggTest.java deleted file mode 100644 index 8eec775..0000000 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/ArrayContactAggTest.java +++ /dev/null @@ -1,195 +0,0 @@ -package com.geedgenetworks.core.udf.test.aggregate; -import com.geedgenetworks.api.common.udf.UDFContext; -import com.geedgenetworks.api.event.Event; -import com.geedgenetworks.common.config.Accumulator; -import com.geedgenetworks.common.exception.GrootStreamRuntimeException; -import com.geedgenetworks.core.udf.udaf.array.ArrayContactAgg; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.util.*; - -import static org.junit.jupiter.api.Assertions.*; - -class ArrayContactAggTest { - - private ArrayContactAgg aggFunction; - private UDFContext udfContext; - private Accumulator accumulator; - - @BeforeEach - void setUp() { - aggFunction = new ArrayContactAgg(); - udfContext = new UDFContext(); - udfContext.setLookupFields(Collections.singletonList("field1")); - udfContext.setParameters(new HashMap<>()); - udfContext.setOutputFields(Collections.singletonList("outputField")); - udfContext.getParameters().put("max_size", 10L); - accumulator = new Accumulator(); - accumulator.setMetricsFields(new HashMap<>()); - } - - @Test - void testOpenWithValidParameters() { - udfContext.getParameters().put("mode", "all"); - udfContext.getParameters().put("max_size", 10L); - - assertDoesNotThrow(() -> aggFunction.open(udfContext)); - } - - @Test - void testOpenWithInvalidParameters() { - UDFContext invalidContext = new UDFContext(); - assertThrows(GrootStreamRuntimeException.class, () -> aggFunction.open(invalidContext)); - - invalidContext.setLookupFields(Collections.emptyList()); - assertThrows(GrootStreamRuntimeException.class, () -> aggFunction.open(invalidContext)); - } - - @Test - void testInitAccumulatorWithNullOutputField() { - udfContext.setOutputFields(null); - udfContext.getParameters().put("mode", "all"); - aggFunction.open(udfContext); - - Accumulator acc = aggFunction.initAccumulator(accumulator); - assertTrue(acc.getMetricsFields().containsKey("field1")); - } - - @Test - void testAddWithNullValue() { - udfContext.getParameters().put("mode", "all"); - aggFunction.open(udfContext); - aggFunction.initAccumulator(accumulator); - - Event event = new Event(); - - Map map = new HashMap(); - map.put("field1", null); - event.setExtractedFields(map); - - aggFunction.add(event, accumulator); - List list = (List) accumulator.getMetricsFields().get("outputField"); - assertTrue(list.isEmpty()); - } - - @Test - void testAddWithMultipleValuesAndDuplicates() { - udfContext.getParameters().put("mode", "all"); - aggFunction.open(udfContext); - aggFunction.initAccumulator(accumulator); - - Event event1 = new Event(); - event1.setExtractedFields(Map.of("field1", List.of("value1", "value2", "value2"))); - aggFunction.add(event1, accumulator); - - Event event2 = new Event(); - event2.setExtractedFields(Map.of("field1", List.of("value3", "value2"))); - aggFunction.add(event2, accumulator); - - List list = (List) accumulator.getMetricsFields().get("outputField"); - - assertEquals(5, list.size()); - assertEquals(Arrays.asList("value1", "value2", "value2", "value3", "value2"), list); - } - - @Test - void testAddWithDistinctModeAndDuplicates() { - udfContext.getParameters().put("mode", "distinct"); - aggFunction.open(udfContext); - aggFunction.initAccumulator(accumulator); - - Event event1 = new Event(); - event1.setExtractedFields(Map.of("field1", List.of("value1", "value2", "value2"))); - aggFunction.add(event1, accumulator); - - Event event2 = new Event(); - event2.setExtractedFields(Map.of("field1", List.of("value3", "value2"))); - aggFunction.add(event2, accumulator); - - Set set = (Set) accumulator.getMetricsFields().get("outputField"); - - assertEquals(3, set.size()); - assertTrue(set.containsAll(Set.of("value1", "value2", "value3"))); - } - - @Test - void testMergeAccumulatorsWithAllMode() { - udfContext.getParameters().put("mode", "all"); - aggFunction.open(udfContext); - - Accumulator acc1 = aggFunction.initAccumulator(accumulator); - Accumulator acc2 = new Accumulator(); - acc2.setMetricsFields(new HashMap<>()); - aggFunction.initAccumulator(acc2); - - List list1 = (List) acc1.getMetricsFields().get("outputField"); - List list2 = (List)acc2.getMetricsFields().get("outputField"); - - list1.addAll(Arrays.asList("value1", "value2")); - list2.addAll(Arrays.asList("value3", "value4")); - - aggFunction.merge(acc1, acc2); - - assertEquals(Arrays.asList("value1", "value2", "value3", "value4"), list1); - } - - @Test - void testMergeAccumulatorsWithDistinctMode() { - udfContext.getParameters().put("mode", "distinct"); - udfContext.getParameters().put("max_size", 10L); - - aggFunction.open(udfContext); - - Accumulator acc1 = aggFunction.initAccumulator(accumulator); - Accumulator acc2 = new Accumulator(); - acc2.setMetricsFields(new HashMap<>()); - aggFunction.initAccumulator(acc2); - - Set set1 = (Set) acc1.getMetricsFields().get("outputField"); - Set set2 = (Set) acc2.getMetricsFields().get("outputField"); - - set1.addAll(Set.of("value1", "value2")); - set2.addAll(Set.of("value2", "value3")); - - aggFunction.merge(acc1, acc2); - - assertEquals(3, set1.size()); - assertTrue(set1.containsAll(Set.of("value1", "value2", "value3"))); - } - - @Test - void testAddWithMaxSizeExceeded() { - udfContext.getParameters().put("mode", "all"); - udfContext.getParameters().put("max_size", 3L); - aggFunction.open(udfContext); - aggFunction.initAccumulator(accumulator); - - Event event1 = new Event(); - event1.setExtractedFields(Map.of("field1", List.of("value1", "value2", "value3", "value4"))); - aggFunction.add(event1, accumulator); - aggFunction.getResult(accumulator); - List list = (List) accumulator.getMetricsFields().get("outputField"); - - assertEquals(3, list.size()); - assertEquals(Arrays.asList("value1", "value2", "value3"), list); // 超出限制后拒绝添加 - } - - @Test - void testAddWithEmptyInputField() { - udfContext.getParameters().put("mode", "all"); - aggFunction.open(udfContext); - aggFunction.initAccumulator(accumulator); - - Event event = new Event(); - event.setExtractedFields(Map.of()); // Missing "field1" - - aggFunction.add(event, accumulator); - List list = (List) accumulator.getMetricsFields().get("outputField"); - - assertTrue(list.isEmpty()); - } -} - - -- cgit v1.2.3