summaryrefslogtreecommitdiff
path: root/groot-bootstrap
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-10-21 16:10:04 +0800
committerwangkuan <[email protected]>2024-10-21 16:10:04 +0800
commit3b4034993c5812ca239c4824d8101b1cca567b5c (patch)
treefd41b8516a31ea6c714a71fa56597f3649a89001 /groot-bootstrap
parent72ba1827fb4a5ccf05e450a83dc930766c9f95e3 (diff)
[feature][core]新增聚合函数NumberMax,NumberMin,增加agg-dos-protection_rule_metric任务以及单元测试
Diffstat (limited to 'groot-bootstrap')
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobDosTest.java93
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml130
2 files changed, 223 insertions, 0 deletions
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobDosTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobDosTest.java
new file mode 100644
index 0000000..ea3793e
--- /dev/null
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobDosTest.java
@@ -0,0 +1,93 @@
+package com.geedgenetworks.bootstrap.main.simple;
+
+import cn.hutool.setting.yaml.YamlUtil;
+import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs;
+import com.geedgenetworks.bootstrap.enums.EngineType;
+import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName;
+import com.geedgenetworks.bootstrap.execution.JobExecution;
+import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink;
+import com.geedgenetworks.bootstrap.utils.CommandLineUtils;
+import com.geedgenetworks.bootstrap.utils.ConfigFileUtils;
+import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.config.ConfigProvider;
+import com.geedgenetworks.common.config.GrootStreamConfig;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigObject;
+import com.typesafe.config.ConfigUtil;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.nio.file.Path;
+import java.util.Map;
+
+
+public class JobDosTest {
+
+ @ClassRule
+ public static MiniClusterWithClientResource flinkCluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(1)
+ .setNumberTaskManagers(1)
+ .build());
+
+ @Test
+ public void testSplit() {
+
+ CollectSink.values.clear();
+ String[] args ={"--target", "test", "-c", ".\\grootstream_job_dos_test.yaml"};
+ ExecuteCommandArgs executeCommandArgs = CommandLineUtils
+ .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
+
+ executeCommandArgs.buildCommand();
+
+
+ GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig();
+ Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs);
+ // check config file exist
+ Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString());
+ ConfigObject configObject = ConfigValueFactory.fromMap(configMap);
+ Config config = configObject.toConfig();
+
+ config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE),
+ ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget()));
+
+
+ JobExecution jobExecution = new JobExecution(config, grootStreamConfig);
+ jobExecution.execute();
+ // Assert.assertEquals(7, CollectSink.values.size());
+
+
+ Assert.assertEquals(3, CollectSink.values.size());
+ Assert.assertEquals("200", CollectSink.values.get(1).getExtractedFields().get("sessions").toString());
+ Assert.assertEquals("2000", CollectSink.values.get(1).getExtractedFields().get("packets").toString());
+ Assert.assertEquals("20000", CollectSink.values.get(1).getExtractedFields().get("bytes").toString());
+ Assert.assertEquals("66.67", CollectSink.values.get(1).getExtractedFields().get("session_rate").toString());
+ Assert.assertEquals("666.67", CollectSink.values.get(1).getExtractedFields().get("packet_rate").toString());
+ Assert.assertEquals("53333.33", CollectSink.values.get(1).getExtractedFields().get("bit_rate").toString());
+
+ Assert.assertTrue( CollectSink.values.get(1).getExtractedFields().containsKey("log_id"));
+ Assert.assertTrue(CollectSink.values.get(1).getExtractedFields().containsKey("recv_time"));
+ Assert.assertEquals("1729476003", CollectSink.values.get(1).getExtractedFields().get("end_time").toString());
+ Assert.assertEquals("1729476000", CollectSink.values.get(1).getExtractedFields().get("start_time").toString());
+ Assert.assertEquals("1729476003", CollectSink.values.get(1).getExtractedFields().get("end_time").toString());
+ Assert.assertEquals("3", CollectSink.values.get(1).getExtractedFields().get("duration").toString());
+
+
+
+ Assert.assertEquals("1.2.2.2", CollectSink.values.get(1).getExtractedFields().get("destination_ip").toString());
+ Assert.assertEquals("1.1.1.1", CollectSink.values.get(1).getExtractedFields().get("source_ip").toString());
+
+ Assert.assertEquals("CN", CollectSink.values.get(1).getExtractedFields().get("source_country").toString());
+ Assert.assertEquals("1.1.1.1", CollectSink.values.get(1).getExtractedFields().get("source_ip").toString());
+ Assert.assertEquals("1.2.2.2", CollectSink.values.get(1).getExtractedFields().get("destination_ip").toString());
+ Assert.assertEquals("US", CollectSink.values.get(1).getExtractedFields().get("destination_country").toString());
+ Assert.assertEquals("123", CollectSink.values.get(1).getExtractedFields().get("rule_uuid").toString());
+
+ }
+
+}
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml
new file mode 100644
index 0000000..7473939
--- /dev/null
+++ b/groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml
@@ -0,0 +1,130 @@
+sources:
+
+ inline_source:
+ type : inline
+ watermark_timestamp: timestamp_ms
+ watermark_timestamp_unit: ms
+ watermark_lag: 10
+ fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
+ properties:
+ data: '[{"sessions":100,"pkts":1000,"bytes":10000,"timestamp_ms":1729476000001,"vsys_id": 1,"duration": 60000,"rule_uuid": "123","client_ip":"1.1.1.2","server_ip":"1.2.2.2","client_country":"CN","server_country":"US"},{"sessions":100,"pkts":1000,"bytes":10000,"timestamp_ms":1729476000001,"vsys_id": 1,"duration": 60000,"rule_uuid": "123","client_ip":"1.1.1.1","server_ip":"1.2.2.2","client_country":"CN","server_country":"US"},{"timestamp_ms":1729476003000,"vsys_id": 1,"duration": 60000,"rule_uuid": "123","client_ip":"1.1.1.1","server_ip":"1.2.2.2","client_country":"CN1","server_country":"US1","sessions":100,"pkts":1000,"bytes":10000},{"timestamp_ms":1729477003000,"vsys_id": 1,"duration": 60000,"rule_uuid": "123","client_ip":"1.1.1.1","server_ip":"1.2.2.2","client_country":"CN1","server_country":"US1","sessions":100,"pkts":1000,"bytes":10000}]'
+ interval.per.row: 2s # 可选
+ repeat.count: 1 # 可选
+ format: json
+ json.ignore.parse.errors: false
+
+
+
+postprocessing_pipelines:
+
+ pre_etl_processor: # [object] Processing Pipeline
+ type: projection
+ functions: # [array of object] Function List
+ - function: CURRENT_UNIX_TIMESTAMP
+ output_fields: [ recv_time ]
+ parameters:
+ precision: seconds
+ aggregate_processor:
+ type: aggregate
+ group_by_fields: [vsys_id,rule_uuid,server_ip,client_ip]
+ window_type: tumbling_event_time # tumbling_event_time,sliding_processing_time,sliding_event_time
+ window_size: 600
+ functions:
+ - function: NUMBER_SUM
+ lookup_fields: [ sessions ]
+ - function: NUMBER_SUM
+ lookup_fields: [ bytes ]
+ - function: NUMBER_SUM
+ lookup_fields: [ pkts ]
+ output_fields: [ packets ]
+ - function: FIRST_VALUE
+ lookup_fields: [ client_country ]
+ - function: FIRST_VALUE
+ lookup_fields: [ server_country ]
+ - function: NUMBER_MIN
+ lookup_fields: [ timestamp_ms ]
+ output_fields: [ start_timestamp_ms ]
+ - function: NUMBER_MIN
+ lookup_fields: [ recv_time ]
+ - function: NUMBER_MAX
+ lookup_fields: [ timestamp_ms ]
+ output_fields: [ end_timestamp_ms ]
+ - function: FIRST_VALUE
+ lookup_fields: [ duration ]
+ post_etl_processor: # [object] Processing Pipeline
+ type: projection
+ remove_fields:
+ output_fields:
+ functions: # [array of object] Function List
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [ end_timestamp_ms ]
+ output_fields: [ end_time ]
+ parameters:
+ precision: seconds
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [ start_timestamp_ms ]
+ output_fields: [ start_time ]
+ parameters:
+ precision: seconds
+ - function: EVAL
+ output_fields: [ duration ]
+ parameters:
+ value_expression: "((end_time-start_time) > 0)? (end_time-start_time) : (duration/1000)"
+ - function: EVAL
+ output_fields: [ end_time ]
+ parameters:
+ value_expression: start_time + duration
+ - function: EVAL
+ output_fields: [ session_rate ]
+ parameters:
+ value_expression: math.round((double(sessions) / duration )*100)/100.0
+ - function: EVAL
+ output_fields: [ packet_rate ]
+ parameters:
+ value_expression: math.round((double(packets) / duration ) *100)/100.0
+ - function: EVAL
+ output_fields: [ bit_rate ]
+ parameters:
+ value_expression: math.round((double((bytes*8)) / duration) *100)/100.0
+ - function: RENAME
+ parameters:
+ rename_fields:
+ client_ip: source_ip
+ client_country: source_country
+ server_ip: destination_ip
+ server_country: destination_country
+ - function: SNOWFLAKE_ID
+ lookup_fields: ['']
+ output_fields: [log_id]
+ parameters:
+ data_center_id_num: 1
+
+sinks:
+
+ collect_sink:
+ type: collect
+ properties:
+ format: json
+
+application: # [object] Application Configuration
+
+ env: # [object] Environment Variables
+ name: groot-stream-job # [string] Job Name
+ parallelism: 1
+ properties:
+ k: v
+ pipeline:
+ object-reuse: true # [boolean] Object Reuse, default is false
+
+
+ topology:
+ - name: inline_source
+ downstream: [pre_etl_processor]
+ - name: pre_etl_processor
+ downstream: [aggregate_processor]
+ - name: aggregate_processor
+ downstream: [ post_etl_processor ]
+ - name: post_etl_processor
+ downstream: [ collect_sink]
+ - name: collect_sink
+