diff options
| author | chaochaoc <[email protected]> | 2024-06-27 15:31:14 +0800 |
|---|---|---|
| committer | chaochaoc <[email protected]> | 2024-06-27 15:31:14 +0800 |
| commit | c077c16a3a55ed6d2be42ef1d43eefaf4e41fd29 (patch) | |
| tree | 202f84d3585eafe1086c261c8874a500bbab56da /src | |
| parent | 1ff8c985c7b7991eff68df30197cbb3e7c896509 (diff) | |
[GAL-602] refactor: impl sip doube-ways correlate
Diffstat (limited to 'src')
| -rw-r--r-- | src/main/resources/job.yml | 407 |
1 files changed, 388 insertions, 19 deletions
diff --git a/src/main/resources/job.yml b/src/main/resources/job.yml index ea0f869..6d838d0 100644 --- a/src/main/resources/job.yml +++ b/src/main/resources/job.yml @@ -3,16 +3,24 @@ job: parallelism: 1 active-pipeline: - console + - console1 source: - name: session-records - type: kafka + # type: kafka + # option: + # topic: SESSION-RECORD + # properties: + # bootstrap.servers: 192.168.44.12:9092 + # group.id: easy-stream-tester9 + # client.id: easy-stream-tester9 +# type: file +# option: +# path: E:\java-workspace\sip-rtp-correlation\feature\easy-refactor\src\main\resources\session-records.txt + type: socket option: - topic: SESSION-RECORD - properties: - bootstrap.servers: 192.168.44.12:9092 - group.id: easy-stream-tester9 - client.id: easy-stream-tester9 + hostname: localhost + port: 9999 format: json schema: - name: session_id @@ -49,6 +57,8 @@ source: data-type: INT - name: direction data-type: INT + - name: vsys_id + data-type: BIGINT - name: t_vsys_id data-type: BIGINT - name: flags @@ -212,21 +222,380 @@ source: pipeline: - name: console category: PRINT - on: split-for-error - - name: split-for-error +# on: sip-records + on: sip-double-way-records.ok + use-err: true + - name: console1 + category: PRINT + on: sip-records + use-err: true + - name: split-for-valid category: SPLIT on: session-records splits: - # Invalid stream dir - - name: error1-records - where: STREAM_DIR(flags) != 1 && STREAM_DIR(flags) != 2 && STREAM_DIR(flags) != 3 # Invalid ip or port - - name: error2-records + - name: error1-records where: NOT(IS_IP_ADDRESS(client_ip)) || NOT(IS_IP_ADDRESS(server_ip)) || client_port <= 0 || server_port <= 0 -# - name: split-by-protocol -# category: SPLIT -# splits: -# - name: rtp-records -# where: "decoded_as == 'RTP'" -# - name: sip-records -# where: "decoded_as == 'SIP'" + # Invalid stream dir + - name: error2-records + where: decoded_as == 'SIP' &&STREAM_DIR(flags) != 1 && STREAM_DIR(flags) != 2 && STREAM_DIR(flags) != 3 + # Invalid: SIP one-way stream and has invalid network address + - name: error3-records + where: decoded_as == 'SIP' && ( NOT(HAS_IP_ADDRESS(sip_originator_sdp_connect_ip, sip_responder_sdp_connect_ip)) || sip_originator_sdp_media_port <= 0 || sip_responder_sdp_media_port <= 0 ) + - name: error4-records + where: decoded_as == 'SIP' && STREAM_DIR(flags) == 3 && ( NOT( IS_IP_ADDRESS(sip_originator_sdp_connect_ip) ) || NOT( IS_IP_ADDRESS(sip_responder_sdp_connect_ip) ) ) + + ### Notes: If internal IP address correlate is needed, please uncomment the following two items + # # Invalid: SIP one-way stream and internal network address + # - name: internal-error1-records + # where: decoded_as == 'SIP' && NOT(HAS_EXTERNAL_IP_ADDRESS(sip_originator_sdp_connect_ip, sip_responder_sdp_connect_ip)) + # # Invalid: SIP double-way stream and internal network address + # - name: internal-error2-records + # where: decoded_as == 'SIP' && STREAM_DIR(flags) == 3 && ( NOT( IS_EXTERNAL_IP_ADDRESS(sip_originator_sdp_connect_ip) ) || NOT( IS_EXTERNAL_IP_ADDRESS(sip_responder_sdp_connect_ip) ) ) + - name: split-by-protocol + category: SPLIT + on: split-for-valid + splits: + - name: rtp-records + where: decoded_as == 'RTP' + - name: sip-records + where: decoded_as == 'SIP' + - name: sip-double-way-records + category: CORRELATE + on: sip-records + cache: + - name: v1 + type: VALUE + ttl: 1 minute + schema: + - name: session_id + data-type: BIGINT NOT NULL + - name: start_timestamp_ms + data-type: BIGINT NOT NULL + - name: start_timestamp + data-type: TIMESTAMP_LTZ(3) + - name: end_timestamp_ms + data-type: BIGINT NOT NULL + - name: decoded_as + data-type: STRING NOT NULL + - name: duration_ms + data-type: BIGINT NOT NULL + - name: tcp_handshake_latency_ms + data-type: BIGINT + - name: device_id + data-type: STRING NOT NULL + - name: out_link_id + data-type: BIGINT + - name: in_link_id + data-type: BIGINT + - name: device_tag + data-type: STRING + - name: data_center + data-type: STRING + - name: device_group + data-type: STRING + - name: sled_ip + data-type: STRING + - name: address_type + data-type: INT + - name: direction + data-type: INT + - name: vsys_id + data-type: BIGINT + - name: t_vsys_id + data-type: BIGINT + - name: flags + data-type: BIGINT + - name: flags_identify_info + data-type: STRING + ## Treatment + - name: security_rule_list + data-type: ARRAY<BIGINT> + - name: security_action + data-type: STRING + - name: monitor_rule_list + data-type: ARRAY<BIGINT> + - name: shaping_rule_list + data-type: ARRAY<BIGINT> + - name: sc_rule_list + data-type: ARRAY<BIGINT> + - name: statistics_rule_list + data-type: ARRAY<BIGINT> + - name: sc_rsp_raw + data-type: ARRAY<BIGINT> + - name: sc_rsp_decrypted + data-type: ARRAY<BIGINT> + - name: proxy_rule_list + data-type: ARRAY<BIGINT> + - name: proxy_action + data-type: STRING + - name: proxy_pinning_status + data-type: INT + - name: proxy_intercept_status + data-type: INT + - name: proxy_passthrough_reason + data-type: STRING + - name: proxy_client_side_latency_ms + data-type: BIGINT + - name: proxy_server_side_latency_ms + data-type: BIGINT + - name: proxy_client_side_version + data-type: STRING + - name: proxy_server_side_version + data-type: STRING + - name: proxy_cert_verify + data-type: INT + - name: proxy_intercept_error + data-type: STRING + - name: monitor_mirrored_pkts + data-type: INT + - name: monitor_mirrored_bytes + data-type: INT + ## Source + - name: client_ip + data-type: STRING + - name: client_port + data-type: INT + - name: client_os_desc + data-type: STRING + - name: client_geolocation + data-type: STRING + - name: client_country + data-type: STRING + - name: client_super_administrative_area + data-type: STRING + - name: client_administrative_area + data-type: STRING + - name: client_sub_administrative_area + data-type: STRING + - name: client_asn + data-type: BIGINT + - name: subscriber_id + data-type: STRING + - name: imei + data-type: STRING + - name: imsi + data-type: STRING + - name: phone_number + data-type: STRING + - name: apn + data-type: STRING + ## Destination + - name: server_ip + data-type: STRING + - name: server_port + data-type: INT + - name: server_os_desc + data-type: STRING + - name: server_geolocation + data-type: STRING + - name: server_country + data-type: STRING + - name: server_super_administrative_area + data-type: STRING + - name: server_administrative_area + data-type: STRING + - name: server_sub_administrative_area + data-type: STRING + - name: server_asn + data-type: BIGINT + - name: server_fqdn + data-type: STRING + - name: server_domain + data-type: STRING + - name: fqdn_category_list + data-type: ARRAY<BIGINT> + ## Application + - name: app_transition + data-type: STRING + - name: app + data-type: STRING + - name: app_debug_info + data-type: STRING + - name: app_content + data-type: STRING + - name: app_extra_info + data-type: STRING + ## Protocol + - name: ip_protocol + data-type: STRING + - name: decoded_path + data-type: STRING + ## SIP + - name: sip_call_id + data-type: STRING + - name: sip_originator_description + data-type: STRING + - name: sip_responder_description + data-type: STRING + - name: sip_user_agent + data-type: STRING + - name: sip_server + data-type: STRING + - name: sip_originator_sdp_connect_ip + data-type: STRING + - name: sip_originator_sdp_media_port + data-type: INT + - name: sip_originator_sdp_media_type + data-type: STRING + - name: sip_originator_sdp_content + data-type: STRING + - name: sip_responder_sdp_connect_ip + data-type: STRING + - name: sip_responder_sdp_media_port + data-type: INT + - name: sip_responder_sdp_media_type + data-type: STRING + - name: sip_responder_sdp_content + data-type: STRING + - name: sip_duration_s + data-type: INT + - name: sip_bye + data-type: STRING + ## RTP + - name: rtp_payload_type_c2s + data-type: INT + - name: rtp_payload_type_s2c + data-type: INT + - name: rtp_pcap_path + data-type: STRING + - name: rtp_originator_dir + data-type: INT + where: + - on: sip-records + key-by: vsys_id, sip_call_id, SORT_ADDRESS( client_ip, client_port, server_ip, server_port ) + process: + - if: STREAM_DIR(flags) != 3 && @v1.isNotNull && STREAM_DIR(@v1.$flags) != STREAM_DIR(flags) + then: + - |- + OUTPUT ok FROM session_id, + start_timestamp_ms, + withColumns(end_timestamp_ms to sip_call_id), + FIND_NOT_BLANK(@v1.$sip_originator_description, sip_originator_description) AS sip_originator_description, + FIND_NOT_BLANK(@v1.$sip_responder_description, sip_responder_description) AS sip_responder_description, + FIND_NOT_BLANK(@v1.$sip_user_agent, sip_user_agent) AS sip_user_agent, + FIND_NOT_BLANK(@v1.$sip_server, sip_server) AS sip_server, + FIND_NOT_BLANK(@v1.$sip_originator_sdp_connect_ip, sip_originator_sdp_connect_ip) AS sip_originator_sdp_connect_ip, + (@v1.$sip_originator_sdp_media_port > 0).?(@v1.$sip_originator_sdp_media_port, sip_originator_sdp_media_port) AS sip_originator_sdp_media_port, + FIND_NOT_BLANK(@v1.$sip_originator_sdp_media_type, sip_originator_sdp_media_type) AS sip_originator_sdp_media_type, + FIND_NOT_BLANK(@v1.$sip_originator_sdp_content, sip_originator_sdp_content) AS sip_originator_sdp_content, + FIND_NOT_BLANK(@v1.$sip_responder_sdp_connect_ip, sip_responder_sdp_connect_ip) AS sip_responder_sdp_connect_ip, + (@v1.$sip_responder_sdp_media_port > 0).?(@v1.$sip_responder_sdp_media_port, sip_responder_sdp_media_port) AS sip_responder_sdp_media_port, + FIND_NOT_BLANK(@v1.$sip_responder_sdp_media_type, sip_responder_sdp_media_type) AS sip_responder_sdp_media_type, + FIND_NOT_BLANK(@v1.$sip_responder_sdp_content, sip_responder_sdp_content) AS sip_responder_sdp_content, + @v1.$sip_duration_s + sip_duration_s AS sip_duration_s, + FIND_NOT_BLANK(@v1.$sip_bye, sip_bye) AS sip_bye, + rtp_payload_type_c2s, + rtp_payload_type_s2c, + rtp_pcap_path, + rtp_originator_dir + - TRUNCATE v1 + - if: STREAM_DIR(flags) != 3 && @v1.isNull + then: + - |- + SET v1 FROM withColumns(session_id to rtp_originator_dir) + - if: STREAM_DIR(flags) == 3 + then: + - |- + OUTPUT ok FROM session_id, + start_timestamp_ms, withColumns(end_timestamp_ms to rtp_originator_dir) + - SCHEDULING USING EVENT TIME FOR NOW + 60 * 1000 + schedule: + - if: '@v1.isNotNull' + then: + - |- + OUTPUT fail FROM @v1.$session_id AS session_id, + @v1.$start_timestamp_ms AS start_timestamp_ms, + @v1.$end_timestamp_ms AS end_timestamp_ms, + @v1.$decoded_as AS decoded_as, + @v1.$duration_ms AS duration_ms, + @v1.$tcp_handshake_latency_ms AS tcp_handshake_latency_ms, + @v1.$device_id AS device_id, + @v1.$out_link_id AS out_link_id, + @v1.$in_link_id AS in_link_id, + @v1.$device_tag AS device_tag, + @v1.$data_center AS data_center, + @v1.$device_group AS device_group, + @v1.$sled_ip AS sled_ip, + @v1.$address_type AS address_type, + @v1.$direction AS direction, + @v1.$vsys_id AS vsys_id, + @v1.$t_vsys_id AS t_vsys_id, + @v1.$flags AS flags, + @v1.$flags_identify_info AS flags_identify_info, + @v1.$security_rule_list AS security_rule_list, + @v1.$security_action AS security_action, + @v1.$monitor_rule_list AS monitor_rule_list, + @v1.$shaping_rule_list AS shaping_rule_list, + @v1.$sc_rule_list AS sc_rule_list, + @v1.$statistics_rule_list AS statistics_rule_list, + @v1.$sc_rsp_raw AS sc_rsp_raw, + @v1.$sc_rsp_decrypted AS sc_rsp_decrypted, + @v1.$proxy_rule_list AS proxy_rule_list, + @v1.$proxy_action AS proxy_action, + @v1.$proxy_pinning_status AS proxy_pinning_status, + @v1.$proxy_intercept_status AS proxy_intercept_status, + @v1.$proxy_passthrough_reason AS proxy_passthrough_reason, + @v1.$proxy_client_side_latency_ms AS proxy_client_side_latency_ms, + @v1.$proxy_server_side_latency_ms AS proxy_server_side_latency_ms, + @v1.$proxy_client_side_version AS proxy_client_side_version, + @v1.$proxy_server_side_version AS proxy_server_side_version, + @v1.$proxy_cert_verify AS proxy_cert_verify, + @v1.$proxy_intercept_error AS proxy_intercept_error, + @v1.$monitor_mirrored_pkts AS monitor_mirrored_pkts, + @v1.$monitor_mirrored_bytes AS monitor_mirrored_bytes, + @v1.$client_ip AS client_ip, + @v1.$client_port AS client_port, + @v1.$client_os_desc AS client_os_desc, + @v1.$client_geolocation AS client_geolocation, + @v1.$client_country AS client_country, + @v1.$client_super_administrative_area AS client_super_administrative_area, + @v1.$client_administrative_area AS client_administrative_area, + @v1.$client_sub_administrative_area AS client_sub_administrative_area, + @v1.$client_asn AS client_asn, + @v1.$subscriber_id AS subscriber_id, + @v1.$imei AS imei, + @v1.$imsi AS imsi, + @v1.$phone_number AS phone_number, + @v1.$apn AS apn, + @v1.$server_ip AS server_ip, + @v1.$server_port AS server_port, + @v1.$server_os_desc AS server_os_desc, + @v1.$server_geolocation AS server_geolocation, + @v1.$server_country AS server_country, + @v1.$server_super_administrative_area AS server_super_administrative_area, + @v1.$server_administrative_area AS server_administrative_area, + @v1.$server_sub_administrative_area AS server_sub_administrative_area, + @v1.$server_asn AS server_asn, + @v1.$server_fqdn AS server_fqdn, + @v1.$server_domain AS server_domain, + @v1.$fqdn_category_list AS fqdn_category_list, + @v1.$app_transition AS app_transition, + @v1.$app AS app, + @v1.$app_debug_info AS app_debug_info, + @v1.$app_content AS app_content, + @v1.$app_extra_info AS app_extra_info, + @v1.$ip_protocol AS ip_protocol, + @v1.$decoded_path AS decoded_path, + @v1.$sip_call_id AS sip_call_id, + @v1.$sip_originator_description AS sip_originator_description, + @v1.$sip_responder_description AS sip_responder_description, + @v1.$sip_user_agent AS sip_user_agent, + @v1.$sip_server AS sip_server, + @v1.$sip_originator_sdp_connect_ip AS sip_originator_sdp_connect_ip, + @v1.$sip_originator_sdp_media_port AS sip_originator_sdp_media_port, + @v1.$sip_originator_sdp_media_type AS sip_originator_sdp_media_type, + @v1.$sip_originator_sdp_content AS sip_originator_sdp_content, + @v1.$sip_responder_sdp_connect_ip AS sip_responder_sdp_connect_ip, + @v1.$sip_responder_sdp_media_port AS sip_responder_sdp_media_port, + @v1.$sip_responder_sdp_media_type AS sip_responder_sdp_media_type, + @v1.$sip_responder_sdp_content AS sip_responder_sdp_content, + @v1.$sip_duration_s AS sip_duration_s, + @v1.$sip_bye AS sip_bye, + @v1.$rtp_payload_type_c2s AS rtp_payload_type_c2s, + @v1.$rtp_payload_type_s2c AS rtp_payload_type_s2c, + @v1.$rtp_pcap_path AS rtp_pcap_path, + @v1.$rtp_originator_dir AS rtp_originator_dir + - TRUNCATE v1 + + |
