diff options
| author | doufenghu <[email protected]> | 2024-08-01 19:27:35 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-08-01 19:27:35 +0800 |
| commit | 034abcaa214ce4f2211ce5adb02e30aa781cd067 (patch) | |
| tree | 0a6a29567072627c72d74f4b1fd2e478af489f9d | |
| parent | 708137a41c1806b4bc6925fc71b0a2892862d9ea (diff) | |
[docs][core] add aggregate processor documents.
9 files changed, 478 insertions, 48 deletions
diff --git a/docs/connector/connector.md b/docs/connector/connector.md index 08ec673..1123385 100644 --- a/docs/connector/connector.md +++ b/docs/connector/connector.md @@ -19,14 +19,14 @@ sources: ${prop_key}: ${prop_value} ``` -| Name | Type | Required | Default | Description | -|--------------------------|---------------|----------|---------|--------------------------------------------------------------------------------------------------------------------------| -| type | String | Yes | (none) | The type of the source connector. The `SourceTableFactory` will use this value as identifier to create source connector. | -| schema | Map | No | (none) | The source table schema, config through fields or local_file or url. | -| watermark_timestamp | String | No | (none) | watermark timestamp field name, if need use eventTime. | -| watermark_timestamp_unit | String | No | ms | watermark field timestamp unit, options:ms(milliseconds),s(seconds). is required if watermark_timestamp is not none. | -| watermark_lag | Long | No | (none) | watermark out-of-order milliseconds. is required if watermark_timestamp is not none. | -| properties | Map of String | Yes | (none) | The source connector customize properties, more details see the [Source](source) documentation. | +| Name | Type | Required | Default | Description | +|--------------------------|---------------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| type | String | Yes | (none) | The type of the source connector. The `SourceTableFactory` will use this value as identifier to create source connector. | +| schema | Map | No | (none) | The source table schema, config through fields or local_file or url. | +| watermark_timestamp | String | No | (none) | Specify the field name as the watermark field. It is used to track event time and generate watermarks. | +| watermark_timestamp_unit | String | No | ms | The watermark field timestamp unit. The optional values are `ms`, `s`. | +| watermark_lag | Long | No | (none) | The watermark out-of-order milliseconds (Allowed Latenness). It defines the maximum amount of time (in milliseconds) by which events can be late but still be considered for processing. | +| properties | Map of String | Yes | (none) | The source connector customize properties, more details see the [Source](source) documentation. | ## Schema Field Projection diff --git a/docs/filter/aviator.md b/docs/filter/aviator.md index 54fe24b..e7f6c2b 100644 --- a/docs/filter/aviator.md +++ b/docs/filter/aviator.md @@ -30,7 +30,7 @@ This example read data from inline source and print to console. It will filter t filters: # [object] Define filter operator filter_operator: # [object] AviatorFilter operator name - type: com.geedgenetworks.core.filter.AviatorFilter + type: aviator properties: expression: event.server_ip == '8.8.8.8' || event.decoded_as == 'HTTP' # [string] Aviator expression, it return true or false. diff --git a/docs/processor/aggregate-processor.md b/docs/processor/aggregate-processor.md new file mode 100644 index 0000000..d9bcdb0 --- /dev/null +++ b/docs/processor/aggregate-processor.md @@ -0,0 +1,71 @@ +# Aggregate Processor + +> Processing pipelines for aggregate processor + +## Description + +Aggregate processor is used to aggregate the data from source to sink. It is a part of the processing pipeline. It can be used in the pre-processing, processing, and post-processing pipeline. Each processor can assemble UDAFs(User-defined Aggregate functions) into a pipeline. +Within the pipeline, events are processed by each Function in order, top‑>down. The UDAF usage detail can be found in [UDAF](udaf.md). + +## Options +Note:Default will output internal fields `__window_start_timestamp` and `__window_end_timestamp` if not set output_fields. + +| name | type | required | default value | +|--------------------------|--------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| type | String | Yes | The type of the processor, now only support `com.geedgenetworks.core.processor.projection.AggregateProcessor` | +| output_fields | Array | No | Array of String. The list of fields that need to be kept. Fields not in the list will be removed. | +| remove_fields | Array | No | Array of String. The list of fields that need to be removed. | +| group_by_fields | Array | yes | Array of String. The list of fields that need to be grouped. | +| window_type | String | yes | The type of window, now only support `tumbling_processing_time`, `tumbling_event_time`, `sliding_processing_time`, `sliding_event_time`. if window_type is `tumbling/sliding_event_time,` you need to set watermark. | +| window_size | Long | yes | The duration of the window in seconds. | +| window_slide | Long | yes | The duration of the window slide in seconds. | +| window_timestamp_field | String | No | Set the output timestamp field name, with the unit in seconds. It is mapped to the internal field __window_start_timestamp. | +| functions | Array | No | Array of Object. The list of functions that need to be applied to the data. | + +## Usage Example + +This example use aggregate processor to aggregate the fields `received_bytes` by `client_ip` and using NUMBER_SUM function to sum all `received_bytes` in 10 seconds window. + +```yaml +sources: + inline_source: + type: inline + properties: + data: '[{"tcp_rtt_ms":128,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]' + format: json + json.ignore.parse.errors: false + + +processing_pipelines: + aggregate_processor: + type: aggregate + group_by_fields: [ client_ip ] + window_type: tumbling_processing_time + window_size: 10 + functions: + - function: NUMBER_SUM + lookup_fields: [ received_bytes ] + output_fields: [ received_bytes_sum ] + +sinks: + print_sink: + type: print + properties: + format: json + mode: log_warn + +application: + env: + name: example-inline-to-print-with-aggregation + parallelism: 3 + pipeline: + object-reuse: true + topology: + - name: inline_source + downstream: [aggregate_processor] + - name: aggregate_processor + downstream: [ print_sink ] + - name: print_sink + downstream: [] +``` + diff --git a/docs/processor/udaf.md b/docs/processor/udaf.md new file mode 100644 index 0000000..e22846f --- /dev/null +++ b/docs/processor/udaf.md @@ -0,0 +1,149 @@ +# UDAF + +> The functions for aggregate processors. + +## Function of content + +- [Collect List](#Collect-List) +- [Collect Set](#Collect-Set) +- [First Value](#First-Value) +- [Last Value](#Last-Value) +- [Long Count](#Long-Count) +- [MEAN](#Mean) +- [Number SUM](#Number-SUM) + + +## Description + +UDF(User Defined Aggregate Function) is used to extend the functions of aggregate processor. It is a part of the processing pipeline. It can be used in the pre-processing, processing, and post-processing pipeline. Each processor can assemble UDAFs into a pipeline. Within the pipeline, events are processed by each Function in order, top‑>down. +The deference between UDF and UDAF is: +- UDF is used to process each event, and the output is also an event. UDAF is used to process a group of events, and the output is also an event. +- A UDF is designed to perform a transformation or calculation on a single event. A UDAF is designed to perform an aggregation over a group of events, such as summing values, calculating an average, or finding a maximum. It processes multiple events of input data and produces a single aggregated result. + +## UDAF Definition + The UDAF basic properties are the same as UDF, such as `name`, `event`, `context`,more detail can be found in [UDF](udf.md). But Aggregate Processor have some methods to process the data is: +- `void add()`: Add a new event to the aggregation. +- `void getResult()`: Get the result of the aggregation. + +## Functions + +### Collect List + +COLLECT_LIST is used to collect the value of the field in the group of events. + +```COLLECT_LIST(filter, lookup_fields, output_fields)``` + +- filter: optional +- lookup_fields: required. Now only support one field. +- output_fields: optional. If not set, the output field name is `lookup_field_name`. + +### Example + +```yaml +- function: COLLECT_LIST + lookup_fields: [client_ip] + output_fields: [client_ip_list] +``` + +### Collect Set + +COLLECT_SET is used to collect the unique value of the field in the group of events. + +```COLLECT_SET(filter, lookup_fields, output_fields)``` + +- filter: optional +- lookup_fields: required. Now only support one field. +- output_fields: optional. If not set, the output field name is `lookup_field_name`. + +### Example + +```yaml +- function: COLLECT_SET + lookup_fields: [client_ip] + output_fields: [client_ip_set] +``` + +### First Value + +FIRST_VALUE is used to get the first value of the field in the group of events. + +```FIRST_VALUE(filter, lookup_fields, output_fields)``` +- filter: optional +- lookup_fields: required. Now only support one field. +- output_fields: optional. If not set, the output field name is `lookup_field_name`. + +### Example + +```yaml +- function: FIRST_VALUE + lookup_fields: [client_ip] + output_fields: [first_client_ip] +``` +### Last Value + +LAST_VALUE is used to get the last value of the field in the group of events. + +```LAST_VALUE(filter, lookup_fields, output_fields)``` +- filter: optional +- lookup_fields: required. Now only support one field. +- output_fields: optional. If not set, the output field name is `lookup_field_name`. + +### Example + +```yaml +- function: LAST_VALUE + lookup_fields: [client_ip] + output_fields: [last_client_ip] +``` + +### Long Count + +LONG_COUNT is used to count the number of events in the group of events. + +```LONG_COUNT(filter, lookup_fields, output_fields)``` +- filter: optional +- lookup_fields: optional. +- output_fields: required. + +### Example + +```yaml +- function: LONG_COUNT + output_fields: [sessions] +``` + +### Mean + +MEAN is used to calculate the mean value of the field in the group of events. The lookup field value must be a number. + +```MEAN(filter, lookup_fields, output_fields[, parameters])``` +- filter: optional +- lookup_fields: required. Now only support one field. +- output_fields: optional. If not set, the output field name is `lookup_field_name`. +- parameters: optional. + - precision: `<Integer>` required. The precision of the mean value. Default is 2. + +### Example + +```yaml +- function: MEAN + lookup_fields: [received_bytes] + output_fields: [received_bytes_mean] +``` + +### Number SUM + +NUMBER_SUM is used to sum the value of the field in the group of events. The lookup field value must be a number. + +```NUMBER_SUM(filter, lookup_fields, output_fields)``` +- filter: optional +- lookup_fields: required. Now only support one field. +- output_fields: optional. If not set, the output field name is `lookup_field_name`. + +### Example + +```yaml +- function: NUMBER_SUM + lookup_fields: [received_bytes] + output_fields: [received_bytes_sum] +```
\ No newline at end of file diff --git a/docs/processor/udf.md b/docs/processor/udf.md index 2a705fd..cf305ef 100644 --- a/docs/processor/udf.md +++ b/docs/processor/udf.md @@ -1,7 +1,7 @@ # UDF > The functions for projection processors. -> + ## Function of content - [Asn Lookup](#asn-lookup) @@ -40,7 +40,7 @@ A UDF includes the following parts: name, event(processing data), context, evalu - open function: Initialize the resources used by the function. - close function: Release the resources used by the function. -### Functions +## Functions Function define common parameters: `filter`, `lookup_fields`, `output_fields`, `parameters`, and will return a Map<String, Object> value of the event. ``` FUNCTION_NAME(filter, lookup_fields, output_fields[, parameters])``` @@ -54,8 +54,8 @@ Asn lookup function is used to lookup the asn information by ip address. You nee - lookup_fields: required - output_fields: required - parameters: required -- kb_name: required. The name of the knowledge base. -- option: required. Now only support `IP_TO_ASN`. + - kb_name: required. The name of the knowledge base. + - option: required. Now only support `IP_TO_ASN`. Example: @@ -77,8 +77,8 @@ Base64 decode function is used to decode the base64 encoded string. - lookup_fields: not required - output_fields: required - parameters: required -- value_field: `<String>` required. -- charset_field:`<String>` optional. Default is `UTF-8`. + - value_field: `<String>` required. + - charset_field:`<String>` optional. Default is `UTF-8`. Example: @@ -99,7 +99,7 @@ Base64 encode function is commonly used to encode the binary data to base64 stri - lookup_fields: not required - output_fields: required - parameters: required -- value_field: `<String>` required. + - value_field: `<String>` required. Example: @@ -119,7 +119,7 @@ Current unix timestamp function is used to get the current unix timestamp. - lookup_fields: not required - output_fields: required - parameters: optional -- precision: `<String>` optional. Default is `seconds`. Enum: `milliseconds`, `seconds`. + - precision: `<String>` optional. Default is `seconds`. Enum: `milliseconds`, `seconds`. Example: @@ -139,7 +139,7 @@ Domain function is used to extract the domain from the url. - lookup_fields: required. Support more than one fields. All fields will be processed from left to right, and the result will be overwritten if the field processed value is not null. - output_fields: required - parameters: required -- option: `<String>` required. Enum: `TOP_LEVEL_DOMAIN`, `FIRST_SIGNIFICANT_SUBDOMAIN`. + - option: `<String>` required. Enum: `TOP_LEVEL_DOMAIN`, `FIRST_SIGNIFICANT_SUBDOMAIN`. #### Option @@ -182,7 +182,7 @@ Eval function is used to adds or removes fields from events by evaluating an val - lookup_fields: not required - output_fields: required - parameters: required -- value_expression: `<String>` required. Enter a value expression to set the field’s value – this can be a constant. + - value_expression: `<String>` required. Enter a value expression to set the field’s value – this can be a constant. Example 1: Add a field `ingestion_time` with value `recv_time`: @@ -213,10 +213,10 @@ Flatten the fields of nested structure to the top level. The new fields name are - lookup_fields: optional - output_fields: not required - parameters: optional -- prefix: `<String>` optional. Prefix string for flattened field names. Default is empty. -- depth: `<Integer>` optional. Number representing the nested levels to consider for flattening. Minimum 1. Default is `5`. -- delimiter: `<String>` optional. The string used to join nested keys Default is `.`. -- json_string_keys: `<Array>` optional. The keys of the json string fields. It indicates keys that contain JSON strings and should be parsed and flattened. Default is empty. + - prefix: `<String>` optional. Prefix string for flattened field names. Default is empty. + - depth: `<Integer>` optional. Number representing the nested levels to consider for flattening. Minimum 1. Default is `5`. + - delimiter: `<String>` optional. The string used to join nested keys Default is `.`. + - json_string_keys: `<Array>` optional. The keys of the json string fields. It indicates keys that contain JSON strings and should be parsed and flattened. Default is empty. Example 1: @@ -259,7 +259,7 @@ From unix timestamp function is used to convert the unix timestamp to date time - lookup_fields: required - output_fields: required - parameters: optional -- precision: `<String>` optional. Default is `seconds`. Enum: `milliseconds`, `seconds`. + - precision: `<String>` optional. Default is `seconds`. Enum: `milliseconds`, `seconds`. #### Precision @@ -303,16 +303,16 @@ GeoIP lookup function is used to lookup the geoip information by ip address. You - lookup_fields: required - output_fields: optional - parameters: required -- kb_name: `<String>` required. The name of the knowledge base. -- option: `<String>` required. Enum: `IP_TO_COUNTRY`, `IP_TO_PROVINCE`, `IP_TO_CITY`, `IP_TO_SUBDIVISION_ADDR`, `IP_TO_DETAIL`, `IP_TO_LATLNG`, `IP_TO_PROVIDER`, `IP_TO_JSON`, `IP_TO_OBJECT`. -- geolocation_field_mapping : `<Map<String, String>>` optional. The option is required when the option is `IP_TO_OBJECT`. The mapping of the geolocation fields. The key is the field name of the knowledge base , and the value is the field name of the event. -- COUNTRY: `<String>` optional. -- PROVINCE: `<String>` optional. -- CITY: `<String>` optional. -- LONGITUDE: `<String>` optional. -- LATITUDE: `<String>` optional. -- ISP: `<String>` optional. -- ORGANIZATION: `<String>` optional. + - kb_name: `<String>` required. The name of the knowledge base. + - option: `<String>` required. Enum: `IP_TO_COUNTRY`, `IP_TO_PROVINCE`, `IP_TO_CITY`, `IP_TO_SUBDIVISION_ADDR`, `IP_TO_DETAIL`, `IP_TO_LATLNG`, `IP_TO_PROVIDER`, `IP_TO_JSON`, `IP_TO_OBJECT`. + - geolocation_field_mapping : `<Map<String, String>>` optional. The option is required when the option is `IP_TO_OBJECT`. The mapping of the geolocation fields. The key is the field name of the knowledge base , and the value is the field name of the event. + - COUNTRY: `<String>` optional. + - PROVINCE: `<String>` optional. + - CITY: `<String>` optional. + - LONGITUDE: `<String>` optional. + - LATITUDE: `<String>` optional. + - ISP: `<String>` optional. + - ORGANIZATION: `<String>` optional. #### Option @@ -369,7 +369,7 @@ JSON extract function is used to extract the value from json string. - lookup_fields: required - output_fields: required - parameters: required -- value_expression: `<String>` required. The json path expression. + - value_expression: `<String>` required. The json path expression. Example: @@ -390,7 +390,7 @@ Path combine function is used to combine the file path. The path value can be co - lookup_fields: required - output_fields: required - parameters: required -- path: `<Array>` required. + - path: `<Array>` required. Example: @@ -411,11 +411,11 @@ Rename function is used to rename or reformat(e.g. by replacing character unders - lookup_fields: not required - output_fields: not required - parameters: required -- parent_fields: `<Array>` optional. Specify fields whose children will inherit the Rename fields and Rename expression operations. -- rename_fields: `Map<String, String>` required. The key is the original field name, and the value is the new field name. -- current_field_name: `<String>` required. The original field name. -- new_field_name: `<String>` required. The new field name. -- rename_expression: `<String>` optional. AviatorScript expression whose returned value will be used to rename fields. + - parent_fields: `<Array>` optional. Specify fields whose children will inherit the Rename fields and Rename expression operations. + - rename_fields: `Map<String, String>` required. The key is the original field name, and the value is the new field name. + - current_field_name: `<String>` required. The original field name. + - new_field_name: `<String>` required. The new field name. + - rename_expression: `<String>` optional. AviatorScript expression whose returned value will be used to rename fields. ``` A single Function can include both rename_fields (to rename specified field names) and rename_expression (to globally rename fields). However, the Rename fields strategy will execute first. @@ -462,7 +462,7 @@ Snowflake ID function is used to generate the snowflake id. The snowflake id is - lookup_fields: not required - output_fields: required - parameters: optional -- data_center_id_num: `<Integer>` optional. Default is `0`, range is `0-31`. + - data_center_id_num: `<Integer>` optional. Default is `0`, range is `0-31`. Example: @@ -482,9 +482,9 @@ String joiner function joins multiple string fields using a delimiter, prefix, a - lookup_fields: required. Support more than one fields. - output_fields: required - parameters: optional -- delimiter: `<String>` optional. Default is `,`. -- prefix: `<String>` optional. Default is empty string. -- suffix: `<String>` optional. Default is empty string. + - delimiter: `<String>` optional. Default is `,`. + - prefix: `<String>` optional. Default is empty string. + - suffix: `<String>` optional. Default is empty string. Example: @@ -507,7 +507,7 @@ Unix timestamp converter function is used to convert the unix timestamp precisio - lookup_fields: required - output_fields: required - parameters: required -- precision: `<String>` required. Enum: `milliseconds`, `seconds`, `minutes`. The minutes precision is used to generate Unix timestamp, round it to the minute level, and output it in seconds format. + - precision: `<String>` required. Enum: `milliseconds`, `seconds`, `minutes`. The minutes precision is used to generate Unix timestamp, round it to the minute level, and output it in seconds format. - Example: _`__timestamp` Internal field, from source ingestion time or current unix timestamp. diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins index 0eb24cb..1b7fca4 100644 --- a/groot-common/src/main/resources/udf.plugins +++ b/groot-common/src/main/resources/udf.plugins @@ -18,4 +18,6 @@ com.geedgenetworks.core.udf.udaf.NumberSum com.geedgenetworks.core.udf.udaf.CollectList com.geedgenetworks.core.udf.udaf.CollectSet com.geedgenetworks.core.udf.udaf.LongCount -com.geedgenetworks.core.udf.udaf.Mean
\ No newline at end of file +com.geedgenetworks.core.udf.udaf.Mean +com.geedgenetworks.core.udf.udaf.LastValue +com.geedgenetworks.core.udf.udaf.FirstValue
\ No newline at end of file diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java index 2e21e49..690f21c 100644 --- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java +++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java @@ -13,7 +13,7 @@ import java.nio.file.Paths; public class GrootStreamExample { public static void main(String[] args) throws FileNotFoundException, URISyntaxException { - String configPath = args.length > 0 ? args[0] : "/examples/session_record_mock_to_print.yaml"; + String configPath = args.length > 0 ? args[0] : "/examples/session_record_mock_to_print_with_aggregation.yaml"; String configFile = getTestConfigFile(configPath); ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs(); executeCommandArgs.setConfigFile(configFile); diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_with_aggregation.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_with_aggregation.yaml new file mode 100644 index 0000000..6f08be2 --- /dev/null +++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_with_aggregation.yaml @@ -0,0 +1,41 @@ +sources: + inline_source: + type: inline + properties: + data: '[{"tcp_rtt_ms":128,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]' + format: json + json.ignore.parse.errors: false + + + +processing_pipelines: + aggregate_processor: + type: aggregate + group_by_fields: [ client_ip ] + window_type: tumbling_processing_time + window_size: 10 + functions: + - function: NUMBER_SUM + lookup_fields: [ received_bytes] + output_fields: [ received_bytes_sum ] + +sinks: + print_sink: + type: print + properties: + format: json + mode: log_warn + +application: + env: + name: example-inline-to-print-with-aggregation + parallelism: 3 + pipeline: + object-reuse: true + topology: + - name: inline_source + downstream: [aggregate_processor] + - name: aggregate_processor + downstream: [ print_sink ] + - name: print_sink + downstream: []
\ No newline at end of file diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/session_record_mock_to_print_with_aggregation.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/session_record_mock_to_print_with_aggregation.yaml new file mode 100644 index 0000000..a3629c1 --- /dev/null +++ b/groot-examples/end-to-end-example/src/main/resources/examples/session_record_mock_to_print_with_aggregation.yaml @@ -0,0 +1,167 @@ +sources: # [object] Define connector source + mock_source: + type: mock + #watermark_timestamp: __timestamp + #watermark_timestamp_unit: s + #watermark_lag: 10000 + properties: + mock.desc.file.path: ./config/template/mock_schema/session_record_mock_desc.json + rows.per.second: 10 + +preprocessing_pipelines: + etl_processor: + type: projection + functions: + - function: SNOWFLAKE_ID + lookup_fields: [''] + output_fields: [log_id] + parameters: + data_center_id_num: 1 + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ __timestamp ] + output_fields: [ recv_time ] + parameters: + precision: seconds + - function: SNOWFLAKE_ID + lookup_fields: [ '' ] + output_fields: [ session_id ] + parameters: + data_center_id_num: 2 + - function: EVAL + output_fields: [ ingestion_time ] + parameters: + value_expression: recv_time + + - function: DOMAIN + lookup_fields: [ http_host, ssl_sni, dtls_sni, quic_sni ] + output_fields: [ server_domain ] + parameters: + option: FIRST_SIGNIFICANT_SUBDOMAIN + + + - function: ASN_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ client_asn ] + parameters: + kb_name: tsg_ip_asn + option: IP_TO_ASN + + - function: ASN_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_asn ] + parameters: + kb_name: tsg_ip_asn + option: IP_TO_ASN + + - function: GEOIP_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [] + parameters: + kb_name: tsg_ip_location + option: IP_TO_OBJECT + geolocation_field_mapping: + COUNTRY: client_country + PROVINCE: client_super_administrative_area + CITY: client_administrative_area + + - function: GEOIP_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [] + parameters: + kb_name: tsg_ip_location + option: IP_TO_OBJECT + geolocation_field_mapping: + COUNTRY: server_country + PROVINCE: server_super_administrative_area + CITY: server_administrative_area + + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [ processing_time ] + parameters: + precision: seconds + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [recv_time] + output_fields: [recv_time] + parameters: + precision: seconds + interval: 60 + + +processing_pipelines: + aggregate_processor: + type: aggregate + group_by_fields: [recv_time, sled_ip] + window_type: tumbling_processing_time + window_size: 60 + functions: + - function: NUMBER_SUM + lookup_fields: [received_bytes, sent_bytes] + output_fields: [received_bytes_sum] + + - function: LONG_COUNT + lookup_fields: [received_bytes] + output_fields: [sessions] + + - function: MEAN + lookup_fields: [received_bytes] + output_fields: [received_bytes_mean] + parameters: + precision: 2 + + - function: FIRST_VALUE + lookup_fields: [ received_bytes ] + output_fields: [ received_bytes_first ] + + - function: LAST_VALUE + lookup_fields: [ received_bytes ] + output_fields: [ received_bytes_last ] + + - function: COLLECT_LIST + lookup_fields: [received_bytes] + output_fields: [received_bytes_set] + + +sinks: + print_sink: + type: print + properties: + mode: log_info + format: json + + kafka_sink: + type: kafka + properties: + topic: SESSION-RECORD + kafka.bootstrap.servers: 192.168.44.12:9094 + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + format: json + json.ignore.parse.errors: false + log.failures.only: true + + +application: # [object] Define job configuration + env: + name: session_record_mock_to_print_with_aggregation + parallelism: 3 + shade.identifier: aes + pipeline: + object-reuse: true + topology: + - name: mock_source + downstream: [ etl_processor ] + - name: etl_processor + downstream: [ aggregate_processor ] + - name: aggregate_processor + downstream: [ print_sink ] + - name: print_sink + downstream: []
\ No newline at end of file |
