summaryrefslogtreecommitdiff
path: root/config/template/grootstream_job_template.yaml
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-01-27 16:35:35 +0800
committerdoufenghu <[email protected]>2024-01-27 16:35:35 +0800
commit185dfe0d12ecf2e7dab3e401a0c79ada58887df6 (patch)
tree10c299aa80615e1d0658db746ab04109a500057d /config/template/grootstream_job_template.yaml
parentbc07038100b0ce0c36ed439c535a079085a7077e (diff)
Fix some details
Diffstat (limited to 'config/template/grootstream_job_template.yaml')
-rw-r--r--config/template/grootstream_job_template.yaml323
1 files changed, 323 insertions, 0 deletions
diff --git a/config/template/grootstream_job_template.yaml b/config/template/grootstream_job_template.yaml
new file mode 100644
index 0000000..063861e
--- /dev/null
+++ b/config/template/grootstream_job_template.yaml
@@ -0,0 +1,323 @@
+#############################################
+# Groot Stream Job Template #
+#############################################
+
+#
+# This section is used to define [object] Source List. eg. kafka source, inline source, etc.
+#
+sources: # [object] Define connector source
+ kafka_source: # [object] Kafka source connector name, must be unique. It used to define the source node of the job topology.
+ type: kafka # [string] Source Type
+ properties: # [object] Kafka source properties
+ topic: SESSION-RECORD # [string] Topic Name, consumer will subscribe this topic.
+ kafka.bootstrap.servers: 127.0.0.1:9092 # [string] Kafka Bootstrap Servers, if you have multiple servers, use comma to separate them.
+ kafka.session.timeout.ms: 60000 # [number] Kafka Session Timeout, default is 60000
+ kafka.max.poll.records: 3000
+ kafka.max.partition.fetch.bytes: 31457280
+ kafka.group.id: SESSION-RECORD-GROUP-GROOT-STREAM-001 # [string] Kafka Group ID for Consumer
+ kafka.auto.offset.reset: latest # [string] Kafka Auto Offset Reset, default is latest
+ format: json # [string] Data Format for Source. eg. json, protobuf, etc.
+
+ inline_source: # [object] Inline source connector name, must be unique. It used to define the source node of the job topology.
+ type: inline
+ fields: # [array of object] Schema field projection, support read data only from specified fields.
+ - name: log_id
+ type: bigint
+ - name: recv_time
+ type: bigint
+ - name: server_fqdn
+ type: string
+ - name: server_domain
+ type: string
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: string
+ - name: server_asn
+ type: string
+ - name: decoded_as
+ type: string
+ - name: device_group
+ type: string
+ - name: device_tag
+ type: string
+ properties:
+ #
+ # [string] Event Data, it will be parsed to Map<String, Object> by the specified format.
+ #
+ data: '[{"tcp_rtt_ms":128,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]'
+ format: json
+ json.ignore.parse.errors: false
+ interval.per.row: 5s
+ repeat.count: 10
+
+ inline_source_protobuf:
+ type : inline
+ properties:
+ data: CIin2awGEICAoLC/hYzKAhoEQkFTRSCch8z3wtqEhAQo6o/Xmc0xMMCy15nNMTjWIkDRCEiIp9msBlCIp9msBloIMjE0MjYwMDNg//8DaP//A3JqeyJ0YWdzIjpbeyJ0YWciOiJkYXRhX2NlbnRlciIsInZhbHVlIjoiY2VudGVyLXh4Zy05MTQwIn0seyJ0YWciOiJkZXZpY2VfZ3JvdXAiLCJ2YWx1ZSI6Imdyb3VwLXh4Zy05MTQwIn1dfXoPY2VudGVyLXh4Zy05MTQwggEOZ3JvdXAteHhnLTkxNDCKAQ0xOTIuMTY4LjQwLjgxkAEEmAEBoAEBqAGQwAGyAQdbMSwxLDJd4gEDt+gY4gINMTkyLjU2LjE1MS44MOgCoeYD8gIHV2luZG93c/oCGOe+juWbvS5Vbmtub3duLlVua25vd24uLrIDDTE5Mi41Ni4yMjIuOTO4A/ZwwgMFTGludXjKAxjnvo7lm70uVW5rbm93bi5Vbmtub3duLi6SBAN0Y3CaBBFFVEhFUk5FVC5JUHY0LlRDULAMBLgMBcAM9gHIDJEOoA2AAagN8cr+jgKwDezksIAPwg0RYTI6ZmE6ZGM6NTY6Yzc6YjPKDRE0ODo3Mzo5Nzo5NjozODoyMNINETQ4OjczOjk3Ojk2OjM4OjIw2g0RYTI6ZmE6ZGM6NTY6Yzc6YjM=
+ type: base64
+ format: protobuf
+ protobuf.descriptor.file.path: ..\session_record_test.desc
+ protobuf.message.name: SessionRecord
+
+ ipfix_source: # [object] IPFIX source connector name, must be unique. It used to define the source node of the job topology.
+ type: ipfix
+# fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
+# - name: log_id
+# type: bigint
+ properties:
+ port.range: 12345-12347
+ max.packet.size: 65535
+ max.receive.buffer.size: 104857600
+ service.discovery.registry.mode: 0 # 0为nacos,1为consul,其他值为不使用服务发现,默认为0
+ service.discovery.service.name: udp_ipfix
+ service.discovery.health.check.interval: 5 # The time interval for reporting health status to the service registry center, in seconds.
+ service.discovery.nacos.server.addr: 127.0.0.1:8848
+ service.discovery.nacos.username: nacos
+ service.discovery.nacos.password: nacos
+ service.discovery.nacos.namespace: test
+ service.discovery.nacos.group: IPFIX
+# service.discovery.consul.server.ip: 192.168.41.30
+# service.discovery.consul.server.port: 8500
+# service.discovery.consul.token:
+
+
+#
+# This section is used to define [object] Filter List. It used to row level filter.
+# Based on the filter expression, the event will be passed to downstream if the expression is true, otherwise it will be dropped.
+#
+filters: # [object] Define filter operator
+ filter_operator: # [object] AviatorFilter operator name, must be unique.
+ type: com.geedgenetworks.core.filter.AviatorFilter
+ properties:
+ expression: event.server_ip != '12.12.12.12' # [string] Aviator expression, it return true or false.
+
+
+#
+# This section is used to define [object] Preprocessing Pipeline List. It used to preprocess the event before processing pipeline.
+# The pipeline includes multiple functions, the function will be executed in order.
+#
+preprocessing_pipelines: # [object] Define Processors for preprocessing pipelines.
+ preprocessor: # [object] Define projection processor name, must be unique.
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ functions: # [array of object] Define UDFs
+ - function: DROP # [string] Define DROP function for filter event
+ lookup_fields: []
+ output_fields: []
+ filter: event.duration_ms == 0
+
+#
+# This section is used to define [object] Processing Pipeline List.
+# It will be accomplished the common processing for the event by the user-defined functions.
+#
+processing_pipelines: # [object] Define Processors for processing pipelines.
+ processor: # [object] Define projection processor name, must be unique.
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ remove_fields:
+ output_fields:
+ functions: # [array of object] Function List
+ - function: GEOIP_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_asn ]
+ parameters:
+ kb_name: tsg_ip_location
+ option: IP_TO_OBJECT
+ geolocation_field_mapping:
+ COUNTRY: client_country_region
+ PROVINCE: client_super_admin_area
+
+ - function: ASN_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_asn ]
+ parameters:
+ option: IP_TO_ASN
+ kb_name: tsg_ip_asn
+
+ - function: ASN_LOOKUP
+ lookup_fields: [ client_ip ]
+ output_fields: [ client_asn ]
+ parameters:
+ option: IP_TO_ASN
+ kb_name: tsg_ip_asn
+
+ - function: SNOWFLAKE_ID
+ lookup_fields: [ '' ]
+ output_fields: [ log_id ]
+ filter:
+ parameters:
+ data_center_id_num: 1
+
+ - function: JSON_EXTRACT
+ lookup_fields: [ device_tag ]
+ output_fields: [ data_center ]
+ filter:
+ parameters:
+ value_expression: $.tags[?(@.tag=='data_center')][0].value
+
+ - function: JSON_EXTRACT
+ lookup_fields: [ device_tag ]
+ output_fields: [ device_group ]
+ filter:
+ parameters:
+ value_expression: $.tags[?(@.tag=='device_group')][0].value
+
+ - function: CURRENT_UNIX_TIMESTAMP
+ output_fields: [ processing_time ]
+ parameters:
+ precision: seconds
+
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [ __timestamp ]
+ output_fields: [ recv_time ]
+ parameters:
+ precision: seconds
+
+ - function: EVAL
+ output_fields: [ ingestion_time ]
+ parameters:
+ value_expression: recv_time
+
+ - function: DOMAIN
+ lookup_fields: [ http_host,ssl_sni,dtls_sni,quic_sni ]
+ output_fields: [ server_domain ]
+ parameters:
+ option: FIRST_SIGNIFICANT_SUBDOMAIN
+
+ - function: BASE64_DECODE_TO_STRING
+ lookup_fields: [ mail_subject,mail_subject_charset ]
+ output_fields: [ mail_subject ]
+
+ - function: BASE64_DECODE_TO_STRING
+ output_fields: [ mail_subject ]
+ parameters:
+ value_field: mail_subject
+ charset_field: mail_subject_charset
+
+ - function: BASE64_DECODE_TO_STRING
+ output_fields: [ mail_attachment_name ]
+ parameters:
+ value_field: mail_attachment_name
+ charset_field: mail_attachment_name_charset
+
+ - function: PATH_COMBINE
+ lookup_fields: [ packet_capture_file ]
+ output_fields: [ packet_capture_file ]
+ parameters:
+ path: [ props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file]
+
+ - function: PATH_COMBINE
+ lookup_fields: [ rtp_pcap_path ]
+ output_fields: [ rtp_pcap_path ]
+ parameters:
+ path: [ props.hos.path, props.hos.bucket.name.troubleshooting_file, rtp_pcap_path ]
+
+ - function: PATH_COMBINE
+ lookup_fields: [ http_request_body ]
+ output_fields: [ http_request_body ]
+ parameters:
+ path: [ props.hos.path, props.hos.bucket.name.traffic_file, http_request_body ]
+
+ - function: PATH_COMBINE
+ lookup_fields: [ http_response_body ]
+ output_fields: [ http_response_body ]
+ parameters:
+ path: [ props.hos.path, props.hos.bucket.name.traffic_file, http_response_body ]
+
+ - function: PATH_COMBINE
+ lookup_fields: [ mail_eml_file ]
+ output_fields: [ mail_eml_file ]
+ parameters:
+ path: [ props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file ]
+
+ - function: STRING_JOINER
+ lookup_fields: [ server_ip,client_ip ]
+ output_fields: [ ip_string ]
+ parameters:
+ separator: ','
+ prefix: '['
+ suffix: ']'
+
+ - function: GENERATE_STRING_ARRAY
+ lookup_fields: [ client_asn,server_asn ]
+ output_fields: [ asn_list ]
+
+postprocessing_pipelines: # [object] Define Processors for postprocessing pipelines.
+ postprocessor: # [object] Define projection processor name, must be unique.
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ remove_fields: [log_id, device_tag, dup_traffic_flag]
+
+#
+# This section is used to define [object] Sink List. eg. print sink, kafka sink, clickhouse sink, etc.
+#
+sinks: # [object] Define connector sink
+ kafka_sink_a: # [object] Kafka sink connector name, must be unique. It used to define the sink node of the job topology.
+ type: kafka
+ properties:
+ topic: SESSION-RECORD-A
+ kafka.bootstrap.servers: 127.0.0.1:9092
+ kafka.retries: 0
+ kafka.linger.ms: 10
+ kafka.request.timeout.ms: 30000
+ kafka.batch.size: 262144
+ kafka.buffer.memory: 134217728
+ kafka.max.request.size: 10485760
+ kafka.compression.type: snappy
+ format: json
+
+ kafka_sink_b:
+ type: kafka
+ properties:
+ topic: SESSION-RECORD-B
+ kafka.bootstrap.servers: 127.0.0.1:9092
+ kafka.retries: 0
+ kafka.linger.ms: 10
+ kafka.request.timeout.ms: 30000
+ kafka.batch.size: 262144
+ kafka.buffer.memory: 134217728
+ kafka.max.request.size: 10485760
+ kafka.compression.type: snappy
+ format: json
+
+ clickhouse_sink: # [object] ClickHouse sink connector name, must be unique. It used to define the sink node of the job topology.
+ type: clickhouse
+ properties:
+ host: 127.0.0.1:9001
+ table: inline_source_test_local
+ batch.size: 10
+ batch.interval: 1s
+ connection.database: tsg_galaxy_v3
+ connection.user: default
+ connection.password: ''
+
+ print_sink: # [object] Print sink connector name, must be unique. It used to define the sink node of the job topology.
+ type: print
+ properties:
+ mode: log_info
+ format: json
+
+#
+# This section is used to define [object] Job Configuration. Includes environment variables, topology, etc.
+# The [object] environment variables will be used to build the job environment.
+# The [array of object] topology will be used to build data stream for job dag graph.
+#
+application: # [object] Application Configuration
+ env: # [object] Define job runtime environment variables
+ name: inline-to-print-job # [string] Job Name
+ parallelism: 3 # [number] Job-Level Parallelism
+ pipeline:
+ object-reuse: true # [boolean] Object Reuse, default is false
+ topology: # [array of object] Node List. It will be used build data flow for job dag graph.
+ - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE.
+ #parallelism: 1 # [number] Operator-Level Parallelism.
+ downstream: [filter_operator] # [array of string] Downstream Node Name List.
+ - name: filter_operator
+ parallelism: 1
+ downstream: [preprocessor]
+ - name: preprocessor
+ downstream: [processor]
+ - name: processor
+ downstream: [ postprocessor ]
+ - name: postprocessor
+ downstream: [ print_sink ]
+ - name: print_sink
+ downstream: []