summaryrefslogtreecommitdiff
path: root/config/grootstream_job_example.yaml
blob: 8c7a1b1ca4723107fa91f91fd141ae3fd548d830 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
sources:
  inline_source:
    type: inline
    properties:
      data: '[{"tcp_rtt_ms":128,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]'
      format: json
      json.ignore.parse.errors: false

filters:
  filter_operator:
    type: aviator
    properties:
      expression: event.server_ip != '12.12.12.12'
splits:
  decoded_as_split:
    type: split
    rules:
      - tag: http_tag
        expression: event.decoded_as == 'HTTP'
      - tag: dns_tag
        expression: event.decoded_as == 'DNS'
processing_pipelines:
  projection_processor:
    type: projection
    remove_fields: [http_request_line, http_response_line, http_response_content_type]
    functions:
      - function: DROP
        filter:  event.server_ip == '4.4.4.4'
  aggregate_processor:
    type: aggregate
    output_fields:
    group_by_fields: [server_ip,server_port]
    window_type: tumbling_processing_time  # tumbling_event_time,sliding_processing_time,sliding_event_time
    window_timestamp_field: recv_time
    window_size: 6
    window_slide: 10 #滑动窗口步长
    mini_batch: true
    functions:
      - function: NUMBER_SUM
        lookup_fields: [ sent_pkts ]
        output_fields: [ sent_pkts_sum ]
      - function: NUMBER_SUM
        lookup_fields: [ sent_bytes ]
        output_fields: [ sent_bytes_sum ]
      - function: COLLECT_LIST
        lookup_fields: [ client_port ]
        output_fields: [ client_port_list ]
      - function: COLLECT_SET
        lookup_fields: [ client_ip ]
        output_fields: [ client_ip_set ]
      - function: LONG_COUNT
        output_fields: [ sessions ]
      - function: MEAN
        lookup_fields: [ received_pkts ]
        output_fields: [ received_pkts_mean ]
        parameters:
          precision: 1
sinks:
  print_sink:
    type: print
    properties:
      format: json
      mode: log_warn

application:
  env:
    name: example-inline-to-print
    parallelism: 3
    shade.identifier: sm4
    kms.type: vault
    pipeline:
      object-reuse: true
    execution:
      restart:
        strategy: none
    properties:
      hos.bucket.name.rtp_file: traffic_rtp_file_bucket
      hos.bucket.name.http_file: traffic_http_file_bucket
      hos.bucket.name.eml_file: traffic_eml_file_bucket
      hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket
      projection.encrypt.schema.registry.uri: 192.168.44.12:9999/v1/schema/session_record?option=encrypt_fields
  topology:
    - name: inline_source
      downstream: [decoded_as_split]
    - name: decoded_as_split
      tags: [http_tag, dns_tag]
      downstream: [ projection_processor, aggregate_processor]
    - name: projection_processor
      downstream: [ print_sink ]
    - name: aggregate_processor
      downstream: [ print_sink ]
    - name: print_sink
      downstream: []