From 6e2564bef00eb0ca640f38209260b0b0fd172c95 Mon Sep 17 00:00:00 2001 From: doufenghu Date: Mon, 9 Sep 2024 16:45:45 +0800 Subject: [Improve][bootstrap] Remove hbase-client-shaded package in assembly releases. --- groot-release/pom.xml | 7 ------- groot-release/src/main/assembly/assembly-bin-ci.xml | 1 - 2 files changed, 8 deletions(-) diff --git a/groot-release/pom.xml b/groot-release/pom.xml index 8a78efa..229b23f 100644 --- a/groot-release/pom.xml +++ b/groot-release/pom.xml @@ -121,13 +121,6 @@ provided - - - com.geedgenetworks - hbase-client-shaded - ${project.version} - provided - com.geedgenetworks diff --git a/groot-release/src/main/assembly/assembly-bin-ci.xml b/groot-release/src/main/assembly/assembly-bin-ci.xml index 4402809..f2b767f 100644 --- a/groot-release/src/main/assembly/assembly-bin-ci.xml +++ b/groot-release/src/main/assembly/assembly-bin-ci.xml @@ -134,7 +134,6 @@ true false - com.geedgenetworks:hbase-client-shaded:jar com.geedgenetworks:format-json:jar com.geedgenetworks:format-protobuf:jar com.geedgenetworks:format-msgpack:jar -- cgit v1.2.3 From 202cf0557b3120b3599ff34015166ae8bd6c5a1b Mon Sep 17 00:00:00 2001 From: doufenghu Date: Mon, 9 Sep 2024 16:47:35 +0800 Subject: [Improve][mock-data] Add array(String) mock data --- config/template/grootstream_job_template.yaml | 1 + .../mock_schema/session_record_mock_desc.json | 76 ++++++++++++++++++++++ docs/connector/connector.md | 27 +++++--- 3 files changed, 94 insertions(+), 10 deletions(-) diff --git a/config/template/grootstream_job_template.yaml b/config/template/grootstream_job_template.yaml index 0ca2d68..3110162 100644 --- a/config/template/grootstream_job_template.yaml +++ b/config/template/grootstream_job_template.yaml @@ -448,6 +448,7 @@ application: # [object] Application Configuration shade.identifier: default # [string] Shade Identifier, Using to encrypt and decrypt sensitive configuration. Support enum: default, aes, base64. if set default, it will not encrypt and decrypt sensitive configuration. pipeline: object-reuse: true # [boolean] Object Reuse, default is false + topology: # [array of object] Node List. It will be used build data flow for job dag graph. - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. #parallelism: 1 # [number] Operator-Level Parallelism. diff --git a/config/template/mock_schema/session_record_mock_desc.json b/config/template/mock_schema/session_record_mock_desc.json index c8c4acf..90060a6 100644 --- a/config/template/mock_schema/session_record_mock_desc.json +++ b/config/template/mock_schema/session_record_mock_desc.json @@ -114,12 +114,43 @@ "start": "103.144.108.1", "end": "103.144.108.255" }, + { + "name": "client_ip_tags", + "type": "String", + "array": true, + "options": [ + "Country:United States", + "ASN:63278", + "Cloud Provider:IBM Cloud", + "Country Code:US", + "CDN Provider:Light CDN", + "ASN:6423" + + ], + "arrayLenMin":1, + "arrayLenMax":5 + }, { "name": "server_ip", "type": "IPv4", "start": "1.0.0.0", "end": "162.105.10.255" }, + { + "name": "server_ip_tags", + "type": "String", + "array": true, + "options": [ + "Country:China", + "ASN:15169", + "Cloud Provider:Alibaba Cloud", + "Country Code:CN", + "CDN Provider:Akamai", + "Super Administrative Area:Guangdong" + ], + "arrayLenMin":1, + "arrayLenMax":5 + }, { "name": "c2s_ttl", "type": "Number", @@ -166,12 +197,43 @@ "start": "1.0.0.0", "end": "162.105.10.255" }, + { + "name": "client_ip_tags", + "type": "String", + "array": true, + "options": [ + "Country:China", + "ASN:15169", + "Cloud Provider:Alibaba Cloud", + "Country Code:CN", + "CDN Provider:Akamai", + "Super Administrative Area:Guangdong" + ], + "arrayLenMin":1, + "arrayLenMax":5 + }, { "name": "server_ip", "type": "IPv4", "start": "103.144.108.1", "end": "103.144.108.255" }, + { + "name": "server_ip_tags", + "type": "String", + "array": true, + "options": [ + "Country:United States", + "ASN:63278", + "Cloud Provider:IBM Cloud", + "Country Code:US", + "CDN Provider:Light CDN", + "ASN:6423" + + ], + "arrayLenMin":1, + "arrayLenMax":5 + }, { "name": "c2s_ttl", "type": "Number", @@ -339,6 +401,20 @@ "expression": "#{internet.domainName}", "nullRate": 0.1 }, + { + "name": "server_fqdn_tags", + "type": "String", + "array": true, + "options": [ + "Category Name:Entertainment and Arts", + "IoC:Malware", + "Category Name:Home and Garden", + "Category Name:Translation", + "IoC:Spam" + ], + "arrayLenMin":1, + "arrayLenMax":5 + }, { "name": "server_port", "type": "Number", diff --git a/docs/connector/connector.md b/docs/connector/connector.md index 766b73e..93d64b0 100644 --- a/docs/connector/connector.md +++ b/docs/connector/connector.md @@ -1,3 +1,12 @@ +# Table of Contents +- [Source Connector](#source-connector) + - [Common Source Options](#common-source-options) + - [Schema Field Projection](#schema-field-projection) + - [Schema Config](#schema-config) + - [Mock Data Type](#mock-data-type) +- [Sink Connector](#sink-connector) + - [Common Sink Options](#common-sink-options) + # Source Connector Source Connector contains some common core features, and each source connector supports them to varying degrees. @@ -62,13 +71,12 @@ schema: To retrieve the schema from a local file using its absolute path. > Ensures that the file path is accessible to all nodes in your Flink cluster. -> -> ```yaml -> schema: -> # by array -> fields: -> local_file: "/path/to/schema.json" -> ``` + + ```yaml +schema: + # Note: Only support avro schema format + local_file: "/path/to/schema.json" +``` ### URL @@ -76,9 +84,8 @@ Some connectors support periodically fetching and updating the schema from a URL ```yaml schema: - # by array - fields: - url: "https://localhost:8080/schema.json" + # Note: Only support avro schema format + url: "https://localhost:8080/schema.json" ``` ## Mock Data Type -- cgit v1.2.3 From 4bb87b62cd7d3dd12bd19e643aaffda53e35e57a Mon Sep 17 00:00:00 2001 From: doufenghu Date: Tue, 10 Sep 2024 20:05:06 +0800 Subject: [Feature][docs] Add split operator description. --- docs/images/groot_stream_architecture.jpg | Bin 5263679 -> 5472871 bytes docs/processor/aggregate-processor.md | 23 ++--- docs/processor/split-processor.md | 49 +++++++++++ .../examples/grootstream_job_split_test.yaml | 97 +++++++++++++++++++++ 4 files changed, 158 insertions(+), 11 deletions(-) create mode 100644 docs/processor/split-processor.md create mode 100644 groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml diff --git a/docs/images/groot_stream_architecture.jpg b/docs/images/groot_stream_architecture.jpg index d8f1d4b..28b553b 100644 Binary files a/docs/images/groot_stream_architecture.jpg and b/docs/images/groot_stream_architecture.jpg differ diff --git a/docs/processor/aggregate-processor.md b/docs/processor/aggregate-processor.md index 5ab0ae0..afc26f6 100644 --- a/docs/processor/aggregate-processor.md +++ b/docs/processor/aggregate-processor.md @@ -10,17 +10,18 @@ Within the pipeline, events are processed by each Function in order, top‑>down ## Options Note:Default will output internal fields `__window_start_timestamp` and `__window_end_timestamp` if not set output_fields. -| name | type | required | default value | -|--------------------------|--------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| type | String | Yes | The type of the processor, now only support `com.geedgenetworks.core.processor.aggregate.AggregateProcessor` | -| output_fields | Array | No | Array of String. The list of fields that need to be kept. Fields not in the list will be removed. | -| remove_fields | Array | No | Array of String. The list of fields that need to be removed. | -| group_by_fields | Array | yes | Array of String. The list of fields that need to be grouped. | -| window_type | String | yes | The type of window, now only support `tumbling_processing_time`, `tumbling_event_time`, `sliding_processing_time`, `sliding_event_time`. if window_type is `tumbling/sliding_event_time,` you need to set watermark. | -| window_size | Long | yes | The duration of the window in seconds. | -| window_slide | Long | yes | The duration of the window slide in seconds. | -| window_timestamp_field | String | No | Set the output timestamp field name, with the unit in seconds. It is mapped to the internal field __window_start_timestamp. | -| functions | Array | No | Array of Object. The list of functions that need to be applied to the data. | +| name | type | required | default value | +|------------------------|-----------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| type | String | Yes | The type of the processor, now only support `com.geedgenetworks.core.processor.aggregate.AggregateProcessor` | +| output_fields | Array | No | Array of String. The list of fields that need to be kept. Fields not in the list will be removed. | +| remove_fields | Array | No | Array of String. The list of fields that need to be removed. | +| group_by_fields | Array | yes | Array of String. The list of fields that need to be grouped. | +| window_type | String | yes | The type of window, now only support `tumbling_processing_time`, `tumbling_event_time`, `sliding_processing_time`, `sliding_event_time`. if window_type is `tumbling/sliding_event_time,` you need to set watermark. | +| window_size | Long | yes | The duration of the window in seconds. | +| window_slide | Long | yes | The duration of the window slide in seconds. | +| window_timestamp_field | String | No | Set the output timestamp field name, with the unit in seconds. It is mapped to the internal field __window_start_timestamp. | +| mini_batch | Boolean | No | Specifies whether to enable local aggregate optimization. The default value is false. This can significantly reduce the state overhead and get a better throughput. | +| functions | Array | No | Array of Object. The list of functions that need to be applied to the data. | ## Usage Example diff --git a/docs/processor/split-processor.md b/docs/processor/split-processor.md new file mode 100644 index 0000000..e1a1163 --- /dev/null +++ b/docs/processor/split-processor.md @@ -0,0 +1,49 @@ +# Split Processor + +> Split the output of a data processing pipeline into multiple streams based on certain conditions. + +## Description + +Using the flink side Outputs send data from a stream to multiple downstream consumers. This is useful when you want to separate or filter certain elements of a stream without disrupting the main processing flow. For example, side outputs can be used for error handling, conditional routing, or extracting specific subsets of the data. + +## Options + +| name | type | required | default value | +|-------------------|--------|----------|--------------------------------------------------------------------------------------------| +| type | String | Yes | The type of the processor, now only support ` com.geedgenetworks.core.split.SplitOperator` | +| rules | Array | Yes | Array of Object. Defining rules for labeling Side Output Tag | +| [rule.]tag | String | Yes | The tag name of the side output | +| [rule.]expression | String | Yes | The expression to evaluate the event. | + +## Usage Example + +This example uses a split processor to split the data into two streams based on the value of the `decoded_as` field. + +```yaml +splits: + decoded_as_split: + type: split + rules: + - tag: http_tag + expression: event.decoded_as == 'HTTP' + - tag: dns_tag + expression: event.decoded_as == 'DNS' + + +topology: + - name: inline_source + downstream: [decoded_as_split] + - name: decoded_as_split + tags: [http_tag, dns_tag] + downstream: [ projection_processor, aggregate_processor] + - name: projection_processor + downstream: [ print_sink ] + - name: aggregate_processor + downstream: [ print_sink ] + - name: print_sink + downstream: [] +``` + + + + diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml new file mode 100644 index 0000000..9bb2900 --- /dev/null +++ b/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml @@ -0,0 +1,97 @@ +sources: + inline_source: + type : inline + fields: # [array of object] Field List, if not set, all fields(Map) will be output. + properties: + data: '[{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' + interval.per.row: 1s # 可选 + repeat.count: 1 # 可选 + format: json + json.ignore.parse.errors: false +sinks: + collect_sink: + type: collect + properties: + format: json +splits: + test_split: + type: split + rules: + - name: table_processor + expression: event.decoded_as == 'HTTP' + - name: pre_etl_processor + expression: event.decoded_as == 'DNS' + +postprocessing_pipelines: + pre_etl_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: [fields,tags] + output_fields: + functions: # [array of object] Function List + + - function: FLATTEN + lookup_fields: [ fields,tags ] + output_fields: [ ] + parameters: + #prefix: "" + depth: 3 + # delimiter: "." + + - function: RENAME + lookup_fields: [ '' ] + output_fields: [ '' ] + filter: + parameters: + # parent_fields: [tags] + # rename_fields: + # tags: tags + rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key; + + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ timestamp_ms ] + output_fields: [ recv_time ] + parameters: + precision: seconds + interval: 300 + # + + aggregate_processor: + type: aggregate + group_by_fields: [decoded_as] + window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time + window_size: 5 + window_timestamp_field: test_time + functions: + - function: NUMBER_SUM + lookup_fields: [ sessions ] + + table_processor: + type: table + functions: + - function: JSON_UNROLL + lookup_fields: [ encapsulation ] + output_fields: [ new_name ] + +application: # [object] Application Configuration + env: # [object] Environment Variables + name: groot-stream-job # [string] Job Name + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: # [array of object] Node List. It will be used build data flow for job dag graph. + - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. + parallelism: 1 # [number] Operator-Level Parallelism. + downstream: [test_split,collect_sink] + - name: test_split + parallelism: 1 + downstream: [ table_processor,pre_etl_processor ] + - name: pre_etl_processor + parallelism: 1 + downstream: [ collect_sink ] + - name: table_processor + parallelism: 1 + downstream: [ collect_sink ] + - name: collect_sink + parallelism: 1 + + -- cgit v1.2.3