summaryrefslogtreecommitdiff
path: root/groot-examples
diff options
context:
space:
mode:
author窦凤虎 <[email protected]>2024-11-01 10:14:03 +0000
committer窦凤虎 <[email protected]>2024-11-01 10:14:03 +0000
commitf7cec560def3981d52f25fc038aab3d4308d4bd1 (patch)
tree1bebf6ee0210b7d5fa50b43e75a5f54a37639177 /groot-examples
parentc0b9acfc3adc85abbd06207259b2515edc5c4eae (diff)
parent7868728ddbe3dc08263b1d21b5ffce5dcd9b8052 (diff)
Merge branch 'release/1.7.0' into 'master'v1.7.0master
[feature][bootstrap][common]node新增tags属性用于分流,需要与downstream相对应。rules中name标签修改为t... See merge request galaxy/platform/groot-stream!128
Diffstat (limited to 'groot-examples')
-rw-r--r--groot-examples/cn-udf-example/pom.xml2
-rw-r--r--groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java2
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml97
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml47
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/grootstream.yaml15
-rw-r--r--groot-examples/pom.xml9
6 files changed, 161 insertions, 11 deletions
diff --git a/groot-examples/cn-udf-example/pom.xml b/groot-examples/cn-udf-example/pom.xml
index 38ae4ea..4ec1f18 100644
--- a/groot-examples/cn-udf-example/pom.xml
+++ b/groot-examples/cn-udf-example/pom.xml
@@ -9,7 +9,7 @@
<version>${revision}</version>
</parent>
- <artifactId>cn-udf-example</artifactId>
+ <artifactId>cn-scalarFunction-example</artifactId>
<name>Groot : Examples : CN-UDF</name>
<properties>
<maven.install.skip>true</maven.install.skip>
diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
index 9b58289..5e64962 100644
--- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
+++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
@@ -14,7 +14,7 @@ import java.util.List;
public class GrootStreamExample {
public static void main(String[] args) throws FileNotFoundException, URISyntaxException {
- String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print.yaml";
+ String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print_test.yaml";
String configFile = getTestConfigFile(configPath);
ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs();
executeCommandArgs.setConfigFile(configFile);
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml
new file mode 100644
index 0000000..9bb2900
--- /dev/null
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml
@@ -0,0 +1,97 @@
+sources:
+ inline_source:
+ type : inline
+ fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
+ properties:
+ data: '[{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]'
+ interval.per.row: 1s # 可选
+ repeat.count: 1 # 可选
+ format: json
+ json.ignore.parse.errors: false
+sinks:
+ collect_sink:
+ type: collect
+ properties:
+ format: json
+splits:
+ test_split:
+ type: split
+ rules:
+ - name: table_processor
+ expression: event.decoded_as == 'HTTP'
+ - name: pre_etl_processor
+ expression: event.decoded_as == 'DNS'
+
+postprocessing_pipelines:
+ pre_etl_processor: # [object] Processing Pipeline
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ remove_fields: [fields,tags]
+ output_fields:
+ functions: # [array of object] Function List
+
+ - function: FLATTEN
+ lookup_fields: [ fields,tags ]
+ output_fields: [ ]
+ parameters:
+ #prefix: ""
+ depth: 3
+ # delimiter: "."
+
+ - function: RENAME
+ lookup_fields: [ '' ]
+ output_fields: [ '' ]
+ filter:
+ parameters:
+ # parent_fields: [tags]
+ # rename_fields:
+ # tags: tags
+ rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key;
+
+
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [ timestamp_ms ]
+ output_fields: [ recv_time ]
+ parameters:
+ precision: seconds
+ interval: 300
+ #
+
+ aggregate_processor:
+ type: aggregate
+ group_by_fields: [decoded_as]
+ window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time
+ window_size: 5
+ window_timestamp_field: test_time
+ functions:
+ - function: NUMBER_SUM
+ lookup_fields: [ sessions ]
+
+ table_processor:
+ type: table
+ functions:
+ - function: JSON_UNROLL
+ lookup_fields: [ encapsulation ]
+ output_fields: [ new_name ]
+
+application: # [object] Application Configuration
+ env: # [object] Environment Variables
+ name: groot-stream-job # [string] Job Name
+ pipeline:
+ object-reuse: true # [boolean] Object Reuse, default is false
+ topology: # [array of object] Node List. It will be used build data flow for job dag graph.
+ - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE.
+ parallelism: 1 # [number] Operator-Level Parallelism.
+ downstream: [test_split,collect_sink]
+ - name: test_split
+ parallelism: 1
+ downstream: [ table_processor,pre_etl_processor ]
+ - name: pre_etl_processor
+ parallelism: 1
+ downstream: [ collect_sink ]
+ - name: table_processor
+ parallelism: 1
+ downstream: [ collect_sink ]
+ - name: collect_sink
+ parallelism: 1
+
+
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
index fb51a0e..e0cbb17 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
@@ -23,10 +23,8 @@ sources:
type: string
- name: device_tag
type: string
- - name: sent_bytes
- type: bigint
- - name: received_bytes
- type: bigint
+ - name: http_host
+ type: string
properties:
data: '{"tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}'
format: json
@@ -47,12 +45,19 @@ processing_pipelines:
session_record_processor:
type: projection
remove_fields: [device_tag]
- output_fields: [log_id, renamed_client_ip, c2s_bytes]
+ #output_fields: [log_id, device_tag, client_ip, client_geolocation, client_asn, server_domain, server_ip, server_geolocation, server_asn, log_uuid, log_uuid_v7, ip_uuid]
functions:
- function: DROP
lookup_fields: []
output_fields: []
filter: event.client_ip == '192.168.10.100'
+
+ - function: DOMAIN
+ lookup_fields: [ http_host, ssl_sni, quic_sni ]
+ output_fields: [ server_domain ]
+ parameters:
+ option: FIRST_SIGNIFICANT_SUBDOMAIN
+
- function: SNOWFLAKE_ID
lookup_fields: []
output_fields: [log_id]
@@ -87,6 +92,21 @@ processing_pipelines:
kb_name: tsg_ip_location
option: IP_TO_DETAIL
+ - function: GEOIP_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ ]
+ parameters:
+ kb_name: tsg_ip_location
+ option: IP_TO_OBJECT
+ geolocation_field_mapping:
+ COUNTRY: server_country
+ PROVINCE: server_super_administrative_area
+ CITY: server_administrative_area
+ LONGITUDE: server_longitude
+ LATITUDE: server_latitude
+ ISP: server_isp
+ ORGANIZATION: server_organization
+
- function: JSON_EXTRACT
lookup_fields: [ device_tag ]
output_fields: [ device_group ]
@@ -101,14 +121,22 @@ processing_pipelines:
output_fields: [ processing_time_str ]
parameters:
precision: milliseconds
+
- function: RENAME
parameters:
rename_fields:
- client_ip: renamed_client_ip
- - function: EVAL
- output_fields: [ c2s_bytes ]
+ device_tag: renamed_device_tag
+
+ - function: UUIDv5
+ lookup_fields: [ client_ip, server_ip ]
+ output_fields: [ ip_uuid ]
parameters:
- value_expression: sent_bytes
+ namespace: NAMESPACE_IP
+ - function: UUIDv7
+ output_fields: [ log_uuid_v7 ]
+ - function: UUID
+ output_fields: [ log_uuid ]
+
sinks:
print_sink:
@@ -120,6 +148,7 @@ application:
env:
name: example-inline-to-print
parallelism: 3
+ kms.type: local
pipeline:
object-reuse: true
topology:
diff --git a/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml b/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml
index 67e1dd6..2c352a2 100644
--- a/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml
@@ -10,6 +10,21 @@ grootstream:
fs_path: ./config/dat
files:
- ip_builtin.mmdb
+ kms:
+ local:
+ type: local
+ vault:
+ type: vault
+ url: <vault-url>
+ token: <vault-token>
+ key_path: <vault-key-path>
+
+ ssl:
+ enabled: true
+ cert_file: ./config/ssl/cert.pem
+ key_file: ./config/ssl/key.pem
+ require_client_auth: true
+
properties:
hos.path: http://192.168.44.12:9098/hos
hos.bucket.name.traffic_file: traffic_file_bucket
diff --git a/groot-examples/pom.xml b/groot-examples/pom.xml
index 6184bda..46ccaaa 100644
--- a/groot-examples/pom.xml
+++ b/groot-examples/pom.xml
@@ -127,12 +127,21 @@
</dependency>
<dependency>
+ <groupId>com.fasterxml.uuid</groupId>
+ <artifactId>java-uuid-generator</artifactId>
+ <version>${uuid-generator.version}</version>
+ <scope>${scope}</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
+
+
</dependencies>