diff options
| author | 王宽 <[email protected]> | 2024-08-13 09:27:37 +0000 |
|---|---|---|
| committer | 王宽 <[email protected]> | 2024-08-13 09:27:37 +0000 |
| commit | 3b5e06b70db295b3f4ec8d1e455a095009c82bc0 (patch) | |
| tree | db9396d047dc2312d54eb88d5ff3947123d648d4 | |
| parent | 647296e18fc36fab3b1e01bf276115fb1200d0eb (diff) | |
| parent | e9d132716800f4b8ef46273b254c173c3d450864 (diff) | |
Merge branch 'feature/aggregate' into 'develop'
[improve][core]拆分聚合函数open方法,较少调用次数,优化性能
See merge request galaxy/platform/groot-stream!91
16 files changed, 70 insertions, 70 deletions
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java index 98450fd..455073f 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java @@ -7,7 +7,9 @@ import java.io.Serializable; public interface AggregateFunction extends Serializable { - Accumulator open(UDFContext udfContext,Accumulator acc); + void open(UDFContext udfContext); + + Accumulator initAccumulator(Accumulator acc); Accumulator add(Event val, Accumulator acc); @@ -15,6 +17,5 @@ public interface AggregateFunction extends Serializable { Accumulator getResult(Accumulator acc); - void close(); - + default void close(){}; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java index c07374e..803fefc 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java @@ -71,23 +71,24 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f } } + for (UdfEntity udfEntity : functions) { + udfEntity.getAggregateFunction().open(udfEntity.getUdfContext()); + } } catch (Exception e) { throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Initialization UDAF failed!", e); } } - @Override public Accumulator createAccumulator() { Map<String, Object> map = new HashMap<>(); Accumulator accumulator = new Accumulator(); accumulator.setMetricsFields(map); for (UdfEntity udfEntity : functions) { - udfEntity.getAggregateFunction().open(udfEntity.getUdfContext(), accumulator); + udfEntity.getAggregateFunction().initAccumulator(accumulator); } return accumulator; } - @Override public Accumulator add(Event event, Accumulator accumulator) { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java index 4a43163..423eff9 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java @@ -22,6 +22,8 @@ import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.processor.projection.UdfEntity; + import java.util.*; /** @@ -32,9 +34,8 @@ public class CollectList implements AggregateFunction { private String lookupField; private String outputField; - @Override - public Accumulator open(UDFContext udfContext,Accumulator acc) { + public void open(UDFContext udfContext) { if(udfContext.getLookup_fields()==null ){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } @@ -45,11 +46,14 @@ public class CollectList implements AggregateFunction { else { outputField = lookupField; } + + } + @Override + public Accumulator initAccumulator(Accumulator acc) { acc.getMetricsFields().put(outputField, new ArrayList<>()); return acc; } - @Override public Accumulator add(Event event, Accumulator acc) { if(event.getExtractedFields().containsKey(lookupField)){ @@ -71,8 +75,4 @@ public class CollectList implements AggregateFunction { return acc; } - @Override - public void close() { - - } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java index a425118..b4dfb14 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java @@ -8,6 +8,7 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.UDFContext; +import java.util.ArrayList; import java.util.HashSet; import java.util.Set; @@ -21,7 +22,7 @@ public class CollectSet implements AggregateFunction { @Override - public Accumulator open(UDFContext udfContext,Accumulator acc) { + public void open(UDFContext udfContext) { if(udfContext.getLookup_fields()==null ){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } @@ -32,11 +33,13 @@ public class CollectSet implements AggregateFunction { else { outputField = lookupField; } + } + @Override + public Accumulator initAccumulator(Accumulator acc) { acc.getMetricsFields().put(outputField, new HashSet<>()); return acc; } - @Override public Accumulator add(Event event, Accumulator acc) { if(event.getExtractedFields().containsKey(lookupField)){ @@ -58,8 +61,5 @@ public class CollectSet implements AggregateFunction { return acc; } - @Override - public void close() { - } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java index 27490ef..6301a01 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java @@ -23,6 +23,8 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.UDFContext; +import java.util.ArrayList; + /** * Collects elements within a group and returns the list of aggregated objects */ @@ -33,7 +35,7 @@ public class FirstValue implements AggregateFunction { @Override - public Accumulator open(UDFContext udfContext,Accumulator acc) { + public void open(UDFContext udfContext) { if(udfContext.getLookup_fields()==null ){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } @@ -44,9 +46,12 @@ public class FirstValue implements AggregateFunction { else { outputField = lookupField; } - return acc; } + @Override + public Accumulator initAccumulator(Accumulator acc) { + return acc; + } @Override public Accumulator add(Event event, Accumulator acc) { @@ -66,8 +71,4 @@ public class FirstValue implements AggregateFunction { return acc; } - @Override - public void close() { - - } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java index 4adafd4..f27a2e6 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java @@ -36,7 +36,7 @@ public class LastValue implements AggregateFunction { @Override - public Accumulator open(UDFContext udfContext,Accumulator acc) { + public void open(UDFContext udfContext) { if(udfContext.getLookup_fields()==null ){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } @@ -47,10 +47,12 @@ public class LastValue implements AggregateFunction { else { outputField = lookupField; } + } + @Override + public Accumulator initAccumulator(Accumulator acc) { return acc; } - @Override public Accumulator add(Event event, Accumulator acc) { if(event.getExtractedFields().containsKey(lookupField)){ @@ -69,8 +71,4 @@ public class LastValue implements AggregateFunction { return acc; } - @Override - public void close() { - - } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java index 5662935..ea33271 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java @@ -13,15 +13,16 @@ public class LongCount implements AggregateFunction { @Override - public Accumulator open(UDFContext udfContext,Accumulator acc){ + public void open(UDFContext udfContext){ if(udfContext.getOutput_fields()==null ){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } outputField = udfContext.getOutput_fields().get(0); + } + @Override + public Accumulator initAccumulator(Accumulator acc) { return acc; } - - @Override public Accumulator add(Event event, Accumulator acc) { @@ -39,9 +40,5 @@ public class LongCount implements AggregateFunction { return acc; } - @Override - public void close() { - - } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java index 88e4be6..2a615ef 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java @@ -17,7 +17,7 @@ public class Mean implements AggregateFunction { private Integer precision; private DecimalFormat df; @Override - public Accumulator open(UDFContext udfContext,Accumulator acc){ + public void open(UDFContext udfContext){ if(udfContext.getLookup_fields()==null ){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); @@ -41,11 +41,14 @@ public class Mean implements AggregateFunction { }else { precision = -1; } + + } + @Override + public Accumulator initAccumulator(Accumulator acc) { acc.getMetricsFields().put(outputField,new OnlineStatistics()); return acc; } - @Override public Accumulator add(Event event, Accumulator acc) { @@ -76,9 +79,4 @@ public class Mean implements AggregateFunction { return acc; } - @Override - public void close() { - - } - } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java index 4ed3143..01e9a5b 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java @@ -6,6 +6,7 @@ import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.pojo.OnlineStatistics; public class NumberSum implements AggregateFunction { @@ -14,7 +15,7 @@ public class NumberSum implements AggregateFunction { @Override - public Accumulator open(UDFContext udfContext,Accumulator acc){ + public void open(UDFContext udfContext){ if(udfContext.getLookup_fields()==null ){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } @@ -25,10 +26,12 @@ public class NumberSum implements AggregateFunction { else { outputField = lookupField; } - return acc; } - + @Override + public Accumulator initAccumulator(Accumulator acc) { + return acc; + } @Override public Accumulator add(Event event, Accumulator acc) { diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java index a01edb3..b0d846b 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java @@ -33,7 +33,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class CollectListTest { @Test - public void testNumberSumTest() throws ParseException { + public void test() throws ParseException { List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4"); excute(arr); @@ -49,7 +49,8 @@ public class CollectListTest { Map<String, Object> metricsFields = new HashMap<>(); Accumulator accumulator = new Accumulator(); accumulator.setMetricsFields(metricsFields); - Accumulator agg = collectList.open(udfContext,accumulator); + collectList.open(udfContext); + Accumulator agg = collectList.initAccumulator(accumulator); for (String o : arr) { Event event = new Event(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java index ae69d7c..ea4fe8d 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java @@ -47,8 +47,8 @@ public class CollectSetTest { Map<String, Object> metricsFields = new HashMap<>(); Accumulator accumulator = new Accumulator(); accumulator.setMetricsFields(metricsFields); - Accumulator agg = collectSet.open(udfContext,accumulator); - + collectSet.open(udfContext); + Accumulator agg = collectSet.initAccumulator(accumulator); for (String o : arr) { Event event = new Event(); Map<String, Object> extractedFields = new HashMap<>(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java index 2c4d460..506f6de 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java @@ -31,7 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class FirstValueTest { @Test - public void testNumberSumTest() throws ParseException { + public void test() throws ParseException { List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4"); excute(arr); @@ -47,8 +47,8 @@ public class FirstValueTest { Map<String, Object> metricsFields = new HashMap<>(); Accumulator accumulator = new Accumulator(); accumulator.setMetricsFields(metricsFields); - Accumulator agg = firstValue.open(udfContext,accumulator); - + firstValue.open(udfContext); + Accumulator agg = firstValue.initAccumulator(accumulator); for (String o : arr) { Event event = new Event(); Map<String, Object> extractedFields = new HashMap<>(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java index e9609f7..f8306cd 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java @@ -34,7 +34,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class LastValueTest { @Test - public void testNumberSumTest() throws ParseException { + public void test() throws ParseException { List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4"); excute(arr); @@ -50,8 +50,8 @@ public class LastValueTest { Map<String, Object> metricsFields = new HashMap<>(); Accumulator accumulator = new Accumulator(); accumulator.setMetricsFields(metricsFields); - Accumulator agg = lastValue.open(udfContext,accumulator); - + lastValue.open(udfContext); + Accumulator agg = lastValue.initAccumulator(accumulator); for (String o : arr) { Event event = new Event(); Map<String, Object> extractedFields = new HashMap<>(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java index 3bde558..3c02499 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java @@ -35,7 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class LongCountTest { @Test - public void testNumberSumTest() throws ParseException { + public void test() throws ParseException { Long[] longArr = new Long[]{1L, 2L, 3L, 4L}; excute(longArr); @@ -49,8 +49,8 @@ public class LongCountTest { Map<String, Object> metricsFields = new HashMap<>(); Accumulator accumulator = new Accumulator(); accumulator.setMetricsFields(metricsFields); - Accumulator agg = longCount.open(udfContext,accumulator); - + longCount.open(udfContext); + Accumulator agg = longCount.initAccumulator(accumulator); for (Number o : arr) { Event event = new Event(); Map<String, Object> extractedFields = new HashMap<>(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java index 807b7db..6deed0f 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java @@ -35,7 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class MeanTest { @Test - public void testNumberSumTest() throws ParseException { + public void test() throws ParseException { Integer[] intArr1 = new Integer[]{1, 2, 3, 4}; Integer[] intArr2 = new Integer[]{1, 6, 3}; @@ -55,8 +55,8 @@ public class MeanTest { Map<String, Object> metricsFields = new HashMap<>(); Accumulator accumulator = new Accumulator(); accumulator.setMetricsFields(metricsFields); - Accumulator agg = mean.open(udfContext,accumulator); - + mean.open(udfContext); + Accumulator agg = mean.initAccumulator(accumulator); for (Number o : arr) { Event event = new Event(); Map<String, Object> extractedFields = new HashMap<>(); @@ -80,8 +80,8 @@ public class MeanTest { Map<String, Object> metricsFields = new HashMap<>(); Accumulator accumulator = new Accumulator(); accumulator.setMetricsFields(metricsFields); - Accumulator agg = mean.open(udfContext,accumulator); - + mean.open(udfContext); + Accumulator agg = mean.initAccumulator(accumulator); for (Number o : arr) { Event event = new Event(); Map<String, Object> extractedFields = new HashMap<>(); @@ -103,8 +103,8 @@ public class MeanTest { Map<String, Object> metricsFields = new HashMap<>(); Accumulator accumulator = new Accumulator(); accumulator.setMetricsFields(metricsFields); - Accumulator agg = mean.open(udfContext,accumulator); - + mean.open(udfContext); + Accumulator agg = mean.initAccumulator(accumulator); for (Number o : arr) { Event event = new Event(); Map<String, Object> extractedFields = new HashMap<>(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java index a1cd54e..d0d3d2c 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java @@ -31,7 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class NumberSumTest { @Test - public void testNumberSumTest() throws ParseException { + public void test() throws ParseException { Integer[] intArr = new Integer[]{1, 2, 3, 4}; Long[] longArr = new Long[]{1L, 2L, 3L, 4L}; @@ -52,8 +52,8 @@ public class NumberSumTest { Map<String, Object> metricsFields = new HashMap<>(); Accumulator accumulator = new Accumulator(); accumulator.setMetricsFields(metricsFields); - Accumulator agg = numberSum.open(udfContext,accumulator); - + numberSum.open(udfContext); + Accumulator agg = numberSum.initAccumulator(accumulator); for (Number o : arr) { Event event = new Event(); Map<String, Object> extractedFields = new HashMap<>(); |
