summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorchaochaoc <[email protected]>2024-06-27 15:31:14 +0800
committerchaochaoc <[email protected]>2024-06-27 15:31:14 +0800
commitc077c16a3a55ed6d2be42ef1d43eefaf4e41fd29 (patch)
tree202f84d3585eafe1086c261c8874a500bbab56da /src
parent1ff8c985c7b7991eff68df30197cbb3e7c896509 (diff)
[GAL-602] refactor: impl sip doube-ways correlate
Diffstat (limited to 'src')
-rw-r--r--src/main/resources/job.yml407
1 files changed, 388 insertions, 19 deletions
diff --git a/src/main/resources/job.yml b/src/main/resources/job.yml
index ea0f869..6d838d0 100644
--- a/src/main/resources/job.yml
+++ b/src/main/resources/job.yml
@@ -3,16 +3,24 @@ job:
parallelism: 1
active-pipeline:
- console
+ - console1
source:
- name: session-records
- type: kafka
+ # type: kafka
+ # option:
+ # topic: SESSION-RECORD
+ # properties:
+ # bootstrap.servers: 192.168.44.12:9092
+ # group.id: easy-stream-tester9
+ # client.id: easy-stream-tester9
+# type: file
+# option:
+# path: E:\java-workspace\sip-rtp-correlation\feature\easy-refactor\src\main\resources\session-records.txt
+ type: socket
option:
- topic: SESSION-RECORD
- properties:
- bootstrap.servers: 192.168.44.12:9092
- group.id: easy-stream-tester9
- client.id: easy-stream-tester9
+ hostname: localhost
+ port: 9999
format: json
schema:
- name: session_id
@@ -49,6 +57,8 @@ source:
data-type: INT
- name: direction
data-type: INT
+ - name: vsys_id
+ data-type: BIGINT
- name: t_vsys_id
data-type: BIGINT
- name: flags
@@ -212,21 +222,380 @@ source:
pipeline:
- name: console
category: PRINT
- on: split-for-error
- - name: split-for-error
+# on: sip-records
+ on: sip-double-way-records.ok
+ use-err: true
+ - name: console1
+ category: PRINT
+ on: sip-records
+ use-err: true
+ - name: split-for-valid
category: SPLIT
on: session-records
splits:
- # Invalid stream dir
- - name: error1-records
- where: STREAM_DIR(flags) != 1 && STREAM_DIR(flags) != 2 && STREAM_DIR(flags) != 3
# Invalid ip or port
- - name: error2-records
+ - name: error1-records
where: NOT(IS_IP_ADDRESS(client_ip)) || NOT(IS_IP_ADDRESS(server_ip)) || client_port <= 0 || server_port <= 0
-# - name: split-by-protocol
-# category: SPLIT
-# splits:
-# - name: rtp-records
-# where: "decoded_as == 'RTP'"
-# - name: sip-records
-# where: "decoded_as == 'SIP'"
+ # Invalid stream dir
+ - name: error2-records
+ where: decoded_as == 'SIP' &&STREAM_DIR(flags) != 1 && STREAM_DIR(flags) != 2 && STREAM_DIR(flags) != 3
+ # Invalid: SIP one-way stream and has invalid network address
+ - name: error3-records
+ where: decoded_as == 'SIP' && ( NOT(HAS_IP_ADDRESS(sip_originator_sdp_connect_ip, sip_responder_sdp_connect_ip)) || sip_originator_sdp_media_port <= 0 || sip_responder_sdp_media_port <= 0 )
+ - name: error4-records
+ where: decoded_as == 'SIP' && STREAM_DIR(flags) == 3 && ( NOT( IS_IP_ADDRESS(sip_originator_sdp_connect_ip) ) || NOT( IS_IP_ADDRESS(sip_responder_sdp_connect_ip) ) )
+
+ ### Notes: If internal IP address correlate is needed, please uncomment the following two items
+ # # Invalid: SIP one-way stream and internal network address
+ # - name: internal-error1-records
+ # where: decoded_as == 'SIP' && NOT(HAS_EXTERNAL_IP_ADDRESS(sip_originator_sdp_connect_ip, sip_responder_sdp_connect_ip))
+ # # Invalid: SIP double-way stream and internal network address
+ # - name: internal-error2-records
+ # where: decoded_as == 'SIP' && STREAM_DIR(flags) == 3 && ( NOT( IS_EXTERNAL_IP_ADDRESS(sip_originator_sdp_connect_ip) ) || NOT( IS_EXTERNAL_IP_ADDRESS(sip_responder_sdp_connect_ip) ) )
+ - name: split-by-protocol
+ category: SPLIT
+ on: split-for-valid
+ splits:
+ - name: rtp-records
+ where: decoded_as == 'RTP'
+ - name: sip-records
+ where: decoded_as == 'SIP'
+ - name: sip-double-way-records
+ category: CORRELATE
+ on: sip-records
+ cache:
+ - name: v1
+ type: VALUE
+ ttl: 1 minute
+ schema:
+ - name: session_id
+ data-type: BIGINT NOT NULL
+ - name: start_timestamp_ms
+ data-type: BIGINT NOT NULL
+ - name: start_timestamp
+ data-type: TIMESTAMP_LTZ(3)
+ - name: end_timestamp_ms
+ data-type: BIGINT NOT NULL
+ - name: decoded_as
+ data-type: STRING NOT NULL
+ - name: duration_ms
+ data-type: BIGINT NOT NULL
+ - name: tcp_handshake_latency_ms
+ data-type: BIGINT
+ - name: device_id
+ data-type: STRING NOT NULL
+ - name: out_link_id
+ data-type: BIGINT
+ - name: in_link_id
+ data-type: BIGINT
+ - name: device_tag
+ data-type: STRING
+ - name: data_center
+ data-type: STRING
+ - name: device_group
+ data-type: STRING
+ - name: sled_ip
+ data-type: STRING
+ - name: address_type
+ data-type: INT
+ - name: direction
+ data-type: INT
+ - name: vsys_id
+ data-type: BIGINT
+ - name: t_vsys_id
+ data-type: BIGINT
+ - name: flags
+ data-type: BIGINT
+ - name: flags_identify_info
+ data-type: STRING
+ ## Treatment
+ - name: security_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: security_action
+ data-type: STRING
+ - name: monitor_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: shaping_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: sc_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: statistics_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: sc_rsp_raw
+ data-type: ARRAY<BIGINT>
+ - name: sc_rsp_decrypted
+ data-type: ARRAY<BIGINT>
+ - name: proxy_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: proxy_action
+ data-type: STRING
+ - name: proxy_pinning_status
+ data-type: INT
+ - name: proxy_intercept_status
+ data-type: INT
+ - name: proxy_passthrough_reason
+ data-type: STRING
+ - name: proxy_client_side_latency_ms
+ data-type: BIGINT
+ - name: proxy_server_side_latency_ms
+ data-type: BIGINT
+ - name: proxy_client_side_version
+ data-type: STRING
+ - name: proxy_server_side_version
+ data-type: STRING
+ - name: proxy_cert_verify
+ data-type: INT
+ - name: proxy_intercept_error
+ data-type: STRING
+ - name: monitor_mirrored_pkts
+ data-type: INT
+ - name: monitor_mirrored_bytes
+ data-type: INT
+ ## Source
+ - name: client_ip
+ data-type: STRING
+ - name: client_port
+ data-type: INT
+ - name: client_os_desc
+ data-type: STRING
+ - name: client_geolocation
+ data-type: STRING
+ - name: client_country
+ data-type: STRING
+ - name: client_super_administrative_area
+ data-type: STRING
+ - name: client_administrative_area
+ data-type: STRING
+ - name: client_sub_administrative_area
+ data-type: STRING
+ - name: client_asn
+ data-type: BIGINT
+ - name: subscriber_id
+ data-type: STRING
+ - name: imei
+ data-type: STRING
+ - name: imsi
+ data-type: STRING
+ - name: phone_number
+ data-type: STRING
+ - name: apn
+ data-type: STRING
+ ## Destination
+ - name: server_ip
+ data-type: STRING
+ - name: server_port
+ data-type: INT
+ - name: server_os_desc
+ data-type: STRING
+ - name: server_geolocation
+ data-type: STRING
+ - name: server_country
+ data-type: STRING
+ - name: server_super_administrative_area
+ data-type: STRING
+ - name: server_administrative_area
+ data-type: STRING
+ - name: server_sub_administrative_area
+ data-type: STRING
+ - name: server_asn
+ data-type: BIGINT
+ - name: server_fqdn
+ data-type: STRING
+ - name: server_domain
+ data-type: STRING
+ - name: fqdn_category_list
+ data-type: ARRAY<BIGINT>
+ ## Application
+ - name: app_transition
+ data-type: STRING
+ - name: app
+ data-type: STRING
+ - name: app_debug_info
+ data-type: STRING
+ - name: app_content
+ data-type: STRING
+ - name: app_extra_info
+ data-type: STRING
+ ## Protocol
+ - name: ip_protocol
+ data-type: STRING
+ - name: decoded_path
+ data-type: STRING
+ ## SIP
+ - name: sip_call_id
+ data-type: STRING
+ - name: sip_originator_description
+ data-type: STRING
+ - name: sip_responder_description
+ data-type: STRING
+ - name: sip_user_agent
+ data-type: STRING
+ - name: sip_server
+ data-type: STRING
+ - name: sip_originator_sdp_connect_ip
+ data-type: STRING
+ - name: sip_originator_sdp_media_port
+ data-type: INT
+ - name: sip_originator_sdp_media_type
+ data-type: STRING
+ - name: sip_originator_sdp_content
+ data-type: STRING
+ - name: sip_responder_sdp_connect_ip
+ data-type: STRING
+ - name: sip_responder_sdp_media_port
+ data-type: INT
+ - name: sip_responder_sdp_media_type
+ data-type: STRING
+ - name: sip_responder_sdp_content
+ data-type: STRING
+ - name: sip_duration_s
+ data-type: INT
+ - name: sip_bye
+ data-type: STRING
+ ## RTP
+ - name: rtp_payload_type_c2s
+ data-type: INT
+ - name: rtp_payload_type_s2c
+ data-type: INT
+ - name: rtp_pcap_path
+ data-type: STRING
+ - name: rtp_originator_dir
+ data-type: INT
+ where:
+ - on: sip-records
+ key-by: vsys_id, sip_call_id, SORT_ADDRESS( client_ip, client_port, server_ip, server_port )
+ process:
+ - if: STREAM_DIR(flags) != 3 && @v1.isNotNull && STREAM_DIR(@v1.$flags) != STREAM_DIR(flags)
+ then:
+ - |-
+ OUTPUT ok FROM session_id,
+ start_timestamp_ms,
+ withColumns(end_timestamp_ms to sip_call_id),
+ FIND_NOT_BLANK(@v1.$sip_originator_description, sip_originator_description) AS sip_originator_description,
+ FIND_NOT_BLANK(@v1.$sip_responder_description, sip_responder_description) AS sip_responder_description,
+ FIND_NOT_BLANK(@v1.$sip_user_agent, sip_user_agent) AS sip_user_agent,
+ FIND_NOT_BLANK(@v1.$sip_server, sip_server) AS sip_server,
+ FIND_NOT_BLANK(@v1.$sip_originator_sdp_connect_ip, sip_originator_sdp_connect_ip) AS sip_originator_sdp_connect_ip,
+ (@v1.$sip_originator_sdp_media_port > 0).?(@v1.$sip_originator_sdp_media_port, sip_originator_sdp_media_port) AS sip_originator_sdp_media_port,
+ FIND_NOT_BLANK(@v1.$sip_originator_sdp_media_type, sip_originator_sdp_media_type) AS sip_originator_sdp_media_type,
+ FIND_NOT_BLANK(@v1.$sip_originator_sdp_content, sip_originator_sdp_content) AS sip_originator_sdp_content,
+ FIND_NOT_BLANK(@v1.$sip_responder_sdp_connect_ip, sip_responder_sdp_connect_ip) AS sip_responder_sdp_connect_ip,
+ (@v1.$sip_responder_sdp_media_port > 0).?(@v1.$sip_responder_sdp_media_port, sip_responder_sdp_media_port) AS sip_responder_sdp_media_port,
+ FIND_NOT_BLANK(@v1.$sip_responder_sdp_media_type, sip_responder_sdp_media_type) AS sip_responder_sdp_media_type,
+ FIND_NOT_BLANK(@v1.$sip_responder_sdp_content, sip_responder_sdp_content) AS sip_responder_sdp_content,
+ @v1.$sip_duration_s + sip_duration_s AS sip_duration_s,
+ FIND_NOT_BLANK(@v1.$sip_bye, sip_bye) AS sip_bye,
+ rtp_payload_type_c2s,
+ rtp_payload_type_s2c,
+ rtp_pcap_path,
+ rtp_originator_dir
+ - TRUNCATE v1
+ - if: STREAM_DIR(flags) != 3 && @v1.isNull
+ then:
+ - |-
+ SET v1 FROM withColumns(session_id to rtp_originator_dir)
+ - if: STREAM_DIR(flags) == 3
+ then:
+ - |-
+ OUTPUT ok FROM session_id,
+ start_timestamp_ms, withColumns(end_timestamp_ms to rtp_originator_dir)
+ - SCHEDULING USING EVENT TIME FOR NOW + 60 * 1000
+ schedule:
+ - if: '@v1.isNotNull'
+ then:
+ - |-
+ OUTPUT fail FROM @v1.$session_id AS session_id,
+ @v1.$start_timestamp_ms AS start_timestamp_ms,
+ @v1.$end_timestamp_ms AS end_timestamp_ms,
+ @v1.$decoded_as AS decoded_as,
+ @v1.$duration_ms AS duration_ms,
+ @v1.$tcp_handshake_latency_ms AS tcp_handshake_latency_ms,
+ @v1.$device_id AS device_id,
+ @v1.$out_link_id AS out_link_id,
+ @v1.$in_link_id AS in_link_id,
+ @v1.$device_tag AS device_tag,
+ @v1.$data_center AS data_center,
+ @v1.$device_group AS device_group,
+ @v1.$sled_ip AS sled_ip,
+ @v1.$address_type AS address_type,
+ @v1.$direction AS direction,
+ @v1.$vsys_id AS vsys_id,
+ @v1.$t_vsys_id AS t_vsys_id,
+ @v1.$flags AS flags,
+ @v1.$flags_identify_info AS flags_identify_info,
+ @v1.$security_rule_list AS security_rule_list,
+ @v1.$security_action AS security_action,
+ @v1.$monitor_rule_list AS monitor_rule_list,
+ @v1.$shaping_rule_list AS shaping_rule_list,
+ @v1.$sc_rule_list AS sc_rule_list,
+ @v1.$statistics_rule_list AS statistics_rule_list,
+ @v1.$sc_rsp_raw AS sc_rsp_raw,
+ @v1.$sc_rsp_decrypted AS sc_rsp_decrypted,
+ @v1.$proxy_rule_list AS proxy_rule_list,
+ @v1.$proxy_action AS proxy_action,
+ @v1.$proxy_pinning_status AS proxy_pinning_status,
+ @v1.$proxy_intercept_status AS proxy_intercept_status,
+ @v1.$proxy_passthrough_reason AS proxy_passthrough_reason,
+ @v1.$proxy_client_side_latency_ms AS proxy_client_side_latency_ms,
+ @v1.$proxy_server_side_latency_ms AS proxy_server_side_latency_ms,
+ @v1.$proxy_client_side_version AS proxy_client_side_version,
+ @v1.$proxy_server_side_version AS proxy_server_side_version,
+ @v1.$proxy_cert_verify AS proxy_cert_verify,
+ @v1.$proxy_intercept_error AS proxy_intercept_error,
+ @v1.$monitor_mirrored_pkts AS monitor_mirrored_pkts,
+ @v1.$monitor_mirrored_bytes AS monitor_mirrored_bytes,
+ @v1.$client_ip AS client_ip,
+ @v1.$client_port AS client_port,
+ @v1.$client_os_desc AS client_os_desc,
+ @v1.$client_geolocation AS client_geolocation,
+ @v1.$client_country AS client_country,
+ @v1.$client_super_administrative_area AS client_super_administrative_area,
+ @v1.$client_administrative_area AS client_administrative_area,
+ @v1.$client_sub_administrative_area AS client_sub_administrative_area,
+ @v1.$client_asn AS client_asn,
+ @v1.$subscriber_id AS subscriber_id,
+ @v1.$imei AS imei,
+ @v1.$imsi AS imsi,
+ @v1.$phone_number AS phone_number,
+ @v1.$apn AS apn,
+ @v1.$server_ip AS server_ip,
+ @v1.$server_port AS server_port,
+ @v1.$server_os_desc AS server_os_desc,
+ @v1.$server_geolocation AS server_geolocation,
+ @v1.$server_country AS server_country,
+ @v1.$server_super_administrative_area AS server_super_administrative_area,
+ @v1.$server_administrative_area AS server_administrative_area,
+ @v1.$server_sub_administrative_area AS server_sub_administrative_area,
+ @v1.$server_asn AS server_asn,
+ @v1.$server_fqdn AS server_fqdn,
+ @v1.$server_domain AS server_domain,
+ @v1.$fqdn_category_list AS fqdn_category_list,
+ @v1.$app_transition AS app_transition,
+ @v1.$app AS app,
+ @v1.$app_debug_info AS app_debug_info,
+ @v1.$app_content AS app_content,
+ @v1.$app_extra_info AS app_extra_info,
+ @v1.$ip_protocol AS ip_protocol,
+ @v1.$decoded_path AS decoded_path,
+ @v1.$sip_call_id AS sip_call_id,
+ @v1.$sip_originator_description AS sip_originator_description,
+ @v1.$sip_responder_description AS sip_responder_description,
+ @v1.$sip_user_agent AS sip_user_agent,
+ @v1.$sip_server AS sip_server,
+ @v1.$sip_originator_sdp_connect_ip AS sip_originator_sdp_connect_ip,
+ @v1.$sip_originator_sdp_media_port AS sip_originator_sdp_media_port,
+ @v1.$sip_originator_sdp_media_type AS sip_originator_sdp_media_type,
+ @v1.$sip_originator_sdp_content AS sip_originator_sdp_content,
+ @v1.$sip_responder_sdp_connect_ip AS sip_responder_sdp_connect_ip,
+ @v1.$sip_responder_sdp_media_port AS sip_responder_sdp_media_port,
+ @v1.$sip_responder_sdp_media_type AS sip_responder_sdp_media_type,
+ @v1.$sip_responder_sdp_content AS sip_responder_sdp_content,
+ @v1.$sip_duration_s AS sip_duration_s,
+ @v1.$sip_bye AS sip_bye,
+ @v1.$rtp_payload_type_c2s AS rtp_payload_type_c2s,
+ @v1.$rtp_payload_type_s2c AS rtp_payload_type_s2c,
+ @v1.$rtp_pcap_path AS rtp_pcap_path,
+ @v1.$rtp_originator_dir AS rtp_originator_dir
+ - TRUNCATE v1
+
+