diff options
| author | wangkuan <[email protected]> | 2024-02-22 15:30:15 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-02-22 15:30:15 +0800 |
| commit | 9000b73e460721569d32f207eafaa6104b7f6829 (patch) | |
| tree | 6b6b7d4b3e5073e69c76d4646ca1bd07362cd418 | |
| parent | e526a0ff54682a1063c40270cf8fa73ee793318b (diff) | |
[feature][core]解决冲突临时添加
| -rw-r--r-- | groot-bootstrap/src/main/resources/grootstream_job_test.yaml | 190 |
1 files changed, 190 insertions, 0 deletions
diff --git a/groot-bootstrap/src/main/resources/grootstream_job_test.yaml b/groot-bootstrap/src/main/resources/grootstream_job_test.yaml new file mode 100644 index 0000000..45c8f56 --- /dev/null +++ b/groot-bootstrap/src/main/resources/grootstream_job_test.yaml @@ -0,0 +1,190 @@ +sources: + + + inline_source: + type : inline + fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. + properties: + data: '[{"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"SSL","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' + interval.per.row: 1s # 可选 + repeat.count: 1 # 可选 + format: json + json.ignore.parse.errors: false +filters: + schema_type_filter: + type: com.geedgenetworks.core.filter.AviatorFilter + output_fields: + properties: + expression: event.decoded_as == 'SSL' || event.decoded_as == 'BASE' + + +preprocessing_pipelines: + preprocessing_processor: # [object] Preprocessing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + output_fields: + properties: + key: value + functions: + - function: SNOWFLAKE_ID + lookup_fields: [''] + output_fields: ['common_log_id'] + filter: + - function: DROP + lookup_fields: [ '' ] + output_fields: [ '' ] + filter: event.common_schema_type == 'BASE' + +processing_pipelines: + session_record_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: + output_fields: + properties: + key: value + functions: # [array of object] Function List + - function: DROP + lookup_fields: [ '' ] + output_fields: [ '' ] + filter: event.decoded_as == 'SSL' + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_attachment_name] + parameters: + value_field: mail_attachment_name + charset_field: mail_attachment_name_charset + + - function: GEOIP_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ ] + parameters: + kb_name: tsg_ip_location + option: IP_TO_OBJECT + geolocation_field_mapping: + COUNTRY: client_country_region + PROVINCE: client_super_admin_area + - function: GEOIP_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [] + parameters: + kb_name: tsg_ip_location + option: IP_TO_OBJECT + geolocation_field_mapping: + COUNTRY: server_country_region + PROVINCE: server_super_admin_area + - function: ASN_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_asn ] + parameters: + option: IP_TO_ASN + kb_name: tsg_ip_asn + - function: SNOWFLAKE_ID + lookup_fields: [ '' ] + output_fields: [ log_id ] + filter: + parameters: + data_center_id_num: 1 + - function: JSON_EXTRACT + lookup_fields: [ device_tag ] + output_fields: [ data_center ] + filter: + parameters: + value_expression: $.tags[?(@.tag=='data_center')][0].value + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [ processing_time ] + parameters: + precision: seconds + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ __timestamp ] + output_fields: [ recv_time ] + parameters: + precision: seconds + - function: EVAL + output_fields: [ ingestion_time ] + parameters: + value_expression: recv_time + - function: DOMAIN + lookup_fields: [ http_host,ssl_sni,dtls_sni,quic_sni ] + output_fields: [ server_domain ] + parameters: + option: FIRST_SIGNIFICANT_SUBDOMAIN + + - function: PATH_COMBINE + lookup_fields: [ packet_capture_file ] + output_fields: [ packet_capture_file ] + parameters: + path: [ props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file] + - function: STRING_JOINER + lookup_fields: [ server_ip,client_ip ] + output_fields: [ ip_string ] + parameters: + separator: ',' + prefix: '[' + suffix: ']' + - function: GENERATE_STRING_ARRAY + lookup_fields: [ client_asn,server_asn ] + output_fields: [ asn_list ] + + +sinks: + kafka_sink_a: + type: kafka + properties: + topic: SESSION-RECORD-JSON + kafka.bootstrap.servers: 192.168.44.12:9092 + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: + kafka.ssl.keystore.location: + kafka.ssl.keystore.password: + kafka.ssl.truststore.location: + kafka.ssl.truststore.password: + kafka.ssl.key.password: + kafka.sasl.mechanism: + kafka.sasl.jaas.config: + format: json + + kafka_sink_b: + type: kafka + properties: + topic: SESSION-RECORD-COMPLETED-TEST + kafka.bootstrap.servers: 192.168.44.12:9092 + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + format: json + + print_sink: + type: print + properties: + format: json + collect_sink: + type: collect + properties: + format: json + +application: # [object] Application Configuration + env: # [object] Environment Variables + name: groot-stream-job # [string] Job Name + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: # [array of object] Node List. It will be used build data flow for job dag graph. + - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. + parallelism: 1 # [number] Operator-Level Parallelism. + downstream: [schema_type_filter] # [array of string] Downstream Node Name List. + - name: schema_type_filter + parallelism: 1 + downstream: [session_record_processor] + - name: session_record_processor + parallelism: 1 + downstream: [collect_sink] + - name: collect_sink + parallelism: 1 + |
