1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
|
#############################################
# Groot Stream Job Template #
#############################################
#
# This section is used to define [object] Source List. eg. kafka source, inline source, etc.
#
sources: # [object] Define connector source
kafka_source: # [object] Kafka source connector name, must be unique. It used to define the source node of the job topology.
type: kafka # [string] Source Type
schema: # [object] Source Schema, config through fields or local_file or url. if not set schema, all fields(Map<String, Object>) will be output.
#fields: "struct<log_id:bigint, recv_time:bigint,client_ip: string>"
local_file: /../schema/kafka_source_schema.json # [string] Local File Path for Schema
#url: http:// # [string] URL for Schema
properties: # [object] Kafka source properties
topic: SESSION-RECORD # [string] Topic Name, consumer will subscribe this topic.
kafka.bootstrap.servers: 127.0.0.1:9092 # [string] Kafka Bootstrap Servers, if you have multiple servers, use comma to separate them.
# The minimum amount of data the server should return for a fetch request.
# If insufficient data is available the request will wait for that much data to accumulate before answering the request.
# The default setting of 1 byte means that fetch requests are answered as soon as that many byte(s) of data is available or the fetch request times out waiting for data to arrive.
# Setting this to a larger value will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency
kafka.fetch.min.bytes: 1
# The timeout used to detect client failures when using Kafka’s group management facility.
# The client sends periodic heartbeats to indicate its liveness to the broker.
# If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this client from the group and initiate a rebalance.
# Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.
kafka.session.timeout.ms: 60000 # [number] Kafka Session Timeout, default is 45 seconds
# Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group.
# The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value.
# It can be adjusted even lower to control the expected time for normal rebalances.
kafka.heartbeat.interval.ms: 10000 # [number] The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities.
# The maximum number of records returned in a single call to poll().
# Note, that max.poll.records does not impact the underlying fetching behavior.
# The consumer will cache the records from each fetch request and returns them incrementally from each poll.
kafka.max.poll.records: 3000
# The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer.
# If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress.
# The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). See fetch.max.bytes for limiting the consumer request size.
kafka.max.partition.fetch.bytes: 31457280
kafka.security.protocol: SSL
kafka.ssl.endpoint.identification.algorithm: ""
kafka.ssl.keystore.location: /data/tsg/olap/flink/topology/data/keystore.jks
kafka.ssl.keystore.password: 86cf0e2ffba3f541a6c6761313e5cc7e
kafka.ssl.truststore.location: /data/tsg/olap/flink/topology/data/truststore.jks
kafka.ssl.truststore.password: 86cf0e2ffba3f541a6c6761313e5cc7e
kafka.ssl.key.password: 86cf0e2ffba3f541a6c6761313e5cc7e
#kafka.security.protocol: SASL_PLAINTEXT
#kafka.sasl.mechanism: PLAIN
#kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
kafka.group.id: SESSION-RECORD-GROUP-GROOT-STREAM-001 # [string] Kafka Group ID for Consumer
kafka.auto.offset.reset: latest # [string] Kafka Auto Offset Reset, default is latest
format: json # [string] Data Format for Source. eg. json, protobuf, etc.
json.ignore.parse.errors: false # [boolean] Flag to ignore parse errors, default will record the parse errors. If set true, it will ignore the parse errors.
inline_source: # [object] Inline source connector name, must be unique. It used to define the source node of the job topology.
type: inline
schema:
fields: # [array of object] Schema field projection, support read data only from specified fields.
- name: log_id
type: bigint
- name: recv_time
type: bigint
- name: server_fqdn
type: string
- name: server_domain
type: string
- name: client_ip
type: string
- name: server_ip
type: string
- name: server_asn
type: string
- name: decoded_as
type: string
- name: device_group
type: string
- name: device_tag
type: string
properties:
#
# [string] Event Data, it will be parsed to Map<String, Object> by the specified format.
#
data: '[{"tcp_rtt_ms":128,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]'
format: json
json.ignore.parse.errors: false
interval.per.row: 5s
repeat.count: 10
inline_source_protobuf:
type : inline
properties:
data: CIin2awGEICAoLC/hYzKAhoEQkFTRSCch8z3wtqEhAQo6o/Xmc0xMMCy15nNMTjWIkDRCEiIp9msBlCIp9msBloIMjE0MjYwMDNg//8DaP//A3JqeyJ0YWdzIjpbeyJ0YWciOiJkYXRhX2NlbnRlciIsInZhbHVlIjoiY2VudGVyLXh4Zy05MTQwIn0seyJ0YWciOiJkZXZpY2VfZ3JvdXAiLCJ2YWx1ZSI6Imdyb3VwLXh4Zy05MTQwIn1dfXoPY2VudGVyLXh4Zy05MTQwggEOZ3JvdXAteHhnLTkxNDCKAQ0xOTIuMTY4LjQwLjgxkAEEmAEBoAEBqAGQwAGyAQdbMSwxLDJd4gEDt+gY4gINMTkyLjU2LjE1MS44MOgCoeYD8gIHV2luZG93c/oCGOe+juWbvS5Vbmtub3duLlVua25vd24uLrIDDTE5Mi41Ni4yMjIuOTO4A/ZwwgMFTGludXjKAxjnvo7lm70uVW5rbm93bi5Vbmtub3duLi6SBAN0Y3CaBBFFVEhFUk5FVC5JUHY0LlRDULAMBLgMBcAM9gHIDJEOoA2AAagN8cr+jgKwDezksIAPwg0RYTI6ZmE6ZGM6NTY6Yzc6YjPKDRE0ODo3Mzo5Nzo5NjozODoyMNINETQ4OjczOjk3Ojk2OjM4OjIw2g0RYTI6ZmE6ZGM6NTY6Yzc6YjM=
type: base64
format: protobuf
protobuf.descriptor.file.path: ..\session_record_test.desc
protobuf.message.name: SessionRecord
ipfix_source: # [object] IPFIX source connector name, must be unique. It used to define the source node of the job topology.
type: ipfix
properties:
port.range: 12345-12347
max.packet.size: 65535
max.receive.buffer.size: 104857600
service.discovery.registry.mode: 0 # 0为nacos,1为consul,其他值为不使用服务发现,默认为0
service.discovery.service.name: udp_ipfix
service.discovery.health.check.interval: 5 # The time interval for reporting health status to the service registry center, in seconds.
service.discovery.nacos.server.addr: 127.0.0.1:8848
service.discovery.nacos.username: nacos
service.discovery.nacos.password: nacos
service.discovery.nacos.namespace: test
service.discovery.nacos.group: IPFIX
# service.discovery.consul.server.ip: 192.168.41.30
# service.discovery.consul.server.port: 8500
# service.discovery.consul.token:
#
# This section is used to define [object] Filter List. It used to row level filter.
# Based on the filter expression, the event will be passed to downstream if the expression is true, otherwise it will be dropped.
#
filters: # [object] Define filter operator
filter_operator: # [object] AviatorFilter operator name, must be unique.
type: aviator # [string] Filter Type
properties:
expression: event.server_ip != '12.12.12.12' # [string] Aviator expression, it return true or false.
#
# This section is used to define [object] Preprocessing Pipeline List. It used to preprocess the event before processing pipeline.
# The pipeline includes multiple functions, the function will be executed in order.
#
preprocessing_pipelines: # [object] Define Processors for preprocessing pipelines.
preprocessor: # [object] Define projection processor name, must be unique.
type: projection # [string] Processor Type
functions: # [array of object] Define UDFs
- function: DROP # [string] Define DROP function for filter event
lookup_fields: []
output_fields: []
filter: event.duration_ms == 0
#
# This section is used to define [object] Processing Pipeline List.
# It will be accomplished the common processing for the event by the user-defined functions.
#
processing_pipelines: # [object] Define Processors for processing pipelines.
z: # [object] Define projection processor name, must be unique.
type: projection # [string] Processor Type
remove_fields:
output_fields:
functions: # [array of object] Function List
- function: GEOIP_LOOKUP
lookup_fields: [ server_ip ]
output_fields: [ server_geolocation ]
parameters:
kb_name: tsg_ip_location
option: IP_TO_OBJECT
geolocation_field_mapping:
COUNTRY: client_country_region
PROVINCE: client_super_admin_area
- function: ASN_LOOKUP
lookup_fields: [ server_ip ]
output_fields: [ server_asn ]
parameters:
option: IP_TO_ASN
kb_name: tsg_ip_asn
- function: ASN_LOOKUP
lookup_fields: [ client_ip ]
output_fields: [ client_asn ]
parameters:
option: IP_TO_ASN
kb_name: tsg_ip_asn
- function: SNOWFLAKE_ID
lookup_fields: [ '' ]
output_fields: [ log_id ]
parameters:
data_center_id_num: 1 # [number] Data Center ID, Default is 0, range is 0-31. Multi-data center deployment, each data center has a unique ID.
- function: JSON_EXTRACT
lookup_fields: [ device_tag ]
output_fields: [ data_center ]
filter:
parameters:
value_expression: $.tags[?(@.tag=='data_center')][0].value
- function: JSON_EXTRACT
lookup_fields: [ device_tag ]
output_fields: [ device_group ]
filter:
parameters:
value_expression: $.tags[?(@.tag=='device_group')][0].value
- function: CURRENT_UNIX_TIMESTAMP
output_fields: [ processing_time ]
parameters:
precision: seconds
- function: UNIX_TIMESTAMP_CONVERTER
lookup_fields: [ __timestamp ]
output_fields: [ recv_time ]
parameters:
precision: seconds
- function: EVAL
output_fields: [ ingestion_time ]
parameters:
value_expression: recv_time
- function: DOMAIN
lookup_fields: [ http_host, ssl_sni, dtls_sni, quic_sni ]
output_fields: [ server_domain ]
parameters:
option: FIRST_SIGNIFICANT_SUBDOMAIN
- function: BASE64_DECODE_TO_STRING
output_fields: [ mail_subject ]
parameters:
value_field: mail_subject
charset_field: mail_subject_charset
- function: BASE64_DECODE_TO_STRING
output_fields: [ mail_attachment_name ]
parameters:
value_field: mail_attachment_name
charset_field: mail_attachment_name_charset
- function: PATH_COMBINE
lookup_fields: [ rtp_pcap_path ]
output_fields: [ rtp_pcap_path ]
parameters:
path: [ props.hos.path, props.hos.bucket.name.traffic_file, rtp_pcap_path ]
- function: PATH_COMBINE
lookup_fields: [ http_request_body ]
output_fields: [ http_request_body ]
parameters:
path: [ props.hos.path, props.hos.bucket.name.traffic_file, http_request_body ]
- function: PATH_COMBINE
lookup_fields: [ http_response_body ]
output_fields: [ http_response_body ]
parameters:
path: [ props.hos.path, props.hos.bucket.name.traffic_file, http_response_body ]
- function: PATH_COMBINE
lookup_fields: [ mail_eml_file ]
output_fields: [ mail_eml_file ]
parameters:
path: [ props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file ]
- function: PATH_COMBINE
lookup_fields: [ packet_capture_file ]
output_fields: [ packet_capture_file ]
parameters:
path: [ props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file ]
- function: STRING_JOINER
lookup_fields: [ server_ip,client_ip ]
output_fields: [ ip_string ]
parameters:
separator: ','
prefix: '['
suffix: ']'
- function: GENERATE_STRING_ARRAY
lookup_fields: [ client_asn,server_asn ]
output_fields: [ asn_list ]
projection_metrics_processor: # [object] metrics processing Pipeline
type: projection
output_fields:
properties:
key: value
functions:
- function: FLATTEN
lookup_fields: [ fields,tags ]
output_fields: [ ]
parameters:
#prefix: ""
depth: 3
# delimiter: "."
- function: RENAME
lookup_fields: [ '' ]
output_fields: [ '' ]
filter:
parameters:
# parent_fields: [tags]
#rename_fields:
# tags: tags
rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key;
- function: EVAL
output_fields: [ internal_ip ]
parameters:
value_expression: 'direction=Outbound? client_ip : server_ip'
- function: EVAL
output_fields: [ external_ip ]
parameters:
value_expression: 'direction=Outbound? server_ip : client_ip'
- function: UNIX_TIMESTAMP_CONVERTER
lookup_fields: [ timestamp_ms ]
output_fields: [ recv_time ]
parameters:
precision: seconds
- function: SNOWFLAKE_ID
lookup_fields: [ '' ]
output_fields: [ log_id ]
filter:
parameters:
data_center_id_num: 1
aggregate_processor: # [object] Define aggregate processor name, must be unique.
type: aggregate
group_by_fields: [ recv_time, sled_ip ] # [array of string] Group By Fields
window_type: tumbling_processing_time # [string] Window Type, tumbling_processing_time, tumbling_event_time, sliding_processing_time, sliding_event_time
window_size: 60
functions:
- function: NUMBER_SUM
lookup_fields: [ received_bytes, sent_bytes ]
output_fields: [ received_bytes_sum ]
- function: LONG_COUNT
lookup_fields: [ received_bytes ]
output_fields: [ sessions ]
- function: MEAN
lookup_fields: [ received_bytes ]
output_fields: [ received_bytes_mean ]
parameters:
precision: 2
- function: FIRST_VALUE
lookup_fields: [ received_bytes ]
output_fields: [ received_bytes_first ]
- function: LAST_VALUE
lookup_fields: [ received_bytes ]
output_fields: [ received_bytes_last ]
- function: COLLECT_LIST
lookup_fields: [ received_bytes ]
output_fields: [ received_bytes_set ]
postprocessing_pipelines: # [object] Define Processors for postprocessing pipelines.
postprocessor: # [object] Define projection processor name, must be unique.
type: projection
remove_fields: [log_id, device_tag, dup_traffic_flag]
#
# This section is used to define [object] Sink List. eg. print sink, kafka sink, clickhouse sink, etc.
#
sinks: # [object] Define connector sink
kafka_sink_a: # [object] Kafka sink connector name, must be unique. It used to define the sink node of the job topology.
type: kafka
# sink table schema, config through fields or local_file or url. if not set schema, all fields(Map<String, Object>) will be output.
# schema:
# fields: "struct<log_id:bigint, recv_time:bigint,client_ip: string>"
# local_file: "schema/test_schema.json"
# url: "http://127.0.0.1/schema.json"
properties:
topic: SESSION-RECORD-A
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.retries: 0
# This setting defaults to 0 (i.e. no delay).
# Setting linger.ms=10, for example, would have the effect of reducing the number of requests sent but would add up to 10ms of latency to records sent in the absence of load.
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
# The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition.
# This helps performance on both the client and the server. This configuration controls the default batch size in bytes.
kafka.batch.size: 262144
# The total bytes of memory the producer can use to buffer records waiting to be sent to the server.
# If records are sent faster than they can be delivered to the server the producer will block for max.block.ms after which it will throw an exception.
# This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering.
# Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests.
kafka.buffer.memory: 134217728
kafka.max.request.size: 10485760
# The compression type for all data generated by the producer.
# The default is none (i.e. no compression). Valid values are none, gzip, snappy, lz4, or zstd.
# Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).
kafka.compression.type: snappy
format: json
json.ignore.parse.errors: false
log.failures.only: true
kafka_sink_b:
type: kafka
properties:
topic: SESSION-RECORD-B
kafka.bootstrap.servers: 127.0.0.1:9094
kafka.retries: 0
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
kafka.batch.size: 262144
kafka.buffer.memory: 134217728
kafka.max.request.size: 10485760
kafka.compression.type: snappy
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456";
format: json
json.ignore.parse.errors: false
log.failures.only: true
clickhouse_sink: # [object] ClickHouse sink connector name, must be unique. It used to define the sink node of the job topology.
type: clickhouse
schema:
local_file: /../schema/clickhouse_sink_schema.json
properties:
host: 127.0.0.1:9001
table: inline_source_test_local
batch.size: 10
batch.interval: 1s
connection.database: tsg_galaxy_v3
connection.user: default
connection.password: 123456
print_sink: # [object] Print sink connector name, must be unique. It used to define the sink node of the job topology.
type: print
properties:
mode: log_info
format: json
#
# This section is used to define [object] Job Configuration. Includes environment variables, topology, etc.
# The [object] environment variables will be used to build the job environment.
# The [array of object] topology will be used to build data stream for job dag graph.
#
application: # [object] Application Configuration
env: # [object] Define job runtime environment variables
name: inline-to-print-job # [string] Job Name
parallelism: 3 # [number] Job-Level Parallelism
shade.identifier: default # [string] Shade Identifier, Using to encrypt and decrypt sensitive configuration. Support enum: default, aes, base64. if set default, it will not encrypt and decrypt sensitive configuration.
pipeline:
object-reuse: true # [boolean] Object Reuse, default is false
topology: # [array of object] Node List. It will be used build data flow for job dag graph.
- name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE.
#parallelism: 1 # [number] Operator-Level Parallelism.
downstream: [filter_operator] # [array of string] Downstream Node Name List.
- name: filter_operator
parallelism: 1
downstream: [preprocessor]
- name: preprocessor
downstream: [processor]
- name: processor
downstream: [ postprocessor ]
- name: postprocessor
downstream: [ print_sink ]
- name: print_sink
downstream: []
|