summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-07-25 14:29:06 +0800
committerwangkuan <[email protected]>2024-07-25 14:29:06 +0800
commit4aeddf37d0f5f69c4dab603f26d5d7f15c918f4b (patch)
treeb2dd2fb4d8e78ada106222743041382737255adc
parentbd422c028ad28ef8f1c3bb237d4ba5f7fa0bcb65 (diff)
[improve][core]增加聚合函数LAST_VALUE和FIRST_VALUE
-rw-r--r--config/udf.plugins4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java73
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java76
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java2
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java67
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java70
9 files changed, 293 insertions, 5 deletions
diff --git a/config/udf.plugins b/config/udf.plugins
index b3f67b4..2978bbe 100644
--- a/config/udf.plugins
+++ b/config/udf.plugins
@@ -19,4 +19,6 @@ com.geedgenetworks.core.udf.udaf.NumberSum
com.geedgenetworks.core.udf.udaf.CollectList
com.geedgenetworks.core.udf.udaf.CollectSet
com.geedgenetworks.core.udf.udaf.LongCount
-com.geedgenetworks.core.udf.udaf.Mean \ No newline at end of file
+com.geedgenetworks.core.udf.udaf.Mean
+com.geedgenetworks.core.udf.udaf.LastValue
+com.geedgenetworks.core.udf.udaf.FirstValue \ No newline at end of file
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java
index 0345b7b..4a43163 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java
@@ -39,7 +39,7 @@ public class CollectList implements AggregateFunction {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
this.lookupField = udfContext.getLookup_fields().get(0);
- if(!udfContext.getOutput_fields().isEmpty()) {
+ if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
this.outputField = udfContext.getOutput_fields().get(0);
}
else {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java
index 7891f93..a425118 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java
@@ -26,7 +26,7 @@ public class CollectSet implements AggregateFunction {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
this.lookupField = udfContext.getLookup_fields().get(0);
- if(!udfContext.getOutput_fields().isEmpty()) {
+ if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
this.outputField = udfContext.getOutput_fields().get(0);
}
else {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java
new file mode 100644
index 0000000..27490ef
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java
@@ -0,0 +1,73 @@
+/**
+ * Copyright 2017 Hortonworks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package com.geedgenetworks.core.udf.udaf;
+
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+
+/**
+ * Collects elements within a group and returns the list of aggregated objects
+ */
+public class FirstValue implements AggregateFunction {
+
+ private String lookupField;
+ private String outputField;
+
+
+ @Override
+ public Accumulator open(UDFContext udfContext,Accumulator acc) {
+ if(udfContext.getLookup_fields()==null ){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
+ }
+ this.lookupField = udfContext.getLookup_fields().get(0);
+ if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
+ this.outputField = udfContext.getOutput_fields().get(0);
+ }
+ else {
+ outputField = lookupField;
+ }
+ return acc;
+ }
+
+
+ @Override
+ public Accumulator add(Event event, Accumulator acc) {
+ if(!acc.getMetricsFields().containsKey(outputField) && event.getExtractedFields().containsKey(lookupField)){
+ acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField));
+ }
+ return acc;
+ }
+
+ @Override
+ public String functionName() {
+ return "FIRST_VALUE";
+ }
+
+ @Override
+ public Accumulator getResult(Accumulator acc) {
+ return acc;
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java
new file mode 100644
index 0000000..4adafd4
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java
@@ -0,0 +1,76 @@
+/**
+ * Copyright 2017 Hortonworks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package com.geedgenetworks.core.udf.udaf;
+
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Collects elements within a group and returns the list of aggregated objects
+ */
+public class LastValue implements AggregateFunction {
+
+ private String lookupField;
+ private String outputField;
+
+
+ @Override
+ public Accumulator open(UDFContext udfContext,Accumulator acc) {
+ if(udfContext.getLookup_fields()==null ){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
+ }
+ this.lookupField = udfContext.getLookup_fields().get(0);
+ if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
+ this.outputField = udfContext.getOutput_fields().get(0);
+ }
+ else {
+ outputField = lookupField;
+ }
+ return acc;
+ }
+
+
+ @Override
+ public Accumulator add(Event event, Accumulator acc) {
+ if(event.getExtractedFields().containsKey(lookupField)){
+ acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField));
+ }
+ return acc;
+ }
+
+ @Override
+ public String functionName() {
+ return "LAST_VALUE";
+ }
+
+ @Override
+ public Accumulator getResult(Accumulator acc) {
+ return acc;
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java
index a3f098b..380f598 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java
@@ -23,7 +23,7 @@ public class Mean implements AggregateFunction {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
lookupField = udfContext.getLookup_fields().get(0);
- if(!udfContext.getOutput_fields().isEmpty()) {
+ if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
outputField = udfContext.getOutput_fields().get(0);
}
else {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java
index d92df5c..4ed3143 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java
@@ -19,7 +19,7 @@ public class NumberSum implements AggregateFunction {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
lookupField = udfContext.getLookup_fields().get(0);
- if(!udfContext.getOutput_fields().isEmpty()) {
+ if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
outputField = udfContext.getOutput_fields().get(0);
}
else {
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java
new file mode 100644
index 0000000..2c4d460
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java
@@ -0,0 +1,67 @@
+/**
+ * Copyright 2017 Hortonworks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package com.geedgenetworks.core.udf.test.aggregate;
+
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.udf.udaf.CollectSet;
+import com.geedgenetworks.core.udf.udaf.FirstValue;
+import org.junit.jupiter.api.Test;
+
+import java.text.ParseException;
+import java.util.*;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class FirstValueTest {
+
+ @Test
+ public void testNumberSumTest() throws ParseException {
+
+ List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4");
+ excute(arr);
+
+ }
+
+ private static void excute(List<String> arr) throws ParseException {
+
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_first"));
+ FirstValue firstValue = new FirstValue();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ Accumulator agg = firstValue.open(udfContext,accumulator);
+
+ for (String o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = firstValue.add(event, agg);
+
+ }
+ Accumulator result = firstValue.getResult(agg);
+ String val = (String) result.getMetricsFields().get("field_first");
+ assertEquals(val,"192.168.1.1");
+ }
+
+
+
+} \ No newline at end of file
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java
new file mode 100644
index 0000000..e9609f7
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java
@@ -0,0 +1,70 @@
+/**
+ * Copyright 2017 Hortonworks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package com.geedgenetworks.core.udf.test.aggregate;
+
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.udf.udaf.FirstValue;
+import com.geedgenetworks.core.udf.udaf.LastValue;
+import org.junit.jupiter.api.Test;
+
+import java.text.ParseException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class LastValueTest {
+
+ @Test
+ public void testNumberSumTest() throws ParseException {
+
+ List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4");
+ excute(arr);
+
+ }
+
+ private static void excute(List<String> arr) throws ParseException {
+
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_last"));
+ LastValue lastValue = new LastValue();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ Accumulator agg = lastValue.open(udfContext,accumulator);
+
+ for (String o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = lastValue.add(event, agg);
+
+ }
+ Accumulator result = lastValue.getResult(agg);
+ String val = (String) result.getMetricsFields().get("field_last");
+ assertEquals(val,"192.168.1.4");
+ }
+
+
+
+} \ No newline at end of file