sources: inline_source: type: inline properties: data: '[{"tcp_rtt_ms":128,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]' format: json json.ignore.parse.errors: false filters: filter_operator: type: aviator properties: expression: event.server_ip != '12.12.12.12' splits: decoded_as_split: type: split rules: - tag: http_tag expression: event.decoded_as == 'HTTP' - tag: dns_tag expression: event.decoded_as == 'DNS' processing_pipelines: projection_processor: type: projection remove_fields: [http_request_line, http_response_line, http_response_content_type] functions: - function: DROP filter: event.server_ip == '4.4.4.4' aggregate_processor: type: aggregate output_fields: group_by_fields: [server_ip,server_port] window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time window_timestamp_field: recv_time window_size: 6 window_slide: 10 #滑动窗口步长 mini_batch: true functions: - function: NUMBER_SUM lookup_fields: [ sent_pkts ] output_fields: [ sent_pkts_sum ] - function: NUMBER_SUM lookup_fields: [ sent_bytes ] output_fields: [ sent_bytes_sum ] - function: COLLECT_LIST lookup_fields: [ client_port ] output_fields: [ client_port_list ] - function: COLLECT_SET lookup_fields: [ client_ip ] output_fields: [ client_ip_set ] - function: LONG_COUNT output_fields: [ sessions ] - function: MEAN lookup_fields: [ received_pkts ] output_fields: [ received_pkts_mean ] parameters: precision: 1 sinks: print_sink: type: print properties: format: json mode: log_warn application: env: name: example-inline-to-print parallelism: 3 shade.identifier: sm4 kms.type: vault pipeline: object-reuse: true execution: restart: strategy: none properties: hos.bucket.name.rtp_file: traffic_rtp_file_bucket hos.bucket.name.http_file: traffic_http_file_bucket hos.bucket.name.eml_file: traffic_eml_file_bucket hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket projection.encrypt.schema.registry.uri: 192.168.44.12:9999/v1/schema/session_record?option=encrypt_fields topology: - name: inline_source downstream: [decoded_as_split] - name: decoded_as_split tags: [http_tag, dns_tag] downstream: [ projection_processor, aggregate_processor] - name: projection_processor downstream: [ print_sink ] - name: aggregate_processor downstream: [ print_sink ] - name: print_sink downstream: []