summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--config/template/grootstream_job_template.yaml1
-rw-r--r--config/template/mock_schema/session_record_mock_desc.json76
-rw-r--r--docs/connector/connector.md27
-rw-r--r--docs/images/groot_stream_architecture.jpgbin5263679 -> 5472871 bytes
-rw-r--r--docs/processor/aggregate-processor.md23
-rw-r--r--docs/processor/split-processor.md49
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml97
-rw-r--r--groot-release/pom.xml7
-rw-r--r--groot-release/src/main/assembly/assembly-bin-ci.xml1
9 files changed, 252 insertions, 29 deletions
diff --git a/config/template/grootstream_job_template.yaml b/config/template/grootstream_job_template.yaml
index 0ca2d68..3110162 100644
--- a/config/template/grootstream_job_template.yaml
+++ b/config/template/grootstream_job_template.yaml
@@ -448,6 +448,7 @@ application: # [object] Application Configuration
shade.identifier: default # [string] Shade Identifier, Using to encrypt and decrypt sensitive configuration. Support enum: default, aes, base64. if set default, it will not encrypt and decrypt sensitive configuration.
pipeline:
object-reuse: true # [boolean] Object Reuse, default is false
+
topology: # [array of object] Node List. It will be used build data flow for job dag graph.
- name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE.
#parallelism: 1 # [number] Operator-Level Parallelism.
diff --git a/config/template/mock_schema/session_record_mock_desc.json b/config/template/mock_schema/session_record_mock_desc.json
index c8c4acf..90060a6 100644
--- a/config/template/mock_schema/session_record_mock_desc.json
+++ b/config/template/mock_schema/session_record_mock_desc.json
@@ -115,12 +115,43 @@
"end": "103.144.108.255"
},
{
+ "name": "client_ip_tags",
+ "type": "String",
+ "array": true,
+ "options": [
+ "Country:United States",
+ "ASN:63278",
+ "Cloud Provider:IBM Cloud",
+ "Country Code:US",
+ "CDN Provider:Light CDN",
+ "ASN:6423"
+
+ ],
+ "arrayLenMin":1,
+ "arrayLenMax":5
+ },
+ {
"name": "server_ip",
"type": "IPv4",
"start": "1.0.0.0",
"end": "162.105.10.255"
},
{
+ "name": "server_ip_tags",
+ "type": "String",
+ "array": true,
+ "options": [
+ "Country:China",
+ "ASN:15169",
+ "Cloud Provider:Alibaba Cloud",
+ "Country Code:CN",
+ "CDN Provider:Akamai",
+ "Super Administrative Area:Guangdong"
+ ],
+ "arrayLenMin":1,
+ "arrayLenMax":5
+ },
+ {
"name": "c2s_ttl",
"type": "Number",
"options": [
@@ -167,12 +198,43 @@
"end": "162.105.10.255"
},
{
+ "name": "client_ip_tags",
+ "type": "String",
+ "array": true,
+ "options": [
+ "Country:China",
+ "ASN:15169",
+ "Cloud Provider:Alibaba Cloud",
+ "Country Code:CN",
+ "CDN Provider:Akamai",
+ "Super Administrative Area:Guangdong"
+ ],
+ "arrayLenMin":1,
+ "arrayLenMax":5
+ },
+ {
"name": "server_ip",
"type": "IPv4",
"start": "103.144.108.1",
"end": "103.144.108.255"
},
{
+ "name": "server_ip_tags",
+ "type": "String",
+ "array": true,
+ "options": [
+ "Country:United States",
+ "ASN:63278",
+ "Cloud Provider:IBM Cloud",
+ "Country Code:US",
+ "CDN Provider:Light CDN",
+ "ASN:6423"
+
+ ],
+ "arrayLenMin":1,
+ "arrayLenMax":5
+ },
+ {
"name": "c2s_ttl",
"type": "Number",
"options": [
@@ -340,6 +402,20 @@
"nullRate": 0.1
},
{
+ "name": "server_fqdn_tags",
+ "type": "String",
+ "array": true,
+ "options": [
+ "Category Name:Entertainment and Arts",
+ "IoC:Malware",
+ "Category Name:Home and Garden",
+ "Category Name:Translation",
+ "IoC:Spam"
+ ],
+ "arrayLenMin":1,
+ "arrayLenMax":5
+ },
+ {
"name": "server_port",
"type": "Number",
"options": [
diff --git a/docs/connector/connector.md b/docs/connector/connector.md
index 766b73e..93d64b0 100644
--- a/docs/connector/connector.md
+++ b/docs/connector/connector.md
@@ -1,3 +1,12 @@
+# Table of Contents
+- [Source Connector](#source-connector)
+ - [Common Source Options](#common-source-options)
+ - [Schema Field Projection](#schema-field-projection)
+ - [Schema Config](#schema-config)
+ - [Mock Data Type](#mock-data-type)
+- [Sink Connector](#sink-connector)
+ - [Common Sink Options](#common-sink-options)
+
# Source Connector
Source Connector contains some common core features, and each source connector supports them to varying degrees.
@@ -62,13 +71,12 @@ schema:
To retrieve the schema from a local file using its absolute path.
> Ensures that the file path is accessible to all nodes in your Flink cluster.
->
-> ```yaml
-> schema:
-> # by array
-> fields:
-> local_file: "/path/to/schema.json"
-> ```
+
+ ```yaml
+schema:
+ # Note: Only support avro schema format
+ local_file: "/path/to/schema.json"
+```
### URL
@@ -76,9 +84,8 @@ Some connectors support periodically fetching and updating the schema from a URL
```yaml
schema:
- # by array
- fields:
- url: "https://localhost:8080/schema.json"
+ # Note: Only support avro schema format
+ url: "https://localhost:8080/schema.json"
```
## Mock Data Type
diff --git a/docs/images/groot_stream_architecture.jpg b/docs/images/groot_stream_architecture.jpg
index d8f1d4b..28b553b 100644
--- a/docs/images/groot_stream_architecture.jpg
+++ b/docs/images/groot_stream_architecture.jpg
Binary files differ
diff --git a/docs/processor/aggregate-processor.md b/docs/processor/aggregate-processor.md
index 5ab0ae0..afc26f6 100644
--- a/docs/processor/aggregate-processor.md
+++ b/docs/processor/aggregate-processor.md
@@ -10,17 +10,18 @@ Within the pipeline, events are processed by each Function in order, top‑>down
## Options
Note:Default will output internal fields `__window_start_timestamp` and `__window_end_timestamp` if not set output_fields.
-| name | type | required | default value |
-|--------------------------|--------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| type | String | Yes | The type of the processor, now only support `com.geedgenetworks.core.processor.aggregate.AggregateProcessor` |
-| output_fields | Array | No | Array of String. The list of fields that need to be kept. Fields not in the list will be removed. |
-| remove_fields | Array | No | Array of String. The list of fields that need to be removed. |
-| group_by_fields | Array | yes | Array of String. The list of fields that need to be grouped. |
-| window_type | String | yes | The type of window, now only support `tumbling_processing_time`, `tumbling_event_time`, `sliding_processing_time`, `sliding_event_time`. if window_type is `tumbling/sliding_event_time,` you need to set watermark. |
-| window_size | Long | yes | The duration of the window in seconds. |
-| window_slide | Long | yes | The duration of the window slide in seconds. |
-| window_timestamp_field | String | No | Set the output timestamp field name, with the unit in seconds. It is mapped to the internal field __window_start_timestamp. |
-| functions | Array | No | Array of Object. The list of functions that need to be applied to the data. |
+| name | type | required | default value |
+|------------------------|-----------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| type | String | Yes | The type of the processor, now only support `com.geedgenetworks.core.processor.aggregate.AggregateProcessor` |
+| output_fields | Array | No | Array of String. The list of fields that need to be kept. Fields not in the list will be removed. |
+| remove_fields | Array | No | Array of String. The list of fields that need to be removed. |
+| group_by_fields | Array | yes | Array of String. The list of fields that need to be grouped. |
+| window_type | String | yes | The type of window, now only support `tumbling_processing_time`, `tumbling_event_time`, `sliding_processing_time`, `sliding_event_time`. if window_type is `tumbling/sliding_event_time,` you need to set watermark. |
+| window_size | Long | yes | The duration of the window in seconds. |
+| window_slide | Long | yes | The duration of the window slide in seconds. |
+| window_timestamp_field | String | No | Set the output timestamp field name, with the unit in seconds. It is mapped to the internal field __window_start_timestamp. |
+| mini_batch | Boolean | No | Specifies whether to enable local aggregate optimization. The default value is false. This can significantly reduce the state overhead and get a better throughput. |
+| functions | Array | No | Array of Object. The list of functions that need to be applied to the data. |
## Usage Example
diff --git a/docs/processor/split-processor.md b/docs/processor/split-processor.md
new file mode 100644
index 0000000..e1a1163
--- /dev/null
+++ b/docs/processor/split-processor.md
@@ -0,0 +1,49 @@
+# Split Processor
+
+> Split the output of a data processing pipeline into multiple streams based on certain conditions.
+
+## Description
+
+Using the flink side Outputs send data from a stream to multiple downstream consumers. This is useful when you want to separate or filter certain elements of a stream without disrupting the main processing flow. For example, side outputs can be used for error handling, conditional routing, or extracting specific subsets of the data.
+
+## Options
+
+| name | type | required | default value |
+|-------------------|--------|----------|--------------------------------------------------------------------------------------------|
+| type | String | Yes | The type of the processor, now only support ` com.geedgenetworks.core.split.SplitOperator` |
+| rules | Array | Yes | Array of Object. Defining rules for labeling Side Output Tag |
+| [rule.]tag | String | Yes | The tag name of the side output |
+| [rule.]expression | String | Yes | The expression to evaluate the event. |
+
+## Usage Example
+
+This example uses a split processor to split the data into two streams based on the value of the `decoded_as` field.
+
+```yaml
+splits:
+ decoded_as_split:
+ type: split
+ rules:
+ - tag: http_tag
+ expression: event.decoded_as == 'HTTP'
+ - tag: dns_tag
+ expression: event.decoded_as == 'DNS'
+
+
+topology:
+ - name: inline_source
+ downstream: [decoded_as_split]
+ - name: decoded_as_split
+ tags: [http_tag, dns_tag]
+ downstream: [ projection_processor, aggregate_processor]
+ - name: projection_processor
+ downstream: [ print_sink ]
+ - name: aggregate_processor
+ downstream: [ print_sink ]
+ - name: print_sink
+ downstream: []
+```
+
+
+
+
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml
new file mode 100644
index 0000000..9bb2900
--- /dev/null
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml
@@ -0,0 +1,97 @@
+sources:
+ inline_source:
+ type : inline
+ fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
+ properties:
+ data: '[{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]'
+ interval.per.row: 1s # 可选
+ repeat.count: 1 # 可选
+ format: json
+ json.ignore.parse.errors: false
+sinks:
+ collect_sink:
+ type: collect
+ properties:
+ format: json
+splits:
+ test_split:
+ type: split
+ rules:
+ - name: table_processor
+ expression: event.decoded_as == 'HTTP'
+ - name: pre_etl_processor
+ expression: event.decoded_as == 'DNS'
+
+postprocessing_pipelines:
+ pre_etl_processor: # [object] Processing Pipeline
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ remove_fields: [fields,tags]
+ output_fields:
+ functions: # [array of object] Function List
+
+ - function: FLATTEN
+ lookup_fields: [ fields,tags ]
+ output_fields: [ ]
+ parameters:
+ #prefix: ""
+ depth: 3
+ # delimiter: "."
+
+ - function: RENAME
+ lookup_fields: [ '' ]
+ output_fields: [ '' ]
+ filter:
+ parameters:
+ # parent_fields: [tags]
+ # rename_fields:
+ # tags: tags
+ rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key;
+
+
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [ timestamp_ms ]
+ output_fields: [ recv_time ]
+ parameters:
+ precision: seconds
+ interval: 300
+ #
+
+ aggregate_processor:
+ type: aggregate
+ group_by_fields: [decoded_as]
+ window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time
+ window_size: 5
+ window_timestamp_field: test_time
+ functions:
+ - function: NUMBER_SUM
+ lookup_fields: [ sessions ]
+
+ table_processor:
+ type: table
+ functions:
+ - function: JSON_UNROLL
+ lookup_fields: [ encapsulation ]
+ output_fields: [ new_name ]
+
+application: # [object] Application Configuration
+ env: # [object] Environment Variables
+ name: groot-stream-job # [string] Job Name
+ pipeline:
+ object-reuse: true # [boolean] Object Reuse, default is false
+ topology: # [array of object] Node List. It will be used build data flow for job dag graph.
+ - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE.
+ parallelism: 1 # [number] Operator-Level Parallelism.
+ downstream: [test_split,collect_sink]
+ - name: test_split
+ parallelism: 1
+ downstream: [ table_processor,pre_etl_processor ]
+ - name: pre_etl_processor
+ parallelism: 1
+ downstream: [ collect_sink ]
+ - name: table_processor
+ parallelism: 1
+ downstream: [ collect_sink ]
+ - name: collect_sink
+ parallelism: 1
+
+
diff --git a/groot-release/pom.xml b/groot-release/pom.xml
index 8a78efa..229b23f 100644
--- a/groot-release/pom.xml
+++ b/groot-release/pom.xml
@@ -121,13 +121,6 @@
<scope>provided</scope>
</dependency>
- <!--Hbase Jars -->
- <dependency>
- <groupId>com.geedgenetworks</groupId>
- <artifactId>hbase-client-shaded</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
<!--Format Json -->
<dependency>
<groupId>com.geedgenetworks</groupId>
diff --git a/groot-release/src/main/assembly/assembly-bin-ci.xml b/groot-release/src/main/assembly/assembly-bin-ci.xml
index 4402809..f2b767f 100644
--- a/groot-release/src/main/assembly/assembly-bin-ci.xml
+++ b/groot-release/src/main/assembly/assembly-bin-ci.xml
@@ -134,7 +134,6 @@
<useTransitiveDependencies>true</useTransitiveDependencies>
<unpack>false</unpack>
<includes>
- <include>com.geedgenetworks:hbase-client-shaded:jar</include>
<include>com.geedgenetworks:format-json:jar</include>
<include>com.geedgenetworks:format-protobuf:jar</include>
<include>com.geedgenetworks:format-msgpack:jar</include>