summaryrefslogtreecommitdiff
path: root/docs/processor
diff options
context:
space:
mode:
Diffstat (limited to 'docs/processor')
-rw-r--r--docs/processor/aggregate-processor.md23
-rw-r--r--docs/processor/split-processor.md49
-rw-r--r--docs/processor/udaf.md33
-rw-r--r--docs/processor/udf.md132
-rw-r--r--docs/processor/udtf.md56
5 files changed, 239 insertions, 54 deletions
diff --git a/docs/processor/aggregate-processor.md b/docs/processor/aggregate-processor.md
index 5ab0ae0..afc26f6 100644
--- a/docs/processor/aggregate-processor.md
+++ b/docs/processor/aggregate-processor.md
@@ -10,17 +10,18 @@ Within the pipeline, events are processed by each Function in order, top‑>down
## Options
Note:Default will output internal fields `__window_start_timestamp` and `__window_end_timestamp` if not set output_fields.
-| name | type | required | default value |
-|--------------------------|--------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| type | String | Yes | The type of the processor, now only support `com.geedgenetworks.core.processor.aggregate.AggregateProcessor` |
-| output_fields | Array | No | Array of String. The list of fields that need to be kept. Fields not in the list will be removed. |
-| remove_fields | Array | No | Array of String. The list of fields that need to be removed. |
-| group_by_fields | Array | yes | Array of String. The list of fields that need to be grouped. |
-| window_type | String | yes | The type of window, now only support `tumbling_processing_time`, `tumbling_event_time`, `sliding_processing_time`, `sliding_event_time`. if window_type is `tumbling/sliding_event_time,` you need to set watermark. |
-| window_size | Long | yes | The duration of the window in seconds. |
-| window_slide | Long | yes | The duration of the window slide in seconds. |
-| window_timestamp_field | String | No | Set the output timestamp field name, with the unit in seconds. It is mapped to the internal field __window_start_timestamp. |
-| functions | Array | No | Array of Object. The list of functions that need to be applied to the data. |
+| name | type | required | default value |
+|------------------------|-----------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| type | String | Yes | The type of the processor, now only support `com.geedgenetworks.core.processor.aggregate.AggregateProcessor` |
+| output_fields | Array | No | Array of String. The list of fields that need to be kept. Fields not in the list will be removed. |
+| remove_fields | Array | No | Array of String. The list of fields that need to be removed. |
+| group_by_fields | Array | yes | Array of String. The list of fields that need to be grouped. |
+| window_type | String | yes | The type of window, now only support `tumbling_processing_time`, `tumbling_event_time`, `sliding_processing_time`, `sliding_event_time`. if window_type is `tumbling/sliding_event_time,` you need to set watermark. |
+| window_size | Long | yes | The duration of the window in seconds. |
+| window_slide | Long | yes | The duration of the window slide in seconds. |
+| window_timestamp_field | String | No | Set the output timestamp field name, with the unit in seconds. It is mapped to the internal field __window_start_timestamp. |
+| mini_batch | Boolean | No | Specifies whether to enable local aggregate optimization. The default value is false. This can significantly reduce the state overhead and get a better throughput. |
+| functions | Array | No | Array of Object. The list of functions that need to be applied to the data. |
## Usage Example
diff --git a/docs/processor/split-processor.md b/docs/processor/split-processor.md
new file mode 100644
index 0000000..e1a1163
--- /dev/null
+++ b/docs/processor/split-processor.md
@@ -0,0 +1,49 @@
+# Split Processor
+
+> Split the output of a data processing pipeline into multiple streams based on certain conditions.
+
+## Description
+
+Using the flink side Outputs send data from a stream to multiple downstream consumers. This is useful when you want to separate or filter certain elements of a stream without disrupting the main processing flow. For example, side outputs can be used for error handling, conditional routing, or extracting specific subsets of the data.
+
+## Options
+
+| name | type | required | default value |
+|-------------------|--------|----------|--------------------------------------------------------------------------------------------|
+| type | String | Yes | The type of the processor, now only support ` com.geedgenetworks.core.split.SplitOperator` |
+| rules | Array | Yes | Array of Object. Defining rules for labeling Side Output Tag |
+| [rule.]tag | String | Yes | The tag name of the side output |
+| [rule.]expression | String | Yes | The expression to evaluate the event. |
+
+## Usage Example
+
+This example uses a split processor to split the data into two streams based on the value of the `decoded_as` field.
+
+```yaml
+splits:
+ decoded_as_split:
+ type: split
+ rules:
+ - tag: http_tag
+ expression: event.decoded_as == 'HTTP'
+ - tag: dns_tag
+ expression: event.decoded_as == 'DNS'
+
+
+topology:
+ - name: inline_source
+ downstream: [decoded_as_split]
+ - name: decoded_as_split
+ tags: [http_tag, dns_tag]
+ downstream: [ projection_processor, aggregate_processor]
+ - name: projection_processor
+ downstream: [ print_sink ]
+ - name: aggregate_processor
+ downstream: [ print_sink ]
+ - name: print_sink
+ downstream: []
+```
+
+
+
+
diff --git a/docs/processor/udaf.md b/docs/processor/udaf.md
index dd1dd70..66d6ad5 100644
--- a/docs/processor/udaf.md
+++ b/docs/processor/udaf.md
@@ -41,7 +41,7 @@ COLLECT_LIST is used to collect the value of the field in the group of events.
- lookup_fields: required. Now only support one field.
- output_fields: optional. If not set, the output field name is `lookup_field_name`.
-### Example
+Example:
```yaml
- function: COLLECT_LIST
@@ -59,7 +59,7 @@ COLLECT_SET is used to collect the unique value of the field in the group of eve
- lookup_fields: required. Now only support one field.
- output_fields: optional. If not set, the output field name is `lookup_field_name`.
-### Example
+Example
```yaml
- function: COLLECT_SET
@@ -76,7 +76,7 @@ FIRST_VALUE is used to get the first value of the field in the group of events.
- lookup_fields: required. Now only support one field.
- output_fields: optional. If not set, the output field name is `lookup_field_name`.
-### Example
+Example
```yaml
- function: FIRST_VALUE
@@ -92,7 +92,7 @@ LAST_VALUE is used to get the last value of the field in the group of events.
- lookup_fields: required. Now only support one field.
- output_fields: optional. If not set, the output field name is `lookup_field_name`.
-### Example
+Example
```yaml
- function: LAST_VALUE
@@ -109,7 +109,7 @@ LONG_COUNT is used to count the number of events in the group of events.
- lookup_fields: optional.
- output_fields: required.
-### Example
+Example
```yaml
- function: LONG_COUNT
@@ -127,7 +127,7 @@ MEAN is used to calculate the mean value of the field in the group of events. Th
- parameters: optional.
- precision: `<Integer>` required. The precision of the mean value. Default is 2.
-### Example
+Example
```yaml
- function: MEAN
@@ -144,7 +144,7 @@ NUMBER_SUM is used to sum the value of the field in the group of events. The loo
- lookup_fields: required. Now only support one field.
- output_fields: optional. If not set, the output field name is `lookup_field_name`.
-### Example
+Example
```yaml
- function: NUMBER_SUM
@@ -164,7 +164,8 @@ hlld is a high-performance C server which is used to expose HyperLogLog sets and
- precision: `<Integer>` optional. The precision of the hlld value. Default is 12.
- output_format: `<String>` optional. The output format can be either `base64(encoded string)` or `binary(byte[])`. The default is `base64`.
-### Example
+Example
+
Merge multiple string field into a HyperLogLog data structure.
```yaml
- function: HLLD
@@ -194,8 +195,8 @@ Approx Count Distinct HLLD is used to count the approximate number of distinct v
- input_type: `<String>` optional. Refer to `HLLD` function.
- precision: `<Integer>` optional. Refer to `HLLD` function.
-### Example
-
+Example
+
```yaml
- function: APPROX_COUNT_DISTINCT_HLLD
lookup_fields: [client_ip]
@@ -228,8 +229,8 @@ A High Dynamic Range (HDR) Histogram. More details can be found in [HDR Histogra
- autoResize: `<Boolean>` optional. If true, the highestTrackableValue will auto-resize. Default is true.
- output_format: `<String>` optional. The output format can be either `base64(encoded string)` or `binary(byte[])`. The default is `base64`.
-### Example
-
+Example
+
```yaml
- function: HDR_HISTOGRAM
lookup_fields: [latency_ms]
@@ -264,8 +265,8 @@ Approx Quantile HDR is used to calculate the approximate quantile value of the f
- autoResize: `<Boolean>` optional. Refer to `HDR_HISTOGRAM` function.
- probability: `<Double>` optional. The probability of the quantile. Default is 0.5.
-### Example
-
+Example
+
```yaml
- function: APPROX_QUANTILE_HDR
lookup_fields: [latency_ms]
@@ -301,8 +302,8 @@ Approx Quantiles HDR is used to calculate the approximate quantile values of the
- autoResize: `<Boolean>` optional. Refer to `HDR_HISTOGRAM` function.
- probabilities: `<Array<Double>>` required. The list of probabilities of the quantiles. Range is 0 to 1.
-### Example
-
+Example
+
```yaml
- function: APPROX_QUANTILES_HDR
lookup_fields: [latency_ms]
diff --git a/docs/processor/udf.md b/docs/processor/udf.md
index 170d86f..e480275 100644
--- a/docs/processor/udf.md
+++ b/docs/processor/udf.md
@@ -96,18 +96,19 @@ Base64 encode function is commonly used to encode the binary data to base64 stri
```BASE64_ENCODE_TO_STRING(filter, output_fields[, parameters])```
- filter: optional
-- lookup_fields: not required
+- lookup_fields: required
- output_fields: required
- parameters: required
- - value_field: `<String>` required.
+ - input_type: `<String>` required. Enum: `string`, `byte_array`. The input type of the value field.
Example:
```yaml
- function: BASE64_ENCODE_TO_STRING
+ lookup_fields: [packet]
output_fields: [packet]
parameters:
- value_field: packet
+ input_type: string
```
### Current Unix Timestamp
@@ -141,7 +142,7 @@ Domain function is used to extract the domain from the url.
- parameters: required
- option: `<String>` required. Enum: `TOP_LEVEL_DOMAIN`, `FIRST_SIGNIFICANT_SUBDOMAIN`.
-#### Option
+**Option**
- `TOP_LEVEL_DOMAIN` is used to extract the top level domain from the url. For example, `www.abc.com` will be extracted to `com`.
- `FIRST_SIGNIFICANT_SUBDOMAIN` is used to extract the first significant subdomain from the url. For example, `www.abc.com` will be extracted to `abc.com`.
@@ -184,34 +185,55 @@ Eval function is used to adds or removes fields from events by evaluating an val
- parameters: required
- value_expression: `<String>` required. Enter a value expression to set the field’s value – this can be a constant.
-Example 1:
-Add a field `ingestion_time` with value `recv_time`:
+Example 1, add a field `eval_constant_string` with string value `fixed_value`:
+```yaml
+
+- function: EVAL
+ output_fields: [eval_constant_string]
+ parameters:
+ value_expression: "'fixed_value'"
+```
+
+Example 2, add a field `eval_constant_integer` with integer value `123`:
+```yaml
+- function: EVAL
+ output_fields: [eval_constant_integer]
+ parameters:
+ value_expression: "123"
+```
+Example 3: add a field `ingestion_time` with the value of `recv_time` field.
```yaml
- function: EVAL
output_fields: [ingestion_time]
parameters:
- value_expression: recv_time
+ value_expression: recv_time # or "recv_time"
```
-Example 2:
+Example 4: add a field `internal_ip` with the expression of conditional operator.
If the value of `direction` is `69`, the value of `internal_ip` will be `client_ip`, otherwise the value of `internal_ip` will be `server_ip`.
-
```yaml
- function: EVAL
output_fields: [internal_ip]
parameters:
- value_expression: 'direction=69 ? client_ip : server_ip'
+ value_expression: "direction=69 ? client_ip : server_ip"
+```
+Use the bitwise operator to determine the value of the `direction` field.
+```yaml
+ - function: EVAL
+ output_fields: [ direction ]
+ parameters:
+ value_expression: "(flags & 24576) == 24576 ? 'double' : ((flags & 8192) == 8192 ? 'c2s' : ((flags & 16384) == 16384 ? 's2c' : 'unknown'))"
```
-
### Flatten
-Flatten the fields of nested structure to the top level. The new fields name are named using the field name prefixed with the names of the struct fields to reach it, separated by dots as default.
+Flatten the fields of nested structure to the top level. The new fields name are named using the field name prefixed with the names of the struct fields to reach it, separated by dots as default. The original fields will be removed.
```FLATTEN(filter, lookup_fields, output_fields[, parameters])```
+
- filter: optional
- lookup_fields: optional
-- output_fields: not required
+- output_fields: not required.
- parameters: optional
- prefix: `<String>` optional. Prefix string for flattened field names. Default is empty.
- depth: `<Integer>` optional. Number representing the nested levels to consider for flattening. Minimum 1. Default is `5`.
@@ -255,13 +277,14 @@ Output:
From unix timestamp function is used to convert the unix timestamp to date time string. The default time zone is UTC+0.
```FROM_UNIX_TIMESTAMP(filter, lookup_fields, output_fields[, parameters])```
+
- filter: optional
- lookup_fields: required
- output_fields: required
- parameters: optional
- precision: `<String>` optional. Default is `seconds`. Enum: `milliseconds`, `seconds`.
-#### Precision
+**Precision**
- `milliseconds` is used to convert the unix timestamp to milliseconds date time string. For example, `1619712000` will be converted to `2021-04-30 00:00:00.000`.
- `seconds` is used to convert the unix timestamp to seconds date time string. For example, `1619712000` will be converted to `2021-04-30 00:00:00`.
@@ -314,7 +337,7 @@ GeoIP lookup function is used to lookup the geoip information by ip address. You
- ISP: `<String>` optional.
- ORGANIZATION: `<String>` optional.
-#### Option
+**Option**
- `IP_TO_COUNTRY` is used to lookup the country or region information by ip address.
- `IP_TO_PROVINCE` is used to lookup the province or state information by ip address.
@@ -326,7 +349,7 @@ GeoIP lookup function is used to lookup the geoip information by ip address. You
- `IP_TO_JSON` is used to lookup the above information by ip address. The result is a json string.
- `IP_TO_OBJECT` is used to lookup the above information by ip address. The result is a `LocationResponse` object.
-#### GeoLocation Field Mapping
+**GeoLocation Field Mapping**
- `COUNTRY` is used to map the country information to the event field.
- `PROVINCE` is used to map the province information to the event field.
@@ -413,8 +436,8 @@ Rename function is used to rename or reformat(e.g. by replacing character unders
- parameters: required
- parent_fields: `<Array>` optional. Specify fields whose children will inherit the Rename fields and Rename expression operations.
- rename_fields: `Map<String, String>` required. The key is the original field name, and the value is the new field name.
- - current_field_name: `<String>` required. The original field name.
- - new_field_name: `<String>` required. The new field name.
+ - current_field_name: `<String>` required. The original field name.
+ - new_field_name: `<String>` required. The new field name.
- rename_expression: `<String>` optional. AviatorScript expression whose returned value will be used to rename fields.
```
@@ -427,9 +450,9 @@ Remove the prefix "tags_" from the field names and rename the field "timestamp_m
```yaml
- function: RENAME
-- parameters:
+ parameters:
rename_fields:
- - timestamp_ms: recv_time_ms
+ timestamp_ms: recv_time_ms
rename_expression: key=string.replace_all(key,'tags_',''); return key;
```
@@ -440,10 +463,10 @@ Rename the field `client_ip` to `source_ip`, including the fields under the `enc
```yaml
- function: RENAME
-- parameters:
+ parameters:
parent_fields: [encapsulation.ipv4]
rename_fields:
- - client_ip: source_ip
+ client_ip: source_ip
```
@@ -509,7 +532,7 @@ Unix timestamp converter function is used to convert the unix timestamp precisio
- parameters: required
- precision: `<String>` required. Enum: `milliseconds`, `seconds`, `minutes`. The minutes precision is used to generate Unix timestamp, round it to the minute level, and output it in seconds format.
- Example:
-_`__timestamp` Internal field, from source ingestion time or current unix timestamp.
+ `__timestamp` Internal field, from source ingestion time or current unix timestamp.
```yaml
- function: UNIX_TIMESTAMP_CONVERTER
@@ -518,4 +541,67 @@ _`__timestamp` Internal field, from source ingestion time or current unix timest
parameters:
precision: seconds
```
+### UUID
+Generate a version 4 (random) UUID in accordance with [RFC-9562](https://datatracker.ietf.org/doc/rfc9562/).
+
+```UUID(output_fields)```
+- filter: not required
+- lookup_fields: not required
+- output_fields: required
+- parameters: not required
+
+Example:
+
+```yaml
+- function: UUID
+ output_fields: [uuid]
+```
+Result: such as 3f0f8d7e-d89e-4b0a-9f2e-2eab5c99d062.
+
+### UUIDv5
+
+Generate a version 5 (namespaced) UUID in accordance with RFC-9562 for the given name and namespace. If namespace is not a valid UUID, this function will fail.
+Suitable for consistent identifiers across different systems. One of IP, DOMAIN, APP, or SUBSCRIBER to use a predefined namespace.
+- NAMESPACE_IP: `6ba7b890-9dad-11d1-80b4-00c04fd430c8`
+- NAMESPACE_DOMAIN: `6ba7b891-9dad-11d1-80b4-00c04fd430c8`
+- NAMESPACE_APP: `6ba7b892-9dad-11d1-80b4-00c04fd430c8`
+- NAMESPACE_SUBSCRIBER: `6ba7b893-9dad-11d1-80b4-00c04fd430c8`
+
+```UUIDV5(lookup_fields, output_fields[, parameters])```
+- filter: not required
+- lookup_fields: required
+- output_fields: required
+- parameters: required
+ - namespace: `<String>` required. The UUID namespace.
+
+Example:
+
+```yaml
+- function: UUIDv5
+ lookup_fields: [ client_ip, server_ip ] # Based on the client_ip and server_ip value as Name with separator "_".
+ output_fields: [ip_uuid]
+ parameters:
+ namespace: NAMESPACE_IP
+```
+
+Result: such as 2ed6657d-e927-568b-95e1-2665a8aea6a2.
+
+### UUIDv7
+
+Generate a version 7 (Unix-timestamp + random based variant) UUID in accordance with RFC-9562. Suitable for scenarios that require time ordering, such as database indexing and logging.
+
+```UUIDV7(output_fields)```
+- filter: not required
+- lookup_fields: not required
+- output_fields: required
+- parameters: not required
+
+Example:
+
+```yaml
+- function: UUIDv7
+ output_fields: [log_uuid]
+
+```
+Result: such as 2ed6657d-e927-568b-95e1-2665a8aea6a2. \ No newline at end of file
diff --git a/docs/processor/udtf.md b/docs/processor/udtf.md
index a6e8444..65a7840 100644
--- a/docs/processor/udtf.md
+++ b/docs/processor/udtf.md
@@ -29,8 +29,8 @@ The Unroll Function handles an array field—or an expression evaluating to an a
- parameters: optional
- regex: `<String>` optional. If lookup_fields is a string, the regex parameter is used to split the string into an array. The default value is a comma.
-#### Example
-
+Example
+
```yaml
functions:
- function: UNROLL
@@ -50,8 +50,8 @@ The JSON Unroll Function handles a JSON object, unrolls/explodes an array of obj
- path: `<String>` optional. Path to array to unroll, default is the root of the JSON object.
- new_path: `<String>` optional. Rename path to new_path, default is the same as path.
-#### Example
-
+Example
+
```yaml
functions:
- function: JSON_UNROLL
@@ -62,5 +62,53 @@ functions:
- new_path: tag
```
+### Path Unroll
+
+The PATH_UNROLL function processes a given file path, breaking it down into individual steps and transforming each step into a separate event while retaining top-level fields. At the final level, it outputs both the full file path and the file name.
+
+```PATH_UNROLL(filter, lookup_fields, output_fields[, parameters])```
+
+- filter: optional
+- lookup_fields: required
+- output_fields: required
+- parameters: optional
+ - separator: <String> optional. The delimiter used to split the path. Default is `/`.
+
+Example Usage:
+
+```yaml
+- function: PATH_UNROLL
+ lookup_fields: [ decoded_path, app]
+ output_fields: [ protocol_stack_id, app_name ]
+ parameters:
+ separator: "."
+```
+Input:
+
+```json
+{"decoded_path":"ETHERNET.IPv4.TCP.ssl","app":"wechat"}
+```
+When the input is processed, the following events are generated:
+```
+ #Event1: {"protocol_stack_id":"ETHERNET"}
+ #Event2: {"protocol_stack_id":"ETHERNET.IPv4"}
+ #Event3: {"protocol_stack_id":"ETHERNET.IPv4.TCP"}
+ #Event4: {"protocol_stack_id":"ETHERNET.IPv4.TCP.ssl"}
+ #Event5: {"app_name":"wechat","protocol_stack_id":"ETHERNET.IPv4.TCP.ssl.wechat"}
+```
+
+If decoded_path contains app value of `ETHERNET.IPv4.TCP.ssl`, the output will be as follows:
+```json
+{"decoded_path":"ETHERNET.IPv4.TCP.ssl","app":"ssl"}
+```
+In this case, the output will be:
+```
+ #Event1: {"protocol_stack_id":"ETHERNET"}
+ #Event2: {"protocol_stack_id":"ETHERNET.IPv4"}
+ #Event3: {"protocol_stack_id":"ETHERNET.IPv4.TCP"}
+ #Event4: {"protocol_stack_id":"ETHERNET.IPv4.TCP.ssl", "app_name":"ssl"}
+```
+
+