diff options
| author | 窦凤虎 <[email protected]> | 2024-11-01 10:14:03 +0000 |
|---|---|---|
| committer | 窦凤虎 <[email protected]> | 2024-11-01 10:14:03 +0000 |
| commit | f7cec560def3981d52f25fc038aab3d4308d4bd1 (patch) | |
| tree | 1bebf6ee0210b7d5fa50b43e75a5f54a37639177 /groot-examples | |
| parent | c0b9acfc3adc85abbd06207259b2515edc5c4eae (diff) | |
| parent | 7868728ddbe3dc08263b1d21b5ffce5dcd9b8052 (diff) | |
[feature][bootstrap][common]node新增tags属性用于分流,需要与downstream相对应。rules中name标签修改为t...
See merge request galaxy/platform/groot-stream!128
Diffstat (limited to 'groot-examples')
6 files changed, 161 insertions, 11 deletions
diff --git a/groot-examples/cn-udf-example/pom.xml b/groot-examples/cn-udf-example/pom.xml index 38ae4ea..4ec1f18 100644 --- a/groot-examples/cn-udf-example/pom.xml +++ b/groot-examples/cn-udf-example/pom.xml @@ -9,7 +9,7 @@ <version>${revision}</version> </parent> - <artifactId>cn-udf-example</artifactId> + <artifactId>cn-scalarFunction-example</artifactId> <name>Groot : Examples : CN-UDF</name> <properties> <maven.install.skip>true</maven.install.skip> diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java index 9b58289..5e64962 100644 --- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java +++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java @@ -14,7 +14,7 @@ import java.util.List; public class GrootStreamExample { public static void main(String[] args) throws FileNotFoundException, URISyntaxException { - String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print.yaml"; + String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print_test.yaml"; String configFile = getTestConfigFile(configPath); ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs(); executeCommandArgs.setConfigFile(configFile); diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml new file mode 100644 index 0000000..9bb2900 --- /dev/null +++ b/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml @@ -0,0 +1,97 @@ +sources: + inline_source: + type : inline + fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. + properties: + data: '[{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' + interval.per.row: 1s # 可选 + repeat.count: 1 # 可选 + format: json + json.ignore.parse.errors: false +sinks: + collect_sink: + type: collect + properties: + format: json +splits: + test_split: + type: split + rules: + - name: table_processor + expression: event.decoded_as == 'HTTP' + - name: pre_etl_processor + expression: event.decoded_as == 'DNS' + +postprocessing_pipelines: + pre_etl_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: [fields,tags] + output_fields: + functions: # [array of object] Function List + + - function: FLATTEN + lookup_fields: [ fields,tags ] + output_fields: [ ] + parameters: + #prefix: "" + depth: 3 + # delimiter: "." + + - function: RENAME + lookup_fields: [ '' ] + output_fields: [ '' ] + filter: + parameters: + # parent_fields: [tags] + # rename_fields: + # tags: tags + rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key; + + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ timestamp_ms ] + output_fields: [ recv_time ] + parameters: + precision: seconds + interval: 300 + # + + aggregate_processor: + type: aggregate + group_by_fields: [decoded_as] + window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time + window_size: 5 + window_timestamp_field: test_time + functions: + - function: NUMBER_SUM + lookup_fields: [ sessions ] + + table_processor: + type: table + functions: + - function: JSON_UNROLL + lookup_fields: [ encapsulation ] + output_fields: [ new_name ] + +application: # [object] Application Configuration + env: # [object] Environment Variables + name: groot-stream-job # [string] Job Name + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: # [array of object] Node List. It will be used build data flow for job dag graph. + - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. + parallelism: 1 # [number] Operator-Level Parallelism. + downstream: [test_split,collect_sink] + - name: test_split + parallelism: 1 + downstream: [ table_processor,pre_etl_processor ] + - name: pre_etl_processor + parallelism: 1 + downstream: [ collect_sink ] + - name: table_processor + parallelism: 1 + downstream: [ collect_sink ] + - name: collect_sink + parallelism: 1 + + diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml index fb51a0e..e0cbb17 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml @@ -23,10 +23,8 @@ sources: type: string - name: device_tag type: string - - name: sent_bytes - type: bigint - - name: received_bytes - type: bigint + - name: http_host + type: string properties: data: '{"tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}' format: json @@ -47,12 +45,19 @@ processing_pipelines: session_record_processor: type: projection remove_fields: [device_tag] - output_fields: [log_id, renamed_client_ip, c2s_bytes] + #output_fields: [log_id, device_tag, client_ip, client_geolocation, client_asn, server_domain, server_ip, server_geolocation, server_asn, log_uuid, log_uuid_v7, ip_uuid] functions: - function: DROP lookup_fields: [] output_fields: [] filter: event.client_ip == '192.168.10.100' + + - function: DOMAIN + lookup_fields: [ http_host, ssl_sni, quic_sni ] + output_fields: [ server_domain ] + parameters: + option: FIRST_SIGNIFICANT_SUBDOMAIN + - function: SNOWFLAKE_ID lookup_fields: [] output_fields: [log_id] @@ -87,6 +92,21 @@ processing_pipelines: kb_name: tsg_ip_location option: IP_TO_DETAIL + - function: GEOIP_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ ] + parameters: + kb_name: tsg_ip_location + option: IP_TO_OBJECT + geolocation_field_mapping: + COUNTRY: server_country + PROVINCE: server_super_administrative_area + CITY: server_administrative_area + LONGITUDE: server_longitude + LATITUDE: server_latitude + ISP: server_isp + ORGANIZATION: server_organization + - function: JSON_EXTRACT lookup_fields: [ device_tag ] output_fields: [ device_group ] @@ -101,14 +121,22 @@ processing_pipelines: output_fields: [ processing_time_str ] parameters: precision: milliseconds + - function: RENAME parameters: rename_fields: - client_ip: renamed_client_ip - - function: EVAL - output_fields: [ c2s_bytes ] + device_tag: renamed_device_tag + + - function: UUIDv5 + lookup_fields: [ client_ip, server_ip ] + output_fields: [ ip_uuid ] parameters: - value_expression: sent_bytes + namespace: NAMESPACE_IP + - function: UUIDv7 + output_fields: [ log_uuid_v7 ] + - function: UUID + output_fields: [ log_uuid ] + sinks: print_sink: @@ -120,6 +148,7 @@ application: env: name: example-inline-to-print parallelism: 3 + kms.type: local pipeline: object-reuse: true topology: diff --git a/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml b/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml index 67e1dd6..2c352a2 100644 --- a/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml @@ -10,6 +10,21 @@ grootstream: fs_path: ./config/dat files: - ip_builtin.mmdb + kms: + local: + type: local + vault: + type: vault + url: <vault-url> + token: <vault-token> + key_path: <vault-key-path> + + ssl: + enabled: true + cert_file: ./config/ssl/cert.pem + key_file: ./config/ssl/key.pem + require_client_auth: true + properties: hos.path: http://192.168.44.12:9098/hos hos.bucket.name.traffic_file: traffic_file_bucket diff --git a/groot-examples/pom.xml b/groot-examples/pom.xml index 6184bda..46ccaaa 100644 --- a/groot-examples/pom.xml +++ b/groot-examples/pom.xml @@ -127,12 +127,21 @@ </dependency> <dependency> + <groupId>com.fasterxml.uuid</groupId> + <artifactId>java-uuid-generator</artifactId> + <version>${uuid-generator.version}</version> + <scope>${scope}</scope> + </dependency> + + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-test-utils_${scala.version}</artifactId> <version>${flink.version}</version> <scope>test</scope> </dependency> + + </dependencies> |
