diff options
| author | doufenghu <[email protected]> | 2024-01-27 16:35:35 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-01-27 16:35:35 +0800 |
| commit | 185dfe0d12ecf2e7dab3e401a0c79ada58887df6 (patch) | |
| tree | 10c299aa80615e1d0658db746ab04109a500057d /config/template/grootstream_job_template.yaml | |
| parent | bc07038100b0ce0c36ed439c535a079085a7077e (diff) | |
Fix some details
Diffstat (limited to 'config/template/grootstream_job_template.yaml')
| -rw-r--r-- | config/template/grootstream_job_template.yaml | 323 |
1 files changed, 323 insertions, 0 deletions
diff --git a/config/template/grootstream_job_template.yaml b/config/template/grootstream_job_template.yaml new file mode 100644 index 0000000..063861e --- /dev/null +++ b/config/template/grootstream_job_template.yaml @@ -0,0 +1,323 @@ +############################################# +# Groot Stream Job Template # +############################################# + +# +# This section is used to define [object] Source List. eg. kafka source, inline source, etc. +# +sources: # [object] Define connector source + kafka_source: # [object] Kafka source connector name, must be unique. It used to define the source node of the job topology. + type: kafka # [string] Source Type + properties: # [object] Kafka source properties + topic: SESSION-RECORD # [string] Topic Name, consumer will subscribe this topic. + kafka.bootstrap.servers: 127.0.0.1:9092 # [string] Kafka Bootstrap Servers, if you have multiple servers, use comma to separate them. + kafka.session.timeout.ms: 60000 # [number] Kafka Session Timeout, default is 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.group.id: SESSION-RECORD-GROUP-GROOT-STREAM-001 # [string] Kafka Group ID for Consumer + kafka.auto.offset.reset: latest # [string] Kafka Auto Offset Reset, default is latest + format: json # [string] Data Format for Source. eg. json, protobuf, etc. + + inline_source: # [object] Inline source connector name, must be unique. It used to define the source node of the job topology. + type: inline + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: log_id + type: bigint + - name: recv_time + type: bigint + - name: server_fqdn + type: string + - name: server_domain + type: string + - name: client_ip + type: string + - name: server_ip + type: string + - name: server_asn + type: string + - name: decoded_as + type: string + - name: device_group + type: string + - name: device_tag + type: string + properties: + # + # [string] Event Data, it will be parsed to Map<String, Object> by the specified format. + # + data: '[{"tcp_rtt_ms":128,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]' + format: json + json.ignore.parse.errors: false + interval.per.row: 5s + repeat.count: 10 + + inline_source_protobuf: + type : inline + properties: + data: CIin2awGEICAoLC/hYzKAhoEQkFTRSCch8z3wtqEhAQo6o/Xmc0xMMCy15nNMTjWIkDRCEiIp9msBlCIp9msBloIMjE0MjYwMDNg//8DaP//A3JqeyJ0YWdzIjpbeyJ0YWciOiJkYXRhX2NlbnRlciIsInZhbHVlIjoiY2VudGVyLXh4Zy05MTQwIn0seyJ0YWciOiJkZXZpY2VfZ3JvdXAiLCJ2YWx1ZSI6Imdyb3VwLXh4Zy05MTQwIn1dfXoPY2VudGVyLXh4Zy05MTQwggEOZ3JvdXAteHhnLTkxNDCKAQ0xOTIuMTY4LjQwLjgxkAEEmAEBoAEBqAGQwAGyAQdbMSwxLDJd4gEDt+gY4gINMTkyLjU2LjE1MS44MOgCoeYD8gIHV2luZG93c/oCGOe+juWbvS5Vbmtub3duLlVua25vd24uLrIDDTE5Mi41Ni4yMjIuOTO4A/ZwwgMFTGludXjKAxjnvo7lm70uVW5rbm93bi5Vbmtub3duLi6SBAN0Y3CaBBFFVEhFUk5FVC5JUHY0LlRDULAMBLgMBcAM9gHIDJEOoA2AAagN8cr+jgKwDezksIAPwg0RYTI6ZmE6ZGM6NTY6Yzc6YjPKDRE0ODo3Mzo5Nzo5NjozODoyMNINETQ4OjczOjk3Ojk2OjM4OjIw2g0RYTI6ZmE6ZGM6NTY6Yzc6YjM= + type: base64 + format: protobuf + protobuf.descriptor.file.path: ..\session_record_test.desc + protobuf.message.name: SessionRecord + + ipfix_source: # [object] IPFIX source connector name, must be unique. It used to define the source node of the job topology. + type: ipfix +# fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. +# - name: log_id +# type: bigint + properties: + port.range: 12345-12347 + max.packet.size: 65535 + max.receive.buffer.size: 104857600 + service.discovery.registry.mode: 0 # 0为nacos,1为consul,其他值为不使用服务发现,默认为0 + service.discovery.service.name: udp_ipfix + service.discovery.health.check.interval: 5 # The time interval for reporting health status to the service registry center, in seconds. + service.discovery.nacos.server.addr: 127.0.0.1:8848 + service.discovery.nacos.username: nacos + service.discovery.nacos.password: nacos + service.discovery.nacos.namespace: test + service.discovery.nacos.group: IPFIX +# service.discovery.consul.server.ip: 192.168.41.30 +# service.discovery.consul.server.port: 8500 +# service.discovery.consul.token: + + +# +# This section is used to define [object] Filter List. It used to row level filter. +# Based on the filter expression, the event will be passed to downstream if the expression is true, otherwise it will be dropped. +# +filters: # [object] Define filter operator + filter_operator: # [object] AviatorFilter operator name, must be unique. + type: com.geedgenetworks.core.filter.AviatorFilter + properties: + expression: event.server_ip != '12.12.12.12' # [string] Aviator expression, it return true or false. + + +# +# This section is used to define [object] Preprocessing Pipeline List. It used to preprocess the event before processing pipeline. +# The pipeline includes multiple functions, the function will be executed in order. +# +preprocessing_pipelines: # [object] Define Processors for preprocessing pipelines. + preprocessor: # [object] Define projection processor name, must be unique. + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + functions: # [array of object] Define UDFs + - function: DROP # [string] Define DROP function for filter event + lookup_fields: [] + output_fields: [] + filter: event.duration_ms == 0 + +# +# This section is used to define [object] Processing Pipeline List. +# It will be accomplished the common processing for the event by the user-defined functions. +# +processing_pipelines: # [object] Define Processors for processing pipelines. + processor: # [object] Define projection processor name, must be unique. + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: + output_fields: + functions: # [array of object] Function List + - function: GEOIP_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_asn ] + parameters: + kb_name: tsg_ip_location + option: IP_TO_OBJECT + geolocation_field_mapping: + COUNTRY: client_country_region + PROVINCE: client_super_admin_area + + - function: ASN_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_asn ] + parameters: + option: IP_TO_ASN + kb_name: tsg_ip_asn + + - function: ASN_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ client_asn ] + parameters: + option: IP_TO_ASN + kb_name: tsg_ip_asn + + - function: SNOWFLAKE_ID + lookup_fields: [ '' ] + output_fields: [ log_id ] + filter: + parameters: + data_center_id_num: 1 + + - function: JSON_EXTRACT + lookup_fields: [ device_tag ] + output_fields: [ data_center ] + filter: + parameters: + value_expression: $.tags[?(@.tag=='data_center')][0].value + + - function: JSON_EXTRACT + lookup_fields: [ device_tag ] + output_fields: [ device_group ] + filter: + parameters: + value_expression: $.tags[?(@.tag=='device_group')][0].value + + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [ processing_time ] + parameters: + precision: seconds + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ __timestamp ] + output_fields: [ recv_time ] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ ingestion_time ] + parameters: + value_expression: recv_time + + - function: DOMAIN + lookup_fields: [ http_host,ssl_sni,dtls_sni,quic_sni ] + output_fields: [ server_domain ] + parameters: + option: FIRST_SIGNIFICANT_SUBDOMAIN + + - function: BASE64_DECODE_TO_STRING + lookup_fields: [ mail_subject,mail_subject_charset ] + output_fields: [ mail_subject ] + + - function: BASE64_DECODE_TO_STRING + output_fields: [ mail_subject ] + parameters: + value_field: mail_subject + charset_field: mail_subject_charset + + - function: BASE64_DECODE_TO_STRING + output_fields: [ mail_attachment_name ] + parameters: + value_field: mail_attachment_name + charset_field: mail_attachment_name_charset + + - function: PATH_COMBINE + lookup_fields: [ packet_capture_file ] + output_fields: [ packet_capture_file ] + parameters: + path: [ props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file] + + - function: PATH_COMBINE + lookup_fields: [ rtp_pcap_path ] + output_fields: [ rtp_pcap_path ] + parameters: + path: [ props.hos.path, props.hos.bucket.name.troubleshooting_file, rtp_pcap_path ] + + - function: PATH_COMBINE + lookup_fields: [ http_request_body ] + output_fields: [ http_request_body ] + parameters: + path: [ props.hos.path, props.hos.bucket.name.traffic_file, http_request_body ] + + - function: PATH_COMBINE + lookup_fields: [ http_response_body ] + output_fields: [ http_response_body ] + parameters: + path: [ props.hos.path, props.hos.bucket.name.traffic_file, http_response_body ] + + - function: PATH_COMBINE + lookup_fields: [ mail_eml_file ] + output_fields: [ mail_eml_file ] + parameters: + path: [ props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file ] + + - function: STRING_JOINER + lookup_fields: [ server_ip,client_ip ] + output_fields: [ ip_string ] + parameters: + separator: ',' + prefix: '[' + suffix: ']' + + - function: GENERATE_STRING_ARRAY + lookup_fields: [ client_asn,server_asn ] + output_fields: [ asn_list ] + +postprocessing_pipelines: # [object] Define Processors for postprocessing pipelines. + postprocessor: # [object] Define projection processor name, must be unique. + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: [log_id, device_tag, dup_traffic_flag] + +# +# This section is used to define [object] Sink List. eg. print sink, kafka sink, clickhouse sink, etc. +# +sinks: # [object] Define connector sink + kafka_sink_a: # [object] Kafka sink connector name, must be unique. It used to define the sink node of the job topology. + type: kafka + properties: + topic: SESSION-RECORD-A + kafka.bootstrap.servers: 127.0.0.1:9092 + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + format: json + + kafka_sink_b: + type: kafka + properties: + topic: SESSION-RECORD-B + kafka.bootstrap.servers: 127.0.0.1:9092 + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + format: json + + clickhouse_sink: # [object] ClickHouse sink connector name, must be unique. It used to define the sink node of the job topology. + type: clickhouse + properties: + host: 127.0.0.1:9001 + table: inline_source_test_local + batch.size: 10 + batch.interval: 1s + connection.database: tsg_galaxy_v3 + connection.user: default + connection.password: '' + + print_sink: # [object] Print sink connector name, must be unique. It used to define the sink node of the job topology. + type: print + properties: + mode: log_info + format: json + +# +# This section is used to define [object] Job Configuration. Includes environment variables, topology, etc. +# The [object] environment variables will be used to build the job environment. +# The [array of object] topology will be used to build data stream for job dag graph. +# +application: # [object] Application Configuration + env: # [object] Define job runtime environment variables + name: inline-to-print-job # [string] Job Name + parallelism: 3 # [number] Job-Level Parallelism + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: # [array of object] Node List. It will be used build data flow for job dag graph. + - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. + #parallelism: 1 # [number] Operator-Level Parallelism. + downstream: [filter_operator] # [array of string] Downstream Node Name List. + - name: filter_operator + parallelism: 1 + downstream: [preprocessor] + - name: preprocessor + downstream: [processor] + - name: processor + downstream: [ postprocessor ] + - name: postprocessor + downstream: [ print_sink ] + - name: print_sink + downstream: [] |
