summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2024-03-13 10:55:17 +0800
committerlifengchao <[email protected]>2024-03-13 10:55:17 +0800
commit420dc8afbc8721b343bf29af238a8578043e34a2 (patch)
tree55f6184c76721ab3d668a7e35307be0a2dd96cee
parent4e2a9108198a32d9ea491765dd8ddd1d1c163877 (diff)
[feature][connector] connector schema文档更新
-rw-r--r--config/template/grootstream_job_debug.yaml15
-rw-r--r--config/template/grootstream_job_template.yaml51
-rw-r--r--docs/connector/connector.md75
-rw-r--r--docs/connector/sink/clickhouse.md43
-rw-r--r--docs/connector/sink/kafka.md43
-rw-r--r--groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_local_template.yaml1
-rw-r--r--groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml1
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/inline_to_clickhouse.yaml43
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml43
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml43
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml11
11 files changed, 216 insertions, 153 deletions
diff --git a/config/template/grootstream_job_debug.yaml b/config/template/grootstream_job_debug.yaml
index 1c8c5e9..a1a287d 100644
--- a/config/template/grootstream_job_debug.yaml
+++ b/config/template/grootstream_job_debug.yaml
@@ -1,7 +1,11 @@
sources:
kafka_source:
type : kafka
- # fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
+ # source table schema, config through fields or local_file or url. if not set schema, all fields(Map<String, Object>) will be output.
+# schema:
+# fields: "struct<log_id:bigint, recv_time:bigint,client_ip: string>"
+# local_file: "schema/test_schema.json"
+# url: "http://127.0.0.1/schema.json"
properties: # [object] Source Properties
topic: SESSION-RECORD
kafka.bootstrap.servers: 192.168.44.11:9092
@@ -25,7 +29,6 @@ sources:
inline_source:
type : inline
- fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
properties:
data: '{"log_id": 1, "recv_time":"111", "client_ip":"192.168.0.1","server_ip":"120.233.20.242","common_schema_type":"BASE"}'
format: json
@@ -42,9 +45,6 @@ sources:
ipfix_source:
type: ipfix
-# fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
-# - name: log_id
-# type: bigint
properties:
port.range: 12345-12347
max.packet.size: 65535
@@ -220,6 +220,11 @@ postprocessing_pipelines:
sinks:
kafka_sink_a:
type: kafka
+ # sink table schema, config through fields or local_file or url. if not set schema, all fields(Map<String, Object>) will be output.
+# schema:
+# fields: "struct<log_id:bigint, recv_time:bigint,client_ip: string>"
+# local_file: "schema/test_schema.json"
+# url: "http://127.0.0.1/schema.json"
properties:
topic: SESSION-RECORD-JSON
kafka.bootstrap.servers: 192.168.44.12:9092
diff --git a/config/template/grootstream_job_template.yaml b/config/template/grootstream_job_template.yaml
index 0cb39ad..2606d56 100644
--- a/config/template/grootstream_job_template.yaml
+++ b/config/template/grootstream_job_template.yaml
@@ -22,27 +22,28 @@ sources: # [object] Define connector source
inline_source: # [object] Inline source connector name, must be unique. It used to define the source node of the job topology.
type: inline
- fields: # [array of object] Schema field projection, support read data only from specified fields.
- - name: log_id
- type: bigint
- - name: recv_time
- type: bigint
- - name: server_fqdn
- type: string
- - name: server_domain
- type: string
- - name: client_ip
- type: string
- - name: server_ip
- type: string
- - name: server_asn
- type: string
- - name: decoded_as
- type: string
- - name: device_group
- type: string
- - name: device_tag
- type: string
+ schema:
+ fields: # [array of object] Schema field projection, support read data only from specified fields.
+ - name: log_id
+ type: bigint
+ - name: recv_time
+ type: bigint
+ - name: server_fqdn
+ type: string
+ - name: server_domain
+ type: string
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: string
+ - name: server_asn
+ type: string
+ - name: decoded_as
+ type: string
+ - name: device_group
+ type: string
+ - name: device_tag
+ type: string
properties:
#
# [string] Event Data, it will be parsed to Map<String, Object> by the specified format.
@@ -64,9 +65,6 @@ sources: # [object] Define connector source
ipfix_source: # [object] IPFIX source connector name, must be unique. It used to define the source node of the job topology.
type: ipfix
-# fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
-# - name: log_id
-# type: bigint
properties:
port.range: 12345-12347
max.packet.size: 65535
@@ -254,6 +252,11 @@ postprocessing_pipelines: # [object] Define Processors for postprocessing pipeli
sinks: # [object] Define connector sink
kafka_sink_a: # [object] Kafka sink connector name, must be unique. It used to define the sink node of the job topology.
type: kafka
+ # sink table schema, config through fields or local_file or url. if not set schema, all fields(Map<String, Object>) will be output.
+# schema:
+# fields: "struct<log_id:bigint, recv_time:bigint,client_ip: string>"
+# local_file: "schema/test_schema.json"
+# url: "http://127.0.0.1/schema.json"
properties:
topic: SESSION-RECORD-A
kafka.bootstrap.servers: 127.0.0.1:9092
diff --git a/docs/connector/connector.md b/docs/connector/connector.md
index 6df1e23..6bcc878 100644
--- a/docs/connector/connector.md
+++ b/docs/connector/connector.md
@@ -7,23 +7,68 @@ Source Connector contains some common core features, and each source connector s
sources:
${source_name}:
type: ${source_connector_type}
- fields:
- - name: ${field_name}
- type: ${field_type}
+ # source table schema, config through fields or local_file or url
+ schema:
+ fields:
+ - name: ${field_name}
+ type: ${field_type}
+ # local_file: "schema path"
+ # url: "schema http url"
properties:
${prop_key}: ${prop_value}
```
-| Name | Type | Required | Default | Description |
-|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|--------------------------------------------------------------------------------------------------------------------------|
-| type | String | Yes | - | The type of the source connector. The `SourceTableFactory` will use this value as identifier to create source connector. |
-| fields | Array of `Field` | No | - | The structure of the data, including field names and field types. |
-| properties | Map of String | Yes | - | The source connector customize properties, more details see the [Source](source) documentation. |
+| Name | Type | Required | Default | Description |
+|-------------------------------------|------------------------------------------------------------------|----------|--------------------------|--------------------------------------------------------------------------------------------------------------------------|
+| type | String | Yes | - | The type of the source connector. The `SourceTableFactory` will use this value as identifier to create source connector. |
+| schema | Map | No | - | The source table schema, config through fields or local_file or url. |
+| properties | Map of String | Yes | - | The source connector customize properties, more details see the [Source](source) documentation. |
## Schema Field Projection
The source connector supports reading only specified fields from the data source. For example `KafkaSource` will read all content from topic and then use `fields` to filter unnecessary columns.
The Schema Structure refer to [Schema Structure](../user-guide.md#schema-structure).
+## Schema Config
+Schema can config through fields or local_file or url.
+
+### fields
+```yaml
+schema:
+ # by array
+ fields:
+ - name: ${field_name}
+ type: ${field_type}
+```
+
+```yaml
+schema:
+ # by sql
+ fields: "struct<field_name:field_type, ...>"
+ # can also without outer struct<>
+ # fields: "field_name:field_type, ..."
+```
+
+### local_file
+
+```yaml
+schema:
+ # by array
+ fields:
+ local_file: "schema path"
+```
+
+### url
+Retrieve updated schema from URL for cycle, support dynamic schema. Not all connector support dynamic schema.
+
+The connectors that currently support dynamic schema include: clickHouse sink.
+
+```yaml
+schema:
+ # by array
+ fields:
+ url: "schema http url"
+```
+
# Sink Connector
Sink Connector contains some common core features, and each sink connector supports them to varying degrees.
@@ -33,12 +78,18 @@ Sink Connector contains some common core features, and each sink connector suppo
sinks:
${sink_name}:
type: ${sink_connector_type}
+ # sink table schema, config through fields or local_file or url. if not set schema, all fields(Map<String, Object>) will be output.
+ schema:
+ fields: "struct<field_name:field_type, ...>"
+ # local_file: "schema path"
+ # url: "schema url"
properties:
${prop_key}: ${prop_value}
```
-| Name | Type | Required | Default | Description |
-|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|--------------------------------------------------------------------------------------------------------------------|
-| type | String | Yes | - | The type of the sink connector. The `SinkTableFactory` will use this value as identifier to create sink connector. |
-| properties | Map of String | Yes | - | The sink connector customize properties, more details see the [Sink](sink) documentation. |
+| Name | Type | Required | Default | Description |
+|-------------------------------------|--------------------------------------------------------------------|----------|--------------------------|--------------------------------------------------------------------------------------------------------------------|
+| type | String | Yes | - | The type of the sink connector. The `SinkTableFactory` will use this value as identifier to create sink connector. |
+| schema | Map | No | - | The sink table schema, config through fields or local_file or url. |
+| properties | Map of String | Yes | - | The sink connector customize properties, more details see the [Sink](sink) documentation. |
diff --git a/docs/connector/sink/clickhouse.md b/docs/connector/sink/clickhouse.md
index d794767..ac37d24 100644
--- a/docs/connector/sink/clickhouse.md
+++ b/docs/connector/sink/clickhouse.md
@@ -50,27 +50,28 @@ This example read data of inline test source and write to ClickHouse table `test
sources: # [object] Define connector source
inline_source:
type: inline
- fields: # [array of object] Schema field projection, support read data only from specified fields.
- - name: log_id
- type: bigint
- - name: recv_time
- type: bigint
- - name: server_fqdn
- type: string
- - name: server_domain
- type: string
- - name: client_ip
- type: string
- - name: server_ip
- type: string
- - name: server_asn
- type: string
- - name: decoded_as
- type: string
- - name: device_group
- type: string
- - name: device_tag
- type: string
+ schema:
+ fields: # [array of object] Schema field projection, support read data only from specified fields.
+ - name: log_id
+ type: bigint
+ - name: recv_time
+ type: bigint
+ - name: server_fqdn
+ type: string
+ - name: server_domain
+ type: string
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: string
+ - name: server_asn
+ type: string
+ - name: decoded_as
+ type: string
+ - name: device_group
+ type: string
+ - name: device_tag
+ type: string
properties:
#
# [string] Event Data, it will be parsed to Map<String, Object> by the specified format.
diff --git a/docs/connector/sink/kafka.md b/docs/connector/sink/kafka.md
index 6793b21..92976d8 100644
--- a/docs/connector/sink/kafka.md
+++ b/docs/connector/sink/kafka.md
@@ -26,27 +26,28 @@ This example read data of inline test source and write to kafka topic `SESSION-R
sources: # [object] Define connector source
inline_source:
type: inline
- fields: # [array of object] Schema field projection, support read data only from specified fields.
- - name: log_id
- type: bigint
- - name: recv_time
- type: bigint
- - name: server_fqdn
- type: string
- - name: server_domain
- type: string
- - name: client_ip
- type: string
- - name: server_ip
- type: string
- - name: server_asn
- type: string
- - name: decoded_as
- type: string
- - name: device_group
- type: string
- - name: device_tag
- type: string
+ schema:
+ fields: # [array of object] Schema field projection, support read data only from specified fields.
+ - name: log_id
+ type: bigint
+ - name: recv_time
+ type: bigint
+ - name: server_fqdn
+ type: string
+ - name: server_domain
+ type: string
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: string
+ - name: server_asn
+ type: string
+ - name: decoded_as
+ type: string
+ - name: device_group
+ type: string
+ - name: device_tag
+ type: string
properties:
#
# [string] Event Data, it will be parsed to Map<String, Object> by the specified format.
diff --git a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_local_template.yaml b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_local_template.yaml
index 5a8fcb0..b328f01 100644
--- a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_local_template.yaml
+++ b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_local_template.yaml
@@ -1,7 +1,6 @@
sources:
kafka_source:
type: kafka
- # fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
properties: # [object] Source Properties
topic: SESSION-RECORD
kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094
diff --git a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml
index 7c448f6..61f4d9e 100644
--- a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml
+++ b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml
@@ -1,7 +1,6 @@
sources:
kafka_source:
type: kafka
- # fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
properties: # [object] Source Properties
topic: SESSION-RECORD
kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_clickhouse.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_clickhouse.yaml
index 370b7a8..829741d 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_clickhouse.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_clickhouse.yaml
@@ -1,27 +1,28 @@
sources: # [object] Define connector source
inline_source:
type: inline
- fields: # [array of object] Schema field projection, support read data only from specified fields.
- - name: log_id
- type: bigint
- - name: recv_time
- type: bigint
- - name: server_fqdn
- type: string
- - name: server_domain
- type: string
- - name: client_ip
- type: string
- - name: server_ip
- type: string
- - name: server_asn
- type: string
- - name: decoded_as
- type: string
- - name: device_group
- type: string
- - name: device_tag
- type: string
+ schema:
+ fields: # [array of object] Schema field projection, support read data only from specified fields.
+ - name: log_id
+ type: bigint
+ - name: recv_time
+ type: bigint
+ - name: server_fqdn
+ type: string
+ - name: server_domain
+ type: string
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: string
+ - name: server_asn
+ type: string
+ - name: decoded_as
+ type: string
+ - name: device_group
+ type: string
+ - name: device_tag
+ type: string
properties:
#
# [string] Event Data, it will be parsed to Map<String, Object> by the specified format.
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml
index a70f588..a5c5ece 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml
@@ -1,27 +1,28 @@
sources: # [object] Define connector source
inline_source:
type: inline
- fields: # [array of object] Schema field projection, support read data only from specified fields.
- - name: log_id
- type: bigint
- - name: recv_time
- type: bigint
- - name: server_fqdn
- type: string
- - name: server_domain
- type: string
- - name: client_ip
- type: string
- - name: server_ip
- type: string
- - name: server_asn
- type: string
- - name: decoded_as
- type: string
- - name: device_group
- type: string
- - name: device_tag
- type: string
+ schema:
+ fields: # [array of object] Schema field projection, support read data only from specified fields.
+ - name: log_id
+ type: bigint
+ - name: recv_time
+ type: bigint
+ - name: server_fqdn
+ type: string
+ - name: server_domain
+ type: string
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: string
+ - name: server_asn
+ type: string
+ - name: decoded_as
+ type: string
+ - name: device_group
+ type: string
+ - name: device_tag
+ type: string
properties:
#
# [string] Event Data, it will be parsed to Map<String, Object> by the specified format.
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
index d42c05a..cfd3917 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
@@ -1,27 +1,28 @@
sources:
inline_source:
type: inline
- fields:
- - name: log_id
- type: bigint
- - name: recv_time
- type: bigint
- - name: server_fqdn
- type: string
- - name: server_domain
- type: string
- - name: client_ip
- type: string
- - name: server_ip
- type: string
- - name: server_asn
- type: string
- - name: decoded_as
- type: string
- - name: device_group
- type: string
- - name: device_tag
- type: string
+ schema:
+ fields:
+ - name: log_id
+ type: bigint
+ - name: recv_time
+ type: bigint
+ - name: server_fqdn
+ type: string
+ - name: server_domain
+ type: string
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: string
+ - name: server_asn
+ type: string
+ - name: decoded_as
+ type: string
+ - name: device_group
+ type: string
+ - name: device_tag
+ type: string
properties:
data: '{"tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}'
format: json
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml
index d3c46b7..e883bce 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml
@@ -1,11 +1,12 @@
sources:
kafka_source:
type : kafka
- fields: # [array of object] Schema field projection, support read data only from specified fields.
- - name: client_ip
- type: string
- - name: server_ip
- type: string
+ schema:
+ fields: # [array of object] Schema field projection, support read data only from specified fields.
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: string
properties: # [object] Kafka source properties
topic: SESSION-RECORD
kafka.bootstrap.servers: 192.168.44.11:9092