diff options
Diffstat (limited to 'docs/connector')
| -rw-r--r-- | docs/connector/config-encryption-decryption.md | 8 | ||||
| -rw-r--r-- | docs/connector/connector.md | 27 | ||||
| -rw-r--r-- | docs/connector/formats/csv.md | 73 | ||||
| -rw-r--r-- | docs/connector/formats/raw.md | 2 | ||||
| -rw-r--r-- | docs/connector/sink/kafka.md | 27 | ||||
| -rw-r--r-- | docs/connector/sink/starrocks.md | 83 | ||||
| -rw-r--r-- | docs/connector/source/kafka.md | 7 |
7 files changed, 199 insertions, 28 deletions
diff --git a/docs/connector/config-encryption-decryption.md b/docs/connector/config-encryption-decryption.md index c2b05f6..91ca80e 100644 --- a/docs/connector/config-encryption-decryption.md +++ b/docs/connector/config-encryption-decryption.md @@ -127,13 +127,13 @@ Next, I'll show how to quickly use groot-stream's own `aes` encryption: ## How to implement user-defined encryption and decryption -1. Create a new class and implement interface `ConfigShade`, this interface has the following methods: +1. Create a new class and implement interface `CryptoShade`, this interface has the following methods: ```java - public interface ConfigShade { + public interface CryptoShade { /** * The unique identifier of the current interface, used it to select the correct {@link - * ConfigShade} + * CryptoShade} */ String getIdentifier(); @@ -157,6 +157,6 @@ Next, I'll show how to quickly use groot-stream's own `aes` encryption: } } ``` -2. Add `com.geedgenetworks.common.config.ConfigShade` in `resources/META-INF/services` +2. Add `com.geedgenetworks.common.crypto.CryptoShade` in `resources/META-INF/services` 3. Change the option `shade.identifier` to the value that you defined in `ConfigShade#getIdentifier`of you config file. diff --git a/docs/connector/connector.md b/docs/connector/connector.md index 766b73e..93d64b0 100644 --- a/docs/connector/connector.md +++ b/docs/connector/connector.md @@ -1,3 +1,12 @@ +# Table of Contents +- [Source Connector](#source-connector) + - [Common Source Options](#common-source-options) + - [Schema Field Projection](#schema-field-projection) + - [Schema Config](#schema-config) + - [Mock Data Type](#mock-data-type) +- [Sink Connector](#sink-connector) + - [Common Sink Options](#common-sink-options) + # Source Connector Source Connector contains some common core features, and each source connector supports them to varying degrees. @@ -62,13 +71,12 @@ schema: To retrieve the schema from a local file using its absolute path. > Ensures that the file path is accessible to all nodes in your Flink cluster. -> -> ```yaml -> schema: -> # by array -> fields: -> local_file: "/path/to/schema.json" -> ``` + + ```yaml +schema: + # Note: Only support avro schema format + local_file: "/path/to/schema.json" +``` ### URL @@ -76,9 +84,8 @@ Some connectors support periodically fetching and updating the schema from a URL ```yaml schema: - # by array - fields: - url: "https://localhost:8080/schema.json" + # Note: Only support avro schema format + url: "https://localhost:8080/schema.json" ``` ## Mock Data Type diff --git a/docs/connector/formats/csv.md b/docs/connector/formats/csv.md new file mode 100644 index 0000000..ca8d10b --- /dev/null +++ b/docs/connector/formats/csv.md @@ -0,0 +1,73 @@ +# CSV + +> Format CSV +> +> ## Description +> +> The CSV format allows to read and write CSV data based on an CSV schema. Currently, the CSV schema is derived from table schema. +> **The CSV format must config schema for source/sink**. + +| Name | Supported Versions | Maven | +|--------------|--------------------|---------------------------------------------------------------------------------------------------------------------------| +| Format CSV | Universal | [Download](http://192.168.40.153:8099/service/local/repositories/platform-release/content/com/geedgenetworks/format-csv/) | + +## Format Options + +| Name | Type | Required | Default | Description | +|-----------------------------|-----------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| format | String | Yes | (none) | Specify what format to use, here should be 'csv'. | +| csv.field.delimiter | String | No | , | Field delimiter character (',' by default), must be single character. You can use backslash to specify special characters, e.g. '\t' represents the tab character. | +| csv.disable.quote.character | Boolean | No | false | Disabled quote character for enclosing field values (false by default). If true, option 'csv.quote.character' can not be set. | +| csv.quote.character | String | No | " | Quote character for enclosing field values (" by default). | +| csv.allow.comments | Boolean | No | false | Ignore comment lines that start with '#' (disabled by default). If enabled, make sure to also ignore parse errors to allow empty rows. | +| csv.ignore.parse.errors | Boolean | No | false | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors. | +| csv.array.element.delimiter | String | No | ; | Array element delimiter string for separating array and row element values (';' by default). | +| csv.escape.character | String | No | (none) | Escape character for escaping values (disabled by default). | +| csv.null.literal | String | No | (none) | Null literal string that is interpreted as a null value (disabled by default). | + +# How to use + +## Inline uses example + +data: + +```json +{ + "log_id": 1, + "recv_time": 1712827485, + "client_ip": "192.168.0.1" +} +``` + +```yaml +sources: + inline_source: + type: inline + schema: + fields: "log_id:int, recv_time:bigint, client_ip:string" + properties: + data: "1,1712827485,192.168.0.1" + format: csv + +sinks: + print_sink: + type: print + schema: + fields: "log_id:int, recv_time:bigint, client_ip:string" + properties: + format: csv + +application: + env: + name: example-inline-to-print + parallelism: 3 + pipeline: + object-reuse: true + topology: + - name: inline_source + downstream: [print_sink] + - name: print_sink + downstream: [] + +``` + diff --git a/docs/connector/formats/raw.md b/docs/connector/formats/raw.md index 853ac79..06ea8bc 100644 --- a/docs/connector/formats/raw.md +++ b/docs/connector/formats/raw.md @@ -4,7 +4,7 @@ > > ## Description > -> The Raw format allows to read and write raw (byte based) values as a single column. +> The Raw format allows to read and write raw (byte based) values as a single column, the column name is raw default, it can also be explicitly defined as other name. | Name | Supported Versions | Maven | |------------|--------------------|---------------------------------------------------------------------------------------------------------------------------| diff --git a/docs/connector/sink/kafka.md b/docs/connector/sink/kafka.md index 716a179..78b7f34 100644 --- a/docs/connector/sink/kafka.md +++ b/docs/connector/sink/kafka.md @@ -20,19 +20,20 @@ In order to use the Kafka connector, the following dependencies are required. Th Kafka sink custom properties. if properties belongs to Kafka Producer Config, you can use `kafka.` prefix to set. -| Name | Type | Required | Default | Description | -|------------------------------------|---------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| topic | String | Yes | (none) | Topic name is required. It used to write data to kafka. | -| kafka.bootstrap.servers | String | Yes | (none) | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. This list should be in the form `host1:port1,host2:port2,...`. | -| log.failures.only | Boolean | No | true | Defines whether the producer should fail on errors, or only log them. If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown and cause the streaming program to fail (and enter recovery). | -| format | String | No | json | Data format. The default value is `json`. The Optional values are `json`, `protobuf`. | -| [format].config | / | No | (none) | Data format properties. Please refer to [Format Options](../formats) for details. | -| rate.limiting.strategy | String | No | (none) | The rate limiting strategy to use. The Optional values are `none`, `sliding_window`. | -| rate.limiting.window.size | Integer | No | 5 | The window size of the rate limiting. For example, limit rate less than 10Mbps in 5 seconds time interval. | -| rate.limiting.limit.rate | String | No | 10Mbps | A maximum rate of traffic that can be transmitted over a network or between networks. The units of the bytes rate are Mbps, Kbps,and bps. For example, 10Mbps, 100Kbps, 1000bps. | -| rate.limiting.block.duration | String | No | 5min | If the rate limit is exceeded, the data will be blocked for the specified duration. The units of the duration are seconds, minutes, and hours. For example, 10s, 1m, 1h. | -| rate.limiting.block.reset.duration | String | No | 30s | The time interval for resetting the rate limit. The units of the duration are seconds, minutes, and hours. For example, 10s, 1m, 1h. | -| kafka.config | / | No | (none) | Kafka producer properties. Please refer to [Kafka Producer Config](https://kafka.apache.org/documentation/#producerconfigs) for details. | +| Name | Type | Required | Default | Description | +|-------------------------------------|---------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| topic | String | Yes | (none) | Topic name is required. It used to write data to kafka. | +| kafka.bootstrap.servers | String | Yes | (none) | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. This list should be in the form `host1:port1,host2:port2,...`. | +| log.failures.only | Boolean | No | true | Defines whether the producer should fail on errors, or only log them. If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown and cause the streaming program to fail (and enter recovery). | +| format | String | No | json | Data format. The default value is `json`. The Optional values are `json`, `protobuf`. | +| [format].config | Map | No | (none) | Data format properties. Please refer to [Format Options](../formats) for details. | +| headers.config | Map | No | (none) | Kafka record headers info. exp: 'headers.key: value' will put key and value into record headers. | +| rate.limiting.strategy | String | No | (none) | The rate limiting strategy to use. The Optional values are `none`, `sliding_window`. | +| rate.limiting.window.size | Integer | No | 5 | The window size of the rate limiting. For example, limit rate less than 10Mbps in 5 seconds time interval. | +| rate.limiting.limit.rate | String | No | 10Mbps | A maximum rate of traffic that can be transmitted over a network or between networks. The units of the bytes rate are Mbps, Kbps,and bps. For example, 10Mbps, 100Kbps, 1000bps. | +| rate.limiting.block.duration | String | No | 5min | If the rate limit is exceeded, the data will be blocked for the specified duration. The units of the duration are seconds, minutes, and hours. For example, 10s, 1m, 1h. | +| rate.limiting.block.reset.duration | String | No | 30s | The time interval for resetting the rate limit. The units of the duration are seconds, minutes, and hours. For example, 10s, 1m, 1h. | +| kafka.config | Map | No | (none) | Kafka producer properties. Please refer to [Kafka Producer Config](https://kafka.apache.org/documentation/#producerconfigs) for details. | ## Example diff --git a/docs/connector/sink/starrocks.md b/docs/connector/sink/starrocks.md new file mode 100644 index 0000000..f07e432 --- /dev/null +++ b/docs/connector/sink/starrocks.md @@ -0,0 +1,83 @@ +# Starrocks + +> Starrocks sink connector +> +> ## Description +> +> Sink connector for Starrocks, know more in https://docs.starrocks.io/zh/docs/loading/Flink-connector-starrocks/. + +## Sink Options + +Starrocks sink custom properties. If properties belongs to Starrocks Flink Connector Config, you can use `connection.` prefix to set. + +| Name | Type | Required | Default | Description | +|---------------------|---------|----------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| log.failures.only | Boolean | No | true | Optional flag to whether the sink should fail on errors, or only log them; If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown, true by default. | +| connection.jdbc-url | String | Yes | (none) | The address that is used to connect to the MySQL server of the FE. You can specify multiple addresses, which must be separated by a comma (,). Format: jdbc:mysql://<fe_host1>:<fe_query_port1>,<fe_host2>:<fe_query_port2>,<fe_host3>:<fe_query_port3>.. | +| connection.load-url | String | Yes | (none) | The address that is used to connect to the HTTP server of the FE. You can specify multiple addresses, which must be separated by a semicolon (;). Format: <fe_host1>:<fe_http_port1>;<fe_host2>:<fe_http_port2>.. | +| connection.config | Map | No | (none) | Starrocks Flink Connector Options, know more in https://docs.starrocks.io/docs/loading/Flink-connector-starrocks/#options. | + +## Example + +This example read data of inline test source and write to Starrocks table `test`. + +```yaml +sources: # [object] Define connector source + inline_source: + type: inline + schema: + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: log_id + type: bigint + - name: recv_time + type: bigint + - name: server_fqdn + type: string + - name: server_domain + type: string + - name: client_ip + type: string + - name: server_ip + type: string + - name: server_asn + type: string + - name: decoded_as + type: string + - name: device_group + type: string + - name: device_tag + type: string + properties: + # + # [string] Event Data, it will be parsed to Map<String, Object> by the specified format. + # + data: '{"recv_time": 1705565615, "log_id":206211012872372224, "tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}' + format: json + json.ignore.parse.errors: false + +sinks: + starrocks_sink: + type: starrocks + properties: + "log.failures.only": false + "connection.jdbc-url": "jdbc:mysql://192.168.40.222:9030" + "connection.load-url": "192.168.40.222:8030" + "connection.database-name": "test" + "connection.table-name": "test" + "connection.username": "root" + "connection.password": "" + "connection.sink.buffer-flush.interval-ms": "30000" + +application: # [object] Define job configuration + env: + name: groot-stream-job-inline-to-starrocks + parallelism: 3 + pipeline: + object-reuse: true + topology: + - name: inline_source + downstream: [ starrocks_sink ] + - name: starrocks_sink + downstream: [ ] +``` + diff --git a/docs/connector/source/kafka.md b/docs/connector/source/kafka.md index 07dff22..680d1c1 100644 --- a/docs/connector/source/kafka.md +++ b/docs/connector/source/kafka.md @@ -24,6 +24,13 @@ Kafka source custom properties. if properties belongs to Kafka Consumer Config, | [format].config | Map | No | (none) | Data format properties. Please refer to [Format Options](../formats) for details. | | kafka.config | Map | No | (none) | Kafka consumer properties. Please refer to [Kafka Consumer Config](https://kafka.apache.org/documentation/#consumerconfigs) for details. | +## Internal Fields + +| Name | Type | Description | +|-------------|---------------------|-------------------------------------| +| __timestamp | Long | The timestamp of this kafka record. | +| __headers | Map[String, String] | The headers of this kafka record. | + ## Example This example read data of kafka topic `SESSION-RECORD` and print to console. |
