diff options
| author | 王宽 <[email protected]> | 2024-07-25 06:40:16 +0000 |
|---|---|---|
| committer | 王宽 <[email protected]> | 2024-07-25 06:40:16 +0000 |
| commit | 8eaf2c973b21277cb613d5bd5418ef510cb72448 (patch) | |
| tree | b5991d6c882b046faa1e9afbbb198fa8c591950a | |
| parent | 05bf631f8562a357fda67891973c098aa32b2763 (diff) | |
| parent | 4aeddf37d0f5f69c4dab603f26d5d7f15c918f4b (diff) | |
Merge branch 'feature/aggregate' into 'develop'
[improve][core]增加聚合函数LAST_VALUE和FIRST_VALUE
See merge request galaxy/platform/groot-stream!81
9 files changed, 293 insertions, 5 deletions
diff --git a/config/udf.plugins b/config/udf.plugins index b3f67b4..2978bbe 100644 --- a/config/udf.plugins +++ b/config/udf.plugins @@ -19,4 +19,6 @@ com.geedgenetworks.core.udf.udaf.NumberSum com.geedgenetworks.core.udf.udaf.CollectList com.geedgenetworks.core.udf.udaf.CollectSet com.geedgenetworks.core.udf.udaf.LongCount -com.geedgenetworks.core.udf.udaf.Mean
\ No newline at end of file +com.geedgenetworks.core.udf.udaf.Mean +com.geedgenetworks.core.udf.udaf.LastValue +com.geedgenetworks.core.udf.udaf.FirstValue
\ No newline at end of file diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java index 0345b7b..4a43163 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java @@ -39,7 +39,7 @@ public class CollectList implements AggregateFunction { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } this.lookupField = udfContext.getLookup_fields().get(0); - if(!udfContext.getOutput_fields().isEmpty()) { + if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { this.outputField = udfContext.getOutput_fields().get(0); } else { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java index 7891f93..a425118 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java @@ -26,7 +26,7 @@ public class CollectSet implements AggregateFunction { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } this.lookupField = udfContext.getLookup_fields().get(0); - if(!udfContext.getOutput_fields().isEmpty()) { + if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { this.outputField = udfContext.getOutput_fields().get(0); } else { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java new file mode 100644 index 0000000..27490ef --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java @@ -0,0 +1,73 @@ +/** + * Copyright 2017 Hortonworks. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package com.geedgenetworks.core.udf.udaf; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; + +/** + * Collects elements within a group and returns the list of aggregated objects + */ +public class FirstValue implements AggregateFunction { + + private String lookupField; + private String outputField; + + + @Override + public Accumulator open(UDFContext udfContext,Accumulator acc) { + if(udfContext.getLookup_fields()==null ){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); + } + this.lookupField = udfContext.getLookup_fields().get(0); + if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + this.outputField = udfContext.getOutput_fields().get(0); + } + else { + outputField = lookupField; + } + return acc; + } + + + @Override + public Accumulator add(Event event, Accumulator acc) { + if(!acc.getMetricsFields().containsKey(outputField) && event.getExtractedFields().containsKey(lookupField)){ + acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField)); + } + return acc; + } + + @Override + public String functionName() { + return "FIRST_VALUE"; + } + + @Override + public Accumulator getResult(Accumulator acc) { + return acc; + } + + @Override + public void close() { + + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java new file mode 100644 index 0000000..4adafd4 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java @@ -0,0 +1,76 @@ +/** + * Copyright 2017 Hortonworks. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package com.geedgenetworks.core.udf.udaf; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; + +import java.util.ArrayList; +import java.util.List; + +/** + * Collects elements within a group and returns the list of aggregated objects + */ +public class LastValue implements AggregateFunction { + + private String lookupField; + private String outputField; + + + @Override + public Accumulator open(UDFContext udfContext,Accumulator acc) { + if(udfContext.getLookup_fields()==null ){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); + } + this.lookupField = udfContext.getLookup_fields().get(0); + if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + this.outputField = udfContext.getOutput_fields().get(0); + } + else { + outputField = lookupField; + } + return acc; + } + + + @Override + public Accumulator add(Event event, Accumulator acc) { + if(event.getExtractedFields().containsKey(lookupField)){ + acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField)); + } + return acc; + } + + @Override + public String functionName() { + return "LAST_VALUE"; + } + + @Override + public Accumulator getResult(Accumulator acc) { + return acc; + } + + @Override + public void close() { + + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java index a3f098b..380f598 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java @@ -23,7 +23,7 @@ public class Mean implements AggregateFunction { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } lookupField = udfContext.getLookup_fields().get(0); - if(!udfContext.getOutput_fields().isEmpty()) { + if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { outputField = udfContext.getOutput_fields().get(0); } else { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java index d92df5c..4ed3143 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java @@ -19,7 +19,7 @@ public class NumberSum implements AggregateFunction { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } lookupField = udfContext.getLookup_fields().get(0); - if(!udfContext.getOutput_fields().isEmpty()) { + if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { outputField = udfContext.getOutput_fields().get(0); } else { diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java new file mode 100644 index 0000000..2c4d460 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java @@ -0,0 +1,67 @@ +/** + * Copyright 2017 Hortonworks. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package com.geedgenetworks.core.udf.test.aggregate; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.udaf.CollectSet; +import com.geedgenetworks.core.udf.udaf.FirstValue; +import org.junit.jupiter.api.Test; + +import java.text.ParseException; +import java.util.*; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class FirstValueTest { + + @Test + public void testNumberSumTest() throws ParseException { + + List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4"); + excute(arr); + + } + + private static void excute(List<String> arr) throws ParseException { + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_first")); + FirstValue firstValue = new FirstValue(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + Accumulator agg = firstValue.open(udfContext,accumulator); + + for (String o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = firstValue.add(event, agg); + + } + Accumulator result = firstValue.getResult(agg); + String val = (String) result.getMetricsFields().get("field_first"); + assertEquals(val,"192.168.1.1"); + } + + + +}
\ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java new file mode 100644 index 0000000..e9609f7 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java @@ -0,0 +1,70 @@ +/** + * Copyright 2017 Hortonworks. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package com.geedgenetworks.core.udf.test.aggregate; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.udaf.FirstValue; +import com.geedgenetworks.core.udf.udaf.LastValue; +import org.junit.jupiter.api.Test; + +import java.text.ParseException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class LastValueTest { + + @Test + public void testNumberSumTest() throws ParseException { + + List<String> arr = List.of("192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4","192.168.1.4"); + excute(arr); + + } + + private static void excute(List<String> arr) throws ParseException { + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_last")); + LastValue lastValue = new LastValue(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + Accumulator agg = lastValue.open(udfContext,accumulator); + + for (String o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = lastValue.add(event, agg); + + } + Accumulator result = lastValue.getResult(agg); + String val = (String) result.getMetricsFields().get("field_last"); + assertEquals(val,"192.168.1.4"); + } + + + +}
\ No newline at end of file |
