diff options
| author | wangkuan <[email protected]> | 2024-10-21 16:10:04 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-10-21 16:10:04 +0800 |
| commit | 3b4034993c5812ca239c4824d8101b1cca567b5c (patch) | |
| tree | fd41b8516a31ea6c714a71fa56597f3649a89001 | |
| parent | 72ba1827fb4a5ccf05e450a83dc930766c9f95e3 (diff) | |
[feature][core]新增聚合函数NumberMax,NumberMin,增加agg-dos-protection_rule_metric任务以及单元测试
8 files changed, 642 insertions, 1 deletions
diff --git a/config/udf.plugins b/config/udf.plugins index 92c652e..49edca1 100644 --- a/config/udf.plugins +++ b/config/udf.plugins @@ -33,3 +33,5 @@ com.geedgenetworks.core.udf.udtf.PathUnroll com.geedgenetworks.core.udf.uuid.Uuid com.geedgenetworks.core.udf.uuid.UuidV5 com.geedgenetworks.core.udf.uuid.UuidV7 +com.geedgenetworks.core.udf.udaf.NumberMax +com.geedgenetworks.core.udf.udaf.NumberMin
\ No newline at end of file diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobDosTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobDosTest.java new file mode 100644 index 0000000..ea3793e --- /dev/null +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobDosTest.java @@ -0,0 +1,93 @@ +package com.geedgenetworks.bootstrap.main.simple; + +import cn.hutool.setting.yaml.YamlUtil; +import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs; +import com.geedgenetworks.bootstrap.enums.EngineType; +import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName; +import com.geedgenetworks.bootstrap.execution.JobExecution; +import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink; +import com.geedgenetworks.bootstrap.utils.CommandLineUtils; +import com.geedgenetworks.bootstrap.utils.ConfigFileUtils; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.config.ConfigProvider; +import com.geedgenetworks.common.config.GrootStreamConfig; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigObject; +import com.typesafe.config.ConfigUtil; +import com.typesafe.config.ConfigValueFactory; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; + +import java.nio.file.Path; +import java.util.Map; + + +public class JobDosTest { + + @ClassRule + public static MiniClusterWithClientResource flinkCluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberSlotsPerTaskManager(1) + .setNumberTaskManagers(1) + .build()); + + @Test + public void testSplit() { + + CollectSink.values.clear(); + String[] args ={"--target", "test", "-c", ".\\grootstream_job_dos_test.yaml"}; + ExecuteCommandArgs executeCommandArgs = CommandLineUtils + .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); + + executeCommandArgs.buildCommand(); + + + GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig(); + Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs); + // check config file exist + Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString()); + ConfigObject configObject = ConfigValueFactory.fromMap(configMap); + Config config = configObject.toConfig(); + + config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE), + ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); + + + JobExecution jobExecution = new JobExecution(config, grootStreamConfig); + jobExecution.execute(); + // Assert.assertEquals(7, CollectSink.values.size()); + + + Assert.assertEquals(3, CollectSink.values.size()); + Assert.assertEquals("200", CollectSink.values.get(1).getExtractedFields().get("sessions").toString()); + Assert.assertEquals("2000", CollectSink.values.get(1).getExtractedFields().get("packets").toString()); + Assert.assertEquals("20000", CollectSink.values.get(1).getExtractedFields().get("bytes").toString()); + Assert.assertEquals("66.67", CollectSink.values.get(1).getExtractedFields().get("session_rate").toString()); + Assert.assertEquals("666.67", CollectSink.values.get(1).getExtractedFields().get("packet_rate").toString()); + Assert.assertEquals("53333.33", CollectSink.values.get(1).getExtractedFields().get("bit_rate").toString()); + + Assert.assertTrue( CollectSink.values.get(1).getExtractedFields().containsKey("log_id")); + Assert.assertTrue(CollectSink.values.get(1).getExtractedFields().containsKey("recv_time")); + Assert.assertEquals("1729476003", CollectSink.values.get(1).getExtractedFields().get("end_time").toString()); + Assert.assertEquals("1729476000", CollectSink.values.get(1).getExtractedFields().get("start_time").toString()); + Assert.assertEquals("1729476003", CollectSink.values.get(1).getExtractedFields().get("end_time").toString()); + Assert.assertEquals("3", CollectSink.values.get(1).getExtractedFields().get("duration").toString()); + + + + Assert.assertEquals("1.2.2.2", CollectSink.values.get(1).getExtractedFields().get("destination_ip").toString()); + Assert.assertEquals("1.1.1.1", CollectSink.values.get(1).getExtractedFields().get("source_ip").toString()); + + Assert.assertEquals("CN", CollectSink.values.get(1).getExtractedFields().get("source_country").toString()); + Assert.assertEquals("1.1.1.1", CollectSink.values.get(1).getExtractedFields().get("source_ip").toString()); + Assert.assertEquals("1.2.2.2", CollectSink.values.get(1).getExtractedFields().get("destination_ip").toString()); + Assert.assertEquals("US", CollectSink.values.get(1).getExtractedFields().get("destination_country").toString()); + Assert.assertEquals("123", CollectSink.values.get(1).getExtractedFields().get("rule_uuid").toString()); + + } + +} diff --git a/groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml new file mode 100644 index 0000000..7473939 --- /dev/null +++ b/groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml @@ -0,0 +1,130 @@ +sources: + + inline_source: + type : inline + watermark_timestamp: timestamp_ms + watermark_timestamp_unit: ms + watermark_lag: 10 + fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. + properties: + data: '[{"sessions":100,"pkts":1000,"bytes":10000,"timestamp_ms":1729476000001,"vsys_id": 1,"duration": 60000,"rule_uuid": "123","client_ip":"1.1.1.2","server_ip":"1.2.2.2","client_country":"CN","server_country":"US"},{"sessions":100,"pkts":1000,"bytes":10000,"timestamp_ms":1729476000001,"vsys_id": 1,"duration": 60000,"rule_uuid": "123","client_ip":"1.1.1.1","server_ip":"1.2.2.2","client_country":"CN","server_country":"US"},{"timestamp_ms":1729476003000,"vsys_id": 1,"duration": 60000,"rule_uuid": "123","client_ip":"1.1.1.1","server_ip":"1.2.2.2","client_country":"CN1","server_country":"US1","sessions":100,"pkts":1000,"bytes":10000},{"timestamp_ms":1729477003000,"vsys_id": 1,"duration": 60000,"rule_uuid": "123","client_ip":"1.1.1.1","server_ip":"1.2.2.2","client_country":"CN1","server_country":"US1","sessions":100,"pkts":1000,"bytes":10000}]' + interval.per.row: 2s # 可选 + repeat.count: 1 # 可选 + format: json + json.ignore.parse.errors: false + + + +postprocessing_pipelines: + + pre_etl_processor: # [object] Processing Pipeline + type: projection + functions: # [array of object] Function List + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [ recv_time ] + parameters: + precision: seconds + aggregate_processor: + type: aggregate + group_by_fields: [vsys_id,rule_uuid,server_ip,client_ip] + window_type: tumbling_event_time # tumbling_event_time,sliding_processing_time,sliding_event_time + window_size: 600 + functions: + - function: NUMBER_SUM + lookup_fields: [ sessions ] + - function: NUMBER_SUM + lookup_fields: [ bytes ] + - function: NUMBER_SUM + lookup_fields: [ pkts ] + output_fields: [ packets ] + - function: FIRST_VALUE + lookup_fields: [ client_country ] + - function: FIRST_VALUE + lookup_fields: [ server_country ] + - function: NUMBER_MIN + lookup_fields: [ timestamp_ms ] + output_fields: [ start_timestamp_ms ] + - function: NUMBER_MIN + lookup_fields: [ recv_time ] + - function: NUMBER_MAX + lookup_fields: [ timestamp_ms ] + output_fields: [ end_timestamp_ms ] + - function: FIRST_VALUE + lookup_fields: [ duration ] + post_etl_processor: # [object] Processing Pipeline + type: projection + remove_fields: + output_fields: + functions: # [array of object] Function List + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ end_timestamp_ms ] + output_fields: [ end_time ] + parameters: + precision: seconds + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ start_timestamp_ms ] + output_fields: [ start_time ] + parameters: + precision: seconds + - function: EVAL + output_fields: [ duration ] + parameters: + value_expression: "((end_time-start_time) > 0)? (end_time-start_time) : (duration/1000)" + - function: EVAL + output_fields: [ end_time ] + parameters: + value_expression: start_time + duration + - function: EVAL + output_fields: [ session_rate ] + parameters: + value_expression: math.round((double(sessions) / duration )*100)/100.0 + - function: EVAL + output_fields: [ packet_rate ] + parameters: + value_expression: math.round((double(packets) / duration ) *100)/100.0 + - function: EVAL + output_fields: [ bit_rate ] + parameters: + value_expression: math.round((double((bytes*8)) / duration) *100)/100.0 + - function: RENAME + parameters: + rename_fields: + client_ip: source_ip + client_country: source_country + server_ip: destination_ip + server_country: destination_country + - function: SNOWFLAKE_ID + lookup_fields: [''] + output_fields: [log_id] + parameters: + data_center_id_num: 1 + +sinks: + + collect_sink: + type: collect + properties: + format: json + +application: # [object] Application Configuration + + env: # [object] Environment Variables + name: groot-stream-job # [string] Job Name + parallelism: 1 + properties: + k: v + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + + + topology: + - name: inline_source + downstream: [pre_etl_processor] + - name: pre_etl_processor + downstream: [aggregate_processor] + - name: aggregate_processor + downstream: [ post_etl_processor ] + - name: post_etl_processor + downstream: [ collect_sink] + - name: collect_sink + diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins index dae8bae..cfd9bab 100644 --- a/groot-common/src/main/resources/udf.plugins +++ b/groot-common/src/main/resources/udf.plugins @@ -28,4 +28,6 @@ com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantile com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantiles com.geedgenetworks.core.udf.udtf.JsonUnroll com.geedgenetworks.core.udf.udtf.Unroll -com.geedgenetworks.core.udf.udtf.PathUnroll
\ No newline at end of file +com.geedgenetworks.core.udf.udtf.PathUnroll +com.geedgenetworks.core.udf.udaf.NumberMax +com.geedgenetworks.core.udf.udaf.NumberMin
\ No newline at end of file diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMax.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMax.java new file mode 100644 index 0000000..865ce64 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMax.java @@ -0,0 +1,94 @@ +/** + * Copyright 2017 Hortonworks. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package com.geedgenetworks.core.udf.udaf; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; + +/** + * Collects elements within a group and returns the list of aggregated objects + */ +public class NumberMax implements AggregateFunction { + + private String lookupField; + private String outputField; + + + @Override + public void open(UDFContext udfContext) { + if (udfContext.getLookup_fields() == null) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); + } + this.lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { + this.outputField = udfContext.getOutput_fields().get(0); + } else { + outputField = lookupField; + } + } + + @Override + public Accumulator initAccumulator(Accumulator acc) { + return acc; + } + + @Override + public Accumulator add(Event event, Accumulator acc) { + if (acc.getMetricsFields().get(outputField)==null && event.getExtractedFields().get(lookupField)!=null) { + acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField)); + } else if (acc.getMetricsFields().get(outputField)!=null && event.getExtractedFields().get(lookupField)!=null) { + double value1 = Double.parseDouble(acc.getMetricsFields().get(outputField).toString()); + double value2 = Double.parseDouble(event.getExtractedFields().get(lookupField).toString()); + if (value1 < value2) { + acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField)); + } + } + return acc; + } + + @Override + public String functionName() { + return "NUMBER_MAX"; + } + + @Override + public Accumulator getResult(Accumulator acc) { + return acc; + } + + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField)); + } + else if(firstAcc.getMetricsFields().containsKey(outputField) && !secondAcc.getMetricsFields().containsKey(outputField)){ + firstAcc.getMetricsFields().put(outputField, firstAcc.getMetricsFields().get(outputField)); + } + else if(firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)){ + double value1 = Double.parseDouble(firstAcc.getMetricsFields().get(outputField).toString()); + double value2 = Double.parseDouble(secondAcc.getMetricsFields().get(outputField).toString()); + if (value1 < value2) { + firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField)); + } + } + return firstAcc; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMin.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMin.java new file mode 100644 index 0000000..3af21ec --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMin.java @@ -0,0 +1,94 @@ +/** + * Copyright 2017 Hortonworks. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package com.geedgenetworks.core.udf.udaf; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; + +/** + * Collects elements within a group and returns the list of aggregated objects + */ +public class NumberMin implements AggregateFunction { + + private String lookupField; + private String outputField; + + + @Override + public void open(UDFContext udfContext) { + if (udfContext.getLookup_fields() == null) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); + } + this.lookupField = udfContext.getLookup_fields().get(0); + if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { + this.outputField = udfContext.getOutput_fields().get(0); + } else { + outputField = lookupField; + } + } + + @Override + public Accumulator initAccumulator(Accumulator acc) { + return acc; + } + + @Override + public Accumulator add(Event event, Accumulator acc) { + if (acc.getMetricsFields().get(outputField)==null && event.getExtractedFields().get(lookupField)!=null) { + acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField)); + } else if (acc.getMetricsFields().get(outputField)!=null && event.getExtractedFields()!=null) { + double value1 = Double.parseDouble(acc.getMetricsFields().get(outputField).toString()); + double value2 = Double.parseDouble(event.getExtractedFields().get(lookupField).toString()); + if (value1 > value2) { + acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField)); + } + } + return acc; + } + + @Override + public String functionName() { + return "NUMBER_MIN"; + } + + @Override + public Accumulator getResult(Accumulator acc) { + return acc; + } + + @Override + public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) { + if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) { + firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField)); + } + else if(firstAcc.getMetricsFields().containsKey(outputField) && !secondAcc.getMetricsFields().containsKey(outputField)){ + firstAcc.getMetricsFields().put(outputField, firstAcc.getMetricsFields().get(outputField)); + } + else if(firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)){ + double value1 = Double.parseDouble(firstAcc.getMetricsFields().get(outputField).toString()); + double value2 = Double.parseDouble(secondAcc.getMetricsFields().get(outputField).toString()); + if (value1 > value2) { + firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField)); + } + } + return firstAcc; + } +} diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMaxTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMaxTest.java new file mode 100644 index 0000000..d7715c0 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMaxTest.java @@ -0,0 +1,113 @@ +/** + * Copyright 2017 Hortonworks. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package com.geedgenetworks.core.udf.test.aggregate; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.udaf.NumberMax; +import com.geedgenetworks.core.udf.udaf.NumberSum; +import com.ibm.icu.text.NumberFormat; +import org.junit.jupiter.api.Test; + +import java.text.ParseException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class NumberMaxTest { + + @Test + public void test() throws ParseException { + + Integer[] intArr = new Integer[]{1, 2, 3, 4}; + Long[] longArr = new Long[]{1L, 2L, 3L, 4L}; + Float[] floatArr = new Float[]{1.0f, 2.0f, 3.0f, 4.0f}; + Double[] doubleArr = new Double[]{1.0, 2.0, 3.0, 4.0}; + excute(floatArr, Float.class); + excute(doubleArr, Double.class); + excute(intArr, Integer.class); + excute(longArr, Long.class); + testMerge(intArr,floatArr); + + } + private void testMerge(Number[] arr,Number[] arr2) { + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_max")); + NumberMax numberMax = new NumberMax(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + numberMax.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = numberMax.getResult(numberMax.merge(result1,result2)); + assertEquals(Float.parseFloat((result.getMetricsFields().get("field_max").toString())),4); + + } + private Accumulator getMiddleResult(UDFContext udfContext,Number[] arr) { + + + NumberMax numberMax = new NumberMax(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + numberMax.open(udfContext); + Accumulator agg = numberMax.initAccumulator(accumulator); + for (Number o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = numberMax.add(event, agg); + + } + return agg; + } + private static void excute(Number[] arr, Class<? extends Number> clazz) throws ParseException { + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_max")); + NumberMax numberMax = new NumberMax(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + numberMax.open(udfContext); + Accumulator agg = numberMax.initAccumulator(accumulator); + for (Number o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = numberMax.add(event, agg); + + } + Accumulator result = numberMax.getResult(agg); + assertEquals(clazz, result.getMetricsFields().get("field_max").getClass()); + assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_max").toString())),NumberFormat.getInstance().parse("4")); + } + + + + +}
\ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMinTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMinTest.java new file mode 100644 index 0000000..ccbb994 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMinTest.java @@ -0,0 +1,113 @@ +/** + * Copyright 2017 Hortonworks. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package com.geedgenetworks.core.udf.test.aggregate; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.udaf.NumberMax; +import com.geedgenetworks.core.udf.udaf.NumberMin; +import com.ibm.icu.text.NumberFormat; +import org.junit.jupiter.api.Test; + +import java.text.ParseException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class NumberMinTest { + + @Test + public void test() throws ParseException { + + Integer[] intArr = new Integer[]{1, 2, 3, 4}; + Long[] longArr = new Long[]{1L, 2L, 3L, 4L}; + Float[] floatArr = new Float[]{1.0f, 2.0f, 3.0f, 4.0f}; + Double[] doubleArr = new Double[]{1.0, 2.0, 3.0, 4.0}; + excute(floatArr, Float.class); + excute(doubleArr, Double.class); + excute(intArr, Integer.class); + excute(longArr, Long.class); + testMerge(intArr,floatArr); + + } + private void testMerge(Number[] arr,Number[] arr2) { + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_min")); + NumberMin numberMin = new NumberMin(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + numberMin.open(udfContext); + Accumulator result1 = getMiddleResult(udfContext,arr); + Accumulator result2 = getMiddleResult(udfContext,arr2); + Accumulator result = numberMin.getResult(numberMin.merge(result1,result2)); + assertEquals(Float.parseFloat((result.getMetricsFields().get("field_min").toString())),1); + + } + private Accumulator getMiddleResult(UDFContext udfContext,Number[] arr) { + + + NumberMin numberMin = new NumberMin(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + numberMin.open(udfContext); + Accumulator agg = numberMin.initAccumulator(accumulator); + for (Number o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = numberMin.add(event, agg); + + } + return agg; + } + private static void excute(Number[] arr, Class<? extends Number> clazz) throws ParseException { + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("field")); + udfContext.setOutput_fields(Collections.singletonList("field_min")); + NumberMin numberMin = new NumberMin(); + Map<String, Object> metricsFields = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(metricsFields); + numberMin.open(udfContext); + Accumulator agg = numberMin.initAccumulator(accumulator); + for (Number o : arr) { + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("field", o); + event.setExtractedFields(extractedFields); + agg = numberMin.add(event, agg); + + } + Accumulator result = numberMin.getResult(agg); + assertEquals(clazz, result.getMetricsFields().get("field_min").getClass()); + assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_min").toString())),NumberFormat.getInstance().parse("1")); + } + + + + +}
\ No newline at end of file |
