From 3b4034993c5812ca239c4824d8101b1cca567b5c Mon Sep 17 00:00:00 2001 From: wangkuan Date: Mon, 21 Oct 2024 16:10:04 +0800 Subject: [feature][core]新增聚合函数NumberMax,NumberMin,增加agg-dos-protection_rule_metric任务以及单元测试 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bootstrap/main/simple/JobDosTest.java | 93 +++++++++++++++ .../test/resources/grootstream_job_dos_test.yaml | 130 +++++++++++++++++++++ 2 files changed, 223 insertions(+) create mode 100644 groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobDosTest.java create mode 100644 groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml (limited to 'groot-bootstrap') diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobDosTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobDosTest.java new file mode 100644 index 0000000..ea3793e --- /dev/null +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobDosTest.java @@ -0,0 +1,93 @@ +package com.geedgenetworks.bootstrap.main.simple; + +import cn.hutool.setting.yaml.YamlUtil; +import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs; +import com.geedgenetworks.bootstrap.enums.EngineType; +import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName; +import com.geedgenetworks.bootstrap.execution.JobExecution; +import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink; +import com.geedgenetworks.bootstrap.utils.CommandLineUtils; +import com.geedgenetworks.bootstrap.utils.ConfigFileUtils; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.config.ConfigProvider; +import com.geedgenetworks.common.config.GrootStreamConfig; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigObject; +import com.typesafe.config.ConfigUtil; +import com.typesafe.config.ConfigValueFactory; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; + +import java.nio.file.Path; +import java.util.Map; + + +public class JobDosTest { + + @ClassRule + public static MiniClusterWithClientResource flinkCluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberSlotsPerTaskManager(1) + .setNumberTaskManagers(1) + .build()); + + @Test + public void testSplit() { + + CollectSink.values.clear(); + String[] args ={"--target", "test", "-c", ".\\grootstream_job_dos_test.yaml"}; + ExecuteCommandArgs executeCommandArgs = CommandLineUtils + .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); + + executeCommandArgs.buildCommand(); + + + GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig(); + Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs); + // check config file exist + Map configMap = YamlUtil.loadByPath(configFile.toString()); + ConfigObject configObject = ConfigValueFactory.fromMap(configMap); + Config config = configObject.toConfig(); + + config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE), + ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); + + + JobExecution jobExecution = new JobExecution(config, grootStreamConfig); + jobExecution.execute(); + // Assert.assertEquals(7, CollectSink.values.size()); + + + Assert.assertEquals(3, CollectSink.values.size()); + Assert.assertEquals("200", CollectSink.values.get(1).getExtractedFields().get("sessions").toString()); + Assert.assertEquals("2000", CollectSink.values.get(1).getExtractedFields().get("packets").toString()); + Assert.assertEquals("20000", CollectSink.values.get(1).getExtractedFields().get("bytes").toString()); + Assert.assertEquals("66.67", CollectSink.values.get(1).getExtractedFields().get("session_rate").toString()); + Assert.assertEquals("666.67", CollectSink.values.get(1).getExtractedFields().get("packet_rate").toString()); + Assert.assertEquals("53333.33", CollectSink.values.get(1).getExtractedFields().get("bit_rate").toString()); + + Assert.assertTrue( CollectSink.values.get(1).getExtractedFields().containsKey("log_id")); + Assert.assertTrue(CollectSink.values.get(1).getExtractedFields().containsKey("recv_time")); + Assert.assertEquals("1729476003", CollectSink.values.get(1).getExtractedFields().get("end_time").toString()); + Assert.assertEquals("1729476000", CollectSink.values.get(1).getExtractedFields().get("start_time").toString()); + Assert.assertEquals("1729476003", CollectSink.values.get(1).getExtractedFields().get("end_time").toString()); + Assert.assertEquals("3", CollectSink.values.get(1).getExtractedFields().get("duration").toString()); + + + + Assert.assertEquals("1.2.2.2", CollectSink.values.get(1).getExtractedFields().get("destination_ip").toString()); + Assert.assertEquals("1.1.1.1", CollectSink.values.get(1).getExtractedFields().get("source_ip").toString()); + + Assert.assertEquals("CN", CollectSink.values.get(1).getExtractedFields().get("source_country").toString()); + Assert.assertEquals("1.1.1.1", CollectSink.values.get(1).getExtractedFields().get("source_ip").toString()); + Assert.assertEquals("1.2.2.2", CollectSink.values.get(1).getExtractedFields().get("destination_ip").toString()); + Assert.assertEquals("US", CollectSink.values.get(1).getExtractedFields().get("destination_country").toString()); + Assert.assertEquals("123", CollectSink.values.get(1).getExtractedFields().get("rule_uuid").toString()); + + } + +} diff --git a/groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml new file mode 100644 index 0000000..7473939 --- /dev/null +++ b/groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml @@ -0,0 +1,130 @@ +sources: + + inline_source: + type : inline + watermark_timestamp: timestamp_ms + watermark_timestamp_unit: ms + watermark_lag: 10 + fields: # [array of object] Field List, if not set, all fields(Map) will be output. + properties: + data: '[{"sessions":100,"pkts":1000,"bytes":10000,"timestamp_ms":1729476000001,"vsys_id": 1,"duration": 60000,"rule_uuid": "123","client_ip":"1.1.1.2","server_ip":"1.2.2.2","client_country":"CN","server_country":"US"},{"sessions":100,"pkts":1000,"bytes":10000,"timestamp_ms":1729476000001,"vsys_id": 1,"duration": 60000,"rule_uuid": "123","client_ip":"1.1.1.1","server_ip":"1.2.2.2","client_country":"CN","server_country":"US"},{"timestamp_ms":1729476003000,"vsys_id": 1,"duration": 60000,"rule_uuid": "123","client_ip":"1.1.1.1","server_ip":"1.2.2.2","client_country":"CN1","server_country":"US1","sessions":100,"pkts":1000,"bytes":10000},{"timestamp_ms":1729477003000,"vsys_id": 1,"duration": 60000,"rule_uuid": "123","client_ip":"1.1.1.1","server_ip":"1.2.2.2","client_country":"CN1","server_country":"US1","sessions":100,"pkts":1000,"bytes":10000}]' + interval.per.row: 2s # 可选 + repeat.count: 1 # 可选 + format: json + json.ignore.parse.errors: false + + + +postprocessing_pipelines: + + pre_etl_processor: # [object] Processing Pipeline + type: projection + functions: # [array of object] Function List + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [ recv_time ] + parameters: + precision: seconds + aggregate_processor: + type: aggregate + group_by_fields: [vsys_id,rule_uuid,server_ip,client_ip] + window_type: tumbling_event_time # tumbling_event_time,sliding_processing_time,sliding_event_time + window_size: 600 + functions: + - function: NUMBER_SUM + lookup_fields: [ sessions ] + - function: NUMBER_SUM + lookup_fields: [ bytes ] + - function: NUMBER_SUM + lookup_fields: [ pkts ] + output_fields: [ packets ] + - function: FIRST_VALUE + lookup_fields: [ client_country ] + - function: FIRST_VALUE + lookup_fields: [ server_country ] + - function: NUMBER_MIN + lookup_fields: [ timestamp_ms ] + output_fields: [ start_timestamp_ms ] + - function: NUMBER_MIN + lookup_fields: [ recv_time ] + - function: NUMBER_MAX + lookup_fields: [ timestamp_ms ] + output_fields: [ end_timestamp_ms ] + - function: FIRST_VALUE + lookup_fields: [ duration ] + post_etl_processor: # [object] Processing Pipeline + type: projection + remove_fields: + output_fields: + functions: # [array of object] Function List + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ end_timestamp_ms ] + output_fields: [ end_time ] + parameters: + precision: seconds + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ start_timestamp_ms ] + output_fields: [ start_time ] + parameters: + precision: seconds + - function: EVAL + output_fields: [ duration ] + parameters: + value_expression: "((end_time-start_time) > 0)? (end_time-start_time) : (duration/1000)" + - function: EVAL + output_fields: [ end_time ] + parameters: + value_expression: start_time + duration + - function: EVAL + output_fields: [ session_rate ] + parameters: + value_expression: math.round((double(sessions) / duration )*100)/100.0 + - function: EVAL + output_fields: [ packet_rate ] + parameters: + value_expression: math.round((double(packets) / duration ) *100)/100.0 + - function: EVAL + output_fields: [ bit_rate ] + parameters: + value_expression: math.round((double((bytes*8)) / duration) *100)/100.0 + - function: RENAME + parameters: + rename_fields: + client_ip: source_ip + client_country: source_country + server_ip: destination_ip + server_country: destination_country + - function: SNOWFLAKE_ID + lookup_fields: [''] + output_fields: [log_id] + parameters: + data_center_id_num: 1 + +sinks: + + collect_sink: + type: collect + properties: + format: json + +application: # [object] Application Configuration + + env: # [object] Environment Variables + name: groot-stream-job # [string] Job Name + parallelism: 1 + properties: + k: v + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + + + topology: + - name: inline_source + downstream: [pre_etl_processor] + - name: pre_etl_processor + downstream: [aggregate_processor] + - name: aggregate_processor + downstream: [ post_etl_processor ] + - name: post_etl_processor + downstream: [ collect_sink] + - name: collect_sink + -- cgit v1.2.3