diff options
Diffstat (limited to 'docs')
| -rw-r--r-- | docs/connector/config-encryption-decryption.md | 8 | ||||
| -rw-r--r-- | docs/connector/connector.md | 27 | ||||
| -rw-r--r-- | docs/connector/formats/csv.md | 73 | ||||
| -rw-r--r-- | docs/connector/formats/raw.md | 2 | ||||
| -rw-r--r-- | docs/connector/sink/kafka.md | 27 | ||||
| -rw-r--r-- | docs/connector/sink/starrocks.md | 83 | ||||
| -rw-r--r-- | docs/connector/source/kafka.md | 7 | ||||
| -rw-r--r-- | docs/develop-guide.md | 22 | ||||
| -rw-r--r-- | docs/env-config.md | 3 | ||||
| -rw-r--r-- | docs/grootstream-config.md | 45 | ||||
| -rw-r--r-- | docs/grootstream-design-cn.md | 2194 | ||||
| -rw-r--r-- | docs/images/groot_stream_architecture.jpg | bin | 5263679 -> 5472871 bytes | |||
| -rw-r--r-- | docs/processor/aggregate-processor.md | 23 | ||||
| -rw-r--r-- | docs/processor/split-processor.md | 49 | ||||
| -rw-r--r-- | docs/processor/udaf.md | 33 | ||||
| -rw-r--r-- | docs/processor/udf.md | 132 | ||||
| -rw-r--r-- | docs/processor/udtf.md | 56 |
17 files changed, 2702 insertions, 82 deletions
diff --git a/docs/connector/config-encryption-decryption.md b/docs/connector/config-encryption-decryption.md index c2b05f6..91ca80e 100644 --- a/docs/connector/config-encryption-decryption.md +++ b/docs/connector/config-encryption-decryption.md @@ -127,13 +127,13 @@ Next, I'll show how to quickly use groot-stream's own `aes` encryption: ## How to implement user-defined encryption and decryption -1. Create a new class and implement interface `ConfigShade`, this interface has the following methods: +1. Create a new class and implement interface `CryptoShade`, this interface has the following methods: ```java - public interface ConfigShade { + public interface CryptoShade { /** * The unique identifier of the current interface, used it to select the correct {@link - * ConfigShade} + * CryptoShade} */ String getIdentifier(); @@ -157,6 +157,6 @@ Next, I'll show how to quickly use groot-stream's own `aes` encryption: } } ``` -2. Add `com.geedgenetworks.common.config.ConfigShade` in `resources/META-INF/services` +2. Add `com.geedgenetworks.common.crypto.CryptoShade` in `resources/META-INF/services` 3. Change the option `shade.identifier` to the value that you defined in `ConfigShade#getIdentifier`of you config file. diff --git a/docs/connector/connector.md b/docs/connector/connector.md index 766b73e..93d64b0 100644 --- a/docs/connector/connector.md +++ b/docs/connector/connector.md @@ -1,3 +1,12 @@ +# Table of Contents +- [Source Connector](#source-connector) + - [Common Source Options](#common-source-options) + - [Schema Field Projection](#schema-field-projection) + - [Schema Config](#schema-config) + - [Mock Data Type](#mock-data-type) +- [Sink Connector](#sink-connector) + - [Common Sink Options](#common-sink-options) + # Source Connector Source Connector contains some common core features, and each source connector supports them to varying degrees. @@ -62,13 +71,12 @@ schema: To retrieve the schema from a local file using its absolute path. > Ensures that the file path is accessible to all nodes in your Flink cluster. -> -> ```yaml -> schema: -> # by array -> fields: -> local_file: "/path/to/schema.json" -> ``` + + ```yaml +schema: + # Note: Only support avro schema format + local_file: "/path/to/schema.json" +``` ### URL @@ -76,9 +84,8 @@ Some connectors support periodically fetching and updating the schema from a URL ```yaml schema: - # by array - fields: - url: "https://localhost:8080/schema.json" + # Note: Only support avro schema format + url: "https://localhost:8080/schema.json" ``` ## Mock Data Type diff --git a/docs/connector/formats/csv.md b/docs/connector/formats/csv.md new file mode 100644 index 0000000..ca8d10b --- /dev/null +++ b/docs/connector/formats/csv.md @@ -0,0 +1,73 @@ +# CSV + +> Format CSV +> +> ## Description +> +> The CSV format allows to read and write CSV data based on an CSV schema. Currently, the CSV schema is derived from table schema. +> **The CSV format must config schema for source/sink**. + +| Name | Supported Versions | Maven | +|--------------|--------------------|---------------------------------------------------------------------------------------------------------------------------| +| Format CSV | Universal | [Download](http://192.168.40.153:8099/service/local/repositories/platform-release/content/com/geedgenetworks/format-csv/) | + +## Format Options + +| Name | Type | Required | Default | Description | +|-----------------------------|-----------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| format | String | Yes | (none) | Specify what format to use, here should be 'csv'. | +| csv.field.delimiter | String | No | , | Field delimiter character (',' by default), must be single character. You can use backslash to specify special characters, e.g. '\t' represents the tab character. | +| csv.disable.quote.character | Boolean | No | false | Disabled quote character for enclosing field values (false by default). If true, option 'csv.quote.character' can not be set. | +| csv.quote.character | String | No | " | Quote character for enclosing field values (" by default). | +| csv.allow.comments | Boolean | No | false | Ignore comment lines that start with '#' (disabled by default). If enabled, make sure to also ignore parse errors to allow empty rows. | +| csv.ignore.parse.errors | Boolean | No | false | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors. | +| csv.array.element.delimiter | String | No | ; | Array element delimiter string for separating array and row element values (';' by default). | +| csv.escape.character | String | No | (none) | Escape character for escaping values (disabled by default). | +| csv.null.literal | String | No | (none) | Null literal string that is interpreted as a null value (disabled by default). | + +# How to use + +## Inline uses example + +data: + +```json +{ + "log_id": 1, + "recv_time": 1712827485, + "client_ip": "192.168.0.1" +} +``` + +```yaml +sources: + inline_source: + type: inline + schema: + fields: "log_id:int, recv_time:bigint, client_ip:string" + properties: + data: "1,1712827485,192.168.0.1" + format: csv + +sinks: + print_sink: + type: print + schema: + fields: "log_id:int, recv_time:bigint, client_ip:string" + properties: + format: csv + +application: + env: + name: example-inline-to-print + parallelism: 3 + pipeline: + object-reuse: true + topology: + - name: inline_source + downstream: [print_sink] + - name: print_sink + downstream: [] + +``` + diff --git a/docs/connector/formats/raw.md b/docs/connector/formats/raw.md index 853ac79..06ea8bc 100644 --- a/docs/connector/formats/raw.md +++ b/docs/connector/formats/raw.md @@ -4,7 +4,7 @@ > > ## Description > -> The Raw format allows to read and write raw (byte based) values as a single column. +> The Raw format allows to read and write raw (byte based) values as a single column, the column name is raw default, it can also be explicitly defined as other name. | Name | Supported Versions | Maven | |------------|--------------------|---------------------------------------------------------------------------------------------------------------------------| diff --git a/docs/connector/sink/kafka.md b/docs/connector/sink/kafka.md index 716a179..78b7f34 100644 --- a/docs/connector/sink/kafka.md +++ b/docs/connector/sink/kafka.md @@ -20,19 +20,20 @@ In order to use the Kafka connector, the following dependencies are required. Th Kafka sink custom properties. if properties belongs to Kafka Producer Config, you can use `kafka.` prefix to set. -| Name | Type | Required | Default | Description | -|------------------------------------|---------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| topic | String | Yes | (none) | Topic name is required. It used to write data to kafka. | -| kafka.bootstrap.servers | String | Yes | (none) | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. This list should be in the form `host1:port1,host2:port2,...`. | -| log.failures.only | Boolean | No | true | Defines whether the producer should fail on errors, or only log them. If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown and cause the streaming program to fail (and enter recovery). | -| format | String | No | json | Data format. The default value is `json`. The Optional values are `json`, `protobuf`. | -| [format].config | / | No | (none) | Data format properties. Please refer to [Format Options](../formats) for details. | -| rate.limiting.strategy | String | No | (none) | The rate limiting strategy to use. The Optional values are `none`, `sliding_window`. | -| rate.limiting.window.size | Integer | No | 5 | The window size of the rate limiting. For example, limit rate less than 10Mbps in 5 seconds time interval. | -| rate.limiting.limit.rate | String | No | 10Mbps | A maximum rate of traffic that can be transmitted over a network or between networks. The units of the bytes rate are Mbps, Kbps,and bps. For example, 10Mbps, 100Kbps, 1000bps. | -| rate.limiting.block.duration | String | No | 5min | If the rate limit is exceeded, the data will be blocked for the specified duration. The units of the duration are seconds, minutes, and hours. For example, 10s, 1m, 1h. | -| rate.limiting.block.reset.duration | String | No | 30s | The time interval for resetting the rate limit. The units of the duration are seconds, minutes, and hours. For example, 10s, 1m, 1h. | -| kafka.config | / | No | (none) | Kafka producer properties. Please refer to [Kafka Producer Config](https://kafka.apache.org/documentation/#producerconfigs) for details. | +| Name | Type | Required | Default | Description | +|-------------------------------------|---------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| topic | String | Yes | (none) | Topic name is required. It used to write data to kafka. | +| kafka.bootstrap.servers | String | Yes | (none) | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. This list should be in the form `host1:port1,host2:port2,...`. | +| log.failures.only | Boolean | No | true | Defines whether the producer should fail on errors, or only log them. If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown and cause the streaming program to fail (and enter recovery). | +| format | String | No | json | Data format. The default value is `json`. The Optional values are `json`, `protobuf`. | +| [format].config | Map | No | (none) | Data format properties. Please refer to [Format Options](../formats) for details. | +| headers.config | Map | No | (none) | Kafka record headers info. exp: 'headers.key: value' will put key and value into record headers. | +| rate.limiting.strategy | String | No | (none) | The rate limiting strategy to use. The Optional values are `none`, `sliding_window`. | +| rate.limiting.window.size | Integer | No | 5 | The window size of the rate limiting. For example, limit rate less than 10Mbps in 5 seconds time interval. | +| rate.limiting.limit.rate | String | No | 10Mbps | A maximum rate of traffic that can be transmitted over a network or between networks. The units of the bytes rate are Mbps, Kbps,and bps. For example, 10Mbps, 100Kbps, 1000bps. | +| rate.limiting.block.duration | String | No | 5min | If the rate limit is exceeded, the data will be blocked for the specified duration. The units of the duration are seconds, minutes, and hours. For example, 10s, 1m, 1h. | +| rate.limiting.block.reset.duration | String | No | 30s | The time interval for resetting the rate limit. The units of the duration are seconds, minutes, and hours. For example, 10s, 1m, 1h. | +| kafka.config | Map | No | (none) | Kafka producer properties. Please refer to [Kafka Producer Config](https://kafka.apache.org/documentation/#producerconfigs) for details. | ## Example diff --git a/docs/connector/sink/starrocks.md b/docs/connector/sink/starrocks.md new file mode 100644 index 0000000..f07e432 --- /dev/null +++ b/docs/connector/sink/starrocks.md @@ -0,0 +1,83 @@ +# Starrocks + +> Starrocks sink connector +> +> ## Description +> +> Sink connector for Starrocks, know more in https://docs.starrocks.io/zh/docs/loading/Flink-connector-starrocks/. + +## Sink Options + +Starrocks sink custom properties. If properties belongs to Starrocks Flink Connector Config, you can use `connection.` prefix to set. + +| Name | Type | Required | Default | Description | +|---------------------|---------|----------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| log.failures.only | Boolean | No | true | Optional flag to whether the sink should fail on errors, or only log them; If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown, true by default. | +| connection.jdbc-url | String | Yes | (none) | The address that is used to connect to the MySQL server of the FE. You can specify multiple addresses, which must be separated by a comma (,). Format: jdbc:mysql://<fe_host1>:<fe_query_port1>,<fe_host2>:<fe_query_port2>,<fe_host3>:<fe_query_port3>.. | +| connection.load-url | String | Yes | (none) | The address that is used to connect to the HTTP server of the FE. You can specify multiple addresses, which must be separated by a semicolon (;). Format: <fe_host1>:<fe_http_port1>;<fe_host2>:<fe_http_port2>.. | +| connection.config | Map | No | (none) | Starrocks Flink Connector Options, know more in https://docs.starrocks.io/docs/loading/Flink-connector-starrocks/#options. | + +## Example + +This example read data of inline test source and write to Starrocks table `test`. + +```yaml +sources: # [object] Define connector source + inline_source: + type: inline + schema: + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: log_id + type: bigint + - name: recv_time + type: bigint + - name: server_fqdn + type: string + - name: server_domain + type: string + - name: client_ip + type: string + - name: server_ip + type: string + - name: server_asn + type: string + - name: decoded_as + type: string + - name: device_group + type: string + - name: device_tag + type: string + properties: + # + # [string] Event Data, it will be parsed to Map<String, Object> by the specified format. + # + data: '{"recv_time": 1705565615, "log_id":206211012872372224, "tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}' + format: json + json.ignore.parse.errors: false + +sinks: + starrocks_sink: + type: starrocks + properties: + "log.failures.only": false + "connection.jdbc-url": "jdbc:mysql://192.168.40.222:9030" + "connection.load-url": "192.168.40.222:8030" + "connection.database-name": "test" + "connection.table-name": "test" + "connection.username": "root" + "connection.password": "" + "connection.sink.buffer-flush.interval-ms": "30000" + +application: # [object] Define job configuration + env: + name: groot-stream-job-inline-to-starrocks + parallelism: 3 + pipeline: + object-reuse: true + topology: + - name: inline_source + downstream: [ starrocks_sink ] + - name: starrocks_sink + downstream: [ ] +``` + diff --git a/docs/connector/source/kafka.md b/docs/connector/source/kafka.md index 07dff22..680d1c1 100644 --- a/docs/connector/source/kafka.md +++ b/docs/connector/source/kafka.md @@ -24,6 +24,13 @@ Kafka source custom properties. if properties belongs to Kafka Consumer Config, | [format].config | Map | No | (none) | Data format properties. Please refer to [Format Options](../formats) for details. | | kafka.config | Map | No | (none) | Kafka consumer properties. Please refer to [Kafka Consumer Config](https://kafka.apache.org/documentation/#consumerconfigs) for details. | +## Internal Fields + +| Name | Type | Description | +|-------------|---------------------|-------------------------------------| +| __timestamp | Long | The timestamp of this kafka record. | +| __headers | Map[String, String] | The headers of this kafka record. | + ## Example This example read data of kafka topic `SESSION-RECORD` and print to console. diff --git a/docs/develop-guide.md b/docs/develop-guide.md index 2742cee..927d2d3 100644 --- a/docs/develop-guide.md +++ b/docs/develop-guide.md @@ -15,6 +15,28 @@ | groot-docs | Docs module of groot-stream, which is responsible for providing documents. | | groot-release | Release module of groot-stream, which is responsible for providing release scripts. | +## Event Model +Groot Stream based all stream processing on data records common known as events. A event is a collection of key-value pairs(fields). As follows: + +```json +{ + "__timestamp": "<Timestamp in UNIX epoch format (milliseconds)>", + "__headers": "Map<String, String> headers of the source that delivered the event", + "__window_start_timestamp" : "<Timestamp in UNIX epoch format (milliseconds)>", + "__window_end_timestamp" : "<Timestamp in UNIX epoch format (milliseconds)>", + "key1": "<value1>", + "key2": "<value2>", + "keyN": "<valueN>" +} +``` +Groot Stream add internal fields during pipeline processing. A few notes about internal fields: +- Internal fields start with a double underscore `__`. +- Each source can add one or many internal fields to the each event. For example, the Kafka source adds both a `__timestamp` and a `__input_id` field. +- Treat internal fields as read-only. Modifying them can result in unintended consequences to your data flows. +- Internal fields only exist for the duration of the event processing pipeline. They are not documented under sources or sinks. +- If you do not configure a timestamp for extraction, the Pipeline process assigns the current time (in UNIX epoch format) to the __timestamp field. +- If you have multiple sources, you can determine the origin of the event by examining the `__headers` field. For example, the Kafka source appends the topic name as the `__input_id` key in the `__headers`. + ## How to write a high quality Git commit message > [purpose] [module name] [sub-module name] Description (JIRA Issue ID) diff --git a/docs/env-config.md b/docs/env-config.md index 8e22a53..58f7e71 100644 --- a/docs/env-config.md +++ b/docs/env-config.md @@ -36,6 +36,9 @@ This parameter is used to define the runtime mode of the job, the default value Specify the method of encryption, if you didn't have the requirement for encrypting or decrypting sensitive information in the configuration file, this option can be ignored. For more details, you can refer to the documentation [config-encryption-decryption](connector/config-encryption-decryption.md) +### kms.type +Specify Key Management System (KMS) type, default is `local`. You can integrate an external KMS, such as `vault`. For more details, you can refer to the documentation [KMS](grootstream-config.md#kms). + ### pipeline.object-reuse This parameter is used to enable/disable object reuse for the execution of the job. If it is not specified, the default value is `false`. diff --git a/docs/grootstream-config.md b/docs/grootstream-config.md index 9dd442f..b7fd037 100644 --- a/docs/grootstream-config.md +++ b/docs/grootstream-config.md @@ -77,6 +77,51 @@ grootstream: - asn_builtin.mmdb - asn_user_defined.mmdb ``` + +## KMS +Key Management System(KMS). It is a service that provides a secure way to create, manage, and control encryption keys used to encrypt data. KMS is used to protect sensitive information by ensuring that encryption keys are kept secure and accessible only to authorized users and applications. + +| Name | Type | Required | Default | Description | +|:-----| :----- | :------- | :-- ---- |:------------------------------------------------ | +| type | String | Yes | local | The type of the Key Management Service. Enum: local, vault. | +| url | String | No | (none) | The kms server's URL (e.g., `http://localhost:8200`). | +| token | String | No | (none) | The authentication token | +| default_key_path | String | No | (none) | HashiCorp Vault default key path. for example, `transit/` | +| plugin_key_path | String | No | (none) | HashiCorp Vault plugin key path. for example, `plugin/gmsm` | + +```yaml + kms: + local: + type: local + vault: + type: vault + url: <vault-url> + token: <vault-token> + default_key_path: <vault-key-path> + plugin_key_path: <vault-plugin-key-path> +``` + +## SSL + +The client SSL configuration. + +| Name | Type | Required | Default | Description | +|:-----| :----- | :------- | :-- ---- |:------------------------------------------------ | +| skip_verification | Boolean | Yes | true | Ignore SSL certificate verification | +| certificate_path | String | Yes | (none) | Path to the client's private key file | +| private_key_path | String | Yes | (none) | Path to the client's certificate file | +| ca_certificate_path | Boolean | Yes | false | Path to the root CA certificate for server verification | + +```yaml + ssl: + skip_verification: true + private_key_path: /path/to/certs/worker.key + certificate_path: /path/to/certs/worker.pem + ca_certificate_path: /path/to/certs/root.pem +``` + + + ## Properties Global user-defined variables can be set in the `properties` section using key-value pairs, where the key represents a configuration property and the value specifies the desired setting. The properties can be used in the configuration file by using `props.${property_name}`.
\ No newline at end of file diff --git a/docs/grootstream-design-cn.md b/docs/grootstream-design-cn.md new file mode 100644 index 0000000..41fcd0d --- /dev/null +++ b/docs/grootstream-design-cn.md @@ -0,0 +1,2194 @@ +# Groot Stream 设计方案 + +# 目录 +- [概述](#概述) +- [系统架构](#系统架构) +- [全局配置 grootstream.yaml](#全局配置-grootstreamyaml) +- [任务配置](#任务配置) + - [接入数据源(Sources)](#接入数据源sources) + - [Source 公共配置](#source-公共配置) + - [Schema配置](#schema配置) + - [Fields](#fields) + - [Local File](#local-file) + - [URL](#url) + - [Kafka Source](#kafka-source) + - [IPFIX Collector(UDP)](#ipfix-collectorudp) + - [File Source](#file-source) + - [Mock Source](#mock-source) + - [Inline Source](#inline-source) + - [过滤器(Filters)](#过滤器filters) + - [分流器(Splits)](#分流器splits) + - [任务处理器 (Processors)](#任务处理器-processors) + - [Projection Processor](#projection-processor) + - [Aggregate Processor](#aggregate-processor) + - [Table Processor](#table-processor) + - [输出Sinks](#输出sinks) + - [Kafka Sink](#kafka-sink) + - [ClickHouse Sink](#clickhouse-sink) + - [Print Sink](#print-sink) + - [Formats](#formats) + - [JSON](#json) + - [MessagePack](#messagepack) + - [Protobuf](#protobuf) + - [Raw](#raw) + - [任务编排](#任务编排) + - [函数定义](#函数定义) + - [内置UDF](#内置udf) + - [标量函数](#标量函数) + - [聚合函数](#聚合函数) + - [表格函数](#表格函数) + - [CN扩展UDF](#cn扩展udf) + - [实现原则](#实现原则) + - [相关问题](#相关问题) + +# 概述 +Groot Stream 是一个实时数据流处理平台,提供了灵活的数据定制管道,能够高效的从多种数据源收集数据,并对其进行加工和转换。具体包括过滤、解析、重组和数据聚合,以便更好的处理和管理数据。 + +主要优势: + +- 实时数据处理:利用Flink作为底层引擎,可以针对大规模实时数据流提供高吞吐、低延迟的实时处理能力。 +- 插件化管理:可自定义Functions, Packs, Sources 和Sinks,用于满足不同应用场景下的数据流定制需求。 +- 降低开发成本:通过YML模版定制数据处理拓扑,无需编写代码快速实现ETL需求。替代现有Real-time Log Streaming ,Data Transporter ETL 和Gohangout数据加载模块。 + +应用场景: + +- 数据汇聚场景 + - 构建QuickConnect拓扑,各个分中心数据被集中汇聚到国家中心。 +- 数据流定制 + - 会话日志经过预处理后发给不同的系统或第三方厂商。 + - 定义Filter 匹配符合条件的日志,然后预处理Pipeline对日志进行反序列化,增加处理时间,抽取域名等操作。 + - Router A 经过 TSG Projection处理器,执行ID-Mapping映射Subscriber ID,发送到TSG系统中。 + - Router B 经过CN Projection处理器,增加IoC标签库映射字段,删除不需要的字段,发送到CN系统中。 + - Router C 经过第三方厂商 Projection处理器,过滤SSL、HTTP 日志,抽取部分字段发送到第三方厂商中。 + - 将会话日志按应用层协议分流,分发到不同Topic中。 + - 过滤匹配SSL日志,分发到SSL Topic。 + - 过滤匹配邮件日志,分发到Email Topic。 +- 数据聚合 + +# 系统架构 + +- **Sources** + - 接收多种数据源或收集器的连续数据输入, 包括Kafka、IPFIX Collector 或UDP 等。 + - 配置参数包括基础配置和Source配置。例如Type 为Kafka,则需要增加Source参数kafka.bootstrap.servers, topics和kafka.consumer.group.id 等。 +- **Filters** + - 对数据源中的日志进行筛选和过滤,缩小处理日志的范围。 + - 通过定义过滤表达式,指定数据中某些属性、条件或规则,基于该表达式匹配符合条件的数据。例如:common_c2s_bytes_num <= 2147483647 && common_s2c_bytes_num<= 2147483647 ,过滤掉不符合Integer取值范围的数据。 + +- **QuickConnect** + - 基于最小化配置,快速构建Sources和Sinks之间的数据管道,可用于原型、测试或跨域数据汇聚。 + - 通过在管道中插入Processors 或Pack。 + +- **Pipelines** + - 在数据流的不同处理阶段可以引用不同类型的Pipelines,所有Pipelines(一系列Functions组成)架构和内部结构一致,只分为Projection和Aggregate两种类型。按Pipeline所在数据流的位置可分为: + - **Pre-processing Pipelines :可选,**前处理数据管道对输入日志进行格式化或执行一系列全局处理函数(例如:从原始日志中提取感兴趣的字段)。 + - **Processing Pipelines:**业务处理管道 + - **Post-processing Pipelines ,可选,**后处理数据管道,发送到目的地之前对日志进行格式化或执行一系列全局处理函数(例如:对输出的日志进行格式验证、类型转换) + - 数据流处理基本单元为处理器,按功能分为无状态和有状态处理器。每个处理器可以连接多个函数,组成一个Pipeline。 + - 投影处理器(Projection Processor):针对每条日志选择所需的列或属性。它属于无状态处理器,期间会严格按照处理器定义的函数(UDFs)顺序执行。例如:获取顶级域名,字符串转换、类型转换或反序列化等运算符函数组成一个Pipeline。 + - 聚合处理器(Aggregate Processor):多条日志进行分组聚合统计。它属于有状态处理器,期间可经过一系列自定义聚合函数(UDAFs)。例如:计算不同IP的总带宽,不同域名总会话数等聚合函数组成一个Pipeline。 + - 表格处理器(Table Processor):一条日志展开为多条输出。它属于无状态处理器,期间可经过一系列自定义聚合函数(UDTFs)。例如:将某个JSON格式的属性展开为多条,其他属性复制,将多条日志输出。 +- **Sinks** + - 发送数据到多个目的地, 具体包括Kafka、HBase 或 Mysql 等。 + - 每种Sink包括基础配置和Sink配置。例如Type 为Kafka,则需要Sink参数Kafka.bootstrap.servers, kafka.topic和kafka.producer.ack 等。 +- **Packs** + - 复杂业务逻辑处理器,一般应用于无法通过函数实现的场景。例如:动态知识库加载及动态schema的数据序列化。 + +# 全局配置 grootstream.yaml + +```yaml +grootstream: +# 知识库配置 + knowledge_base: + - name: tsg_ip_asn # 知识库名称 + fs_type: http # 文件系统类型(http,local,hdfs..) + fs_path: http://127.0.0.1:9999/v1/knowledge_base # 文件路径(单机模式hdfs://{ip}:{port}/{path},集群模式hdfs://{nameservice}/{path}) + files: + - f9f6bc91-2142-4673-8249-e097c00fe1ea # 知识库文件名 + # .... + + - name: tsg_ip_location + # .... + kms: + local: + type: local + vault: + type: vault + url: <vault-url> + token: <vault-token> + default_key_path: <default-vault-key-path> + plugin_key_path: <plugin-vault-key-path> + + ssl: ## SSL/TLS 客户端链接配置 + skip_verification: true # 忽略SSL证书校验 + private_key_path: /path/to/certs/worker.key # 客户端私钥文件路径 + certificate_path: /path/to/certs/worker.pem # 客户端证书文件路径 + ca_certificate_path: /path/to/certs/root.pem # CA 根证书路径 + + properties: # 用户自定义属性的支持从函数中获取,使用方式见函数定义 + hos.path: http://127.0.0.1:9093 + hos.bucket.name.traffic_file: traffic_file_bucket + hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket + scheduler.knowledge_base.update.interval.minutes: 1 #知识库文件定时更新时间 +``` + +| 属性名 | 必填 | 默认值 | 类型 | 描述 | +| -------------- | ---- | ------ | ------------------- | ---------------------------------------------- | +| knowledge_base | Y | - | Object | 知识库配置 | +| kms | N | - | Object | kms (key management system, 密钥管理系统) 配置 | +| ssl | N | - | Object | ssl配置 | +| properties | N | - | Map(String, Object) | 自定义属性配置:key-value 格式 | + + + +# 任务配置 + +## 接入数据源(Sources) + +### **Source 公共配置** + +```yaml +sources: + kafka_source: + type : kafka # source connector 类型 + # source表schema, 通过fields/local_file/url三种方式配置: 配置则转换校验, 只输出配置的列;没配置输出全部列, 不进行类型转换和校验 + schema: + fields: + - name: common_recv_time + type: bigint + - name: common_log_id + type: bigint + # local_file: "schema/test_schema.json" + # url: "http://127.0.0.1/schema.json" + # watermark_timestamp: recv_time + # watermark_timestamp_unit: ms + # watermark_lag: 60 + properties: # source connector 配置 + prop_key1: prop_value1 + prop_key2: prop_value2 + #... +``` + +| 属性名 | 必填 | 默认值 | 类型 | 描述 | +|--------------------------|-------|-----------|-----------|------------------------------------------------------------------------------------------| +| **type** | Y | - | String | source唯一标识 | +| schema | N | - | Map | source表schema,配置则只输出配置的列,同时会进行类型转换和校验。 | +| watermark_timestamp | N | - | String | watermark timestamp字段名称。 | +| watermark_timestamp_unit | N | ms | String | watermark timestamp字段单位,可选值:ms(milliseconds),s(seconds)。如果配置watermark_timestamp,此字段是必须的。 | +| watermark_lag | N | - | Long | watermark out-of-order milliseconds。如果配置watermark_timestamp,此字段是必须的。 | +| properties | Y | - | Object | source属性配置 | + +### schema配置 + +支持通过fields/local_file/url三种方式配置,只能同时配置一种方式。 + +#### Fields + +支持配置属性列表和sql风格字符串(hive sql) + +example: + +```yaml + schema: + fields: + - name: common_recv_time + type: bigint + - name: common_log_id + type: bigint +``` + +支持的数据类型: + +| 类型 | 对应java类型 | 描述 | +|---------|-----------------------|----------------------------------------------------------------------------| +| string | String | 字符串 | +| int | Integer | int | +| bigint | Long | bigint | +| float | Float | float | +| double | Double | double | +| boolean | Boolean | boolean | +| binary | byte[] | 字节数组 | +| struct | Map<String, Object> | 结构体。例如:struct<id:int, client_ip:string, data:struct<id:int, name:string>>。 | +| array | List<Object> | 数组。例如:array<int>, array<struct<id:int, client_ip:string>>。 | + +#### Local File + +读取本地文件中的schema定义,只支持tsg avro schema格式 + +- example + +```yaml + schema: + local_file: "schema/test_schema.json" +``` + +- test_schema.json + +```yaml + { + "type": "record", + "name": "test", + "fields" : [ + {"name": "log_id", "type": "long"}, + {"name": "recv_time", "type": "long"}, + {"name": "client_ip", "type": "string","doc": {"visibility": "enabled"}} + ] +} +``` + +#### URL + +读取http url返回的schema定义,只支持tsg avro schema格式,支持动态更新schema,支持动态schema的connector有:clickhouse sink. + +example: + +```yaml + schema: + url: "http://127.0.0.1/schema.json" +``` + +### Kafka Source + +```yaml +sources: # [object] + kafka_source: # [object] Source Name + # source标识 + type : kafka + # 数据schema: 配置则转换校验, 只输出配置的列;没配置输出全部列, 不进行类型转换和校验 + fields: + - name: common_recv_time + type: bigint + - name: common_log_id + type: bigint + # source属性配置 + properties: + topic: SESSION-RECORD-COMPLETED + kafka.bootstrap.servers: 192.168.44.11:9092 + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.group.id: SESSION-RECORD-COMPLETED-GROUP-GROOT-STREAM-20231021 + kafka.auto.offset.reset: latest + format: json +``` + +| 属性名 | 必填 | 默认值 | 类型 | 描述 | +|-----------------------------|----|------|--------|---------------------------------------------------| +| **topic** | Y | - | String | Kafka Topic名称。支持 Topic列表,用分号分隔,如'topic-1;topic-2' | +| **kafka.bootstrap.servers** | Y | - | String | Kafka Broker 地址 | +| **format** | Y | JSON | String | format,用来反序列化消息JSONProtobufCSV... | +| Kafka Properties | N | - | | kafka Consumer Properties,以"kafka."作为前缀 | +| Format properties | N | - | | format properties,以Format类型作为前缀。例如: “protobuf.” | + +### IPFIX Collector(UDP) + +```yaml +sources: # [object] + ipfix_source: # [object] Source Name + # source标识 + type : ipfix + # 数据schema: 配置则转换校验, 只输出配置的列;没配置输出全部列, 不进行类型转换和校验 + fields: + - name: recv_time + type: bigint + - name: log_id + type: bigint + # source属性配置 + properties: + port.range: 12345-12347 + max.packet.size: 65535 + max.receive.buffer.size: 104857600 + service.discovery.registry.mode: 0 + service.discovery.service.name: udp_ipfix + service.discovery.health.check.interval: 5 + service.discovery.nacos.server.addr: 192.168.44.12:8848 + service.discovery.nacos.username: nacos + service.discovery.nacos.password: nacos + service.discovery.nacos.namespace: test + service.discovery.nacos.group: IPFIX + service.discovery.consul.server.addr: 192.168.41.30 + service.discovery.consul.server.port: 8500 + service.discovery.consul.token: +``` + +| 属性名 | 必填 | 默认值 | 类型 | 描述 | +|--------------------------------------------|------------|-------------|----------|-----------------------------------------------------------------------------| +| port.range | Y | - | String | IPFIX Collector的UDP端口,指定单个端口或端口范围。例如指定单个端口为4739,指定端口范围为12345-12347。 | +| max.packet.size | N | 65535 | Integer | 单条UDP数据包的最大大小,最大值为65535(Bytes)。 | +| max.receive.buffer.size | N | 104857600 | Integer | UDP接收缓存区大小(Bytes)。 | +| service.discovery.registry.mode | N | - | Integer | 服务发现的注册模式,0为nacos,1为consul,其他为不使用服务发现。 | +| service.discovery.service.name | N | - | String | 服务发现中的serviceName。 | +| service.discovery.health.check.interval | N | - | Integer | 服务发现健康检查的时间间隔,单位秒。 | +| service.discovery.nacos.server.addr | N | - | String | nacos服务的地址,格式为ip:port, service.discovery.registry.mode为0时必须指定。 | +| service.discovery.nacos.username | N | - | String | nacos的用户名,service.discovery.registry.mode为0时必须指定。 | +| service.discovery.nacos.password | N | - | String | nacos的密码,service.discovery.registry.mode为0时必须指定。 | +| service.discovery.nacos.namespace | N | - | String | nacos中的命名空间,service.discovery.registry.mode为0时可设置,不设置为public。 | +| service.discovery.nacos.group | N | - | String | nacos中的所属组,service.discovery.registry.mode为0时可设置,不设置为DEFAULT。 | +| service.discovery.consul.server.ip | N | - | String | consul服务的ip,service.discovery.registry.mode为1时必须指定。 | +| service.discovery.consul.server.port | N | - | Integer | consul服务的端口,service.discovery.registry.mode为1时必须指定。 | +| service.discovery.consul.token | N | - | String | consul的token,service.discovery.registry.mode为1且consul开启验证时必须指定。 | + +### File Source + +从text file读取数据,支持本地文件和hdfs文件,用于测试以及从文件回放数据,这个source每个1s发送2条数据 + +```yaml +sources: + file_source: + type: file + properties: + # path: 'hdfs://ns1/test/logs.json' + path: './logs.json' + rows.per.second: 2 + format: json +``` + +| 属性名 | 必填 | 默认值 | 类型 | 描述 | +|---------------------------|-------|------|---------|---------------------------------------------------------------------------------------------------------------------------------------------| +| **path** | Y | - | String | 文件路径,以[hdfs://](hdfs://ns1/test/logs.json)开头为hdfs文件,其它为本地文件系统文件。例如:./logs/logs.json, [hdfs://ns1/test/logs.json](hdfs://ns1/test/logs.json) | +| **format** | Y | - | String | 使用的format | +| rows.per.second | N | 1000 | Integer | 每秒生成行数 | +| number.of.rows | N | -1 | Long | 总生成行数,默认此source是无界流(会循环从文件生成数据),当配置大于0时此source为有界流 | +| millis.per.row | N | 0 | Long | 每行生成花费毫秒数,当大于0时,rows.per.second配置不生效 | +| read.local.file.in.client | N | true | Boolean | 是否在客户端读取本地文件,客户端读取限制文件大小最大为128MB。当为false时,在taskmanager端读取文件,必须在每个taskmanager的path存放文件 | + +put file to hdfs: + +```shell +# maka dir +hadoop fs -mkdir hdfs://ns1/test + +# put local file to hdfs +hadoop fs -put logs.json hdfs://ns1/test + +# list hdfs dir +hadoop fs -ls logs.json hdfs://ns1/test +``` + +### **Mock Source** + +mock数据源,用于生成测试数据 + +```yaml +sources: + mock_source: + type : mock + properties: + mock.desc.file.path: './mock_example.json' + rows.per.second: 1 +``` + +| 属性名 | 必填 | 默认值 | 类型 | 描述 | +|---------------------------|-----|-------|---------|----------------------------------------------------| +| **mock.desc.file.path** | Y | - | String | mock schema文件路径 | +| rows.per.second | N | 1000 | Integer | 每秒生成行数 | +| number.of.rows | N | -1 | Long | 总生成行数,默认此source是无界流(会循环从文件生成数据),当配置大于0时此source为有界流 | +| millis.per.row | N | 0 | Long | 每行生成花费毫秒数,当大于0时,rows.per.second配置不生效 | + +#### mock desc 文件配置 + +mock desc为json配置,配置每个字段的mock规则,格式: + +```json + [ + { + "name": "field_name1", + "type": "type1", + "arg": "arg" + }, + { + "name": "field_name2", + "type": "type2", + "arg": "arg" + } + +] +``` + + + +#### mock type + +| type | 参数 | 说明 | 返回数据类型 | 例子 | +|:----------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------|:--------------------------------------------------------------------------------------:|-------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| Number | min(number):range起始值(包含),默认:0。max(number):range结束值(不包含),默认:int32.max。options(array<number>):number列表,配置options则[start, end)失效,默认:null。random(boolean):随机模式,默认:true。 | 用于生成number | 根据start、end、options的值,推测返回类型为:int或bigint或double | 随机生成[0, 10000)范围的int数据:{"name":"int_random","type":"Number","min":0,"max":10000}递增方式生成[0, 10000)范围的int数据:{"name":"int_inc","type":"Number","min":0,"max":10000,"random":false}从int列表生成int数据:{"name":"int_options","type":"Number","options":[20,22,25,30]}随机生成[0, 10000)范围的double数据:{"name":"double_random","type":"Number","min":0.0,"max":10000.0} | +| Sequence | start(bigint):range起始值(包含),默认:0。step(bigint):步长,默认:1。 | 用于生成bigint序列, 类似等差数列 | bigint | 生成0,1,2...序列:{"name":"sub_id","type":"Sequence","start":0}生成0,2,4...序列:{"name":"sub_id","type":"Sequence","start":0,"step":2} | +| UniqueSequence | start(bigint):range起始值(包含),默认:0。 | 用于生成唯一bigint序列,0,1,2...和Sequence的区别: Sequence每个线程单独生成序列 UniqueSequence生成数字整个程序保证唯一 | bigint | 生成0,1,2...序列:{"name":"id","type":"UniqueSequence","start":0} | +| String | regex(string):根据正则随机生成符合正则的字符串,默认:[a-zA-Z]{0,5}。options(array`<string>`):string列表,配置options则regex失效,默认:null。random(boolean):随机模式,默认:true。 | 用于生成string | string | 随机生成长度我5-10的小写英文字符串:{"name":"str_regex","type":"String","regex":"[a-z]{5,10}"}从string列表生成string数据:{"name":"str_options","type":"String","options":["a","b","c"]} | +| Timestamp | unit(string):second或millis,生成秒或者毫秒时间戳,默认:second。 | 用于生成时间戳(当前时间) | bigint | 生成unix时间戳:{"name":"timestamp","type":"Timestamp"}生成毫秒时间戳:{"name":"timestamp_ms","type":"Timestamp","unit":"millis"} | +| FormatTimestamp | format(string):format,默认:yyyy-MM-dd HH:mm:ss。utc(boolean):使用utc时区,默认:false,当地时区。 | 用于生成时间字符串(当前时间) | string | 生成时间字符串:{"name":"timestamp_str","type":"FormatTimestamp","format":"yyyy-MM-dd HH:mm:ss"}生成时间字符串:{"name":"timestamp_str","type":"FormatTimestamp","format":"yyyy-MM-dd HH:mm:ss.SSS"} | +| IPv4 | start(string):起始值ip(包含),默认:0.0.0.0。end(string):结束ip(包含),默认:255.255.255.255。 | 用于生成start-end范围的ip地址 | string | 随机生成192.168.20.1-192.168.20.255范围的ip地址:{"name":"ip","type":"IPv4","start":"192.168.20.1","end":"192.168.20.255"} | +| Expression | expression(string):Datafaker expression,默认:null。 | 用于使用datafaker库的expression生成随机字符串文档:https://www.datafaker.net/documentation/expressions | string | 生成人名:{"name":"name","type":"Expression","expression":"#{[Name.name](http://Name.name)}"}生成邮箱地址:{"name":"emailAddress","type":"Expression","expression":"#{internet.emailAddress}"} | +| Hlld | itemCount(bigint):总基数(总唯一元素数量),默认:1000000。batchCount(int):每次生成的hll随机添加的元素数量,默认:10000。precision(int):hll的精度,范围[4, 18],默认:12。 | 用于生成Hlld Sketch,hll算法的一种实现 | string(字节数组的base64) | 生成ip hll 每次大约包含1000个ip:{ "name": "ip_cnt", "type": "Hlld", "itemCount": 100000, "batchCount": 1000 } | +| HdrHistogram | max(bigint):histogram最大值,默认:100000。batchCount(int):每次生成的histogram随机添加的元素数量,默认:1000。numberOfSignificantValueDigits(int):histogram的精度,范围[1, 5],默认:1。 | 用于生成HdrHistogram Sketch,一种分位数Histogram Sketch | string(字节数组的base64) | 生成延时的Histogram,每次包含1000个ms延时: { "name": "ms_his", "type": "HdrHistogram", "max": 100000, "batchCount": 1000} | +| Eval | expression(string):AviatorScript expression,默认:null。 | 计算列,通过其它列计算值AviatorScript文档:https://www.yuque.com/boyan-avfmj/aviatorscript | 返回类型依赖expression,可能为任何类型 | 根据已有的in_bytes(bigint), out_bytes(bigint)列计算bytes(bigint)列其值为其它两个的和:{"name": "bytes", "type": "Eval", "expression": "in_bytes + out_bytes"} | +| Object | fields(array):每个字段的生成规则,可以使用所有type,默认:null。 | 用于生成struct/object类型fields内容和mock desc文件根配置一样,描述每个字段的生成规则 | struct/object | 生成object:{"name":"object","type":"Object","fields":[{"name":"str","type":"String","regex":"[a-z]{5,10}","nullRate":0.1},{"name":"cate","type":"String","options":["a","b","c"]}]} | +| Union | unionFields(array):每组字段生成规则,默认:null。每个元素的字段:fields(array):和Object配置一样weight(int):此组字段权重,根据权重按照比例生成数据random(boolean):随机模式,默认:true。 | 用于生成有关联的字段 | 各个字段配置类型 | 生成object_id、item_id字段,当object_id = 10时,item_id从[1, 2, 3, 4, 5]生成数据,当object_id = 20时,item_id从[6, 7]生成数据,第一种数据占比5/7,第二种数据占比2/7 | + +- Union 举例 + +```json +{ + "name": "unionFields", + "type": "Union", + "random": false, + "unionFields": [ + { + "weight": 5, + "fields": [ + { + "name": "object_id", + "type": "Number", + "options": [10] + }, + { + "name": "item_id", + "type": "Number", + "options": [1, 2, 3, 4, 5], + "random": false + } + ] + }, + { + "weight": 2, + "fields": [ + { + "name": "object_id", + "type": "Number", + "options": [20] + }, + { + "name": "item_id", + "type": "Number", + "options": [6, 7], + "random": false + } + ] + } + ] +} +``` + +type通用参数: + +| 参数 | 说明 | 例子 | +|-------------------------|-----------------------------------------|-----------------------------------------------------------------------------------------------------------------| +| nullRate(double) | 生成数据null值比率,默认是1,没有null值。 | 随机生成字符串,null值占10%:{"name":"str_regex","type":"String","regex":"[a-z]{5,10}","nullRate":0.1} | +| array(double) | 是否是数组类型,默认false。 | 生成数组字符串:{"name":"array_str","type":"String","regex":"[a-z]{5,10}","array":true,"arrayLenMin":1,"arrayLenMax":3} | +| arrayLenMin(int) | 数组最小长度(包含),默认0。array属性为true时才生效。 | | +| arrayLenMax(int) | 数组最大长度(包含),默认5。array属性为true时才生效。 | | + +#### mock 示例 + +**各个类型生成查看** + +配置: + +```json +[ + { + "name": "id", + "type": "UniqueSequence", + "start": 0 + }, + { + "name": "sub_id", + "type": "Sequence", + "start": 0 + }, + { + "name": "int_random", + "type": "Number", + "min": 0, + "max": 10000 + }, + { + "name": "int_inc", + "type": "Number", + "min": 0, + "max": 10000, + "random": false + }, + { + "name": "int_options", + "type": "Number", + "options": [20, 22, 25, 30], + "random": true + }, + { + "name": "int_options_round_robin", + "type": "Number", + "options": [20, 22, 25, 30], + "random": false + }, + { + "name": "double_random", + "type": "Number", + "min": 0.0, + "max": 10000.0 + }, + { + "name": "str_regex", + "type": "String", + "regex": "[a-z]{5,10}", + "nullRate": 0.1 + }, + { + "name": "str_options", + "type": "String", + "options": ["a", "b", "c"] + }, + { + "name": "str_options_round_robin", + "type": "String", + "options": ["a", "b", "c"], + "random": false + }, + { + "name": "timestamp", + "type": "Timestamp" + }, + { + "name": "timestamp_ms", + "type": "Timestamp", + "unit": "millis" + }, + { + "name": "timestamp_str", + "type": "FormatTimestamp", + "format": "yyyy-MM-dd HH:mm:ss" + }, + { + "name": "ip", + "type": "IpV4", + "start": "192.168.20.1", + "end": "192.168.20.255" + }, + { + "name": "array_str", + "type": "String", + "options": ["a", "b", "c"], + "array": true, + "arrayLenMin": 1, + "arrayLenMax": 3 + }, + { + "name": "array_object", + "type": "Object", + "fields": [ + { + "name": "str", + "type": "String", + "regex": "[a-z]{5,10}", + "nullRate": 0.1 + }, + { + "name": "name", + "type": "Expression", + "expression": "#{Name.name}" + }, + { + "name": "emailAddress", + "type": "Expression", + "expression": "#{internet.emailAddress}" + } + ] + } +] +``` + +生成数据: + +``` +{"id":0,"sub_id":0,"int_random":7604,"int_inc":0,"int_options":30,"int_options_round_robin":20,"double_random":2329.3205359759163,"str_regex":"wxzrpn","str_options":"b","str_options_round_robin":"a","timestamp":1717493414,"timestamp_ms":1717493414603,"timestamp_str":"2024-06-04 17:30:14","ip":"192.168.20.24","array_str":["b"],"array_object":{"str":"wvrzqde","name":"Berry Gorczany","emailAddress":"[email protected]"}} +{"id":1,"sub_id":1,"int_random":5760,"int_inc":1,"int_options":30,"int_options_round_robin":22,"double_random":9644.141255418077,"str_regex":"oadbz","str_options":"a","str_options_round_robin":"b","timestamp":1717493415,"timestamp_ms":1717493415603,"timestamp_str":"2024-06-04 17:30:15","ip":"192.168.20.127","array_str":["c"],"array_object":{"str":"bkcwtpl","name":"Alba Gottlieb","emailAddress":"[email protected]"}} +{"id":2,"sub_id":2,"int_random":3775,"int_inc":2,"int_options":20,"int_options_round_robin":25,"double_random":9573.948656302768,"str_regex":"rlhtrk","str_options":"b","str_options_round_robin":"c","timestamp":1717493416,"timestamp_ms":1717493416603,"timestamp_str":"2024-06-04 17:30:16","ip":"192.168.20.20","array_str":["b"],"array_object":{"name":"Celestina O'Reilly","emailAddress":"[email protected]"}} +{"id":3,"sub_id":3,"int_random":7877,"int_inc":3,"int_options":22,"int_options_round_robin":30,"double_random":8921.757584727951,"str_regex":"spydx","str_options":"c","str_options_round_robin":"a","timestamp":1717493417,"timestamp_ms":1717493417603,"timestamp_str":"2024-06-04 17:30:17","ip":"192.168.20.218","array_str":["a","a"],"array_object":{"name":"Dr. Nichole McGlynn","emailAddress":"[email protected]"}} +{"id":4,"sub_id":4,"int_random":8248,"int_inc":4,"int_options":30,"int_options_round_robin":20,"double_random":4105.3600047674545,"str_regex":"rbjelg","str_options":"b","str_options_round_robin":"b","timestamp":1717493418,"timestamp_ms":1717493418602,"timestamp_str":"2024-06-04 17:30:18","ip":"192.168.20.146","array_str":["b"],"array_object":{"str":"ekbyer","name":"Raul Leannon","emailAddress":"[email protected]"}} +{"id":5,"sub_id":5,"int_random":3663,"int_inc":5,"int_options":22,"int_options_round_robin":22,"double_random":7486.737315942628,"str_regex":"qyqqiyj","str_options":"c","str_options_round_robin":"c","timestamp":1717493419,"timestamp_ms":1717493419610,"timestamp_str":"2024-06-04 17:30:19","ip":"192.168.20.90","array_str":["c","b"],"array_object":{"str":"dbepb","name":"Moshe Powlowski","emailAddress":"[email protected]"}} +{"id":6,"sub_id":6,"int_random":6967,"int_inc":6,"int_options":22,"int_options_round_robin":25,"double_random":6742.751027323034,"str_regex":"slfghf","str_options":"a","str_options_round_robin":"a","timestamp":1717493420,"timestamp_ms":1717493420602,"timestamp_str":"2024-06-04 17:30:20","ip":"192.168.20.72","array_str":["b","b"],"array_object":{"name":"Alvera Graham","emailAddress":"[email protected]"}} +{"id":7,"sub_id":7,"int_random":5340,"int_inc":7,"int_options":25,"int_options_round_robin":30,"double_random":7259.505902869291,"str_regex":"yarcof","str_options":"c","str_options_round_robin":"b","timestamp":1717493421,"timestamp_ms":1717493421614,"timestamp_str":"2024-06-04 17:30:21","ip":"192.168.20.44","array_str":["a"],"array_object":{"str":"dxianwxv","name":"Pedro Kerluke","emailAddress":"[email protected]"}} +{"id":8,"sub_id":8,"int_random":8365,"int_inc":8,"int_options":25,"int_options_round_robin":20,"double_random":7142.049302311821,"str_options":"c","str_options_round_robin":"c","timestamp":1717493422,"timestamp_ms":1717493422603,"timestamp_str":"2024-06-04 17:30:22","ip":"192.168.20.197","array_str":["b"],"array_object":{"str":"mximiyd","name":"Herman Runte","emailAddress":"[email protected]"}} +{"id":9,"sub_id":9,"int_random":5944,"int_inc":9,"int_options":30,"int_options_round_robin":22,"double_random":1420.8479774375382,"str_regex":"eahpq","str_options":"b","str_options_round_robin":"a","timestamp":1717493423,"timestamp_ms":1717493423602,"timestamp_str":"2024-06-04 17:30:23","ip":"192.168.20.44","array_str":["a","a","b"],"array_object":{"str":"kseeqicxuh","name":"Kaitlyn Douglas","emailAddress":"[email protected]"}} +{"id":10,"sub_id":10,"int_random":9357,"int_inc":10,"int_options":30,"int_options_round_robin":25,"double_random":2451.2488213660886,"str_regex":"agwxbf","str_options":"b","str_options_round_robin":"b","timestamp":1717493424,"timestamp_ms":1717493424607,"timestamp_str":"2024-06-04 17:30:24","ip":"192.168.20.19","array_str":["b","c"],"array_object":{"str":"iidogsi","name":"Luigi McClure PhD","emailAddress":"[email protected]"}} +``` + +**object类型以及Union类型生成** + +配置: + +```json +[ + { "name": "name", "type": "String", "options": ["object_statistics"] }, + { "name": "timestamp_ms", "type": "Timestamp", "unit": "millis" }, + { "name": "tags", "type": "Object", "fields": [ + { "name": "vsys_id", "type": "Number", "options": [1] }, + { "name": "template_id", "type": "Number", "options": [1] }, + { "name": "chart_id", "type": "Number", "options": [1] }, + { "name": "version", "type": "Number", "options": [1] }, + { "name": "unionFields", "type": "Union", "unionFields": [ + { "weight": 2, "fields": [ + { "name": "object_type", "type": "String", "options": ["ip"] }, + { "name": "object_id", "type": "Number", "options": [7562] }, + { "name": "item_id", "type": "Number", "options": [7835, 7819] } + ] + }, + { "weight": 2, "fields": [ + { "name": "object_type", "type": "String", "options": ["fqdn"] }, + { "name": "object_id", "type": "Number", "options": [13087] }, + { "name": "item_id", "type": "Number", "options": [229604,229603] } + ] + } + ] + } + ] + }, + { "name": "fields", "type": "Object", "fields": [ + { "name": "in_bytes", "type": "Number", "min": 10000, "max": 200000}, + { "name": "out_bytes", "type": "Number", "min": 10000, "max": 200000}, + { "name": "new_in_sessions", "type": "Number", "min": 10, "max": 200}, + { "name": "new_out_sessions", "type": "Number", "min": 10, "max": 200} + ] + } +] +``` + +生成数据: + +``` +{"name":"object_statistics","timestamp_ms":1717573879804,"tags":{"vsys_id":1,"template_id":1,"chart_id":1,"version":1,"object_type":"ip","object_id":7562,"item_id":7819},"fields":{"in_bytes":47083,"out_bytes":68389,"new_in_sessions":142,"new_out_sessions":92}} +{"name":"object_statistics","timestamp_ms":1717573879807,"tags":{"vsys_id":1,"template_id":1,"chart_id":1,"version":1,"object_type":"fqdn","object_id":13087,"item_id":229603},"fields":{"in_bytes":81118,"out_bytes":107287,"new_in_sessions":98,"new_out_sessions":86}} +{"name":"object_statistics","timestamp_ms":1717573879808,"tags":{"vsys_id":1,"template_id":1,"chart_id":1,"version":1,"object_type":"ip","object_id":7562,"item_id":7835},"fields":{"in_bytes":61395,"out_bytes":111095,"new_in_sessions":87,"new_out_sessions":149}} +{"name":"object_statistics","timestamp_ms":1717573879808,"tags":{"vsys_id":1,"template_id":1,"chart_id":1,"version":1,"object_type":"fqdn","object_id":13087,"item_id":229603},"fields":{"in_bytes":145986,"out_bytes":12166,"new_in_sessions":169,"new_out_sessions":127}} +{"name":"object_statistics","timestamp_ms":1717573880806,"tags":{"vsys_id":1,"template_id":1,"chart_id":1,"version":1,"object_type":"fqdn","object_id":13087,"item_id":229604},"fields":{"in_bytes":112797,"out_bytes":120310,"new_in_sessions":12,"new_out_sessions":177}} +{"name":"object_statistics","timestamp_ms":1717573880806,"tags":{"vsys_id":1,"template_id":1,"chart_id":1,"version":1,"object_type":"fqdn","object_id":13087,"item_id":229604},"fields":{"in_bytes":180960,"out_bytes":118214,"new_in_sessions":106,"new_out_sessions":73}} +{"name":"object_statistics","timestamp_ms":1717573880806,"tags":{"vsys_id":1,"template_id":1,"chart_id":1,"version":1,"object_type":"ip","object_id":7562,"item_id":7819},"fields":{"in_bytes":91394,"out_bytes":105840,"new_in_sessions":74,"new_out_sessions":177}} +{"name":"object_statistics","timestamp_ms":1717573880806,"tags":{"vsys_id":1,"template_id":1,"chart_id":1,"version":1,"object_type":"ip","object_id":7562,"item_id":7835},"fields":{"in_bytes":79266,"out_bytes":95721,"new_in_sessions":50,"new_out_sessions":88}}``` +``` + +### Inline Source + +用于简单测试format,function,sink等,这个source每个1s发送一条配置的data数据 + +```yaml +sources: + inline_source: + type : inline + fields: + - name: log_id + type: bigint + - name: recv_time + type: bigint + - name: client_ip + type: string + properties: + # 单条数据 + data: '{"log_id": 1, "recv_time":"111", "client_ip":"192.168.0.1"}' + # 多条数据 + # data: '[{"log_id": 1, "recv_time":"111", "client_ip":"192.168.0.1"}, {"log_id": 2, "recv_time":"222", "client_ip":"192.168.0.2"}]' + # data: '["1,111,192.168.0.1", "2,222,192.168.0.2"]' + format: json + json.ignore.parse.errors: false +``` + +| 属性名 | 必填 | 默认值 | 类型 | 描述 | +|-------------------|----|--------|----------|---------------------------------------------------| +| **data** | Y | - | String | source发送的数据,如果是json array形式则当做单独解析发送array每个元素 | +| **format** | Y | - | String | 使用的format | +| **type** | N | string | String | 数据类型:string(UTF8字符串),hex(十六进制编码),base64(base64编码) | +| interval.per.row | N | 1s | Duration | 发送每行数据间隔时间 | +| repeat.count | N | -1 | Integer | 重复发送data测试,负数则一直循环重复发送 | +| format properties | N | - | String | format properties配置,key为format值.+原始key | + +## 过滤器(Filters) + +```yaml +filters: + http_filter: + type: aviator + properties: + expression: event.decoded_as == 'HTTP' && event.server_port = 80 +``` + +| 属性名 | 默认值 | 类型 | 必填 | 描述 | +|----------------|-----|--------|----|------------------------------------| +| **name** | - | String | Y | 过滤器名称,唯一标识,用于任务编排。例如:“http_filter“ | +| **type** | - | String | Y | 数据源类型。例如:aviator | +| **properties** | | | | | +| expression | - | String | N | 基于AviatorScript语法,过滤符合条件的事件; | + +## 分流器(Splits) + +```yaml +splits: + decoded_as_split: + type: split + rules: + - tag: http_tag + expression: event.decoded_as == 'HTTP' + - tag: dns_tag + expression: event.decoded_as == 'DNS' +``` + +| 属性名 | 默认值 | 类型 | 必填 | 描述 | +|------------|-----|--------|----|-----------------------------------------| +| **name** | - | String | Y | 过滤器名称,唯一标识,用于任务编排。例如:“decode_as_filter“ | +| **type** | - | String | Y | 数据源类型。例如:split | +| **rules** | | | | | +| tag | - | String | Y | 分流标记,同时需要在topology中配置,具体参见任务编排 | +| expression | - | String | Y | 基于AviatorScript语法,将符合条件的数据分流至下游算子; | + + +## 任务处理器 (Processors) + +### Pre-processing Pipeline + +```yaml +pre_processing_pipelines: + common_pre_processor: + type: projection + output_fields: [] + functions: + - function: CURRENT_UNIX_TIMESTAMP + lookup_fields: [] + output_fields: [processing_time] + parameters: + precision: milliseconds +``` + +### Processing Pipeline + +```yaml +processing_pipelines: + session_record_processor: + type: projection + output_fields: [] + functions: + - function: DOMAIN + lookup_fields: [http_host, ssl_sni, quic_sni] + output_fields: [server_domain] + option: FIRST_SIGNIFICANT_SUBDOMAIN + - function: ASN_LOOKUP + lookup_fields: [server_ip] + output_fields: [server_asn] + parameters: + option: IP_TO_ASN + vendor_id: tsg_asnlookup + - name: BASE64_DECODE_TO_STRING + lookup_fields: [mail_subject,mail_subject_charset] + output_fields: [mail_subject] + aggregate_processor: + type: aggregate + group_by_fields: [server_ip,server_port,client_ip,client_port] + window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time + window_size: 60 + window_slide: 10 #滑动窗口步长 + mini_batch: true #是否开启预聚合优化 + functions: + - function: NUMBER_SUM + lookup_fields: [ sent_pkts ] + output_fields: [ sent_pkts_sum ] + table_processor: + type: table + functions: + - function: JSON_UNROLL + lookup_fields: [ device_tag ] + output_fields: [ new_name2 ] + parameters: + path: tags + new_path: newtags +``` + +#### Projection Processor + +| 属性名 | 默认值 | 类型 | 必填 | 描述 | +|---------------|-----|---------------|----|-------------------------| +| name | - | String | Y | Processor名称,唯一标识,用于任务编排 | +| type | - | String | Y | 数据源类型:projection | +| output_fields | - | Array(String) | N | 输出指定字段,默认发送全部字段。 | +| remove_fields | - | Array(String) | N | 删除指定字段,默认为空。 | +| functions | - | List(UDF) | Y | 自定义函数列表 | +| | | | | | + +#### Aggregate Processor + +| 属性名 | 默认值 | 类型 | 必填 | 描述 | +|------------------------|-----|------------|----|------------------------------------------------------------------------------------------------| +| name | - | String | Y | Processor名称,唯一标识,用于任务编排 | +| type | - | String | Y | 数据源类型:aggregate | +| group_by_fields | - | Integer | Y | 聚合的维度列 | +| window_type | - | Enum | Y | 时间窗口类型:tumbling_processing_time,tumbling_event_time,sliding_processing_time,sliding_event_time | +| window_size | - | Integer | Y | 窗口的大小,单位秒 | +| window_slide | - | Integer | N | 滑动窗口需要指定滑动步长,单位秒 | +| window_timestamp_field | - | String | N | 窗口开始的时间戳(ms)做为value输出的字段名 | +| mini_batch | - | Boolean | N | 默认为false,是否开启预聚合优化,在按照key进行聚合之前,先在本地进行汇聚,进而降低网络传输数据量 | +| functions | - | List(UDAF) | Y | 自定义函数列表 | + +#### Table Processor + +| 属性名 | 默认值 | 类型 | 必填 | 描述 | +|-----------|-----|------------|----|-------------------------| +| name | - | String | Y | Processor名称,唯一标识,用于任务编排 | +| type | - | String | Y | 数据源类型:table | +| functions | - | List(UDTF) | Y | 自定义函数列表 | + +## 输出(Sinks) + +### Sink通用配置 + +```yaml +sinks: + kafka_sink: + # sink标识 + type: kafka + # sink schema + # schema: + # sink属性配置 + properties: + prop_key1: prop_value1 + prop_key2: prop_value2 + #... +``` + + + +| 属性名 | 必填 | 默认值 | 类型 | 描述 | +|----------------|-------|---------|---------|------------------| +| **type** | Y | - | String | sink唯一标识 | +| `schema` | N | - | Map | 同source schema | +| properties | Y | - | Object | sink属性配置 | + +### Kafka Sink + +```yaml +sinks: + kafka_sink: + type: kafka + properties: + topic: SESSION-RECORD-JSON + kafka.bootstrap.servers: 192.168.44.12:9092 + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + format: json +``` + +| 属性名 | 必填 | 默认值 | 类型 | 描述 | +|------------------------------------|----|--------|----------|--------------------------------------------------------| +| **topic** | Y | - | String | Kafka Topic名称。 | +| **kafka.bootstrap.servers** | Y | - | String | Kafka Broker 地址 | +| **format** | Y | - | String | format,用来序列化消息JSONProtobufCSV... | +| log.failures.only | N | true | Boolean | producer发生error时只打印日志, 否则抛出异常程序停止(重试) | +| rate.limiting.strategy | N | none | String | 限速策略:none:不限速(默认)sliding_window:限速,使用滑动窗口计算速率 | +| rate.limiting.limit.rate | N | 10Mbps | String | 限制的最大速率:单位必须是Mbps、Kbps、bps,例如:10Mbps, 10Kbps, 10240bps | +| rate.limiting.window.size | N | 5 | Integer | 窗口大小,单位秒 | +| rate.limiting.block.duration | N | 5min | Duration | 对首次超出限流数据阻塞,最长阻塞多长时间后超出限流数据全部丢弃 | +| rate.limiting.block.reset.duration | N | 30s | Duration | 超速阻塞后速率恢复正常多长时间后重置超速阻塞状态 | +| Kafka properties | N | - | String | kafka consumer/producer properties配置,key为kafka.+原始key | +| format properties | N | - | String | format properties配置,key为format值.+原始key | + +### ClickHouse Sink + +```yaml +sinks: + clickhouse_sink: + type: clickhouse + properties: + host: 192.168.40.222:9001,192.168.40.223:9001 + table: tsg_galaxy_v3.session_record_local_old + batch.size: 100000 + batch.interval: 30s + connection.user: default + connection.password: galaxy2019 +``` + +| 属性名 | 必填 | 默认值 | 类型 | 描述 | +|----------------------------|----|---------|------------|---------------------------------------------------------------| +| **host** | Y | - | String | clickhouse host和tcp port信息。格式:host1:port,host2:port ...。 | +| **table** | Y | - | String | clickhouse table name. | +| **batch.size** | N | 100000 | Integer | 最大flush size,超过size会立刻flush。 | +| **batch.byte.size** | N | 200mb | MemorySize | 最大flush buffer字节大小,超过会立刻flush。 | +| **batch.interval** | N | 30s | Duration | 最大flush间隔,超过会立刻flush。 | +| connection.user | Y | - | String | clickhouse 连接 用户名 | +| connection.password | Y | - | String | clickhouse 连接 密码 | +| connection.database | N | default | String | clickhouse 连接 默认数据库 | +| connection.connect_timeout | N | 30 | Integer | 连接超时(单位秒) | +| connection.query_timeout | N | 300 | Integer | 查询超时(单位秒) | +| connection properties | N | - | String | clickhouse jdbc connection properties配置,key为connection.+原始key | + +### Print Sink + +用来测试的sink,把元素输出到标准输出或输出日志。 + +```yaml +sinks: + print_sink: + type: print + properties: + format: json +``` + +| 属性名 | 必填 | 默认值 | 类型 | 描述 | +|-------------------|----|--------|--------|----------------------------------------| +| **format** | Y | - | String | format,用来序列化消息 | +| **mode** | N | stdout | Enum | 输出模式,可选值:stdout,log_info,log_warn,null | +| format properties | N | - | String | format properties配置,key为format值.+原始key | + +## Formats + +### JSON + +```yaml +sources: + kafka_source: + type : kafka + properties: + topic: SESSION-RECORD-COMPLETED + kafka.bootstrap.servers: 192.168.44.11:9092 + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.group.id: SESSION-RECORD-COMPLETED-GROUP-GROOT-STREAM-20231021 + kafka.auto.offset.reset: latest + format: json + json.ignore.parse.errors: true +``` + +| 属性名 | 必填 | 默认值 | 类型 | 描述 | +|---------------------|----|-------|---------|------------------------| +| ignore.parse.errors | N | false | Boolean | json解析时发生错误时忽略,否则抛出异常。 | + +### MessagePack + +```yaml +kafka_source_msgpack: + type : kafka + properties: + topic: msgpack-test + format: msgpack + kafka.bootstrap.servers: 192.168.44.12:9092 + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.group.id: msgpack-test + kafka.auto.offset.reset: latest + +inline_source_msgpack: + type : inline + properties: + data: g6Zsb2dfaWQBqXJlY3ZfdGltZc5mF3f5qWNsaWVudF9pcKsxOTIuMTY4LjAuMQ== + type: base64 + format: msgpack +``` + +- 只需要指定format为msgpack,没有其它的参数。 + +- 支持所有数据类型的解析,包括复杂数据类型struct,array,以及binary。 + +### Protobuf + +```yaml +sources: + inline_source_protobuf: + type : inline + properties: + data: CIin2awGEICAoLC/hYzKAhoEQkFTRSCch8z3wtqEhAQo6o/Xmc0xMMCy15nNMTjWIkDRCEiIp9msBlCIp9msBloIMjE0MjYwMDNg//8DaP//A3JqeyJ0YWdzIjpbeyJ0YWciOiJkYXRhX2NlbnRlciIsInZhbHVlIjoiY2VudGVyLXh4Zy05MTQwIn0seyJ0YWciOiJkZXZpY2VfZ3JvdXAiLCJ2YWx1ZSI6Imdyb3VwLXh4Zy05MTQwIn1dfXoPY2VudGVyLXh4Zy05MTQwggEOZ3JvdXAteHhnLTkxNDCKAQ0xOTIuMTY4LjQwLjgxkAEEmAEBoAEBqAGQwAGyAQdbMSwxLDJd4gEDt+gY4gINMTkyLjU2LjE1MS44MOgCoeYD8gIHV2luZG93c/oCGOe+juWbvS5Vbmtub3duLlVua25vd24uLrIDDTE5Mi41Ni4yMjIuOTO4A/ZwwgMFTGludXjKAxjnvo7lm70uVW5rbm93bi5Vbmtub3duLi6SBAN0Y3CaBBFFVEhFUk5FVC5JUHY0LlRDULAMBLgMBcAM9gHIDJEOoA2AAagN8cr+jgKwDezksIAPwg0RYTI6ZmE6ZGM6NTY6Yzc6YjPKDRE0ODo3Mzo5Nzo5NjozODoyMNINETQ4OjczOjk3Ojk2OjM4OjIw2g0RYTI6ZmE6ZGM6NTY6Yzc6YjM= + type: base64 + format: protobuf + protobuf.descriptor.file.path: ./config/session_record_test.desc + protobuf.message.name: SessionRecord +``` + + + +| 属性名 | 必填 | 默认值 | 类型 | 描述 | +|----------------------|----|-------|---------|-----------------------------------------------------------------| +| descriptor.file.path | Y | - | String | The Protobuf descriptor file path. | +| message.name | Y | - | String | The protobuf MessageName to look for in the descriptor file. | +| ignore.parse.errors | N | false | Boolean | protobuf解析时发生错误时忽略,否则抛出异常。 | +| emit.default.values | N | false | Boolean | protobuf解析时是否设置默认值。不建议配置,严重影响性能。基本数据类型建议使用optional配置来显式处理null值。 | + +protobuf 类型与内置类型对应表: + +| protobuf类型 | 类型(原始对应类型) | 可以转换的类型 | 描述 | +|--------------------------------------|------------|----------------------------------------------|---------------------------------------------------------------------| +| int3,uint32,sint32,fixed32,sfixed32 | int | int, bigint, float, double(序列化时支持string类型转换) | 建议使用int32 其次使用sint32,不建议使用uint32(java读取出来是int类型 第一位代表符号位,可能读取出来是负数) | +| int64,uint64,sint64,fixed64,sfixed64 | bigint | int, bigint, float, double(序列化时支持string类型转换) | 建议使用int64,其次使用sint64 | +| float | float | int, bigint, float, double(序列化时支持string类型转换) | 建议使用double | +| double | double | int, bigint, float, double(序列化时支持string类型转换) | 建议使用double | +| bool | boolean | boolean, int(0为false, 非0为true) | 不建议使用bool,使用int32代替 | +| enum | int | int | 不建议使用enum,使用int32代替 | +| string | string | string(序列化时支持所有类型,调用toString方法) | | +| bytes | binary | binary | | +| message (结构体类型) | struct | struct | | +| repeated type (数组类型) | array | array | | + +protobuf format使用步骤: + +1. 定义proto文件(只支持proto3语法),int double等数值类型有null值时添加optional,建议int double总是添加optional选项。 +2. 生成desc二进制文件(使用23.4版本) + +示例:定义proto文件 + +``` +syntax = "proto3"; + +// [START java_declaration] +// option java_multiple_files = true; +option java_package = "com.geedgenetworks.proto"; +option java_outer_classname = "SessionRecordProtos"; +// [END java_declaration] + +message SessionRecord { + optional int64 recv_time = 1; + optional int64 log_id = 2; + string decoded_as = 3; + optional int64 session_id = 4; + optional int64 start_timestamp_ms = 5; + optional int64 end_timestamp_ms = 6; + optional int32 duration_ms = 7; + optional int32 tcp_handshake_latency_ms = 8; + optional int64 ingestion_time = 9; + optional int64 processing_time = 10; + string device_id = 11; + optional int32 out_link_id = 12; + optional int32 in_link_id = 13; + string device_tag = 14; + string data_center = 15; + string device_group = 16; + string sled_ip = 17; + optional int32 address_type = 18; + optional int32 vsys_id = 19; + optional int32 t_vsys_id = 20; + optional int64 flags = 21; + string flags_identify_info = 22; + repeated int64 security_rule_list = 23; + string security_action = 24; + repeated int64 monitor_rule_list = 25; + repeated int64 shaping_rule_list = 26; + repeated int64 proxy_rule_list = 27; + repeated int64 statistics_rule_list = 28; + repeated int64 sc_rule_list = 29; + repeated int64 sc_rsp_raw = 30; + repeated int64 sc_rsp_decrypted = 31; + string proxy_action = 32; + optional int32 proxy_pinning_status = 33; + optional int32 proxy_intercept_status = 34; + string proxy_passthrough_reason = 35; + optional int32 proxy_client_side_latency_ms = 36; + optional int32 proxy_server_side_latency_ms = 37; + string proxy_client_side_version = 38; + string proxy_server_side_version = 39; + optional int32 proxy_cert_verify = 40; + string proxy_intercept_error = 41; + optional int32 monitor_mirrored_pkts = 42; + optional int32 monitor_mirrored_bytes = 43; + string client_ip = 44; + optional int32 client_port = 45; + string client_os_desc = 46; + string client_geolocation = 47; + optional int64 client_asn = 48; + string subscriber_id = 49; + string imei = 50; + string imsi = 51; + string phone_number = 52; + string apn = 53; + string server_ip = 54; + optional int32 server_port = 55; + string server_os_desc = 56; + string server_geolocation = 57; + optional int64 server_asn = 58; + string server_fqdn = 59; + string server_domain = 60; + string app_transition = 61; + string app = 62; + string app_debug_info = 63; + string app_content = 64; + repeated int64 fqdn_category_list = 65; + string ip_protocol = 66; + string decoded_path = 67; + optional int32 dns_message_id = 68; + optional int32 dns_qr = 69; + optional int32 dns_opcode = 70; + optional int32 dns_aa = 71; + optional int32 dns_tc = 72; + optional int32 dns_rd = 73; + optional int32 dns_ra = 74; + optional int32 dns_rcode = 75; + optional int32 dns_qdcount = 76; + optional int32 dns_ancount = 77; + optional int32 dns_nscount = 78; + optional int32 dns_arcount = 79; + string dns_qname = 80; + optional int32 dns_qtype = 81; + optional int32 dns_qclass = 82; + string dns_cname = 83; + optional int32 dns_sub = 84; + string dns_rr = 85; + optional int32 dns_response_latency_ms = 86; + string http_url = 87; + string http_host = 88; + string http_request_line = 89; + string http_response_line = 90; + string http_request_body = 91; + string http_response_body = 92; + optional int32 http_proxy_flag = 93; + optional int32 http_sequence = 94; + string http_cookie = 95; + string http_referer = 96; + string http_user_agent = 97; + optional int64 http_request_content_length = 98; + string http_request_content_type = 99; + optional int64 http_response_content_length = 100; + string http_response_content_type = 101; + string http_set_cookie = 102; + string http_version = 103; + optional int32 http_status_code = 104; + optional int32 http_response_latency_ms = 105; + optional int32 http_session_duration_ms = 106; + optional int64 http_action_file_size = 107; + string ssl_version = 108; + string ssl_sni = 109; + string ssl_san = 110; + string ssl_cn = 111; + optional int32 ssl_handshake_latency_ms = 112; + string ssl_ja3_hash = 113; + string ssl_ja3s_hash = 114; + string ssl_cert_issuer = 115; + string ssl_cert_subject = 116; + optional int32 ssl_esni_flag = 117; + optional int32 ssl_ech_flag = 118; + string dtls_cookie = 119; + string dtls_version = 120; + string dtls_sni = 121; + string dtls_san = 122; + string dtls_cn = 123; + optional int32 dtls_handshake_latency_ms = 124; + string dtls_ja3_fingerprint = 125; + string dtls_ja3_hash = 126; + string dtls_cert_issuer = 127; + string dtls_cert_subject = 128; + string mail_protocol_type = 129; + string mail_account = 130; + string mail_from_cmd = 131; + string mail_to_cmd = 132; + string mail_from = 133; + string mail_password = 134; + string mail_to = 135; + string mail_cc = 136; + string mail_bcc = 137; + string mail_subject = 138; + string mail_subject_charset = 139; + string mail_attachment_name = 140; + string mail_attachment_name_charset = 141; + string mail_eml_file = 142; + string ftp_account = 143; + string ftp_url = 144; + string ftp_link_type = 145; + string quic_version = 146; + string quic_sni = 147; + string quic_user_agent = 148; + string rdp_cookie = 149; + string rdp_security_protocol = 150; + string rdp_client_channels = 151; + string rdp_keyboard_layout = 152; + string rdp_client_version = 153; + string rdp_client_name = 154; + string rdp_client_product_id = 155; + string rdp_desktop_width = 156; + string rdp_desktop_height = 157; + string rdp_requested_color_depth = 158; + string rdp_certificate_type = 159; + optional int32 rdp_certificate_count = 160; + optional int32 rdp_certificate_permanent = 161; + string rdp_encryption_level = 162; + string rdp_encryption_method = 163; + string ssh_version = 164; + string ssh_auth_success = 165; + string ssh_client_version = 166; + string ssh_server_version = 167; + string ssh_cipher_alg = 168; + string ssh_mac_alg = 169; + string ssh_compression_alg = 170; + string ssh_kex_alg = 171; + string ssh_host_key_alg = 172; + string ssh_host_key = 173; + string ssh_hassh = 174; + string sip_call_id = 175; + string sip_originator_description = 176; + string sip_responder_description = 177; + string sip_user_agent = 178; + string sip_server = 179; + string sip_originator_sdp_connect_ip = 180; + optional int32 sip_originator_sdp_media_port = 181; + string sip_originator_sdp_media_type = 182; + string sip_originator_sdp_content = 183; + string sip_responder_sdp_connect_ip = 184; + optional int32 sip_responder_sdp_media_port = 185; + string sip_responder_sdp_media_type = 186; + string sip_responder_sdp_content = 187; + optional int32 sip_duration_s = 188; + string sip_bye = 189; + optional int32 rtp_payload_type_c2s = 190; + optional int32 rtp_payload_type_s2c = 191; + string rtp_pcap_path = 192; + optional int32 rtp_originator_dir = 193; + string stratum_cryptocurrency = 194; + string stratum_mining_pools = 195; + string stratum_mining_program = 196; + string stratum_mining_subscribe = 197; + optional int64 sent_pkts = 198; + optional int64 received_pkts = 199; + optional int64 sent_bytes = 200; + optional int64 received_bytes = 201; + optional int64 tcp_c2s_ip_fragments = 202; + optional int64 tcp_s2c_ip_fragments = 203; + optional int64 tcp_c2s_lost_bytes = 204; + optional int64 tcp_s2c_lost_bytes = 205; + optional int64 tcp_c2s_o3_pkts = 206; + optional int64 tcp_s2c_o3_pkts = 207; + optional int64 tcp_c2s_rtx_pkts = 208; + optional int64 tcp_s2c_rtx_pkts = 209; + optional int64 tcp_c2s_rtx_bytes = 210; + optional int64 tcp_s2c_rtx_bytes = 211; + optional int32 tcp_rtt_ms = 212; + optional int64 tcp_client_isn = 213; + optional int64 tcp_server_isn = 214; + string packet_capture_file = 215; + string in_src_mac = 216; + string out_src_mac = 217; + string in_dest_mac = 218; + string out_dest_mac = 219; + string tunnels = 220; + optional int32 dup_traffic_flag = 221; + string tunnel_endpoint_a_desc = 222; + string tunnel_endpoint_b_desc = 223; +} +``` + +生成desc二进制文件 + +``` +protoc --descriptor_set_out=session_record_test.desc session_record_test.proto +``` + +### Raw + +Raw format允许读写原始(字节数组)值作为单个列。主要用于不涉及修改message从kakfa到kakfa同步topic场景。只需要指定format为raw,没有其它的参数。 + +```yaml + +sources: + inline_source: + type: inline + properties: + data: 123abc + format: raw + +sinks: + print_sink: + type: print + properties: + format: raw +``` + +# 任务编排 + +```yaml +application: + env: + name: example-inline-to-print + parallelism: 3 + shade.identifier: aes + kms.type: local + pipeline: + object-reuse: true + execution: + restart: + strategy: none + properties: # job级别变量,同名情况下会覆盖全局变量 + hos.bucket.name.rtp_file: traffic_rtp_file_bucket + hos.bucket.name.http_file: traffic_http_file_bucket + hos.bucket.name.eml_file: traffic_eml_file_bucket + hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket + # RestfulAPI 取需要加密的字段,返回数据类型为Array + projection.encrypt.schema.registry.uri: 127.0.0.1:9999/v1/schema/session_record?option=encrypt_fields + topology: + - name: inline_source + downstream: [decoded_as_split] + - name: decoded_as_split + tags: [http_tag, dns_tag] #需要在分流处理器的rules中进行定义,分流规则按照数组中的顺序对应downstream中的处理器,支持Pipelines,Sinks,Filters + downstream: [ projection_processor, aggregate_processor] + - name: projection_processor + downstream: [ print_sink ] + - name: aggregate_processor + downstream: [ print_sink ] + - name: print_sink + downstream: [] +``` + +# 函数定义 + +## 内置UDF + + 函数可读取job配置文件(grootstream_job.yaml),每个函数在处理器管道中(Processor Pipeline )独立运行,互不影响。一个函数包括名称、传递数据(Event)、函数上下文信息(UDF Context) 及执行方法 evaluate(Event)。 + +- Function Name :函数名称,命名全大写单词之间用下划线分割,用于函数注册。 +- Event:处理的事件,数据组织Map<field_name, field_value> event结构。 +- UDF Context 函数执行环境上下文,包括输入数据,配置信息及其它状态信息。 + - filter :过滤表达式;String类型,默认为空,它用于筛选需要经过函数处理的事件,具体过滤方式参考AviatorScript语法。 + - lookup_fields:查找的字段;Array[String]类型,允许指定多个字段,在事件中查找字段名对应的值。 + - output_fields:输出的字段;Array[String]类型,允许指定多个字段,用于将函数执行的结果附加到事件中。如果输出字段与查找字段相匹配,它们将覆盖原有字段的值;如果不匹配,将会在日志中添加一个新字段。 + - parameters:扩展参数;选填,Map<String, Object> + + + +> 函数表达式:FUNCTION_NAME(filter, lookup_fields, output_fields[, parameters]) + +### 标量函数 + + #### ASN Lookup + +查找IP所属AS号。 + +- Parameters + - kb_name=`<string>` // 使用的知识库的名称 ,需要预先在全局配置中进行注册。 + - option = `<string>` + - IP_TO_ASN + +```yaml + - function: ASN_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_asn ] + parameters: + option: IP_TO_ASN + kb_name: tsg_ip_asn +``` + + #### Base64 Decode + +将 Base64 编码二进制数据解码转换为字符串。 + +Parameters: + +- value_field =<String> +- charset_field=<String> 可选,默认为UTF-8 + +```yaml +- function: BASE64_DECODE_TO_STRING + output_fields: [mail_subject] + parameters: + value_field: mail_subject + charset_field: mail_subject_charset +``` + + #### Base64 Encode + +将 Base64 二进制数据编码转换为字符串。 + +Parameters: + +- value_field =<String> + +```yaml +- function: BASE64_ENCODE_TO_STRING + output_fields: [packet] + parameters: + value_field: packet +``` + + #### Current Unix Timestamp + +获取系统当前时间戳。 + +- Parameters + - precision=seconds | milliseconds + +```yaml +- function: CURRENT_UNIX_TIMESTAMP + output_fields: [ processing_time ] + parameters: + precision: milliseconds + +``` + + #### Domain + +域名处理函数。 + +Parameters: + +- option = `<string>` + - TOP_LEVEL_DOMAIN 顶级域名 + - FIRST_SIGNIFICANT_SUBDOMAIN 获取二级有效域名 + - FQDN 获取FQDN + +```yaml +- function: DOMAIN + lookup_fields: [ http_host,ssl_sni,dtls_sni,quic_sni ] + output_fields: [ server_domain ] + parameters: + option: FIRST_SIGNIFICANT_SUBDOMAIN +``` + +#### Drop + +满足Filter表达式的日志增加删除标记,下游函数将不再执行,当前Projection Function 不再发送事件到下游。设置Event isDropped标记为true。 + +- 日志格式数据(无嵌套),丢弃符合过滤条件的数据 + +```shell +- function: DROP + filter: event.c2s_byte_num <10 +``` + +- 丢弃object_id为13167 数据 + +```shell +- function: DROP + filter: event.object_id == 13167 + +# Input: {"object_id":13176,"item_id":83989295} +``` + +- metrics格式数据(多级嵌套),丢弃object_id为102且item_id大于等于2的数据,或object_id等于13176且item_id大于83989294的数据 + +```shell +- function: DROP + filter: (event.tags.object_id == 102 && event.tags.item_id >= 2) || (event.tags.object_id ==13176 && event.tags.item_id >= 83989294) + +# Input: {"tags":{"object_id":13176,"item_id":83989295},"fields":{"in_bytes":1765830,"out_bytes":27446,"bytes":1793276},"timestamp_ms":1714443502000} +``` + +#### Encrypt + +对敏感信息进行加密。支持引用动态规则,获取需要加密的字段,选择是否对当前字段进行加密 + +Parameters: + +- identifier = `<string>` 加密算法唯一标识。支持:aes-128-gcm96, aes-256-gcm96, sm4-gcm96 +- default_val= `<string>` 加密失败输出该值,默认将输出原值 + +``` +- function: ENCRYPT + lookup_fields: [ phone_number ] + output_fields: [ phone_number ] + parameters: + identifier: aes-128-gcm96 +``` + +Note : 读取任务变量`projection.encrypt.schema.registry.uri`,返回加密字段,数据类型为Array。 + + #### Eval + +通过值表达式,获取符合条件的值,添加到字段中。同时可以选择保留或删除指定的字段。 + +Parameters: + +- value_expression=`<string>` 基于表达式设置字段的值,可以是一个常量 + +Example 1: 创建一个字段ingestion_time, 取自 recv_time值 + +``` +- function: EVAL + output_fields: [ ingestion_time ] + parameters: + value_expression: 'recv_time' +``` + +Example 2: 创建一个字段internal_ip, 如果flags&8=8?client_ip : server_ip + +``` +- function: EVAL + output_fields: [ internal_ip ] + parameters: + value_expression: 'flags&8=8? client_ip : server_ip' +``` + + #### Flatten + +扁平化嵌套结构使其成为顶级字段。新字段命名使用每层结构名称作为前缀,中间默认用句点“.”分隔。 + +- Parameters + - prefix= `<string>` //为扁平化的字段名称指定前缀。默认为空。 + - depth=<int> // 扁平化的嵌套级别的最大值. 设置为1,仅扁平化顶级结构。默认设置为5 + - delimiter=<String> 组合父级与子级名称的分隔符。默认为"."。 + - json_string_keys=Array[string] 标识哪些JsonString格式的数据需要扁平化。默认为空。 + +Example 1: 对Metrics的fields,tags 嵌套结构进行扁平化,如果lookup_fields为空则对所有嵌套结构进行扁平化。 + +``` +- function: FLATTEN + lookup_fields: [ tags, fields ] +``` + +Example 2: 会话日志字段encapsulation(JsonString格式)嵌套结构进行扁平化,并增加前缀tunnels,嵌套深度指定3,中间用下划线“."分隔 + +```yaml +- function: FLATTEN + lookup_fields: [ encapsulation ] + parameters: + prefix: tunnels + depth: 3 + delimiter: . + json_string_keys: [ encapsulation] + +# Output: tunnels.encapsulation.ipv4.client_ip: 192.168.4.1 +``` + + #### From Unix Timestamp + +将时间戳转换为日期类型,返回UTC日期时间格式字符串,输入支持10位和13位时间戳。 + +- Parameters + - precision=seconds // yyyy-MM-dd HH:mm:ss + - precision=milliseconds // yyyy-MM-dd HH:mm:ss:SSS + +```yaml +- function: FROM_UNIX_TIMESTAMP + lookup_fields: [recv_time] + output_fields: [recv_time_string] + parameters: + precision: seconds +``` + + #### Generate String Array + +创建字符串数组 + +```yaml +- function: GENERATE_STRING_ARRAY + lookup_fields: [ client_asn,server_asn ] + output_fields: [ asn_list ] +``` + + #### GeoIP Lookup + +查找IP地理位置信息。 + +- Parameters + - kb_name=`<string>` // 使用的知识库的名称 ,需要预先在全局配置中进行注册。 + - option = `<string>` + - IP_TO_COUNTRY 所属国家或地区 + - IP_TO_PROVINCE 所属省/州 + - IP_TO_CITY 所属城市 + - IP_TO_SUBDIVISION_ADDR 如上三级以下信息,包括区、街道等。 + - IP_TO_DETAIL 所属详情,包括如上四级,中间用英文句点分隔 + - IP_TO_LATLNG 所属经纬度,中间用英文逗号分隔 + - IP_TO_PROVIDER 所属服务提供商(ISP) + - IP_TO_JSON 返回所属位置详情,格式为JSON + - IP_TO_OBJECT 返回所属位置详情,格式为Response Object + - geolocation_field_mappingobject_key : field_name + +```yaml +- function: GEOIP_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_geolocation ] + parameters: + kb_name: tsg_ip_location + option: IP_TO_DETAIL +``` + +```yaml +- function: GEOIP_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_geolocation ] + parameters: + kb_name: tsg_ip_location + option: IP_TO_OBJECT + geolocation_field_mapping: + COUNTRY: client_country_region + PROVINCE: client_super_admin_area + +# 当option为“IP_TO_OBJECT” 时,支持字段映射(geolocation_field_mapping): +# - COUNTRY - 国家或地区 +# - PROVINCE - 省/州 +# - CITY - 城市 +# - LONGITUDE - 精度 +# - LATITUDE - 纬度 +# - ISP - 运营商 +# - ORGANIZATION - 组织 +``` + +#### HMAC + +使用密钥和消息使用哈希算法生成一个固定长度的消息摘要。HMAC(Hash-based Message Authentication Code)是一种基于哈希函数的消息认证码,用于验证数据的完整性和真实性。 + +Parameters: + +- secret_key = `<string>` 用于生成MAC的密钥。 +- algorithm= `<string>` 用于生成MAC的HASH算法。默认是`sha256` +- output_format = `<string>` 输出MAC的格式。默认为`'hex'` 。支持:`base64` | `hex `。 + +``` +- function: HMAC + lookup_fields: [ phone_number ] + output_fields: [ phone_number_hmac ] + parameters: + secret_key: ****** + output_format: base64 +``` + + + + #### JSON Extract + +解析JSON字段,通过表达式抽取json部分内容。 + +- Parameters + - value_expression=`<string>` //基于JsonPath表达式设置字段的值 + +``` +JSON_EXTRACT(null, 'device_tag', 'data_center', parameters) +- parameters: + - value_expression = $.tags[?(@.tag=='data_center')][0].value +``` + + #### Path Combine + +路径合并。 + +- Parameters + - path = Array[string] + +```yaml +- function: PATH_COMBINE + lookup_fields: [ packet_capture_file ] + output_fields: [ packet_capture_file ] + parameters: + # 获取grootstream.yaml中properties配置的对应属性hos.path的值 + path: [ props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file] + +# Output: hos_path + bucket_name + packet_capture_file +``` + + #### Rename + +重命名字段。 + +- Parameters + + - parent_fields: Array[string] 指定哪些字段的子字段将进行重命名。如果为空,则仅会对顶级字段进行重命名,不支持对数组结构中的key进行重命名。 + - rename_fields: 指定的字段进行重命名 + - current_field_name : new_field_name + - rename_expression=`<string>` 对字段执行AviatorScript表达式,返回值作为重命名后的字段名,优先级低于rename_fields。 + + + +Example 1: 移除字段名"tags_"前缀 , 重命名字段timestamp_ms为recv_time_ms + +```yaml +- function: RENAME + parameters: + rename_fields: + timestamp_ms: recv_time_ms + rename_expression: key=string.replace_all(key,'tags_',''); return key; +``` + +Example 2: client_ip 重命名为source_ip, 包括隧道encapsulation.ipv4下的字段 + +```yaml +- function: RENAME + parameters: + parent_fields: [encapsulation.ipv4] + rename_fields: + client_ip: source_ip + +# Output: source_ip:192.168.4.1, encapsulation.ipv4.source_ip:192.168.12.12 +``` + +#### Snowflake ID + +基于雪花算法生成唯一ID。 + +Parameters: + +- data_center_id_num = <int> 数据中心id,用与保证生成雪花id的唯一性。 + +````shell +- function: SNOWFLAKE_ID + output_fields: [ log_id ] +```` + + #### String Joiner + +字符串拼接,可以指定分隔符,前缀与后缀。 + +```yaml +- function: STRING_JOINER + lookup_fields: [client_ip, server_ip] + output_fields: [ip_string] + parameters: + delimiter: ',' + prefix: '[' + suffix: ']' + + # Output:ip_string='[client_ip, server_ip]' + +``` + + #### Unix Timestamp Converter + +转换时间戳精度,返回其他精度时间戳 + +- Parameters + - precision=seconds // 获取Unix时间戳并将其精确到秒级 + - precision=milliseconds // 获取Unix时间戳并将其精确到毫秒级 + - precision=minutes // 获取Unix时间戳将其精确到分钟级别,并以秒级格式输出 + - interval = <int>//时长精度,单位取决于precision + +```yaml +- function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ __timestamp ] + output_fields: [ recv_time ] + parameters: + precision: seconds + interval: 300 + +# __timestamp:内置参数,从数据source的摄入时间,以300秒为精度返回时间戳,若precision = minutes,则为以300分钟为精度输出。 + +``` + + #### UUID + +使用UUIDv4标准,生成128位随机UUID。实现方式参考:https://github.com/cowtowncoder/java-uuid-generator + +```yaml +- function: UUID + output_fields: [log_uuid] + + # 3f0f8d7e-d89e-4b0a-9f2e-2eab5c99d062 +``` + + #### UUIDv5 + +是一种基于 **命名空间和名称** 生成的 UUID。与 `UUIDv4` 主要依赖随机数不同,`UUIDv5` 使用 SHA-1 哈希算法将命名空间和名称组合后生成一个确定性的 UUID。这意味着对同一命名空间和相同名称的输入,`UUIDv5` 总是会生成相同的 UUID。 + +- Parameters + - namespace = <Enum> 枚举值,命名空间是一个 UUID,它定义了名称所属的上下文。可指定如下命名空间: + - NAMESPACE_IP: 6ba7b890-9dad-11d1-80b4-00c04fd430c8 + - NAMESPACE_DOMAIN: 6ba7b891-9dad-11d1-80b4-00c04fd430c8 + - NAMESPACE_APP: 6ba7b892-9dad-11d1-80b4-00c04fd430c8 + - NAMESPACE_SUBSCRIBER: 6ba7b893-9dad-11d1-80b4-00c04fd430c8 + +```yaml +- function: UUIDv5 + lookup_fields: [ client_ip, server_ip ] # 基于 client_ip, server_ip的值组成UUIDv5 name 参数值与命名空间结合后,通过哈希生成唯一的 UUID。 + output_fields: [ip_uuid] + parameters: + namespace: NAMESPACE_IP + + # 2ed6657d-e927-568b-95e1-2665a8aea6a2 +``` + + #### UUIDv7 + +通过时间戳和随机数生成唯一UUID,适合需要时间排序的场景,比如数据库索引和日志记录。 + +```yaml +- function: UUIDv7 + output_fields: [log_uuid] # 生成基于时间戳和随机数的 UUID + + # 2ed6657d-e927-568b-95e1-2665a8aea6a2 +``` + +### 聚合函数 + + #### Collect List + +在时间窗口内将指定对象合并为List,不进行去重 + +```yaml +- function: COLLECT_LIST + lookup_fields: [client_ip] + output_fields: [client_ip_list] +# Output:client_ip_list= ['192.168.4.1','192.168.4.1','192.168.4.2'] +``` + + #### Collect Set + +在时间窗口内将指定对象合并为Set,对结果进行去重。 + +```yaml +- function: COLLECT_SET + lookup_fields: [client_ip] + output_fields: [client_ip_set] + +# Output:client_ip_set= ['192.168.4.1','192.168.4.2'] +``` + + #### First Value + +返回时间窗口内第一个出现的不为空的value。 + +```yaml +- function: FIRST_VALUE + lookup_fields: [ received_bytes ] + output_fields: [ received_bytes_first ] +``` + + #### Last Value + +返回时间窗口内最后一个出现的不为空的value。 + +```yaml +- function: LAST_VALUE + lookup_fields: [ received_bytes ] + output_fields: [ received_bytes_last ] +``` + + #### Long Count + +在时间窗口内统计Event条数。 + +```yaml +- function: LONG_COUNT + lookup_fields: [ log_id ] + output_fields: [ sessions ] +``` + + #### Mean + +在时间窗口内对指定的数值对象求平均值。 + +Parameters + +- precision=<int> 返回的double类型结果精度,不配置则返回实际计算结果 + +```yaml +- function: MEAN + lookup_fields: [ received_bytes ] + output_fields: [ received_bytes_mean ] + parameters: + precision: 2 +``` + + #### Number Sum + +在时间窗口内对指定数字类型字段进行求和:支持 int,long,double,float类型。 + +```yaml +- function: NUMBER_SUM + lookup_fields: [received_bytes, sent_bytes] + output_fields: [received_bytes_sum] +``` + +```yaml +- function: NUMBER_SUM + lookup_fields: [sent_bytes] + +``` + + #### HLLD + +构建HLLD Sketch,输入列可以为常规类型列或HLLD Sketch列。 + +Parameters: + +- input_type(string):输入列类型。可选值:regular(输入列为常规类型列,代表单个元素), sketch(输入列为sketch类型列,sketch类型)。默认值:sketch。 +- precision(int):HLL精度。默认值:12。 +- output_format(string):输出类型格式。可选值:base64(base64字符串), binary(byte[])。默认值:base64。 + +```yaml +- function: HLLD + lookup_fields: [ ip_hlld ] + output_fields: [ ip_hlld ] + parameters: + input_type: sketch + +- function: HLLD + lookup_fields: [ ip ] + output_fields: [ ip_hlld ] + parameters: + input_type: regular +``` + + #### APPROX_COUNT_DISTINCT_HLLD + +计算近似distinct count,输入列可以为常规类型列或HLLD Sketch列。 + +Parameters: + +- input_type(string):输入列类型。可选值:regular(输入列为常规类型列,代表单个元素), sketch(输入列为sketch类型列,sketch类型)。默认值:sketch。 +- precision(int):HLL精度。默认值:12。 + +```yaml +- function: APPROX_COUNT_DISTINCT_HLLD + lookup_fields: [ ip_hlld ] + output_fields: [ ip_count ] + parameters: + input_type: sketch + +- function: APPROX_COUNT_DISTINCT_HLLD + lookup_fields: [ ip ] + output_fields: [ ip_count ] + parameters: + input_type: regular +``` + + #### HDR_HISTOGRAM + +构建HdrHistogram Sketch,输入列可以为常规类型列或HdrHistogram Sketch列。 + +Parameters: + +Parameters: + +- input_type(string):输入列类型。可选值:regular(输入列为常规类型列,代表单个元素), sketch(输入列为sketch类型列,sketch类型)。默认值:sketch。 +- lowestDiscernibleValue(int):除0外最小值,默认1 +- highestTrackableValue(int):直方图可以记录的最大值,默认2 +- numberOfSignificantValueDigits(int):指定数据值的精度,默认1;[1-5] 较大的值更精确,但会需要更多内存。 +- autoResize(boolean):自动调整highestTrackableValue,默认true +- output_format(string):输出类型格式。可选值:base64(base64字符串), binary(byte[])。默认值:base64。 + +```yaml + - function: HDR_HISTOGRAM + lookup_fields: [latency_ms_histogram] + output_fields: [latency_ms_histogram] + parameters: + input_type: sketch + + - function: HDR_HISTOGRAM + lookup_fields: [latency_ms] + output_fields: [latency_ms_histogram] + parameters: + input_type: regular +``` + + #### APPROX_QUANTILE_HDR + +计算近似分位数,输入列可以为常规类型列或HdrHistogram Sketch列。 + +Parameters: + +- input_type(string):输入列类型。可选值:regular(输入列为常规类型列,代表单个元素), sketch(输入列为sketch类型列,sketch类型)。默认值:sketch。 +- lowestDiscernibleValue(int):除0外最小值,默认1 +- highestTrackableValue(int):直方图可以记录的最大值,默认2 +- numberOfSignificantValueDigits(int):指定数据值的精度,默认1;[1-5] 较大的值更精确,但会需要更多内存。 +- autoResize(boolean):自动调整highestTrackableValue,默认true +- probability(double):分位数百分比,范围0-1,默认0.5 + +```yaml + + - function: APPROX_QUANTILE_HDR + lookup_fields: [latency_ms] + output_fields: [latency_ms_p95] + parameters: + input_type: regular + probability: 0.95 + + + - function: APPROX_QUANTILE_HDR + lookup_fields: [latency_ms_histogram] + output_fields: [latency_ms_p95] + parameters: + input_type: sketch + probability: 0.95 +``` + + #### APPROX_QUANTILES_HDR + +计算近似分位数,输入列可以为常规类型列或HdrHistogram Sketch列。 + +Parameters: + +- input_type(string):输入列类型。可选值:regular(输入列为常规类型列,代表单个元素), sketch(输入列为sketch类型列,sketch类型)。默认值:sketch。 +- lowestDiscernibleValue(int):除0外最小值,默认1 +- highestTrackableValue(int):直方图可以记录的最大值,默认2 +- numberOfSignificantValueDigits(int):指定数据值的精度,默认1;[1-5] 较大的值更精确,但会需要更多内存。 +- autoResize(boolean):自动调整highestTrackableValue,默认true +- probabilities(double[]):分位数百分比数组,范围0-1,默认null,必须的属性。 + +```yaml +- function: APPROX_QUANTILES_HDR + lookup_fields: [latency_ms_HDR] + output_fields: [latency_ms_quantiles] + parameters: + input_type: sketch + probabilities: [0.5, 0.95, 0.99] + + +- function: APPROX_QUANTILES_HDR + lookup_fields: [latency_ms] + output_fields: [latency_ms_quantiles] + parameters: + input_type: regular + probabilities: [0.5, 0.95, 0.99] + +``` + +### 表格函数 + + #### Unroll + +展开函数用于处理一个数组类型字段 ,或配置一个用于分割字符串类型字段的表达式 , 并将该字段展开为单独的事件。支持处理 array或string类型字段。 + +Parameters: + +- regex= string//用于将字符串分割为数组的正则表达式,如“,”按照逗号分割字符串,如果字段为数组类型则无需配置 + +```yaml +functions: + - function: UNROLL + lookup_fields: [ monitor_rule_list ] + output_fields: [ monitor_rule ] + + # Input: Event { client_ip=‘192.168.1.1’,monitor_rule_list=[954779,930791]} + # Output: + #Event1: {client_ip=‘192.168.1.1’,monitor_rule=954779} + #Event2: {client_ip=‘192.168.1.1’,monitor_rule=930791} +``` + + #### Json Unroll + +JSON 展开函数接收 JSON 对象字符串字段,将其中的对象数组展开为字符串类型单独事件,同时继承顶级字段。 + +Parameters: + +- path= string//要展开的数组的路径,基于JsonPath表达式,不配置默认展开顶层数组 +- new_path= string//新元素的路径,基于JsonPath表达式,不配置默认覆盖原path + +```yaml +- function: JSON_UNROLL + lookup_fields: [ encapsulation] + output_fields: [ encapsulation ] + parameters: + path: tags + new_path: new_tag +# Input: Event { client_ip=‘192.168.1.1’,device_tag=‘{"tags":[{"tag":"data_center","value":"center-xxg-tsgx-1"}, {"tag":"device_group","value":"group-xxg-tsgx-2"}]}’} +# Output: + #Event1:{client_ip=‘192.168.1.1’,device_tag='{"new_tag":{"tag":"data_center","value":"center-xxg-tsgx-1"}’}' + #Event2:{client_ip=‘192.168.1.1’,device_tag='{"new_tag":{"tag":"data_center","value":"center-xxg-tsgx-2"}’}' +``` + +```yaml +- function: JSON_UNROLL + lookup_fields: [ encapsulation] + output_fields: [ encapsulation ] + +#Input: Event { client_ip=‘192.168.1.1’,encapsulation=‘[{"tunnels_schema_type":"GRE"},{"tunnels_schema_type":"IPv4","client_ip":"12.1.1.1","server_ip":"14.1.1.1"}]’} +#Output: + #Event1:{client_ip=‘192.168.1.1’,encapsulation='{"tunnels_schema_type":"GRE"}'} + #Event2:{client_ip=‘192.168.1.1’,encapsulation='{"tunnels_schema_type":"IPv4","client_ip":"12.1.1.1","server_ip":"14.1.1.1"}'} +``` + + #### Path Unroll + + 将文件路径逐层展开,逐层输出路径和文件(可选)。 + +Parameters: + +- separator= 路径分隔符(只能是单个字符),默认'/'。 + +```yaml +# 将一个应用层协议按层级进行拆分,应用层协议由协议解析路径和应用组成。 +- function: PATH_UNROLL + lookup_fields: [ decoded_path, app] + output_fields: [ protocol_stack_id, app_name ] + parameters: + separator: "." + +# Input: {"decoded_path":"ETHERNET.IPv4.TCP.ssl","app":"port_443"} +# Output: + #Event1: {"protocol_stack_id":"ETHERNET"} + #Event2: {"protocol_stack_id":"ETHERNET.IPv4"} + #Event3: {"protocol_stack_id":"ETHERNET.IPv4.TCP"} + #Event4: {"protocol_stack_id":"ETHERNET.IPv4.TCP.ssl"} + #Event5: {"app_name":"port_443","protocol_stack_id":"ETHERNET.IPv4.TCP.ssl.port_443"} + +# Input: {"decoded_path":"ETHERNET.IPv4.TCP.ssl","app":"ssl"} +# Output: + #Event1: {"protocol_stack_id":"ETHERNET"} + #Event2: {"protocol_stack_id":"ETHERNET.IPv4"} + #Event3: {"protocol_stack_id":"ETHERNET.IPv4.TCP"} + #Event4: {"app_name":"ssl","protocol_stack_id":"ETHERNET.IPv4.TCP.ssl"} + +# Input: {"decoded_path":"ETHERNET.IPv4.TCP.ssl","app":"ssl.port_444"} +# Output: + #Event1: {"protocol_stack_id":"ETHERNET"} + #Event2: {"protocol_stack_id":"ETHERNET.IPv4"} + #Event3: {"protocol_stack_id":"ETHERNET.IPv4.TCP"} + #Event4: {"protocol_stack_id":"ETHERNET.IPv4.TCP.ssl"} + #Event5: {"app_name":"ssl.port_444","protocol_stack_id":"ETHERNET.IPv4.TCP.ssl.ssl.port_444"} + +#只有路径参数的场景(或者上例中文件字段值为null). +- function: PATH_UNROLL + lookup_fields: [ decoded_path] + output_fields: [ protocol_stack_id] + parameters: + separator: "." + +# Input: {"decoded_path":"ETHERNET.IPv4.TCP.ssl"} +# Output: + #Event1: {"protocol_stack_id":"ETHERNET"} + #Event2: {"protocol_stack_id":"ETHERNET.IPv4"} + #Event3: {"protocol_stack_id":"ETHERNET.IPv4.TCP"} + #Event4: {"protocol_stack_id":"ETHERNET.IPv4.TCP.ssl"} +``` + +## CN扩展UDF + +[CN函数库](https://docs.geedge.net/pages/viewpage.action?pageId=129087866) + +用户自定义插件(IN Progress) + +| 名称 | 描述 | 类型 | 必填 | 约束 | +|----------------------|---------|---------------|----|---------| +| user_define_process | | | | | +| function_name | 调用的方法名称 | String | Y | | +| class_name | 主类名 | String | Y | | +| jar_name | jar包名 | String | Y | | +| config_field | 私有属性配置 | JSON[Array] | N | 可自定义多属性 | +| input_schema | 输入字段 | json_String | Y | | +| output_schema | 输出字段 | json_String | Y | | +| | | | | | +| user_define_function | | | | | +| name | 名称 | String | Y | | +| description | 描述 | String | N | | +| type | 类型 | String | Y | udfudaf | +| class_name | 主类名 | String | Y | | +| jar_name | jar包名 | String | Y | | + +# 实现原则 + +- 包命名: com.geedgenetworks. [模块名].XXX +- 统一依赖管理:第三方类库的依赖声明在项目的顶层 POM 文件(也称为 Project POM)中,各个子模块继承这些依赖,确保整个项目共享相同的依赖。 +- 模块之间依赖:在每个模块的 POM 文件中定义依赖其他模块的关系。 +- 每个模块按其职责命名 groot-[功能名称],例如: + - groot-common 公共模块,包含可复用功能、工具类或库,供其它模块引用。 + - groot-core 核心模块,包含与业务逻辑紧密相关的核心功能、类、接口或服务。 + - groot-bootstrap 启动模块,包含一些必要的初始化代码、配置解析或资源加载等,它属于应用程序起点,负责将一个流处理任务各个部分组装起来,使其正确运行。 + - groot-connectors 连接器模块 + - connecor-kafka 子模块,包含Source和Sink 功能 + - connector-ipfix-collector 子模块,Source 功能 + - connecotr-clickhouse 子模块,Sink 功能 + - MockDataConnector(Source) 用于产生样例数据,用于测试、开发或演示目的的场景 + - groot-formats format模块 + - format-json 子模块,提供json format + - groot-tests 测试模块,包含多个模块,用于任务的集成和功能测试 (非单元测试) +- 对于不受检查异常(RuntimeException)在groot-common模块定义全局的异常处理类GrootRuntimeException,基于该自定义的异常抛出并附带更清晰的错误信息,用于限定问题的范围。其他各个模块按需实现Exception用于增加更多上下文异常提示信息。 +- 自定义插件管理:Connectors(Source 和 Destination) 和 UDF 函数; + - UDF(用户自定义函数)—— 用于数据清洗、处理和格式转换。按实现方式可分为内置UDFs和用户扩展UDFs。 + - UDF接口包括Function Name、传递数据(Event)、配置参数(context) 及执行方法 Evaluate(Event) + - 通过配置文件(udfs)管理平台已注册的函数列表 + - 任务启动时包含两个步骤:验证所引用的函数是否在注册列表中;按照引用的顺序对函数进行实例化。 + - 与通用工具类的关系:UDF 调用通用工具类的方法,以实现Evaluate的功能。 + - 提供open 和 close 方法,用以对象初始化,处理连接器(如数据库连接、文件句柄等)相关的资源的打开和关闭。而open方法一次性初始化的方法,在 UDF 对象创建时执行,用于初始化对象级别的资源和状态。 +- Event 内置字段(Internal Fields) 命名以双下划线开头,仅用于数据流处理,不发送到SINK 模块。 + - __timestamp : 数据摄入时间(Ingestion Time)。当Source无法抽取时,使用当前时间(Unix epoch格式),一般用于标识“数据的摄入时间”。例如 Kafka Source 抽取头部_time属性。 + - __inputId: 数据来源,事件的产生源头或来源的标识符或名称。用于事件追踪和管理,以确定事件是由哪个系统、应用程序、设备或实体产生的。例如Kafka Source 记录topic 名称。 + +# 相关问题 + +- 知识库更新为什么不基于Flink 广播流? + - 广播流适用于将配置或规则低吞吐事件流广播到下游所有Task中,不适用广播知识库大文件配置(GB级别)。 + - 采用广播流动态广播知识库元数据方式,若更新知识库,当基于每个Task(线程)分别存储,占用内存较大;如果基于进程级(静态方法/变量)共享,可能会发生线程阻塞或死锁问题。 +- 自定义函数如何提交到平台? +- Pack 在平台里定位是什么? 如何扩展? +- 数据分流方案? + - 使用Flink 侧输出流(side_output),对事件标记tag实现。 +- Aggregate Processor 函数如何定义?怎么指定dimension、Metrics ? + - 支持基础滑动,滚动窗口聚合计算。Dimension 基于group_by_fields 指定,Metrics 通过自定义UDAF实现。
\ No newline at end of file diff --git a/docs/images/groot_stream_architecture.jpg b/docs/images/groot_stream_architecture.jpg Binary files differindex d8f1d4b..28b553b 100644 --- a/docs/images/groot_stream_architecture.jpg +++ b/docs/images/groot_stream_architecture.jpg diff --git a/docs/processor/aggregate-processor.md b/docs/processor/aggregate-processor.md index 5ab0ae0..afc26f6 100644 --- a/docs/processor/aggregate-processor.md +++ b/docs/processor/aggregate-processor.md @@ -10,17 +10,18 @@ Within the pipeline, events are processed by each Function in order, top‑>down ## Options Note:Default will output internal fields `__window_start_timestamp` and `__window_end_timestamp` if not set output_fields. -| name | type | required | default value | -|--------------------------|--------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| type | String | Yes | The type of the processor, now only support `com.geedgenetworks.core.processor.aggregate.AggregateProcessor` | -| output_fields | Array | No | Array of String. The list of fields that need to be kept. Fields not in the list will be removed. | -| remove_fields | Array | No | Array of String. The list of fields that need to be removed. | -| group_by_fields | Array | yes | Array of String. The list of fields that need to be grouped. | -| window_type | String | yes | The type of window, now only support `tumbling_processing_time`, `tumbling_event_time`, `sliding_processing_time`, `sliding_event_time`. if window_type is `tumbling/sliding_event_time,` you need to set watermark. | -| window_size | Long | yes | The duration of the window in seconds. | -| window_slide | Long | yes | The duration of the window slide in seconds. | -| window_timestamp_field | String | No | Set the output timestamp field name, with the unit in seconds. It is mapped to the internal field __window_start_timestamp. | -| functions | Array | No | Array of Object. The list of functions that need to be applied to the data. | +| name | type | required | default value | +|------------------------|-----------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| type | String | Yes | The type of the processor, now only support `com.geedgenetworks.core.processor.aggregate.AggregateProcessor` | +| output_fields | Array | No | Array of String. The list of fields that need to be kept. Fields not in the list will be removed. | +| remove_fields | Array | No | Array of String. The list of fields that need to be removed. | +| group_by_fields | Array | yes | Array of String. The list of fields that need to be grouped. | +| window_type | String | yes | The type of window, now only support `tumbling_processing_time`, `tumbling_event_time`, `sliding_processing_time`, `sliding_event_time`. if window_type is `tumbling/sliding_event_time,` you need to set watermark. | +| window_size | Long | yes | The duration of the window in seconds. | +| window_slide | Long | yes | The duration of the window slide in seconds. | +| window_timestamp_field | String | No | Set the output timestamp field name, with the unit in seconds. It is mapped to the internal field __window_start_timestamp. | +| mini_batch | Boolean | No | Specifies whether to enable local aggregate optimization. The default value is false. This can significantly reduce the state overhead and get a better throughput. | +| functions | Array | No | Array of Object. The list of functions that need to be applied to the data. | ## Usage Example diff --git a/docs/processor/split-processor.md b/docs/processor/split-processor.md new file mode 100644 index 0000000..e1a1163 --- /dev/null +++ b/docs/processor/split-processor.md @@ -0,0 +1,49 @@ +# Split Processor + +> Split the output of a data processing pipeline into multiple streams based on certain conditions. + +## Description + +Using the flink side Outputs send data from a stream to multiple downstream consumers. This is useful when you want to separate or filter certain elements of a stream without disrupting the main processing flow. For example, side outputs can be used for error handling, conditional routing, or extracting specific subsets of the data. + +## Options + +| name | type | required | default value | +|-------------------|--------|----------|--------------------------------------------------------------------------------------------| +| type | String | Yes | The type of the processor, now only support ` com.geedgenetworks.core.split.SplitOperator` | +| rules | Array | Yes | Array of Object. Defining rules for labeling Side Output Tag | +| [rule.]tag | String | Yes | The tag name of the side output | +| [rule.]expression | String | Yes | The expression to evaluate the event. | + +## Usage Example + +This example uses a split processor to split the data into two streams based on the value of the `decoded_as` field. + +```yaml +splits: + decoded_as_split: + type: split + rules: + - tag: http_tag + expression: event.decoded_as == 'HTTP' + - tag: dns_tag + expression: event.decoded_as == 'DNS' + + +topology: + - name: inline_source + downstream: [decoded_as_split] + - name: decoded_as_split + tags: [http_tag, dns_tag] + downstream: [ projection_processor, aggregate_processor] + - name: projection_processor + downstream: [ print_sink ] + - name: aggregate_processor + downstream: [ print_sink ] + - name: print_sink + downstream: [] +``` + + + + diff --git a/docs/processor/udaf.md b/docs/processor/udaf.md index dd1dd70..66d6ad5 100644 --- a/docs/processor/udaf.md +++ b/docs/processor/udaf.md @@ -41,7 +41,7 @@ COLLECT_LIST is used to collect the value of the field in the group of events. - lookup_fields: required. Now only support one field. - output_fields: optional. If not set, the output field name is `lookup_field_name`. -### Example +Example: ```yaml - function: COLLECT_LIST @@ -59,7 +59,7 @@ COLLECT_SET is used to collect the unique value of the field in the group of eve - lookup_fields: required. Now only support one field. - output_fields: optional. If not set, the output field name is `lookup_field_name`. -### Example +Example ```yaml - function: COLLECT_SET @@ -76,7 +76,7 @@ FIRST_VALUE is used to get the first value of the field in the group of events. - lookup_fields: required. Now only support one field. - output_fields: optional. If not set, the output field name is `lookup_field_name`. -### Example +Example ```yaml - function: FIRST_VALUE @@ -92,7 +92,7 @@ LAST_VALUE is used to get the last value of the field in the group of events. - lookup_fields: required. Now only support one field. - output_fields: optional. If not set, the output field name is `lookup_field_name`. -### Example +Example ```yaml - function: LAST_VALUE @@ -109,7 +109,7 @@ LONG_COUNT is used to count the number of events in the group of events. - lookup_fields: optional. - output_fields: required. -### Example +Example ```yaml - function: LONG_COUNT @@ -127,7 +127,7 @@ MEAN is used to calculate the mean value of the field in the group of events. Th - parameters: optional. - precision: `<Integer>` required. The precision of the mean value. Default is 2. -### Example +Example ```yaml - function: MEAN @@ -144,7 +144,7 @@ NUMBER_SUM is used to sum the value of the field in the group of events. The loo - lookup_fields: required. Now only support one field. - output_fields: optional. If not set, the output field name is `lookup_field_name`. -### Example +Example ```yaml - function: NUMBER_SUM @@ -164,7 +164,8 @@ hlld is a high-performance C server which is used to expose HyperLogLog sets and - precision: `<Integer>` optional. The precision of the hlld value. Default is 12. - output_format: `<String>` optional. The output format can be either `base64(encoded string)` or `binary(byte[])`. The default is `base64`. -### Example +Example + Merge multiple string field into a HyperLogLog data structure. ```yaml - function: HLLD @@ -194,8 +195,8 @@ Approx Count Distinct HLLD is used to count the approximate number of distinct v - input_type: `<String>` optional. Refer to `HLLD` function. - precision: `<Integer>` optional. Refer to `HLLD` function. -### Example - +Example + ```yaml - function: APPROX_COUNT_DISTINCT_HLLD lookup_fields: [client_ip] @@ -228,8 +229,8 @@ A High Dynamic Range (HDR) Histogram. More details can be found in [HDR Histogra - autoResize: `<Boolean>` optional. If true, the highestTrackableValue will auto-resize. Default is true. - output_format: `<String>` optional. The output format can be either `base64(encoded string)` or `binary(byte[])`. The default is `base64`. -### Example - +Example + ```yaml - function: HDR_HISTOGRAM lookup_fields: [latency_ms] @@ -264,8 +265,8 @@ Approx Quantile HDR is used to calculate the approximate quantile value of the f - autoResize: `<Boolean>` optional. Refer to `HDR_HISTOGRAM` function. - probability: `<Double>` optional. The probability of the quantile. Default is 0.5. -### Example - +Example + ```yaml - function: APPROX_QUANTILE_HDR lookup_fields: [latency_ms] @@ -301,8 +302,8 @@ Approx Quantiles HDR is used to calculate the approximate quantile values of the - autoResize: `<Boolean>` optional. Refer to `HDR_HISTOGRAM` function. - probabilities: `<Array<Double>>` required. The list of probabilities of the quantiles. Range is 0 to 1. -### Example - +Example + ```yaml - function: APPROX_QUANTILES_HDR lookup_fields: [latency_ms] diff --git a/docs/processor/udf.md b/docs/processor/udf.md index 170d86f..e480275 100644 --- a/docs/processor/udf.md +++ b/docs/processor/udf.md @@ -96,18 +96,19 @@ Base64 encode function is commonly used to encode the binary data to base64 stri ```BASE64_ENCODE_TO_STRING(filter, output_fields[, parameters])``` - filter: optional -- lookup_fields: not required +- lookup_fields: required - output_fields: required - parameters: required - - value_field: `<String>` required. + - input_type: `<String>` required. Enum: `string`, `byte_array`. The input type of the value field. Example: ```yaml - function: BASE64_ENCODE_TO_STRING + lookup_fields: [packet] output_fields: [packet] parameters: - value_field: packet + input_type: string ``` ### Current Unix Timestamp @@ -141,7 +142,7 @@ Domain function is used to extract the domain from the url. - parameters: required - option: `<String>` required. Enum: `TOP_LEVEL_DOMAIN`, `FIRST_SIGNIFICANT_SUBDOMAIN`. -#### Option +**Option** - `TOP_LEVEL_DOMAIN` is used to extract the top level domain from the url. For example, `www.abc.com` will be extracted to `com`. - `FIRST_SIGNIFICANT_SUBDOMAIN` is used to extract the first significant subdomain from the url. For example, `www.abc.com` will be extracted to `abc.com`. @@ -184,34 +185,55 @@ Eval function is used to adds or removes fields from events by evaluating an val - parameters: required - value_expression: `<String>` required. Enter a value expression to set the field’s value – this can be a constant. -Example 1: -Add a field `ingestion_time` with value `recv_time`: +Example 1, add a field `eval_constant_string` with string value `fixed_value`: +```yaml + +- function: EVAL + output_fields: [eval_constant_string] + parameters: + value_expression: "'fixed_value'" +``` + +Example 2, add a field `eval_constant_integer` with integer value `123`: +```yaml +- function: EVAL + output_fields: [eval_constant_integer] + parameters: + value_expression: "123" +``` +Example 3: add a field `ingestion_time` with the value of `recv_time` field. ```yaml - function: EVAL output_fields: [ingestion_time] parameters: - value_expression: recv_time + value_expression: recv_time # or "recv_time" ``` -Example 2: +Example 4: add a field `internal_ip` with the expression of conditional operator. If the value of `direction` is `69`, the value of `internal_ip` will be `client_ip`, otherwise the value of `internal_ip` will be `server_ip`. - ```yaml - function: EVAL output_fields: [internal_ip] parameters: - value_expression: 'direction=69 ? client_ip : server_ip' + value_expression: "direction=69 ? client_ip : server_ip" +``` +Use the bitwise operator to determine the value of the `direction` field. +```yaml + - function: EVAL + output_fields: [ direction ] + parameters: + value_expression: "(flags & 24576) == 24576 ? 'double' : ((flags & 8192) == 8192 ? 'c2s' : ((flags & 16384) == 16384 ? 's2c' : 'unknown'))" ``` - ### Flatten -Flatten the fields of nested structure to the top level. The new fields name are named using the field name prefixed with the names of the struct fields to reach it, separated by dots as default. +Flatten the fields of nested structure to the top level. The new fields name are named using the field name prefixed with the names of the struct fields to reach it, separated by dots as default. The original fields will be removed. ```FLATTEN(filter, lookup_fields, output_fields[, parameters])``` + - filter: optional - lookup_fields: optional -- output_fields: not required +- output_fields: not required. - parameters: optional - prefix: `<String>` optional. Prefix string for flattened field names. Default is empty. - depth: `<Integer>` optional. Number representing the nested levels to consider for flattening. Minimum 1. Default is `5`. @@ -255,13 +277,14 @@ Output: From unix timestamp function is used to convert the unix timestamp to date time string. The default time zone is UTC+0. ```FROM_UNIX_TIMESTAMP(filter, lookup_fields, output_fields[, parameters])``` + - filter: optional - lookup_fields: required - output_fields: required - parameters: optional - precision: `<String>` optional. Default is `seconds`. Enum: `milliseconds`, `seconds`. -#### Precision +**Precision** - `milliseconds` is used to convert the unix timestamp to milliseconds date time string. For example, `1619712000` will be converted to `2021-04-30 00:00:00.000`. - `seconds` is used to convert the unix timestamp to seconds date time string. For example, `1619712000` will be converted to `2021-04-30 00:00:00`. @@ -314,7 +337,7 @@ GeoIP lookup function is used to lookup the geoip information by ip address. You - ISP: `<String>` optional. - ORGANIZATION: `<String>` optional. -#### Option +**Option** - `IP_TO_COUNTRY` is used to lookup the country or region information by ip address. - `IP_TO_PROVINCE` is used to lookup the province or state information by ip address. @@ -326,7 +349,7 @@ GeoIP lookup function is used to lookup the geoip information by ip address. You - `IP_TO_JSON` is used to lookup the above information by ip address. The result is a json string. - `IP_TO_OBJECT` is used to lookup the above information by ip address. The result is a `LocationResponse` object. -#### GeoLocation Field Mapping +**GeoLocation Field Mapping** - `COUNTRY` is used to map the country information to the event field. - `PROVINCE` is used to map the province information to the event field. @@ -413,8 +436,8 @@ Rename function is used to rename or reformat(e.g. by replacing character unders - parameters: required - parent_fields: `<Array>` optional. Specify fields whose children will inherit the Rename fields and Rename expression operations. - rename_fields: `Map<String, String>` required. The key is the original field name, and the value is the new field name. - - current_field_name: `<String>` required. The original field name. - - new_field_name: `<String>` required. The new field name. + - current_field_name: `<String>` required. The original field name. + - new_field_name: `<String>` required. The new field name. - rename_expression: `<String>` optional. AviatorScript expression whose returned value will be used to rename fields. ``` @@ -427,9 +450,9 @@ Remove the prefix "tags_" from the field names and rename the field "timestamp_m ```yaml - function: RENAME -- parameters: + parameters: rename_fields: - - timestamp_ms: recv_time_ms + timestamp_ms: recv_time_ms rename_expression: key=string.replace_all(key,'tags_',''); return key; ``` @@ -440,10 +463,10 @@ Rename the field `client_ip` to `source_ip`, including the fields under the `enc ```yaml - function: RENAME -- parameters: + parameters: parent_fields: [encapsulation.ipv4] rename_fields: - - client_ip: source_ip + client_ip: source_ip ``` @@ -509,7 +532,7 @@ Unix timestamp converter function is used to convert the unix timestamp precisio - parameters: required - precision: `<String>` required. Enum: `milliseconds`, `seconds`, `minutes`. The minutes precision is used to generate Unix timestamp, round it to the minute level, and output it in seconds format. - Example: -_`__timestamp` Internal field, from source ingestion time or current unix timestamp. + `__timestamp` Internal field, from source ingestion time or current unix timestamp. ```yaml - function: UNIX_TIMESTAMP_CONVERTER @@ -518,4 +541,67 @@ _`__timestamp` Internal field, from source ingestion time or current unix timest parameters: precision: seconds ``` +### UUID +Generate a version 4 (random) UUID in accordance with [RFC-9562](https://datatracker.ietf.org/doc/rfc9562/). + +```UUID(output_fields)``` +- filter: not required +- lookup_fields: not required +- output_fields: required +- parameters: not required + +Example: + +```yaml +- function: UUID + output_fields: [uuid] +``` +Result: such as 3f0f8d7e-d89e-4b0a-9f2e-2eab5c99d062. + +### UUIDv5 + +Generate a version 5 (namespaced) UUID in accordance with RFC-9562 for the given name and namespace. If namespace is not a valid UUID, this function will fail. +Suitable for consistent identifiers across different systems. One of IP, DOMAIN, APP, or SUBSCRIBER to use a predefined namespace. +- NAMESPACE_IP: `6ba7b890-9dad-11d1-80b4-00c04fd430c8` +- NAMESPACE_DOMAIN: `6ba7b891-9dad-11d1-80b4-00c04fd430c8` +- NAMESPACE_APP: `6ba7b892-9dad-11d1-80b4-00c04fd430c8` +- NAMESPACE_SUBSCRIBER: `6ba7b893-9dad-11d1-80b4-00c04fd430c8` + +```UUIDV5(lookup_fields, output_fields[, parameters])``` +- filter: not required +- lookup_fields: required +- output_fields: required +- parameters: required + - namespace: `<String>` required. The UUID namespace. + +Example: + +```yaml +- function: UUIDv5 + lookup_fields: [ client_ip, server_ip ] # Based on the client_ip and server_ip value as Name with separator "_". + output_fields: [ip_uuid] + parameters: + namespace: NAMESPACE_IP +``` + +Result: such as 2ed6657d-e927-568b-95e1-2665a8aea6a2. + +### UUIDv7 + +Generate a version 7 (Unix-timestamp + random based variant) UUID in accordance with RFC-9562. Suitable for scenarios that require time ordering, such as database indexing and logging. + +```UUIDV7(output_fields)``` +- filter: not required +- lookup_fields: not required +- output_fields: required +- parameters: not required + +Example: + +```yaml +- function: UUIDv7 + output_fields: [log_uuid] + +``` +Result: such as 2ed6657d-e927-568b-95e1-2665a8aea6a2.
\ No newline at end of file diff --git a/docs/processor/udtf.md b/docs/processor/udtf.md index a6e8444..65a7840 100644 --- a/docs/processor/udtf.md +++ b/docs/processor/udtf.md @@ -29,8 +29,8 @@ The Unroll Function handles an array field—or an expression evaluating to an a - parameters: optional - regex: `<String>` optional. If lookup_fields is a string, the regex parameter is used to split the string into an array. The default value is a comma. -#### Example - +Example + ```yaml functions: - function: UNROLL @@ -50,8 +50,8 @@ The JSON Unroll Function handles a JSON object, unrolls/explodes an array of obj - path: `<String>` optional. Path to array to unroll, default is the root of the JSON object. - new_path: `<String>` optional. Rename path to new_path, default is the same as path. -#### Example - +Example + ```yaml functions: - function: JSON_UNROLL @@ -62,5 +62,53 @@ functions: - new_path: tag ``` +### Path Unroll + +The PATH_UNROLL function processes a given file path, breaking it down into individual steps and transforming each step into a separate event while retaining top-level fields. At the final level, it outputs both the full file path and the file name. + +```PATH_UNROLL(filter, lookup_fields, output_fields[, parameters])``` + +- filter: optional +- lookup_fields: required +- output_fields: required +- parameters: optional + - separator: <String> optional. The delimiter used to split the path. Default is `/`. + +Example Usage: + +```yaml +- function: PATH_UNROLL + lookup_fields: [ decoded_path, app] + output_fields: [ protocol_stack_id, app_name ] + parameters: + separator: "." +``` +Input: + +```json +{"decoded_path":"ETHERNET.IPv4.TCP.ssl","app":"wechat"} +``` +When the input is processed, the following events are generated: +``` + #Event1: {"protocol_stack_id":"ETHERNET"} + #Event2: {"protocol_stack_id":"ETHERNET.IPv4"} + #Event3: {"protocol_stack_id":"ETHERNET.IPv4.TCP"} + #Event4: {"protocol_stack_id":"ETHERNET.IPv4.TCP.ssl"} + #Event5: {"app_name":"wechat","protocol_stack_id":"ETHERNET.IPv4.TCP.ssl.wechat"} +``` + +If decoded_path contains app value of `ETHERNET.IPv4.TCP.ssl`, the output will be as follows: +```json +{"decoded_path":"ETHERNET.IPv4.TCP.ssl","app":"ssl"} +``` +In this case, the output will be: +``` + #Event1: {"protocol_stack_id":"ETHERNET"} + #Event2: {"protocol_stack_id":"ETHERNET.IPv4"} + #Event3: {"protocol_stack_id":"ETHERNET.IPv4.TCP"} + #Event4: {"protocol_stack_id":"ETHERNET.IPv4.TCP.ssl", "app_name":"ssl"} +``` + + |
