summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author窦凤虎 <[email protected]>2024-08-02 00:45:56 +0000
committer窦凤虎 <[email protected]>2024-08-02 00:45:56 +0000
commit0d68d0be840ae86c79896a239d27d6fa417c77d4 (patch)
treef6a0676a34ed04c1804e518b9b47abb62bfdb74b
parentdf126a194232fe5b5f68ba4fc4fa0b7acd628050 (diff)
parent034abcaa214ce4f2211ce5adb02e30aa781cd067 (diff)
Merge branch 'docs/aggregate-processor' into 'develop'
[docs][core] add aggregate processor documents. See merge request galaxy/platform/groot-stream!86
-rw-r--r--docs/connector/connector.md16
-rw-r--r--docs/filter/aviator.md2
-rw-r--r--docs/processor/aggregate-processor.md71
-rw-r--r--docs/processor/udaf.md149
-rw-r--r--docs/processor/udf.md74
-rw-r--r--groot-common/src/main/resources/udf.plugins4
-rw-r--r--groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java2
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_with_aggregation.yaml41
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/session_record_mock_to_print_with_aggregation.yaml167
9 files changed, 478 insertions, 48 deletions
diff --git a/docs/connector/connector.md b/docs/connector/connector.md
index 08ec673..1123385 100644
--- a/docs/connector/connector.md
+++ b/docs/connector/connector.md
@@ -19,14 +19,14 @@ sources:
${prop_key}: ${prop_value}
```
-| Name | Type | Required | Default | Description |
-|--------------------------|---------------|----------|---------|--------------------------------------------------------------------------------------------------------------------------|
-| type | String | Yes | (none) | The type of the source connector. The `SourceTableFactory` will use this value as identifier to create source connector. |
-| schema | Map | No | (none) | The source table schema, config through fields or local_file or url. |
-| watermark_timestamp | String | No | (none) | watermark timestamp field name, if need use eventTime. |
-| watermark_timestamp_unit | String | No | ms | watermark field timestamp unit, options:ms(milliseconds),s(seconds). is required if watermark_timestamp is not none. |
-| watermark_lag | Long | No | (none) | watermark out-of-order milliseconds. is required if watermark_timestamp is not none. |
-| properties | Map of String | Yes | (none) | The source connector customize properties, more details see the [Source](source) documentation. |
+| Name | Type | Required | Default | Description |
+|--------------------------|---------------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| type | String | Yes | (none) | The type of the source connector. The `SourceTableFactory` will use this value as identifier to create source connector. |
+| schema | Map | No | (none) | The source table schema, config through fields or local_file or url. |
+| watermark_timestamp | String | No | (none) | Specify the field name as the watermark field. It is used to track event time and generate watermarks. |
+| watermark_timestamp_unit | String | No | ms | The watermark field timestamp unit. The optional values are `ms`, `s`. |
+| watermark_lag | Long | No | (none) | The watermark out-of-order milliseconds (Allowed Latenness). It defines the maximum amount of time (in milliseconds) by which events can be late but still be considered for processing. |
+| properties | Map of String | Yes | (none) | The source connector customize properties, more details see the [Source](source) documentation. |
## Schema Field Projection
diff --git a/docs/filter/aviator.md b/docs/filter/aviator.md
index 54fe24b..e7f6c2b 100644
--- a/docs/filter/aviator.md
+++ b/docs/filter/aviator.md
@@ -30,7 +30,7 @@ This example read data from inline source and print to console. It will filter t
filters: # [object] Define filter operator
filter_operator: # [object] AviatorFilter operator name
- type: com.geedgenetworks.core.filter.AviatorFilter
+ type: aviator
properties:
expression: event.server_ip == '8.8.8.8' || event.decoded_as == 'HTTP' # [string] Aviator expression, it return true or false.
diff --git a/docs/processor/aggregate-processor.md b/docs/processor/aggregate-processor.md
new file mode 100644
index 0000000..d9bcdb0
--- /dev/null
+++ b/docs/processor/aggregate-processor.md
@@ -0,0 +1,71 @@
+# Aggregate Processor
+
+> Processing pipelines for aggregate processor
+
+## Description
+
+Aggregate processor is used to aggregate the data from source to sink. It is a part of the processing pipeline. It can be used in the pre-processing, processing, and post-processing pipeline. Each processor can assemble UDAFs(User-defined Aggregate functions) into a pipeline.
+Within the pipeline, events are processed by each Function in order, top‑>down. The UDAF usage detail can be found in [UDAF](udaf.md).
+
+## Options
+Note:Default will output internal fields `__window_start_timestamp` and `__window_end_timestamp` if not set output_fields.
+
+| name | type | required | default value |
+|--------------------------|--------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| type | String | Yes | The type of the processor, now only support `com.geedgenetworks.core.processor.projection.AggregateProcessor` |
+| output_fields | Array | No | Array of String. The list of fields that need to be kept. Fields not in the list will be removed. |
+| remove_fields | Array | No | Array of String. The list of fields that need to be removed. |
+| group_by_fields | Array | yes | Array of String. The list of fields that need to be grouped. |
+| window_type | String | yes | The type of window, now only support `tumbling_processing_time`, `tumbling_event_time`, `sliding_processing_time`, `sliding_event_time`. if window_type is `tumbling/sliding_event_time,` you need to set watermark. |
+| window_size | Long | yes | The duration of the window in seconds. |
+| window_slide | Long | yes | The duration of the window slide in seconds. |
+| window_timestamp_field | String | No | Set the output timestamp field name, with the unit in seconds. It is mapped to the internal field __window_start_timestamp. |
+| functions | Array | No | Array of Object. The list of functions that need to be applied to the data. |
+
+## Usage Example
+
+This example use aggregate processor to aggregate the fields `received_bytes` by `client_ip` and using NUMBER_SUM function to sum all `received_bytes` in 10 seconds window.
+
+```yaml
+sources:
+ inline_source:
+ type: inline
+ properties:
+ data: '[{"tcp_rtt_ms":128,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]'
+ format: json
+ json.ignore.parse.errors: false
+
+
+processing_pipelines:
+ aggregate_processor:
+ type: aggregate
+ group_by_fields: [ client_ip ]
+ window_type: tumbling_processing_time
+ window_size: 10
+ functions:
+ - function: NUMBER_SUM
+ lookup_fields: [ received_bytes ]
+ output_fields: [ received_bytes_sum ]
+
+sinks:
+ print_sink:
+ type: print
+ properties:
+ format: json
+ mode: log_warn
+
+application:
+ env:
+ name: example-inline-to-print-with-aggregation
+ parallelism: 3
+ pipeline:
+ object-reuse: true
+ topology:
+ - name: inline_source
+ downstream: [aggregate_processor]
+ - name: aggregate_processor
+ downstream: [ print_sink ]
+ - name: print_sink
+ downstream: []
+```
+
diff --git a/docs/processor/udaf.md b/docs/processor/udaf.md
new file mode 100644
index 0000000..e22846f
--- /dev/null
+++ b/docs/processor/udaf.md
@@ -0,0 +1,149 @@
+# UDAF
+
+> The functions for aggregate processors.
+
+## Function of content
+
+- [Collect List](#Collect-List)
+- [Collect Set](#Collect-Set)
+- [First Value](#First-Value)
+- [Last Value](#Last-Value)
+- [Long Count](#Long-Count)
+- [MEAN](#Mean)
+- [Number SUM](#Number-SUM)
+
+
+## Description
+
+UDF(User Defined Aggregate Function) is used to extend the functions of aggregate processor. It is a part of the processing pipeline. It can be used in the pre-processing, processing, and post-processing pipeline. Each processor can assemble UDAFs into a pipeline. Within the pipeline, events are processed by each Function in order, top‑>down.
+The deference between UDF and UDAF is:
+- UDF is used to process each event, and the output is also an event. UDAF is used to process a group of events, and the output is also an event.
+- A UDF is designed to perform a transformation or calculation on a single event. A UDAF is designed to perform an aggregation over a group of events, such as summing values, calculating an average, or finding a maximum. It processes multiple events of input data and produces a single aggregated result.
+
+## UDAF Definition
+ The UDAF basic properties are the same as UDF, such as `name`, `event`, `context`,more detail can be found in [UDF](udf.md). But Aggregate Processor have some methods to process the data is:
+- `void add()`: Add a new event to the aggregation.
+- `void getResult()`: Get the result of the aggregation.
+
+## Functions
+
+### Collect List
+
+COLLECT_LIST is used to collect the value of the field in the group of events.
+
+```COLLECT_LIST(filter, lookup_fields, output_fields)```
+
+- filter: optional
+- lookup_fields: required. Now only support one field.
+- output_fields: optional. If not set, the output field name is `lookup_field_name`.
+
+### Example
+
+```yaml
+- function: COLLECT_LIST
+ lookup_fields: [client_ip]
+ output_fields: [client_ip_list]
+```
+
+### Collect Set
+
+COLLECT_SET is used to collect the unique value of the field in the group of events.
+
+```COLLECT_SET(filter, lookup_fields, output_fields)```
+
+- filter: optional
+- lookup_fields: required. Now only support one field.
+- output_fields: optional. If not set, the output field name is `lookup_field_name`.
+
+### Example
+
+```yaml
+- function: COLLECT_SET
+ lookup_fields: [client_ip]
+ output_fields: [client_ip_set]
+```
+
+### First Value
+
+FIRST_VALUE is used to get the first value of the field in the group of events.
+
+```FIRST_VALUE(filter, lookup_fields, output_fields)```
+- filter: optional
+- lookup_fields: required. Now only support one field.
+- output_fields: optional. If not set, the output field name is `lookup_field_name`.
+
+### Example
+
+```yaml
+- function: FIRST_VALUE
+ lookup_fields: [client_ip]
+ output_fields: [first_client_ip]
+```
+### Last Value
+
+LAST_VALUE is used to get the last value of the field in the group of events.
+
+```LAST_VALUE(filter, lookup_fields, output_fields)```
+- filter: optional
+- lookup_fields: required. Now only support one field.
+- output_fields: optional. If not set, the output field name is `lookup_field_name`.
+
+### Example
+
+```yaml
+- function: LAST_VALUE
+ lookup_fields: [client_ip]
+ output_fields: [last_client_ip]
+```
+
+### Long Count
+
+LONG_COUNT is used to count the number of events in the group of events.
+
+```LONG_COUNT(filter, lookup_fields, output_fields)```
+- filter: optional
+- lookup_fields: optional.
+- output_fields: required.
+
+### Example
+
+```yaml
+- function: LONG_COUNT
+ output_fields: [sessions]
+```
+
+### Mean
+
+MEAN is used to calculate the mean value of the field in the group of events. The lookup field value must be a number.
+
+```MEAN(filter, lookup_fields, output_fields[, parameters])```
+- filter: optional
+- lookup_fields: required. Now only support one field.
+- output_fields: optional. If not set, the output field name is `lookup_field_name`.
+- parameters: optional.
+ - precision: `<Integer>` required. The precision of the mean value. Default is 2.
+
+### Example
+
+```yaml
+- function: MEAN
+ lookup_fields: [received_bytes]
+ output_fields: [received_bytes_mean]
+```
+
+### Number SUM
+
+NUMBER_SUM is used to sum the value of the field in the group of events. The lookup field value must be a number.
+
+```NUMBER_SUM(filter, lookup_fields, output_fields)```
+- filter: optional
+- lookup_fields: required. Now only support one field.
+- output_fields: optional. If not set, the output field name is `lookup_field_name`.
+
+### Example
+
+```yaml
+- function: NUMBER_SUM
+ lookup_fields: [received_bytes]
+ output_fields: [received_bytes_sum]
+``` \ No newline at end of file
diff --git a/docs/processor/udf.md b/docs/processor/udf.md
index 2a705fd..cf305ef 100644
--- a/docs/processor/udf.md
+++ b/docs/processor/udf.md
@@ -1,7 +1,7 @@
# UDF
> The functions for projection processors.
->
+
## Function of content
- [Asn Lookup](#asn-lookup)
@@ -40,7 +40,7 @@ A UDF includes the following parts: name, event(processing data), context, evalu
- open function: Initialize the resources used by the function.
- close function: Release the resources used by the function.
-### Functions
+## Functions
Function define common parameters: `filter`, `lookup_fields`, `output_fields`, `parameters`, and will return a Map<String, Object> value of the event.
``` FUNCTION_NAME(filter, lookup_fields, output_fields[, parameters])```
@@ -54,8 +54,8 @@ Asn lookup function is used to lookup the asn information by ip address. You nee
- lookup_fields: required
- output_fields: required
- parameters: required
-- kb_name: required. The name of the knowledge base.
-- option: required. Now only support `IP_TO_ASN`.
+ - kb_name: required. The name of the knowledge base.
+ - option: required. Now only support `IP_TO_ASN`.
Example:
@@ -77,8 +77,8 @@ Base64 decode function is used to decode the base64 encoded string.
- lookup_fields: not required
- output_fields: required
- parameters: required
-- value_field: `<String>` required.
-- charset_field:`<String>` optional. Default is `UTF-8`.
+ - value_field: `<String>` required.
+ - charset_field:`<String>` optional. Default is `UTF-8`.
Example:
@@ -99,7 +99,7 @@ Base64 encode function is commonly used to encode the binary data to base64 stri
- lookup_fields: not required
- output_fields: required
- parameters: required
-- value_field: `<String>` required.
+ - value_field: `<String>` required.
Example:
@@ -119,7 +119,7 @@ Current unix timestamp function is used to get the current unix timestamp.
- lookup_fields: not required
- output_fields: required
- parameters: optional
-- precision: `<String>` optional. Default is `seconds`. Enum: `milliseconds`, `seconds`.
+ - precision: `<String>` optional. Default is `seconds`. Enum: `milliseconds`, `seconds`.
Example:
@@ -139,7 +139,7 @@ Domain function is used to extract the domain from the url.
- lookup_fields: required. Support more than one fields. All fields will be processed from left to right, and the result will be overwritten if the field processed value is not null.
- output_fields: required
- parameters: required
-- option: `<String>` required. Enum: `TOP_LEVEL_DOMAIN`, `FIRST_SIGNIFICANT_SUBDOMAIN`.
+ - option: `<String>` required. Enum: `TOP_LEVEL_DOMAIN`, `FIRST_SIGNIFICANT_SUBDOMAIN`.
#### Option
@@ -182,7 +182,7 @@ Eval function is used to adds or removes fields from events by evaluating an val
- lookup_fields: not required
- output_fields: required
- parameters: required
-- value_expression: `<String>` required. Enter a value expression to set the field’s value – this can be a constant.
+ - value_expression: `<String>` required. Enter a value expression to set the field’s value – this can be a constant.
Example 1:
Add a field `ingestion_time` with value `recv_time`:
@@ -213,10 +213,10 @@ Flatten the fields of nested structure to the top level. The new fields name are
- lookup_fields: optional
- output_fields: not required
- parameters: optional
-- prefix: `<String>` optional. Prefix string for flattened field names. Default is empty.
-- depth: `<Integer>` optional. Number representing the nested levels to consider for flattening. Minimum 1. Default is `5`.
-- delimiter: `<String>` optional. The string used to join nested keys Default is `.`.
-- json_string_keys: `<Array>` optional. The keys of the json string fields. It indicates keys that contain JSON strings and should be parsed and flattened. Default is empty.
+ - prefix: `<String>` optional. Prefix string for flattened field names. Default is empty.
+ - depth: `<Integer>` optional. Number representing the nested levels to consider for flattening. Minimum 1. Default is `5`.
+ - delimiter: `<String>` optional. The string used to join nested keys Default is `.`.
+ - json_string_keys: `<Array>` optional. The keys of the json string fields. It indicates keys that contain JSON strings and should be parsed and flattened. Default is empty.
Example 1:
@@ -259,7 +259,7 @@ From unix timestamp function is used to convert the unix timestamp to date time
- lookup_fields: required
- output_fields: required
- parameters: optional
-- precision: `<String>` optional. Default is `seconds`. Enum: `milliseconds`, `seconds`.
+ - precision: `<String>` optional. Default is `seconds`. Enum: `milliseconds`, `seconds`.
#### Precision
@@ -303,16 +303,16 @@ GeoIP lookup function is used to lookup the geoip information by ip address. You
- lookup_fields: required
- output_fields: optional
- parameters: required
-- kb_name: `<String>` required. The name of the knowledge base.
-- option: `<String>` required. Enum: `IP_TO_COUNTRY`, `IP_TO_PROVINCE`, `IP_TO_CITY`, `IP_TO_SUBDIVISION_ADDR`, `IP_TO_DETAIL`, `IP_TO_LATLNG`, `IP_TO_PROVIDER`, `IP_TO_JSON`, `IP_TO_OBJECT`.
-- geolocation_field_mapping : `<Map<String, String>>` optional. The option is required when the option is `IP_TO_OBJECT`. The mapping of the geolocation fields. The key is the field name of the knowledge base , and the value is the field name of the event.
-- COUNTRY: `<String>` optional.
-- PROVINCE: `<String>` optional.
-- CITY: `<String>` optional.
-- LONGITUDE: `<String>` optional.
-- LATITUDE: `<String>` optional.
-- ISP: `<String>` optional.
-- ORGANIZATION: `<String>` optional.
+ - kb_name: `<String>` required. The name of the knowledge base.
+ - option: `<String>` required. Enum: `IP_TO_COUNTRY`, `IP_TO_PROVINCE`, `IP_TO_CITY`, `IP_TO_SUBDIVISION_ADDR`, `IP_TO_DETAIL`, `IP_TO_LATLNG`, `IP_TO_PROVIDER`, `IP_TO_JSON`, `IP_TO_OBJECT`.
+ - geolocation_field_mapping : `<Map<String, String>>` optional. The option is required when the option is `IP_TO_OBJECT`. The mapping of the geolocation fields. The key is the field name of the knowledge base , and the value is the field name of the event.
+ - COUNTRY: `<String>` optional.
+ - PROVINCE: `<String>` optional.
+ - CITY: `<String>` optional.
+ - LONGITUDE: `<String>` optional.
+ - LATITUDE: `<String>` optional.
+ - ISP: `<String>` optional.
+ - ORGANIZATION: `<String>` optional.
#### Option
@@ -369,7 +369,7 @@ JSON extract function is used to extract the value from json string.
- lookup_fields: required
- output_fields: required
- parameters: required
-- value_expression: `<String>` required. The json path expression.
+ - value_expression: `<String>` required. The json path expression.
Example:
@@ -390,7 +390,7 @@ Path combine function is used to combine the file path. The path value can be co
- lookup_fields: required
- output_fields: required
- parameters: required
-- path: `<Array>` required.
+ - path: `<Array>` required.
Example:
@@ -411,11 +411,11 @@ Rename function is used to rename or reformat(e.g. by replacing character unders
- lookup_fields: not required
- output_fields: not required
- parameters: required
-- parent_fields: `<Array>` optional. Specify fields whose children will inherit the Rename fields and Rename expression operations.
-- rename_fields: `Map<String, String>` required. The key is the original field name, and the value is the new field name.
-- current_field_name: `<String>` required. The original field name.
-- new_field_name: `<String>` required. The new field name.
-- rename_expression: `<String>` optional. AviatorScript expression whose returned value will be used to rename fields.
+ - parent_fields: `<Array>` optional. Specify fields whose children will inherit the Rename fields and Rename expression operations.
+ - rename_fields: `Map<String, String>` required. The key is the original field name, and the value is the new field name.
+ - current_field_name: `<String>` required. The original field name.
+ - new_field_name: `<String>` required. The new field name.
+ - rename_expression: `<String>` optional. AviatorScript expression whose returned value will be used to rename fields.
```
A single Function can include both rename_fields (to rename specified field names) and rename_expression (to globally rename fields). However, the Rename fields strategy will execute first.
@@ -462,7 +462,7 @@ Snowflake ID function is used to generate the snowflake id. The snowflake id is
- lookup_fields: not required
- output_fields: required
- parameters: optional
-- data_center_id_num: `<Integer>` optional. Default is `0`, range is `0-31`.
+ - data_center_id_num: `<Integer>` optional. Default is `0`, range is `0-31`.
Example:
@@ -482,9 +482,9 @@ String joiner function joins multiple string fields using a delimiter, prefix, a
- lookup_fields: required. Support more than one fields.
- output_fields: required
- parameters: optional
-- delimiter: `<String>` optional. Default is `,`.
-- prefix: `<String>` optional. Default is empty string.
-- suffix: `<String>` optional. Default is empty string.
+ - delimiter: `<String>` optional. Default is `,`.
+ - prefix: `<String>` optional. Default is empty string.
+ - suffix: `<String>` optional. Default is empty string.
Example:
@@ -507,7 +507,7 @@ Unix timestamp converter function is used to convert the unix timestamp precisio
- lookup_fields: required
- output_fields: required
- parameters: required
-- precision: `<String>` required. Enum: `milliseconds`, `seconds`, `minutes`. The minutes precision is used to generate Unix timestamp, round it to the minute level, and output it in seconds format.
+ - precision: `<String>` required. Enum: `milliseconds`, `seconds`, `minutes`. The minutes precision is used to generate Unix timestamp, round it to the minute level, and output it in seconds format.
- Example:
_`__timestamp` Internal field, from source ingestion time or current unix timestamp.
diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins
index 0eb24cb..1b7fca4 100644
--- a/groot-common/src/main/resources/udf.plugins
+++ b/groot-common/src/main/resources/udf.plugins
@@ -18,4 +18,6 @@ com.geedgenetworks.core.udf.udaf.NumberSum
com.geedgenetworks.core.udf.udaf.CollectList
com.geedgenetworks.core.udf.udaf.CollectSet
com.geedgenetworks.core.udf.udaf.LongCount
-com.geedgenetworks.core.udf.udaf.Mean \ No newline at end of file
+com.geedgenetworks.core.udf.udaf.Mean
+com.geedgenetworks.core.udf.udaf.LastValue
+com.geedgenetworks.core.udf.udaf.FirstValue \ No newline at end of file
diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
index 2e21e49..690f21c 100644
--- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
+++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
@@ -13,7 +13,7 @@ import java.nio.file.Paths;
public class GrootStreamExample {
public static void main(String[] args) throws FileNotFoundException, URISyntaxException {
- String configPath = args.length > 0 ? args[0] : "/examples/session_record_mock_to_print.yaml";
+ String configPath = args.length > 0 ? args[0] : "/examples/session_record_mock_to_print_with_aggregation.yaml";
String configFile = getTestConfigFile(configPath);
ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs();
executeCommandArgs.setConfigFile(configFile);
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_with_aggregation.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_with_aggregation.yaml
new file mode 100644
index 0000000..6f08be2
--- /dev/null
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_with_aggregation.yaml
@@ -0,0 +1,41 @@
+sources:
+ inline_source:
+ type: inline
+ properties:
+ data: '[{"tcp_rtt_ms":128,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]'
+ format: json
+ json.ignore.parse.errors: false
+
+
+
+processing_pipelines:
+ aggregate_processor:
+ type: aggregate
+ group_by_fields: [ client_ip ]
+ window_type: tumbling_processing_time
+ window_size: 10
+ functions:
+ - function: NUMBER_SUM
+ lookup_fields: [ received_bytes]
+ output_fields: [ received_bytes_sum ]
+
+sinks:
+ print_sink:
+ type: print
+ properties:
+ format: json
+ mode: log_warn
+
+application:
+ env:
+ name: example-inline-to-print-with-aggregation
+ parallelism: 3
+ pipeline:
+ object-reuse: true
+ topology:
+ - name: inline_source
+ downstream: [aggregate_processor]
+ - name: aggregate_processor
+ downstream: [ print_sink ]
+ - name: print_sink
+ downstream: [] \ No newline at end of file
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/session_record_mock_to_print_with_aggregation.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/session_record_mock_to_print_with_aggregation.yaml
new file mode 100644
index 0000000..a3629c1
--- /dev/null
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/session_record_mock_to_print_with_aggregation.yaml
@@ -0,0 +1,167 @@
+sources: # [object] Define connector source
+ mock_source:
+ type: mock
+ #watermark_timestamp: __timestamp
+ #watermark_timestamp_unit: s
+ #watermark_lag: 10000
+ properties:
+ mock.desc.file.path: ./config/template/mock_schema/session_record_mock_desc.json
+ rows.per.second: 10
+
+preprocessing_pipelines:
+ etl_processor:
+ type: projection
+ functions:
+ - function: SNOWFLAKE_ID
+ lookup_fields: ['']
+ output_fields: [log_id]
+ parameters:
+ data_center_id_num: 1
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [ __timestamp ]
+ output_fields: [ recv_time ]
+ parameters:
+ precision: seconds
+ - function: SNOWFLAKE_ID
+ lookup_fields: [ '' ]
+ output_fields: [ session_id ]
+ parameters:
+ data_center_id_num: 2
+ - function: EVAL
+ output_fields: [ ingestion_time ]
+ parameters:
+ value_expression: recv_time
+
+ - function: DOMAIN
+ lookup_fields: [ http_host, ssl_sni, dtls_sni, quic_sni ]
+ output_fields: [ server_domain ]
+ parameters:
+ option: FIRST_SIGNIFICANT_SUBDOMAIN
+
+
+ - function: ASN_LOOKUP
+ lookup_fields: [ client_ip ]
+ output_fields: [ client_asn ]
+ parameters:
+ kb_name: tsg_ip_asn
+ option: IP_TO_ASN
+
+ - function: ASN_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_asn ]
+ parameters:
+ kb_name: tsg_ip_asn
+ option: IP_TO_ASN
+
+ - function: GEOIP_LOOKUP
+ lookup_fields: [ client_ip ]
+ output_fields: []
+ parameters:
+ kb_name: tsg_ip_location
+ option: IP_TO_OBJECT
+ geolocation_field_mapping:
+ COUNTRY: client_country
+ PROVINCE: client_super_administrative_area
+ CITY: client_administrative_area
+
+ - function: GEOIP_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: []
+ parameters:
+ kb_name: tsg_ip_location
+ option: IP_TO_OBJECT
+ geolocation_field_mapping:
+ COUNTRY: server_country
+ PROVINCE: server_super_administrative_area
+ CITY: server_administrative_area
+
+ - function: CURRENT_UNIX_TIMESTAMP
+ output_fields: [ processing_time ]
+ parameters:
+ precision: seconds
+
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [recv_time]
+ output_fields: [recv_time]
+ parameters:
+ precision: seconds
+ interval: 60
+
+
+processing_pipelines:
+ aggregate_processor:
+ type: aggregate
+ group_by_fields: [recv_time, sled_ip]
+ window_type: tumbling_processing_time
+ window_size: 60
+ functions:
+ - function: NUMBER_SUM
+ lookup_fields: [received_bytes, sent_bytes]
+ output_fields: [received_bytes_sum]
+
+ - function: LONG_COUNT
+ lookup_fields: [received_bytes]
+ output_fields: [sessions]
+
+ - function: MEAN
+ lookup_fields: [received_bytes]
+ output_fields: [received_bytes_mean]
+ parameters:
+ precision: 2
+
+ - function: FIRST_VALUE
+ lookup_fields: [ received_bytes ]
+ output_fields: [ received_bytes_first ]
+
+ - function: LAST_VALUE
+ lookup_fields: [ received_bytes ]
+ output_fields: [ received_bytes_last ]
+
+ - function: COLLECT_LIST
+ lookup_fields: [received_bytes]
+ output_fields: [received_bytes_set]
+
+
+sinks:
+ print_sink:
+ type: print
+ properties:
+ mode: log_info
+ format: json
+
+ kafka_sink:
+ type: kafka
+ properties:
+ topic: SESSION-RECORD
+ kafka.bootstrap.servers: 192.168.44.12:9094
+ kafka.retries: 0
+ kafka.linger.ms: 10
+ kafka.request.timeout.ms: 30000
+ kafka.batch.size: 262144
+ kafka.buffer.memory: 134217728
+ kafka.max.request.size: 10485760
+ kafka.compression.type: snappy
+ kafka.security.protocol: SASL_PLAINTEXT
+ kafka.sasl.mechanism: PLAIN
+ kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
+ format: json
+ json.ignore.parse.errors: false
+ log.failures.only: true
+
+
+application: # [object] Define job configuration
+ env:
+ name: session_record_mock_to_print_with_aggregation
+ parallelism: 3
+ shade.identifier: aes
+ pipeline:
+ object-reuse: true
+ topology:
+ - name: mock_source
+ downstream: [ etl_processor ]
+ - name: etl_processor
+ downstream: [ aggregate_processor ]
+ - name: aggregate_processor
+ downstream: [ print_sink ]
+ - name: print_sink
+ downstream: [] \ No newline at end of file