summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author王宽 <[email protected]>2024-08-13 09:27:37 +0000
committer王宽 <[email protected]>2024-08-13 09:27:37 +0000
commit3b5e06b70db295b3f4ec8d1e455a095009c82bc0 (patch)
treedb9396d047dc2312d54eb88d5ff3947123d648d4
parent647296e18fc36fab3b1e01bf276115fb1200d0eb (diff)
parente9d132716800f4b8ef46273b254c173c3d450864 (diff)
Merge branch 'feature/aggregate' into 'develop'
[improve][core]拆分聚合函数open方法,较少调用次数,优化性能 See merge request galaxy/platform/groot-stream!91
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java7
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java7
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java14
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java10
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java13
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java10
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java11
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java12
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java9
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java5
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java4
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java6
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java6
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java6
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java14
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java6
16 files changed, 70 insertions, 70 deletions
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java
index 98450fd..455073f 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java
@@ -7,7 +7,9 @@ import java.io.Serializable;
public interface AggregateFunction extends Serializable {
- Accumulator open(UDFContext udfContext,Accumulator acc);
+ void open(UDFContext udfContext);
+
+ Accumulator initAccumulator(Accumulator acc);
Accumulator add(Event val, Accumulator acc);
@@ -15,6 +17,5 @@ public interface AggregateFunction extends Serializable {
Accumulator getResult(Accumulator acc);
- void close();
-
+ default void close(){};
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
index c07374e..803fefc 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
@@ -71,23 +71,24 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f
}
}
+ for (UdfEntity udfEntity : functions) {
+ udfEntity.getAggregateFunction().open(udfEntity.getUdfContext());
+ }
} catch (Exception e) {
throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Initialization UDAF failed!", e);
}
}
-
@Override
public Accumulator createAccumulator() {
Map<String, Object> map = new HashMap<>();
Accumulator accumulator = new Accumulator();
accumulator.setMetricsFields(map);
for (UdfEntity udfEntity : functions) {
- udfEntity.getAggregateFunction().open(udfEntity.getUdfContext(), accumulator);
+ udfEntity.getAggregateFunction().initAccumulator(accumulator);
}
return accumulator;
}
-
@Override
public Accumulator add(Event event, Accumulator accumulator) {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java
index 4a43163..423eff9 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java
@@ -22,6 +22,8 @@ import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.AggregateFunction;
import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.processor.projection.UdfEntity;
+
import java.util.*;
/**
@@ -32,9 +34,8 @@ public class CollectList implements AggregateFunction {
private String lookupField;
private String outputField;
-
@Override
- public Accumulator open(UDFContext udfContext,Accumulator acc) {
+ public void open(UDFContext udfContext) {
if(udfContext.getLookup_fields()==null ){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
@@ -45,11 +46,14 @@ public class CollectList implements AggregateFunction {
else {
outputField = lookupField;
}
+
+ }
+ @Override
+ public Accumulator initAccumulator(Accumulator acc) {
acc.getMetricsFields().put(outputField, new ArrayList<>());
return acc;
}
-
@Override
public Accumulator add(Event event, Accumulator acc) {
if(event.getExtractedFields().containsKey(lookupField)){
@@ -71,8 +75,4 @@ public class CollectList implements AggregateFunction {
return acc;
}
- @Override
- public void close() {
-
- }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java
index a425118..b4dfb14 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java
@@ -8,6 +8,7 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.AggregateFunction;
import com.geedgenetworks.common.udf.UDFContext;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
@@ -21,7 +22,7 @@ public class CollectSet implements AggregateFunction {
@Override
- public Accumulator open(UDFContext udfContext,Accumulator acc) {
+ public void open(UDFContext udfContext) {
if(udfContext.getLookup_fields()==null ){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
@@ -32,11 +33,13 @@ public class CollectSet implements AggregateFunction {
else {
outputField = lookupField;
}
+ }
+ @Override
+ public Accumulator initAccumulator(Accumulator acc) {
acc.getMetricsFields().put(outputField, new HashSet<>());
return acc;
}
-
@Override
public Accumulator add(Event event, Accumulator acc) {
if(event.getExtractedFields().containsKey(lookupField)){
@@ -58,8 +61,5 @@ public class CollectSet implements AggregateFunction {
return acc;
}
- @Override
- public void close() {
- }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java
index 27490ef..6301a01 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java
@@ -23,6 +23,8 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.AggregateFunction;
import com.geedgenetworks.common.udf.UDFContext;
+import java.util.ArrayList;
+
/**
* Collects elements within a group and returns the list of aggregated objects
*/
@@ -33,7 +35,7 @@ public class FirstValue implements AggregateFunction {
@Override
- public Accumulator open(UDFContext udfContext,Accumulator acc) {
+ public void open(UDFContext udfContext) {
if(udfContext.getLookup_fields()==null ){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
@@ -44,9 +46,12 @@ public class FirstValue implements AggregateFunction {
else {
outputField = lookupField;
}
- return acc;
}
+ @Override
+ public Accumulator initAccumulator(Accumulator acc) {
+ return acc;
+ }
@Override
public Accumulator add(Event event, Accumulator acc) {
@@ -66,8 +71,4 @@ public class FirstValue implements AggregateFunction {
return acc;
}
- @Override
- public void close() {
-
- }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java
index 4adafd4..f27a2e6 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java
@@ -36,7 +36,7 @@ public class LastValue implements AggregateFunction {
@Override
- public Accumulator open(UDFContext udfContext,Accumulator acc) {
+ public void open(UDFContext udfContext) {
if(udfContext.getLookup_fields()==null ){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
@@ -47,10 +47,12 @@ public class LastValue implements AggregateFunction {
else {
outputField = lookupField;
}
+ }
+ @Override
+ public Accumulator initAccumulator(Accumulator acc) {
return acc;
}
-
@Override
public Accumulator add(Event event, Accumulator acc) {
if(event.getExtractedFields().containsKey(lookupField)){
@@ -69,8 +71,4 @@ public class LastValue implements AggregateFunction {
return acc;
}
- @Override
- public void close() {
-
- }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java
index 5662935..ea33271 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java
@@ -13,15 +13,16 @@ public class LongCount implements AggregateFunction {
@Override
- public Accumulator open(UDFContext udfContext,Accumulator acc){
+ public void open(UDFContext udfContext){
if(udfContext.getOutput_fields()==null ){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
outputField = udfContext.getOutput_fields().get(0);
+ }
+ @Override
+ public Accumulator initAccumulator(Accumulator acc) {
return acc;
}
-
-
@Override
public Accumulator add(Event event, Accumulator acc) {
@@ -39,9 +40,5 @@ public class LongCount implements AggregateFunction {
return acc;
}
- @Override
- public void close() {
-
- }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java
index 88e4be6..2a615ef 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java
@@ -17,7 +17,7 @@ public class Mean implements AggregateFunction {
private Integer precision;
private DecimalFormat df;
@Override
- public Accumulator open(UDFContext udfContext,Accumulator acc){
+ public void open(UDFContext udfContext){
if(udfContext.getLookup_fields()==null ){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
@@ -41,11 +41,14 @@ public class Mean implements AggregateFunction {
}else {
precision = -1;
}
+
+ }
+ @Override
+ public Accumulator initAccumulator(Accumulator acc) {
acc.getMetricsFields().put(outputField,new OnlineStatistics());
return acc;
}
-
@Override
public Accumulator add(Event event, Accumulator acc) {
@@ -76,9 +79,4 @@ public class Mean implements AggregateFunction {
return acc;
}
- @Override
- public void close() {
-
- }
-
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java
index 4ed3143..01e9a5b 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java
@@ -6,6 +6,7 @@ import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.AggregateFunction;
import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.pojo.OnlineStatistics;
public class NumberSum implements AggregateFunction {
@@ -14,7 +15,7 @@ public class NumberSum implements AggregateFunction {
@Override
- public Accumulator open(UDFContext udfContext,Accumulator acc){
+ public void open(UDFContext udfContext){
if(udfContext.getLookup_fields()==null ){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
@@ -25,10 +26,12 @@ public class NumberSum implements AggregateFunction {
else {
outputField = lookupField;
}
- return acc;
}
-
+ @Override
+ public Accumulator initAccumulator(Accumulator acc) {
+ return acc;
+ }
@Override
public Accumulator add(Event event, Accumulator acc) {
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java
index a01edb3..b0d846b 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java
@@ -33,7 +33,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class CollectListTest {
@Test
- public void testNumberSumTest() throws ParseException {
+ public void test() throws ParseException {
List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4");
excute(arr);
@@ -49,7 +49,8 @@ public class CollectListTest {
Map<String, Object> metricsFields = new HashMap<>();
Accumulator accumulator = new Accumulator();
accumulator.setMetricsFields(metricsFields);
- Accumulator agg = collectList.open(udfContext,accumulator);
+ collectList.open(udfContext);
+ Accumulator agg = collectList.initAccumulator(accumulator);
for (String o : arr) {
Event event = new Event();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java
index ae69d7c..ea4fe8d 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java
@@ -47,8 +47,8 @@ public class CollectSetTest {
Map<String, Object> metricsFields = new HashMap<>();
Accumulator accumulator = new Accumulator();
accumulator.setMetricsFields(metricsFields);
- Accumulator agg = collectSet.open(udfContext,accumulator);
-
+ collectSet.open(udfContext);
+ Accumulator agg = collectSet.initAccumulator(accumulator);
for (String o : arr) {
Event event = new Event();
Map<String, Object> extractedFields = new HashMap<>();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java
index 2c4d460..506f6de 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java
@@ -31,7 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class FirstValueTest {
@Test
- public void testNumberSumTest() throws ParseException {
+ public void test() throws ParseException {
List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4");
excute(arr);
@@ -47,8 +47,8 @@ public class FirstValueTest {
Map<String, Object> metricsFields = new HashMap<>();
Accumulator accumulator = new Accumulator();
accumulator.setMetricsFields(metricsFields);
- Accumulator agg = firstValue.open(udfContext,accumulator);
-
+ firstValue.open(udfContext);
+ Accumulator agg = firstValue.initAccumulator(accumulator);
for (String o : arr) {
Event event = new Event();
Map<String, Object> extractedFields = new HashMap<>();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java
index e9609f7..f8306cd 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java
@@ -34,7 +34,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class LastValueTest {
@Test
- public void testNumberSumTest() throws ParseException {
+ public void test() throws ParseException {
List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4");
excute(arr);
@@ -50,8 +50,8 @@ public class LastValueTest {
Map<String, Object> metricsFields = new HashMap<>();
Accumulator accumulator = new Accumulator();
accumulator.setMetricsFields(metricsFields);
- Accumulator agg = lastValue.open(udfContext,accumulator);
-
+ lastValue.open(udfContext);
+ Accumulator agg = lastValue.initAccumulator(accumulator);
for (String o : arr) {
Event event = new Event();
Map<String, Object> extractedFields = new HashMap<>();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java
index 3bde558..3c02499 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java
@@ -35,7 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class LongCountTest {
@Test
- public void testNumberSumTest() throws ParseException {
+ public void test() throws ParseException {
Long[] longArr = new Long[]{1L, 2L, 3L, 4L};
excute(longArr);
@@ -49,8 +49,8 @@ public class LongCountTest {
Map<String, Object> metricsFields = new HashMap<>();
Accumulator accumulator = new Accumulator();
accumulator.setMetricsFields(metricsFields);
- Accumulator agg = longCount.open(udfContext,accumulator);
-
+ longCount.open(udfContext);
+ Accumulator agg = longCount.initAccumulator(accumulator);
for (Number o : arr) {
Event event = new Event();
Map<String, Object> extractedFields = new HashMap<>();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java
index 807b7db..6deed0f 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java
@@ -35,7 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class MeanTest {
@Test
- public void testNumberSumTest() throws ParseException {
+ public void test() throws ParseException {
Integer[] intArr1 = new Integer[]{1, 2, 3, 4};
Integer[] intArr2 = new Integer[]{1, 6, 3};
@@ -55,8 +55,8 @@ public class MeanTest {
Map<String, Object> metricsFields = new HashMap<>();
Accumulator accumulator = new Accumulator();
accumulator.setMetricsFields(metricsFields);
- Accumulator agg = mean.open(udfContext,accumulator);
-
+ mean.open(udfContext);
+ Accumulator agg = mean.initAccumulator(accumulator);
for (Number o : arr) {
Event event = new Event();
Map<String, Object> extractedFields = new HashMap<>();
@@ -80,8 +80,8 @@ public class MeanTest {
Map<String, Object> metricsFields = new HashMap<>();
Accumulator accumulator = new Accumulator();
accumulator.setMetricsFields(metricsFields);
- Accumulator agg = mean.open(udfContext,accumulator);
-
+ mean.open(udfContext);
+ Accumulator agg = mean.initAccumulator(accumulator);
for (Number o : arr) {
Event event = new Event();
Map<String, Object> extractedFields = new HashMap<>();
@@ -103,8 +103,8 @@ public class MeanTest {
Map<String, Object> metricsFields = new HashMap<>();
Accumulator accumulator = new Accumulator();
accumulator.setMetricsFields(metricsFields);
- Accumulator agg = mean.open(udfContext,accumulator);
-
+ mean.open(udfContext);
+ Accumulator agg = mean.initAccumulator(accumulator);
for (Number o : arr) {
Event event = new Event();
Map<String, Object> extractedFields = new HashMap<>();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java
index a1cd54e..d0d3d2c 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java
@@ -31,7 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class NumberSumTest {
@Test
- public void testNumberSumTest() throws ParseException {
+ public void test() throws ParseException {
Integer[] intArr = new Integer[]{1, 2, 3, 4};
Long[] longArr = new Long[]{1L, 2L, 3L, 4L};
@@ -52,8 +52,8 @@ public class NumberSumTest {
Map<String, Object> metricsFields = new HashMap<>();
Accumulator accumulator = new Accumulator();
accumulator.setMetricsFields(metricsFields);
- Accumulator agg = numberSum.open(udfContext,accumulator);
-
+ numberSum.open(udfContext);
+ Accumulator agg = numberSum.initAccumulator(accumulator);
for (Number o : arr) {
Event event = new Event();
Map<String, Object> extractedFields = new HashMap<>();