summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-02-22 15:30:15 +0800
committerwangkuan <[email protected]>2024-02-22 15:30:15 +0800
commit9000b73e460721569d32f207eafaa6104b7f6829 (patch)
tree6b6b7d4b3e5073e69c76d4646ca1bd07362cd418
parente526a0ff54682a1063c40270cf8fa73ee793318b (diff)
[feature][core]解决冲突临时添加
-rw-r--r--groot-bootstrap/src/main/resources/grootstream_job_test.yaml190
1 files changed, 190 insertions, 0 deletions
diff --git a/groot-bootstrap/src/main/resources/grootstream_job_test.yaml b/groot-bootstrap/src/main/resources/grootstream_job_test.yaml
new file mode 100644
index 0000000..45c8f56
--- /dev/null
+++ b/groot-bootstrap/src/main/resources/grootstream_job_test.yaml
@@ -0,0 +1,190 @@
+sources:
+
+
+ inline_source:
+ type : inline
+ fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
+ properties:
+ data: '[{"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"SSL","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]'
+ interval.per.row: 1s # 可选
+ repeat.count: 1 # 可选
+ format: json
+ json.ignore.parse.errors: false
+filters:
+ schema_type_filter:
+ type: com.geedgenetworks.core.filter.AviatorFilter
+ output_fields:
+ properties:
+ expression: event.decoded_as == 'SSL' || event.decoded_as == 'BASE'
+
+
+preprocessing_pipelines:
+ preprocessing_processor: # [object] Preprocessing Pipeline
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ output_fields:
+ properties:
+ key: value
+ functions:
+ - function: SNOWFLAKE_ID
+ lookup_fields: ['']
+ output_fields: ['common_log_id']
+ filter:
+ - function: DROP
+ lookup_fields: [ '' ]
+ output_fields: [ '' ]
+ filter: event.common_schema_type == 'BASE'
+
+processing_pipelines:
+ session_record_processor: # [object] Processing Pipeline
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ remove_fields:
+ output_fields:
+ properties:
+ key: value
+ functions: # [array of object] Function List
+ - function: DROP
+ lookup_fields: [ '' ]
+ output_fields: [ '' ]
+ filter: event.decoded_as == 'SSL'
+ - function: BASE64_DECODE_TO_STRING
+ output_fields: [mail_attachment_name]
+ parameters:
+ value_field: mail_attachment_name
+ charset_field: mail_attachment_name_charset
+
+ - function: GEOIP_LOOKUP
+ lookup_fields: [ client_ip ]
+ output_fields: [ ]
+ parameters:
+ kb_name: tsg_ip_location
+ option: IP_TO_OBJECT
+ geolocation_field_mapping:
+ COUNTRY: client_country_region
+ PROVINCE: client_super_admin_area
+ - function: GEOIP_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: []
+ parameters:
+ kb_name: tsg_ip_location
+ option: IP_TO_OBJECT
+ geolocation_field_mapping:
+ COUNTRY: server_country_region
+ PROVINCE: server_super_admin_area
+ - function: ASN_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_asn ]
+ parameters:
+ option: IP_TO_ASN
+ kb_name: tsg_ip_asn
+ - function: SNOWFLAKE_ID
+ lookup_fields: [ '' ]
+ output_fields: [ log_id ]
+ filter:
+ parameters:
+ data_center_id_num: 1
+ - function: JSON_EXTRACT
+ lookup_fields: [ device_tag ]
+ output_fields: [ data_center ]
+ filter:
+ parameters:
+ value_expression: $.tags[?(@.tag=='data_center')][0].value
+ - function: CURRENT_UNIX_TIMESTAMP
+ output_fields: [ processing_time ]
+ parameters:
+ precision: seconds
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [ __timestamp ]
+ output_fields: [ recv_time ]
+ parameters:
+ precision: seconds
+ - function: EVAL
+ output_fields: [ ingestion_time ]
+ parameters:
+ value_expression: recv_time
+ - function: DOMAIN
+ lookup_fields: [ http_host,ssl_sni,dtls_sni,quic_sni ]
+ output_fields: [ server_domain ]
+ parameters:
+ option: FIRST_SIGNIFICANT_SUBDOMAIN
+
+ - function: PATH_COMBINE
+ lookup_fields: [ packet_capture_file ]
+ output_fields: [ packet_capture_file ]
+ parameters:
+ path: [ props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file]
+ - function: STRING_JOINER
+ lookup_fields: [ server_ip,client_ip ]
+ output_fields: [ ip_string ]
+ parameters:
+ separator: ','
+ prefix: '['
+ suffix: ']'
+ - function: GENERATE_STRING_ARRAY
+ lookup_fields: [ client_asn,server_asn ]
+ output_fields: [ asn_list ]
+
+
+sinks:
+ kafka_sink_a:
+ type: kafka
+ properties:
+ topic: SESSION-RECORD-JSON
+ kafka.bootstrap.servers: 192.168.44.12:9092
+ kafka.retries: 0
+ kafka.linger.ms: 10
+ kafka.request.timeout.ms: 30000
+ kafka.batch.size: 262144
+ kafka.buffer.memory: 134217728
+ kafka.max.request.size: 10485760
+ kafka.compression.type: snappy
+ kafka.security.protocol:
+ kafka.ssl.keystore.location:
+ kafka.ssl.keystore.password:
+ kafka.ssl.truststore.location:
+ kafka.ssl.truststore.password:
+ kafka.ssl.key.password:
+ kafka.sasl.mechanism:
+ kafka.sasl.jaas.config:
+ format: json
+
+ kafka_sink_b:
+ type: kafka
+ properties:
+ topic: SESSION-RECORD-COMPLETED-TEST
+ kafka.bootstrap.servers: 192.168.44.12:9092
+ kafka.retries: 0
+ kafka.linger.ms: 10
+ kafka.request.timeout.ms: 30000
+ kafka.batch.size: 262144
+ kafka.buffer.memory: 134217728
+ kafka.max.request.size: 10485760
+ kafka.compression.type: snappy
+ format: json
+
+ print_sink:
+ type: print
+ properties:
+ format: json
+ collect_sink:
+ type: collect
+ properties:
+ format: json
+
+application: # [object] Application Configuration
+ env: # [object] Environment Variables
+ name: groot-stream-job # [string] Job Name
+ pipeline:
+ object-reuse: true # [boolean] Object Reuse, default is false
+ topology: # [array of object] Node List. It will be used build data flow for job dag graph.
+ - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE.
+ parallelism: 1 # [number] Operator-Level Parallelism.
+ downstream: [schema_type_filter] # [array of string] Downstream Node Name List.
+ - name: schema_type_filter
+ parallelism: 1
+ downstream: [session_record_processor]
+ - name: session_record_processor
+ parallelism: 1
+ downstream: [collect_sink]
+ - name: collect_sink
+ parallelism: 1
+