diff options
| author | wangkuan <[email protected]> | 2024-10-21 16:10:04 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-10-21 16:10:04 +0800 |
| commit | 3b4034993c5812ca239c4824d8101b1cca567b5c (patch) | |
| tree | fd41b8516a31ea6c714a71fa56597f3649a89001 /groot-core | |
| parent | 72ba1827fb4a5ccf05e450a83dc930766c9f95e3 (diff) | |
[feature][core]新增聚合函数NumberMax,NumberMin,增加agg-dos-protection_rule_metric任务以及单元测试
Diffstat (limited to 'groot-core')
4 files changed, 414 insertions, 0 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMax.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMax.java new file mode 100644 index 0000000..865ce64 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMax.java @@ -0,0 +1,94 @@ +/** + * Copyright 2017 Hortonworks. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package com.geedgenetworks.core.udf.udaf; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; + +/** + * Collects elements within a group and returns the list of aggregated objects + */ +public class NumberMax implements AggregateFunction { + + private String lookupField; + private String outputField; + + + @Override + public void open(UDFContext udfContext) { + if (udfContext.getLookup_fields() == null) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); + } + this.lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { + this.outputField = udfContext.getOutput_fields().get(0); + } else { + outputField = lookupField; + } + } + + @Override + public Accumulator initAccumulator(Accumulator acc) { + return acc; + } + + @Override + public Accumulator add(Event event, Accumulator acc) { + if (acc.getMetricsFields().get(outputField)==null && event.getExtractedFields().get(lookupField)!=null) { + acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField)); + } else if (acc.getMetricsFields().get(outputField)!=null && event.getExtractedFields().get(lookupField)!=null) { + double value1 = Double.parseDouble(acc.getMetricsFields().get(outputField).toString()); + double value2 = Double.parseDouble(event.getExtractedFields().get(lookupField).toString()); + if (value1 < value2) { + acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField)); + } + } + return acc; + } + + @Override + public String functionName() { + return "NUMBER_MAX"; + } + + @Override + public Accumulator getResult(Accumulator acc) { + return acc; + } + + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField)); + } + else if(firstAcc.getMetricsFields().containsKey(outputField) && !secondAcc.getMetricsFields().containsKey(outputField)){ + firstAcc.getMetricsFields().put(outputField, firstAcc.getMetricsFields().get(outputField)); + } + else if(firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)){ + double value1 = Double.parseDouble(firstAcc.getMetricsFields().get(outputField).toString()); + double value2 = Double.parseDouble(secondAcc.getMetricsFields().get(outputField).toString()); + if (value1 < value2) { + firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField)); + } + } + return firstAcc; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMin.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMin.java new file mode 100644 index 0000000..3af21ec --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMin.java @@ -0,0 +1,94 @@ +/** + * Copyright 2017 Hortonworks. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package com.geedgenetworks.core.udf.udaf; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; + +/** + * Collects elements within a group and returns the list of aggregated objects + */ +public class NumberMin implements AggregateFunction { + + private String lookupField; + private String outputField; + + + @Override + public void open(UDFContext udfContext) { + if (udfContext.getLookup_fields() == null) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); + } + this.lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { + this.outputField = udfContext.getOutput_fields().get(0); + } else { + outputField = lookupField; + } + } + + @Override + public Accumulator initAccumulator(Accumulator acc) { + return acc; + } + + @Override + public Accumulator add(Event event, Accumulator acc) { + if (acc.getMetricsFields().get(outputField)==null && event.getExtractedFields().get(lookupField)!=null) { + acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField)); + } else if (acc.getMetricsFields().get(outputField)!=null && event.getExtractedFields()!=null) { + double value1 = Double.parseDouble(acc.getMetricsFields().get(outputField).toString()); + double value2 = Double.parseDouble(event.getExtractedFields().get(lookupField).toString()); + if (value1 > value2) { + acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField)); + } + } + return acc; + } + + @Override + public String functionName() { + return "NUMBER_MIN"; + } + + @Override + public Accumulator getResult(Accumulator acc) { + return acc; + } + + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField)); + } + else if(firstAcc.getMetricsFields().containsKey(outputField) && !secondAcc.getMetricsFields().containsKey(outputField)){ + firstAcc.getMetricsFields().put(outputField, firstAcc.getMetricsFields().get(outputField)); + } + else if(firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)){ + double value1 = Double.parseDouble(firstAcc.getMetricsFields().get(outputField).toString()); + double value2 = Double.parseDouble(secondAcc.getMetricsFields().get(outputField).toString()); + if (value1 > value2) { + firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField)); + } + } + return firstAcc; + } +} diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMaxTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMaxTest.java new file mode 100644 index 0000000..d7715c0 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMaxTest.java @@ -0,0 +1,113 @@ +/** + * Copyright 2017 Hortonworks. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package com.geedgenetworks.core.udf.test.aggregate; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.udaf.NumberMax; +import com.geedgenetworks.core.udf.udaf.NumberSum; +import com.ibm.icu.text.NumberFormat; +import org.junit.jupiter.api.Test; + +import java.text.ParseException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class NumberMaxTest { + + @Test + public void test() throws ParseException { + + Integer[] intArr = new Integer[]{1, 2, 3, 4}; + Long[] longArr = new Long[]{1L, 2L, 3L, 4L}; + Float[] floatArr = new Float[]{1.0f, 2.0f, 3.0f, 4.0f}; + Double[] doubleArr = new Double[]{1.0, 2.0, 3.0, 4.0}; + excute(floatArr, Float.class); + excute(doubleArr, Double.class); + excute(intArr, Integer.class); + excute(longArr, Long.class); + testMerge(intArr,floatArr); + + } + private void testMerge(Number[] arr,Number[] arr2) { + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_max")); + NumberMax numberMax = new NumberMax(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + numberMax.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = numberMax.getResult(numberMax.merge(result1,result2)); + assertEquals(Float.parseFloat((result.getMetricsFields().get("field_max").toString())),4); + + } + private Accumulator getMiddleResult(UDFContext udfContext,Number[] arr) { + + + NumberMax numberMax = new NumberMax(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + numberMax.open(udfContext); + Accumulator agg = numberMax.initAccumulator(accumulator); + for (Number o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = numberMax.add(event, agg); + + } + return agg; + } + private static void excute(Number[] arr, Class<? extends Number> clazz) throws ParseException { + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_max")); + NumberMax numberMax = new NumberMax(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + numberMax.open(udfContext); + Accumulator agg = numberMax.initAccumulator(accumulator); + for (Number o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = numberMax.add(event, agg); + + } + Accumulator result = numberMax.getResult(agg); + assertEquals(clazz, result.getMetricsFields().get("field_max").getClass()); + assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_max").toString())),NumberFormat.getInstance().parse("4")); + } + + + + +}
\ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMinTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMinTest.java new file mode 100644 index 0000000..ccbb994 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMinTest.java @@ -0,0 +1,113 @@ +/** + * Copyright 2017 Hortonworks. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package com.geedgenetworks.core.udf.test.aggregate; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.udaf.NumberMax; +import com.geedgenetworks.core.udf.udaf.NumberMin; +import com.ibm.icu.text.NumberFormat; +import org.junit.jupiter.api.Test; + +import java.text.ParseException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class NumberMinTest { + + @Test + public void test() throws ParseException { + + Integer[] intArr = new Integer[]{1, 2, 3, 4}; + Long[] longArr = new Long[]{1L, 2L, 3L, 4L}; + Float[] floatArr = new Float[]{1.0f, 2.0f, 3.0f, 4.0f}; + Double[] doubleArr = new Double[]{1.0, 2.0, 3.0, 4.0}; + excute(floatArr, Float.class); + excute(doubleArr, Double.class); + excute(intArr, Integer.class); + excute(longArr, Long.class); + testMerge(intArr,floatArr); + + } + private void testMerge(Number[] arr,Number[] arr2) { + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_min")); + NumberMin numberMin = new NumberMin(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + numberMin.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = numberMin.getResult(numberMin.merge(result1,result2)); + assertEquals(Float.parseFloat((result.getMetricsFields().get("field_min").toString())),1); + + } + private Accumulator getMiddleResult(UDFContext udfContext,Number[] arr) { + + + NumberMin numberMin = new NumberMin(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + numberMin.open(udfContext); + Accumulator agg = numberMin.initAccumulator(accumulator); + for (Number o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = numberMin.add(event, agg); + + } + return agg; + } + private static void excute(Number[] arr, Class<? extends Number> clazz) throws ParseException { + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_min")); + NumberMin numberMin = new NumberMin(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + numberMin.open(udfContext); + Accumulator agg = numberMin.initAccumulator(accumulator); + for (Number o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = numberMin.add(event, agg); + + } + Accumulator result = numberMin.getResult(agg); + assertEquals(clazz, result.getMetricsFields().get("field_min").getClass()); + assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_min").toString())),NumberFormat.getInstance().parse("1")); + } + + + + +}
\ No newline at end of file |
