diff options
| author | 窦凤虎 <[email protected]> | 2024-11-01 10:14:03 +0000 |
|---|---|---|
| committer | 窦凤虎 <[email protected]> | 2024-11-01 10:14:03 +0000 |
| commit | f7cec560def3981d52f25fc038aab3d4308d4bd1 (patch) | |
| tree | 1bebf6ee0210b7d5fa50b43e75a5f54a37639177 /groot-tests | |
| parent | c0b9acfc3adc85abbd06207259b2515edc5c4eae (diff) | |
| parent | 7868728ddbe3dc08263b1d21b5ffce5dcd9b8052 (diff) | |
[feature][bootstrap][common]node新增tags属性用于分流,需要与downstream相对应。rules中name标签修改为t...
See merge request galaxy/platform/groot-stream!128
Diffstat (limited to 'groot-tests')
4 files changed, 243 insertions, 26 deletions
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java index b833115..4ac3d03 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java @@ -81,6 +81,10 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer { ".*Successful registration at resource manager.*") .withStartupTimeout(Duration.ofMinutes(2))); + // Copy groot-stream bootstrap and some other files to the container + copyGrootStreamStarterToContainer(taskManager); + copyGrootStreamStarterLoggingToContainer(taskManager); + Startables.deepStart(Stream.of(jobManager)).join(); Startables.deepStart(Stream.of(taskManager)).join(); // execute extra commands diff --git a/groot-tests/test-common/src/test/resources/grootstream.yaml b/groot-tests/test-common/src/test/resources/grootstream.yaml index 2eb105b..0def444 100644 --- a/groot-tests/test-common/src/test/resources/grootstream.yaml +++ b/groot-tests/test-common/src/test/resources/grootstream.yaml @@ -11,11 +11,4 @@ grootstream: files: - ip_builtin.mmdb properties: - hos.path: http://192.168.44.12:9098/hos - hos.bucket.name.traffic_file: traffic_file_bucket - hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket scheduler.knowledge_base.update.interval.minutes: 5 - hos.bucket.name.rtp_file: traffic_rtp_file_bucket - hos.bucket.name.http_file: traffic_http_file_bucket - hos.bucket.name.eml_file: traffic_eml_file_bucket - hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java index 1c1e777..fdba36f 100644 --- a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java +++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java @@ -27,30 +27,31 @@ import static org.awaitility.Awaitility.await; disabledReason = "Only flink adjusts the parameter configuration rules") public class InlineToPrintIT extends TestSuiteBase { + @TestTemplate - public void testInlineToPrint(AbstractTestFlinkContainer container) throws IOException, InterruptedException { + public void testJobExecution(AbstractTestFlinkContainer container) throws IOException, InterruptedException { CompletableFuture.supplyAsync( () -> { try { - List<String> variables = List.of( - "hos.bucket.name.rtp_file=cli_job_level_traffic_rtp_file_bucket", - "hos.bucket.name.http_file=cli_job_level_traffic_http_file_bucket"); - return container.executeJob("/inline_to_print.yaml", variables); + return container.executeJob("/inline_to_print.yaml"); } catch (Exception e) { - log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); } }); AtomicReference<String> taskMangerID = new AtomicReference<>(); + await().atMost(300000, TimeUnit.MILLISECONDS) .untilAsserted( () -> { Map<String, Object> taskMangerInfo = JSON.parseObject(container.executeJobManagerInnerCommand( "curl http://localhost:8081/taskmanagers"), new TypeReference<Map<String, Object>>() { }); + + @SuppressWarnings("unchecked") List<Map<String, Object>> taskManagers = (List<Map<String, Object>>) taskMangerInfo.get("taskmanagers"); + if (!CollectionUtils.isEmpty(taskManagers)) { taskMangerID.set(taskManagers.get(0).get("id").toString()); } @@ -64,6 +65,7 @@ public class InlineToPrintIT extends TestSuiteBase { Map<String, Object> jobInfo = JSON.parseObject(container.executeJobManagerInnerCommand( "curl http://localhost:8081/jobs/overview"), new TypeReference<Map<String, Object>>() { }); + @SuppressWarnings("unchecked") List<Map<String, Object>> jobs = (List<Map<String, Object>>) jobInfo.get("jobs"); if (!CollectionUtils.isEmpty(jobs)) { @@ -71,6 +73,7 @@ public class InlineToPrintIT extends TestSuiteBase { } Assertions.assertNotNull(jobId.get()); }); + //Obtain job metrics AtomicReference<List<Map<String, Object>>> jobNumRestartsReference = new AtomicReference<>(); await().atMost(60000, TimeUnit.MILLISECONDS) @@ -78,8 +81,8 @@ public class InlineToPrintIT extends TestSuiteBase { () -> { Thread.sleep(5000); String result = container.executeJobManagerInnerCommand( - String.format( - "curl http://localhost:8081/jobs/%s/metrics?get=numRestarts", jobId.get())); + String.format( + "curl http://localhost:8081/jobs/%s/metrics?get=numRestarts", jobId.get())); List<Map<String, Object>> jobNumRestartsInfo = JSON.parseObject(result, new TypeReference<List<Map<String, Object>>>() { }); if (!CollectionUtils.isEmpty(jobNumRestartsInfo)) { @@ -90,12 +93,57 @@ public class InlineToPrintIT extends TestSuiteBase { }); + + } + + @TestTemplate + public void testUserDefinedJobVariables(AbstractTestFlinkContainer container) throws IOException, InterruptedException { + + CompletableFuture.supplyAsync( + () -> { + try { + List<String> variables = List.of( + "hos.bucket.name.rtp_file=cli_job_level_traffic_rtp_file_bucket", + "hos.bucket.name.http_file=cli_job_level_traffic_http_file_bucket"); + return container.executeJob("/inline_to_print.yaml", variables); + } catch (Exception e) { + log.error("Commit task exception : {} ", e.getMessage()); + throw new RuntimeException(e); + } + }); + + await().atMost(300000, TimeUnit.MILLISECONDS) .untilAsserted( () -> { String logs = container.getServerLogs(); Assertions.assertTrue(StringUtils.countMatches(logs, "cli_job_level_traffic_rtp_file_bucket/test_pcap_file") > 10); Assertions.assertTrue(StringUtils.countMatches(logs, "cli_job_level_traffic_http_file_bucket/test_http_req_file") > 10); + // Test server_ip filter -> output logs not contains 4.4.4.4 of server_ip + Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && !StringUtils.contains(logs, "\"server_ip\":\"4.4.4.4\"")); + // Test Drop function -> output logs not contains 5.5.5.5 of server_ip + Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && !StringUtils.contains(logs, "\"server_ip\":\"5.5.5.5\"")); + + // Output logs contains server_asn + Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "\"server_asn\"")); + // Output logs contains server_domain + Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "\"server_domain\"")); + + // Output logs contains server_country + Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "\"server_country\"")); + // Output logs contains mail_attachment_name equals 中文测试 + Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "\"mail_attachment_name\":\"中文测试\"")); + // Test EVAL function -> output logs contains direction equals c2s + Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "\"direction\":\"c2s\"")); + // Test JSON Extract function -> output logs contains device_group equals XXG-TSG-BJ + Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "\"device_group\":\"XXG-TSG-BJ\"")); + + Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "client_ip_list")); + + + + + }); diff --git a/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml b/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml index b4773a1..2908ffb 100644 --- a/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml +++ b/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml @@ -2,48 +2,205 @@ sources: inline_source: type: inline properties: - data: '[{"tcp_rtt_ms":128,"decoded_as":"HTTP","rtp_pcap_path":"test_pcap_file","http_request_body":"test_http_req_file","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]' + data: [{"tcp_rtt_ms":128,"decoded_as":"DNS","rtp_pcap_path":"test_pcap_file", "security_rule_id_list": [1,10,100,300], "http_request_body":"test_http_req_file","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","ssl_sni":"www.ct.cn", "http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"flags":8192, "address_type":4,"mail_subject":"中文标题测试","mail_attachment_name":"5Lit5paH5rWL6K+V","mail_attachment_name_charset": "utf8","device_tag": "{\"tags\":[{\"tag\":\"data_center\",\"value\":\"XXG-TSG-BJ\"},{\"tag\":\"device_group\",\"value\":\"XXG-TSG-BJ\"}]}", "client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","mail_subject":"中文标题测试","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","mail_subject":"english subject test","http_request_line":"GET / HTTP/1.1","http_host":"www.5555.com","http_url":"www.5555.com/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.1","server_ip":"5.5.5.5","client_port":42751,"server_port":53,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.6666.cn","http_url":"www.6666.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","mail_subject":"中文标题测试","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.100.1","server_ip":"6.6.6.6","client_port":42751,"server_port":53,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}] format: json json.ignore.parse.errors: false filters: - filter_operator: - type: com.geedgenetworks.core.filter.AviatorFilter + server_ip_filter: + type: aviator properties: - expression: event.server_ip != '12.12.12.12' + expression: event.server_ip != '4.4.4.4' + +splits: + decoded_as_split: + type: split + rules: + - tag: http_tag + expression: event.decoded_as == 'HTTP' + - tag: dns_tag + expression: event.decoded_as == 'DNS' + processing_pipelines: projection_processor: type: projection remove_fields: [http_request_line, http_response_line, http_response_content_type] functions: + - function: DROP - filter: event.server_ip == '4.4.4.4' + filter: event.server_ip == '5.5.5.5' + + - function: SNOWFLAKE_ID + output_fields: [ log_id ] + parameters: + data_center_id_num: 1 + + - function: UUID + output_fields: [ log_uuid ] + + - function: UUIDv5 + lookup_fields: [ client_ip, server_ip ] + output_fields: [ ip_uuid ] + parameters: + namespace: NAMESPACE_IP + - function: UUIDv7 + output_fields: [ log_uuid_v7 ] + + - function: ASN_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_asn ] + parameters: + kb_name: tsg_ip_asn + option: IP_TO_ASN + + - function: GEOIP_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ ] + parameters: + kb_name: tsg_ip_location + option: IP_TO_OBJECT + geolocation_field_mapping: + COUNTRY: server_country + PROVINCE: server_super_administrative_area + CITY: server_administrative_area + LONGITUDE: server_longitude + LATITUDE: server_latitude + ISP: server_isp + ORGANIZATION: server_organization + + - function : BASE64_ENCODE_TO_STRING + lookup_fields: [ mail_subject ] + output_fields: [ mail_subject_base64 ] + parameters: + input_type: string + + - function: BASE64_DECODE_TO_STRING + output_fields: [ mail_attachment_name ] + parameters: + value_field: mail_attachment_name + charset_field: mail_attachment_name_charset + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [ current_unix_timestamp_ms ] + parameters: + precision: milliseconds + + - function: DOMAIN + lookup_fields: [ http_host, ssl_sni, quic_sni ] + output_fields: [ server_domain ] + parameters: + option: FIRST_SIGNIFICANT_SUBDOMAIN + + - function: EVAL + output_fields: [ recv_time ] + parameters: + value_expression: current_unix_timestamp_ms + + - function: EVAL + output_fields: [ direction ] + parameters: + value_expression: "(flags & 24576) == 24576 ? 'double' : ((flags & 8192) == 8192 ? 'c2s' : ((flags & 16384) == 16384 ? 's2c' : 'unknown'))" + + - function: EVAL + output_fields: [ constant_value ] + parameters: + value_expression: "'abc'" + + - function: JSON_EXTRACT + lookup_fields: [ device_tag ] + output_fields: [ device_group ] + parameters: + value_expression: $.tags[?(@.tag=='device_group')][0].value + + - function: FLATTEN + lookup_fields: [ device_tag ] + parameters: + prefix: olap + json_string_keys: [device_tag] + + - function: FROM_UNIX_TIMESTAMP + lookup_fields: [ current_unix_timestamp_ms ] + output_fields: [ current_time_str ] + parameters: + precision: milliseconds + + - function: GENERATE_STRING_ARRAY + lookup_fields: [server_ip, server_port] + output_fields: [server_ip_port] + - function: PATH_COMBINE lookup_fields: [ rtp_pcap_path ] output_fields: [ rtp_pcap_path ] parameters: path: [ props.hos.path, props.hos.bucket.name.rtp_file, rtp_pcap_path ] + - function: PATH_COMBINE lookup_fields: [ http_request_body ] output_fields: [ http_request_body ] parameters: path: [ props.hos.path, props.hos.bucket.name.http_file, http_request_body ] + - function: RENAME + parameters: + rename_fields: + current_unix_timestamp_ms: processing_time_ms + rename_expression: key = string.replace_all(key,'olap.device_tag.tags','device_tags'); return key; + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ __timestamp ] + output_fields: [stat_time_minute] + parameters: + precision: minutes + + dns_table_processor: + type: table + functions: + - function: UNROLL + lookup_fields: [ security_rule_id_list ] + output_fields: [ security_rule_id ] + + dns_aggregate_processor: + type: aggregate + group_by_fields: [ decoded_as ] + window_type: tumbling_processing_time + window_size: 5 + functions: + - function: LONG_COUNT + output_fields: [ count ] + - function: COLLECT_LIST + lookup_fields: [ client_ip ] + output_fields: [ client_ip_list ] + + + sinks: - print_sink: + global_print_sink: + type: print + properties: + format: json + mode: log_warn + dns_print_sink: type: print properties: format: json mode: log_warn + http_print_sink: + type: print + properties: + format: json + mode: log_warn + application: env: name: example-inline-to-print - parallelism: 3 + parallelism: 1 pipeline: object-reuse: true + properties: + hos.path: http://192.168.44.12:9098/hos + hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket hos.bucket.name.rtp_file: job_level_traffic_rtp_file_bucket hos.bucket.name.http_file: job_level_traffic_http_file_bucket hos.bucket.name.eml_file: job_level_traffic_eml_file_bucket @@ -51,10 +208,25 @@ application: topology: - name: inline_source - downstream: [filter_operator] - - name: filter_operator + downstream: [server_ip_filter] + - name: server_ip_filter downstream: [ projection_processor ] - name: projection_processor - downstream: [ print_sink ] - - name: print_sink + downstream: [ global_print_sink, decoded_as_split ] + parallelism: 2 + - name: decoded_as_split + tags: [http_tag, dns_tag] + downstream: [ http_print_sink, dns_table_processor ] + parallelism: 2 + - name: dns_table_processor + downstream: [ dns_aggregate_processor ] + parallelism: 2 + - name: dns_aggregate_processor + downstream: [ dns_print_sink ] + parallelism: 2 + - name: global_print_sink + downstream: [] + - name: http_print_sink + downstream: [] + - name: dns_print_sink downstream: []
\ No newline at end of file |
