summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author王宽 <[email protected]>2024-10-21 08:34:00 +0000
committer王宽 <[email protected]>2024-10-21 08:34:00 +0000
commit505b04ea10f1e3e37410f5ef1b0721e6f23caebb (patch)
tree028aafc12bb47737fe2995e79b78722ffe078d8a
parentd6a715c0d65e36665536b8ff03e0cf5ef9ff3e4b (diff)
parentac085998d64b91f59d4f1c590540781be2c7c94c (diff)
Merge branch 'feature/dos' into 'develop'v1.7.0.1-SNAPSHOT
[feature][core]新增聚合函数NumberMax,NumberMin,增加agg-dos-protection_rule_metric任务以及单元测试 See merge request galaxy/platform/groot-stream!116
-rw-r--r--config/udf.plugins7
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobDosTest.java93
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml130
-rw-r--r--groot-common/src/main/resources/udf.plugins4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMax.java94
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMin.java94
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMaxTest.java113
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMinTest.java113
8 files changed, 646 insertions, 2 deletions
diff --git a/config/udf.plugins b/config/udf.plugins
index 45abeea..7cd3b0e 100644
--- a/config/udf.plugins
+++ b/config/udf.plugins
@@ -32,4 +32,9 @@ com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantile
com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantiles
com.geedgenetworks.core.udf.udtf.JsonUnroll
com.geedgenetworks.core.udf.udtf.Unroll
-com.geedgenetworks.core.udf.udtf.PathUnroll \ No newline at end of file
+com.geedgenetworks.core.udf.udtf.PathUnroll
+com.geedgenetworks.core.udf.uuid.Uuid
+com.geedgenetworks.core.udf.uuid.UuidV5
+com.geedgenetworks.core.udf.uuid.UuidV7
+com.geedgenetworks.core.udf.udaf.NumberMax
+com.geedgenetworks.core.udf.udaf.NumberMin \ No newline at end of file
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobDosTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobDosTest.java
new file mode 100644
index 0000000..ea3793e
--- /dev/null
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobDosTest.java
@@ -0,0 +1,93 @@
+package com.geedgenetworks.bootstrap.main.simple;
+
+import cn.hutool.setting.yaml.YamlUtil;
+import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs;
+import com.geedgenetworks.bootstrap.enums.EngineType;
+import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName;
+import com.geedgenetworks.bootstrap.execution.JobExecution;
+import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink;
+import com.geedgenetworks.bootstrap.utils.CommandLineUtils;
+import com.geedgenetworks.bootstrap.utils.ConfigFileUtils;
+import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.config.ConfigProvider;
+import com.geedgenetworks.common.config.GrootStreamConfig;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigObject;
+import com.typesafe.config.ConfigUtil;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.nio.file.Path;
+import java.util.Map;
+
+
+public class JobDosTest {
+
+ @ClassRule
+ public static MiniClusterWithClientResource flinkCluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(1)
+ .setNumberTaskManagers(1)
+ .build());
+
+ @Test
+ public void testSplit() {
+
+ CollectSink.values.clear();
+ String[] args ={"--target", "test", "-c", ".\\grootstream_job_dos_test.yaml"};
+ ExecuteCommandArgs executeCommandArgs = CommandLineUtils
+ .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
+
+ executeCommandArgs.buildCommand();
+
+
+ GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig();
+ Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs);
+ // check config file exist
+ Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString());
+ ConfigObject configObject = ConfigValueFactory.fromMap(configMap);
+ Config config = configObject.toConfig();
+
+ config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE),
+ ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget()));
+
+
+ JobExecution jobExecution = new JobExecution(config, grootStreamConfig);
+ jobExecution.execute();
+ // Assert.assertEquals(7, CollectSink.values.size());
+
+
+ Assert.assertEquals(3, CollectSink.values.size());
+ Assert.assertEquals("200", CollectSink.values.get(1).getExtractedFields().get("sessions").toString());
+ Assert.assertEquals("2000", CollectSink.values.get(1).getExtractedFields().get("packets").toString());
+ Assert.assertEquals("20000", CollectSink.values.get(1).getExtractedFields().get("bytes").toString());
+ Assert.assertEquals("66.67", CollectSink.values.get(1).getExtractedFields().get("session_rate").toString());
+ Assert.assertEquals("666.67", CollectSink.values.get(1).getExtractedFields().get("packet_rate").toString());
+ Assert.assertEquals("53333.33", CollectSink.values.get(1).getExtractedFields().get("bit_rate").toString());
+
+ Assert.assertTrue( CollectSink.values.get(1).getExtractedFields().containsKey("log_id"));
+ Assert.assertTrue(CollectSink.values.get(1).getExtractedFields().containsKey("recv_time"));
+ Assert.assertEquals("1729476003", CollectSink.values.get(1).getExtractedFields().get("end_time").toString());
+ Assert.assertEquals("1729476000", CollectSink.values.get(1).getExtractedFields().get("start_time").toString());
+ Assert.assertEquals("1729476003", CollectSink.values.get(1).getExtractedFields().get("end_time").toString());
+ Assert.assertEquals("3", CollectSink.values.get(1).getExtractedFields().get("duration").toString());
+
+
+
+ Assert.assertEquals("1.2.2.2", CollectSink.values.get(1).getExtractedFields().get("destination_ip").toString());
+ Assert.assertEquals("1.1.1.1", CollectSink.values.get(1).getExtractedFields().get("source_ip").toString());
+
+ Assert.assertEquals("CN", CollectSink.values.get(1).getExtractedFields().get("source_country").toString());
+ Assert.assertEquals("1.1.1.1", CollectSink.values.get(1).getExtractedFields().get("source_ip").toString());
+ Assert.assertEquals("1.2.2.2", CollectSink.values.get(1).getExtractedFields().get("destination_ip").toString());
+ Assert.assertEquals("US", CollectSink.values.get(1).getExtractedFields().get("destination_country").toString());
+ Assert.assertEquals("123", CollectSink.values.get(1).getExtractedFields().get("rule_uuid").toString());
+
+ }
+
+}
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml
new file mode 100644
index 0000000..7473939
--- /dev/null
+++ b/groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml
@@ -0,0 +1,130 @@
+sources:
+
+ inline_source:
+ type : inline
+ watermark_timestamp: timestamp_ms
+ watermark_timestamp_unit: ms
+ watermark_lag: 10
+ fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
+ properties:
+ data: '[{"sessions":100,"pkts":1000,"bytes":10000,"timestamp_ms":1729476000001,"vsys_id": 1,"duration": 60000,"rule_uuid": "123","client_ip":"1.1.1.2","server_ip":"1.2.2.2","client_country":"CN","server_country":"US"},{"sessions":100,"pkts":1000,"bytes":10000,"timestamp_ms":1729476000001,"vsys_id": 1,"duration": 60000,"rule_uuid": "123","client_ip":"1.1.1.1","server_ip":"1.2.2.2","client_country":"CN","server_country":"US"},{"timestamp_ms":1729476003000,"vsys_id": 1,"duration": 60000,"rule_uuid": "123","client_ip":"1.1.1.1","server_ip":"1.2.2.2","client_country":"CN1","server_country":"US1","sessions":100,"pkts":1000,"bytes":10000},{"timestamp_ms":1729477003000,"vsys_id": 1,"duration": 60000,"rule_uuid": "123","client_ip":"1.1.1.1","server_ip":"1.2.2.2","client_country":"CN1","server_country":"US1","sessions":100,"pkts":1000,"bytes":10000}]'
+ interval.per.row: 2s # 可选
+ repeat.count: 1 # 可选
+ format: json
+ json.ignore.parse.errors: false
+
+
+
+postprocessing_pipelines:
+
+ pre_etl_processor: # [object] Processing Pipeline
+ type: projection
+ functions: # [array of object] Function List
+ - function: CURRENT_UNIX_TIMESTAMP
+ output_fields: [ recv_time ]
+ parameters:
+ precision: seconds
+ aggregate_processor:
+ type: aggregate
+ group_by_fields: [vsys_id,rule_uuid,server_ip,client_ip]
+ window_type: tumbling_event_time # tumbling_event_time,sliding_processing_time,sliding_event_time
+ window_size: 600
+ functions:
+ - function: NUMBER_SUM
+ lookup_fields: [ sessions ]
+ - function: NUMBER_SUM
+ lookup_fields: [ bytes ]
+ - function: NUMBER_SUM
+ lookup_fields: [ pkts ]
+ output_fields: [ packets ]
+ - function: FIRST_VALUE
+ lookup_fields: [ client_country ]
+ - function: FIRST_VALUE
+ lookup_fields: [ server_country ]
+ - function: NUMBER_MIN
+ lookup_fields: [ timestamp_ms ]
+ output_fields: [ start_timestamp_ms ]
+ - function: NUMBER_MIN
+ lookup_fields: [ recv_time ]
+ - function: NUMBER_MAX
+ lookup_fields: [ timestamp_ms ]
+ output_fields: [ end_timestamp_ms ]
+ - function: FIRST_VALUE
+ lookup_fields: [ duration ]
+ post_etl_processor: # [object] Processing Pipeline
+ type: projection
+ remove_fields:
+ output_fields:
+ functions: # [array of object] Function List
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [ end_timestamp_ms ]
+ output_fields: [ end_time ]
+ parameters:
+ precision: seconds
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [ start_timestamp_ms ]
+ output_fields: [ start_time ]
+ parameters:
+ precision: seconds
+ - function: EVAL
+ output_fields: [ duration ]
+ parameters:
+ value_expression: "((end_time-start_time) > 0)? (end_time-start_time) : (duration/1000)"
+ - function: EVAL
+ output_fields: [ end_time ]
+ parameters:
+ value_expression: start_time + duration
+ - function: EVAL
+ output_fields: [ session_rate ]
+ parameters:
+ value_expression: math.round((double(sessions) / duration )*100)/100.0
+ - function: EVAL
+ output_fields: [ packet_rate ]
+ parameters:
+ value_expression: math.round((double(packets) / duration ) *100)/100.0
+ - function: EVAL
+ output_fields: [ bit_rate ]
+ parameters:
+ value_expression: math.round((double((bytes*8)) / duration) *100)/100.0
+ - function: RENAME
+ parameters:
+ rename_fields:
+ client_ip: source_ip
+ client_country: source_country
+ server_ip: destination_ip
+ server_country: destination_country
+ - function: SNOWFLAKE_ID
+ lookup_fields: ['']
+ output_fields: [log_id]
+ parameters:
+ data_center_id_num: 1
+
+sinks:
+
+ collect_sink:
+ type: collect
+ properties:
+ format: json
+
+application: # [object] Application Configuration
+
+ env: # [object] Environment Variables
+ name: groot-stream-job # [string] Job Name
+ parallelism: 1
+ properties:
+ k: v
+ pipeline:
+ object-reuse: true # [boolean] Object Reuse, default is false
+
+
+ topology:
+ - name: inline_source
+ downstream: [pre_etl_processor]
+ - name: pre_etl_processor
+ downstream: [aggregate_processor]
+ - name: aggregate_processor
+ downstream: [ post_etl_processor ]
+ - name: post_etl_processor
+ downstream: [ collect_sink]
+ - name: collect_sink
+
diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins
index 45abeea..9950a64 100644
--- a/groot-common/src/main/resources/udf.plugins
+++ b/groot-common/src/main/resources/udf.plugins
@@ -32,4 +32,6 @@ com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantile
com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantiles
com.geedgenetworks.core.udf.udtf.JsonUnroll
com.geedgenetworks.core.udf.udtf.Unroll
-com.geedgenetworks.core.udf.udtf.PathUnroll \ No newline at end of file
+com.geedgenetworks.core.udf.udtf.PathUnroll
+com.geedgenetworks.core.udf.udaf.NumberMax
+com.geedgenetworks.core.udf.udaf.NumberMin \ No newline at end of file
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMax.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMax.java
new file mode 100644
index 0000000..865ce64
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMax.java
@@ -0,0 +1,94 @@
+/**
+ * Copyright 2017 Hortonworks.
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package com.geedgenetworks.core.udf.udaf;
+
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+
+/**
+ * Collects elements within a group and returns the list of aggregated objects
+ */
+public class NumberMax implements AggregateFunction {
+
+ private String lookupField;
+ private String outputField;
+
+
+ @Override
+ public void open(UDFContext udfContext) {
+ if (udfContext.getLookup_fields() == null) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
+ }
+ this.lookupField = udfContext.getLookup_fields().get(0);
+ if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
+ this.outputField = udfContext.getOutput_fields().get(0);
+ } else {
+ outputField = lookupField;
+ }
+ }
+
+ @Override
+ public Accumulator initAccumulator(Accumulator acc) {
+ return acc;
+ }
+
+ @Override
+ public Accumulator add(Event event, Accumulator acc) {
+ if (acc.getMetricsFields().get(outputField)==null && event.getExtractedFields().get(lookupField)!=null) {
+ acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField));
+ } else if (acc.getMetricsFields().get(outputField)!=null && event.getExtractedFields().get(lookupField)!=null) {
+ double value1 = Double.parseDouble(acc.getMetricsFields().get(outputField).toString());
+ double value2 = Double.parseDouble(event.getExtractedFields().get(lookupField).toString());
+ if (value1 < value2) {
+ acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField));
+ }
+ }
+ return acc;
+ }
+
+ @Override
+ public String functionName() {
+ return "NUMBER_MAX";
+ }
+
+ @Override
+ public Accumulator getResult(Accumulator acc) {
+ return acc;
+ }
+
+ @Override
+ public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
+ if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+ firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField));
+ }
+ else if(firstAcc.getMetricsFields().containsKey(outputField) && !secondAcc.getMetricsFields().containsKey(outputField)){
+ firstAcc.getMetricsFields().put(outputField, firstAcc.getMetricsFields().get(outputField));
+ }
+ else if(firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)){
+ double value1 = Double.parseDouble(firstAcc.getMetricsFields().get(outputField).toString());
+ double value2 = Double.parseDouble(secondAcc.getMetricsFields().get(outputField).toString());
+ if (value1 < value2) {
+ firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField));
+ }
+ }
+ return firstAcc;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMin.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMin.java
new file mode 100644
index 0000000..3af21ec
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberMin.java
@@ -0,0 +1,94 @@
+/**
+ * Copyright 2017 Hortonworks.
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package com.geedgenetworks.core.udf.udaf;
+
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+
+/**
+ * Collects elements within a group and returns the list of aggregated objects
+ */
+public class NumberMin implements AggregateFunction {
+
+ private String lookupField;
+ private String outputField;
+
+
+ @Override
+ public void open(UDFContext udfContext) {
+ if (udfContext.getLookup_fields() == null) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
+ }
+ this.lookupField = udfContext.getLookup_fields().get(0);
+ if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
+ this.outputField = udfContext.getOutput_fields().get(0);
+ } else {
+ outputField = lookupField;
+ }
+ }
+
+ @Override
+ public Accumulator initAccumulator(Accumulator acc) {
+ return acc;
+ }
+
+ @Override
+ public Accumulator add(Event event, Accumulator acc) {
+ if (acc.getMetricsFields().get(outputField)==null && event.getExtractedFields().get(lookupField)!=null) {
+ acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField));
+ } else if (acc.getMetricsFields().get(outputField)!=null && event.getExtractedFields()!=null) {
+ double value1 = Double.parseDouble(acc.getMetricsFields().get(outputField).toString());
+ double value2 = Double.parseDouble(event.getExtractedFields().get(lookupField).toString());
+ if (value1 > value2) {
+ acc.getMetricsFields().put(outputField, event.getExtractedFields().get(lookupField));
+ }
+ }
+ return acc;
+ }
+
+ @Override
+ public String functionName() {
+ return "NUMBER_MIN";
+ }
+
+ @Override
+ public Accumulator getResult(Accumulator acc) {
+ return acc;
+ }
+
+ @Override
+ public Accumulator merge(Accumulator firstAcc, Accumulator secondAcc) {
+ if (!firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)) {
+ firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField));
+ }
+ else if(firstAcc.getMetricsFields().containsKey(outputField) && !secondAcc.getMetricsFields().containsKey(outputField)){
+ firstAcc.getMetricsFields().put(outputField, firstAcc.getMetricsFields().get(outputField));
+ }
+ else if(firstAcc.getMetricsFields().containsKey(outputField) && secondAcc.getMetricsFields().containsKey(outputField)){
+ double value1 = Double.parseDouble(firstAcc.getMetricsFields().get(outputField).toString());
+ double value2 = Double.parseDouble(secondAcc.getMetricsFields().get(outputField).toString());
+ if (value1 > value2) {
+ firstAcc.getMetricsFields().put(outputField, secondAcc.getMetricsFields().get(outputField));
+ }
+ }
+ return firstAcc;
+ }
+}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMaxTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMaxTest.java
new file mode 100644
index 0000000..d7715c0
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMaxTest.java
@@ -0,0 +1,113 @@
+/**
+ * Copyright 2017 Hortonworks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package com.geedgenetworks.core.udf.test.aggregate;
+
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.udf.udaf.NumberMax;
+import com.geedgenetworks.core.udf.udaf.NumberSum;
+import com.ibm.icu.text.NumberFormat;
+import org.junit.jupiter.api.Test;
+
+import java.text.ParseException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class NumberMaxTest {
+
+ @Test
+ public void test() throws ParseException {
+
+ Integer[] intArr = new Integer[]{1, 2, 3, 4};
+ Long[] longArr = new Long[]{1L, 2L, 3L, 4L};
+ Float[] floatArr = new Float[]{1.0f, 2.0f, 3.0f, 4.0f};
+ Double[] doubleArr = new Double[]{1.0, 2.0, 3.0, 4.0};
+ excute(floatArr, Float.class);
+ excute(doubleArr, Double.class);
+ excute(intArr, Integer.class);
+ excute(longArr, Long.class);
+ testMerge(intArr,floatArr);
+
+ }
+ private void testMerge(Number[] arr,Number[] arr2) {
+
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_max"));
+ NumberMax numberMax = new NumberMax();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ numberMax.open(udfContext);
+ Accumulator result1 = getMiddleResult(udfContext,arr);
+ Accumulator result2 = getMiddleResult(udfContext,arr2);
+ Accumulator result = numberMax.getResult(numberMax.merge(result1,result2));
+ assertEquals(Float.parseFloat((result.getMetricsFields().get("field_max").toString())),4);
+
+ }
+ private Accumulator getMiddleResult(UDFContext udfContext,Number[] arr) {
+
+
+ NumberMax numberMax = new NumberMax();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ numberMax.open(udfContext);
+ Accumulator agg = numberMax.initAccumulator(accumulator);
+ for (Number o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = numberMax.add(event, agg);
+
+ }
+ return agg;
+ }
+ private static void excute(Number[] arr, Class<? extends Number> clazz) throws ParseException {
+
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_max"));
+ NumberMax numberMax = new NumberMax();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ numberMax.open(udfContext);
+ Accumulator agg = numberMax.initAccumulator(accumulator);
+ for (Number o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = numberMax.add(event, agg);
+
+ }
+ Accumulator result = numberMax.getResult(agg);
+ assertEquals(clazz, result.getMetricsFields().get("field_max").getClass());
+ assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_max").toString())),NumberFormat.getInstance().parse("4"));
+ }
+
+
+
+
+} \ No newline at end of file
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMinTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMinTest.java
new file mode 100644
index 0000000..ccbb994
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberMinTest.java
@@ -0,0 +1,113 @@
+/**
+ * Copyright 2017 Hortonworks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package com.geedgenetworks.core.udf.test.aggregate;
+
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.udf.udaf.NumberMax;
+import com.geedgenetworks.core.udf.udaf.NumberMin;
+import com.ibm.icu.text.NumberFormat;
+import org.junit.jupiter.api.Test;
+
+import java.text.ParseException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class NumberMinTest {
+
+ @Test
+ public void test() throws ParseException {
+
+ Integer[] intArr = new Integer[]{1, 2, 3, 4};
+ Long[] longArr = new Long[]{1L, 2L, 3L, 4L};
+ Float[] floatArr = new Float[]{1.0f, 2.0f, 3.0f, 4.0f};
+ Double[] doubleArr = new Double[]{1.0, 2.0, 3.0, 4.0};
+ excute(floatArr, Float.class);
+ excute(doubleArr, Double.class);
+ excute(intArr, Integer.class);
+ excute(longArr, Long.class);
+ testMerge(intArr,floatArr);
+
+ }
+ private void testMerge(Number[] arr,Number[] arr2) {
+
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_min"));
+ NumberMin numberMin = new NumberMin();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ numberMin.open(udfContext);
+ Accumulator result1 = getMiddleResult(udfContext,arr);
+ Accumulator result2 = getMiddleResult(udfContext,arr2);
+ Accumulator result = numberMin.getResult(numberMin.merge(result1,result2));
+ assertEquals(Float.parseFloat((result.getMetricsFields().get("field_min").toString())),1);
+
+ }
+ private Accumulator getMiddleResult(UDFContext udfContext,Number[] arr) {
+
+
+ NumberMin numberMin = new NumberMin();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ numberMin.open(udfContext);
+ Accumulator agg = numberMin.initAccumulator(accumulator);
+ for (Number o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = numberMin.add(event, agg);
+
+ }
+ return agg;
+ }
+ private static void excute(Number[] arr, Class<? extends Number> clazz) throws ParseException {
+
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("field"));
+ udfContext.setOutput_fields(Collections.singletonList("field_min"));
+ NumberMin numberMin = new NumberMin();
+ Map<String, Object> metricsFields = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(metricsFields);
+ numberMin.open(udfContext);
+ Accumulator agg = numberMin.initAccumulator(accumulator);
+ for (Number o : arr) {
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("field", o);
+ event.setExtractedFields(extractedFields);
+ agg = numberMin.add(event, agg);
+
+ }
+ Accumulator result = numberMin.getResult(agg);
+ assertEquals(clazz, result.getMetricsFields().get("field_min").getClass());
+ assertEquals(NumberFormat.getInstance().parse((result.getMetricsFields().get("field_min").toString())),NumberFormat.getInstance().parse("1"));
+ }
+
+
+
+
+} \ No newline at end of file