summaryrefslogtreecommitdiff
path: root/groot-stream/templates/proxy_event.yaml.j2
blob: e793d381a7a950d665d7975e72252013e0bb75a7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
sources:
  kafka_source:
    type: kafka
    properties:
      topic: PROXY-EVENT
      kafka.bootstrap.servers: {{ kafka_source_servers }}
      kafka.client.id: PROXY-EVENT
      kafka.session.timeout.ms: 60000
      kafka.max.poll.records: 3000
      kafka.max.partition.fetch.bytes: 31457280
      kafka.security.protocol: SASL_PLAINTEXT
      kafka.sasl.mechanism: PLAIN
      kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252
      kafka.group.id: {{ kafka_source_group_id }}
      kafka.auto.offset.reset: latest
      format: json
      json.ignore.parse.errors: false

processing_pipelines:
  etl_processor:
    type: projection
    functions:
    - function: SNOWFLAKE_ID
      lookup_fields: ['']
      output_fields: [log_id]
      parameters:
        data_center_id_num: {{ data_center_id_num }}

    - function: UNIX_TIMESTAMP_CONVERTER
      lookup_fields: [__timestamp]
      output_fields: [recv_time]
      parameters:
        precision: seconds

    - function: EVAL
      output_fields: [ingestion_time]
      parameters:
        value_expression: recv_time

    - function: BASE64_DECODE_TO_STRING
      output_fields: [mail_subject]
      parameters:
        value_field: mail_subject
        charset_field: mail_subject_charset

    - function: BASE64_DECODE_TO_STRING
      output_fields: [mail_attachment_name]
      parameters:
        value_field: mail_attachment_name
        charset_field: mail_attachment_name_charset

    - function: PATH_COMBINE
      lookup_fields: [rtp_pcap_path]
      output_fields: [rtp_pcap_path]
      parameters:
        path: [props.hos.path, props.hos.bucket.name.rtp_file, rtp_pcap_path]

    - function: PATH_COMBINE
      lookup_fields: [http_request_body]
      output_fields: [http_request_body]
      parameters:
        path: [props.hos.path, props.hos.bucket.name.http_file, http_request_body]

    - function: PATH_COMBINE
      lookup_fields: [http_response_body]
      output_fields: [http_response_body]
      parameters:
        path: [props.hos.path, props.hos.bucket.name.http_file, http_response_body]

    - function: PATH_COMBINE
      lookup_fields: [mail_eml_file]
      output_fields: [mail_eml_file]
      parameters:
        path: [props.hos.path, props.hos.bucket.name.eml_file, mail_eml_file]

    - function: PATH_COMBINE
      lookup_fields: [packet_capture_file]
      output_fields: [packet_capture_file]
      parameters:
        path: [props.hos.path, props.hos.bucket.name.policy_capture_file, packet_capture_file]


    - function: CURRENT_UNIX_TIMESTAMP
      output_fields: [ processing_time ]
      parameters:
        precision: seconds

sinks:
  kafka_sink:
    type: kafka
    properties:
      topic: PROXY-EVENT
      kafka.bootstrap.servers: {{ kafka_sink_servers }}
      kafka.client.id: PROXY-EVENT
      kafka.retries: 0
      kafka.linger.ms: 10
      kafka.request.timeout.ms: 30000
      kafka.batch.size: 262144
      kafka.buffer.memory: 134217728
      kafka.max.request.size: 10485760
      kafka.compression.type: snappy
      kafka.security.protocol: SASL_PLAINTEXT
      kafka.sasl.mechanism: PLAIN
      kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252
      format: json
      json.ignore.parse.errors: false
      log.failures.only: true

  clickhouse_sink:
    type: clickhouse
    properties:
      host: {{ clickhouse_sink_host }}
      table: tsg_galaxy_v3.proxy_event_local
      batch.size: 100000
      batch.interval: 30s
      connection.user: e54c9568586180eede1506eecf3574e9
      connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e
      connection.connect_timeout: 30
      connection.query_timeout: 300

application:
  env: 
    name: {{ job_name }}
    shade.identifier: aes
    pipeline:
      object-reuse: true
    properties:
      hos.bucket.name.rtp_file: traffic_rtp_file_bucket
      hos.bucket.name.http_file: traffic_http_file_bucket
      hos.bucket.name.eml_file: traffic_eml_file_bucket
      hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket
  {{ topology }}