diff options
| author | lifengchao <[email protected]> | 2024-03-13 10:55:17 +0800 |
|---|---|---|
| committer | lifengchao <[email protected]> | 2024-03-13 10:55:17 +0800 |
| commit | 420dc8afbc8721b343bf29af238a8578043e34a2 (patch) | |
| tree | 55f6184c76721ab3d668a7e35307be0a2dd96cee | |
| parent | 4e2a9108198a32d9ea491765dd8ddd1d1c163877 (diff) | |
[feature][connector] connector schema文档更新
11 files changed, 216 insertions, 153 deletions
diff --git a/config/template/grootstream_job_debug.yaml b/config/template/grootstream_job_debug.yaml index 1c8c5e9..a1a287d 100644 --- a/config/template/grootstream_job_debug.yaml +++ b/config/template/grootstream_job_debug.yaml @@ -1,7 +1,11 @@ sources: kafka_source: type : kafka - # fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. + # source table schema, config through fields or local_file or url. if not set schema, all fields(Map<String, Object>) will be output. +# schema: +# fields: "struct<log_id:bigint, recv_time:bigint,client_ip: string>" +# local_file: "schema/test_schema.json" +# url: "http://127.0.0.1/schema.json" properties: # [object] Source Properties topic: SESSION-RECORD kafka.bootstrap.servers: 192.168.44.11:9092 @@ -25,7 +29,6 @@ sources: inline_source: type : inline - fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. properties: data: '{"log_id": 1, "recv_time":"111", "client_ip":"192.168.0.1","server_ip":"120.233.20.242","common_schema_type":"BASE"}' format: json @@ -42,9 +45,6 @@ sources: ipfix_source: type: ipfix -# fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. -# - name: log_id -# type: bigint properties: port.range: 12345-12347 max.packet.size: 65535 @@ -220,6 +220,11 @@ postprocessing_pipelines: sinks: kafka_sink_a: type: kafka + # sink table schema, config through fields or local_file or url. if not set schema, all fields(Map<String, Object>) will be output. +# schema: +# fields: "struct<log_id:bigint, recv_time:bigint,client_ip: string>" +# local_file: "schema/test_schema.json" +# url: "http://127.0.0.1/schema.json" properties: topic: SESSION-RECORD-JSON kafka.bootstrap.servers: 192.168.44.12:9092 diff --git a/config/template/grootstream_job_template.yaml b/config/template/grootstream_job_template.yaml index 0cb39ad..2606d56 100644 --- a/config/template/grootstream_job_template.yaml +++ b/config/template/grootstream_job_template.yaml @@ -22,27 +22,28 @@ sources: # [object] Define connector source inline_source: # [object] Inline source connector name, must be unique. It used to define the source node of the job topology. type: inline - fields: # [array of object] Schema field projection, support read data only from specified fields. - - name: log_id - type: bigint - - name: recv_time - type: bigint - - name: server_fqdn - type: string - - name: server_domain - type: string - - name: client_ip - type: string - - name: server_ip - type: string - - name: server_asn - type: string - - name: decoded_as - type: string - - name: device_group - type: string - - name: device_tag - type: string + schema: + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: log_id + type: bigint + - name: recv_time + type: bigint + - name: server_fqdn + type: string + - name: server_domain + type: string + - name: client_ip + type: string + - name: server_ip + type: string + - name: server_asn + type: string + - name: decoded_as + type: string + - name: device_group + type: string + - name: device_tag + type: string properties: # # [string] Event Data, it will be parsed to Map<String, Object> by the specified format. @@ -64,9 +65,6 @@ sources: # [object] Define connector source ipfix_source: # [object] IPFIX source connector name, must be unique. It used to define the source node of the job topology. type: ipfix -# fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. -# - name: log_id -# type: bigint properties: port.range: 12345-12347 max.packet.size: 65535 @@ -254,6 +252,11 @@ postprocessing_pipelines: # [object] Define Processors for postprocessing pipeli sinks: # [object] Define connector sink kafka_sink_a: # [object] Kafka sink connector name, must be unique. It used to define the sink node of the job topology. type: kafka + # sink table schema, config through fields or local_file or url. if not set schema, all fields(Map<String, Object>) will be output. +# schema: +# fields: "struct<log_id:bigint, recv_time:bigint,client_ip: string>" +# local_file: "schema/test_schema.json" +# url: "http://127.0.0.1/schema.json" properties: topic: SESSION-RECORD-A kafka.bootstrap.servers: 127.0.0.1:9092 diff --git a/docs/connector/connector.md b/docs/connector/connector.md index 6df1e23..6bcc878 100644 --- a/docs/connector/connector.md +++ b/docs/connector/connector.md @@ -7,23 +7,68 @@ Source Connector contains some common core features, and each source connector s sources: ${source_name}: type: ${source_connector_type} - fields: - - name: ${field_name} - type: ${field_type} + # source table schema, config through fields or local_file or url + schema: + fields: + - name: ${field_name} + type: ${field_type} + # local_file: "schema path" + # url: "schema http url" properties: ${prop_key}: ${prop_value} ``` -| Name | Type | Required | Default | Description | -|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|--------------------------------------------------------------------------------------------------------------------------| -| type | String | Yes | - | The type of the source connector. The `SourceTableFactory` will use this value as identifier to create source connector. | -| fields | Array of `Field` | No | - | The structure of the data, including field names and field types. | -| properties | Map of String | Yes | - | The source connector customize properties, more details see the [Source](source) documentation. | +| Name | Type | Required | Default | Description | +|-------------------------------------|------------------------------------------------------------------|----------|--------------------------|--------------------------------------------------------------------------------------------------------------------------| +| type | String | Yes | - | The type of the source connector. The `SourceTableFactory` will use this value as identifier to create source connector. | +| schema | Map | No | - | The source table schema, config through fields or local_file or url. | +| properties | Map of String | Yes | - | The source connector customize properties, more details see the [Source](source) documentation. | ## Schema Field Projection The source connector supports reading only specified fields from the data source. For example `KafkaSource` will read all content from topic and then use `fields` to filter unnecessary columns. The Schema Structure refer to [Schema Structure](../user-guide.md#schema-structure). +## Schema Config +Schema can config through fields or local_file or url. + +### fields +```yaml +schema: + # by array + fields: + - name: ${field_name} + type: ${field_type} +``` + +```yaml +schema: + # by sql + fields: "struct<field_name:field_type, ...>" + # can also without outer struct<> + # fields: "field_name:field_type, ..." +``` + +### local_file + +```yaml +schema: + # by array + fields: + local_file: "schema path" +``` + +### url +Retrieve updated schema from URL for cycle, support dynamic schema. Not all connector support dynamic schema. + +The connectors that currently support dynamic schema include: clickHouse sink. + +```yaml +schema: + # by array + fields: + url: "schema http url" +``` + # Sink Connector Sink Connector contains some common core features, and each sink connector supports them to varying degrees. @@ -33,12 +78,18 @@ Sink Connector contains some common core features, and each sink connector suppo sinks: ${sink_name}: type: ${sink_connector_type} + # sink table schema, config through fields or local_file or url. if not set schema, all fields(Map<String, Object>) will be output. + schema: + fields: "struct<field_name:field_type, ...>" + # local_file: "schema path" + # url: "schema url" properties: ${prop_key}: ${prop_value} ``` -| Name | Type | Required | Default | Description | -|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|--------------------------------------------------------------------------------------------------------------------| -| type | String | Yes | - | The type of the sink connector. The `SinkTableFactory` will use this value as identifier to create sink connector. | -| properties | Map of String | Yes | - | The sink connector customize properties, more details see the [Sink](sink) documentation. | +| Name | Type | Required | Default | Description | +|-------------------------------------|--------------------------------------------------------------------|----------|--------------------------|--------------------------------------------------------------------------------------------------------------------| +| type | String | Yes | - | The type of the sink connector. The `SinkTableFactory` will use this value as identifier to create sink connector. | +| schema | Map | No | - | The sink table schema, config through fields or local_file or url. | +| properties | Map of String | Yes | - | The sink connector customize properties, more details see the [Sink](sink) documentation. | diff --git a/docs/connector/sink/clickhouse.md b/docs/connector/sink/clickhouse.md index d794767..ac37d24 100644 --- a/docs/connector/sink/clickhouse.md +++ b/docs/connector/sink/clickhouse.md @@ -50,27 +50,28 @@ This example read data of inline test source and write to ClickHouse table `test sources: # [object] Define connector source inline_source: type: inline - fields: # [array of object] Schema field projection, support read data only from specified fields. - - name: log_id - type: bigint - - name: recv_time - type: bigint - - name: server_fqdn - type: string - - name: server_domain - type: string - - name: client_ip - type: string - - name: server_ip - type: string - - name: server_asn - type: string - - name: decoded_as - type: string - - name: device_group - type: string - - name: device_tag - type: string + schema: + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: log_id + type: bigint + - name: recv_time + type: bigint + - name: server_fqdn + type: string + - name: server_domain + type: string + - name: client_ip + type: string + - name: server_ip + type: string + - name: server_asn + type: string + - name: decoded_as + type: string + - name: device_group + type: string + - name: device_tag + type: string properties: # # [string] Event Data, it will be parsed to Map<String, Object> by the specified format. diff --git a/docs/connector/sink/kafka.md b/docs/connector/sink/kafka.md index 6793b21..92976d8 100644 --- a/docs/connector/sink/kafka.md +++ b/docs/connector/sink/kafka.md @@ -26,27 +26,28 @@ This example read data of inline test source and write to kafka topic `SESSION-R sources: # [object] Define connector source inline_source: type: inline - fields: # [array of object] Schema field projection, support read data only from specified fields. - - name: log_id - type: bigint - - name: recv_time - type: bigint - - name: server_fqdn - type: string - - name: server_domain - type: string - - name: client_ip - type: string - - name: server_ip - type: string - - name: server_asn - type: string - - name: decoded_as - type: string - - name: device_group - type: string - - name: device_tag - type: string + schema: + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: log_id + type: bigint + - name: recv_time + type: bigint + - name: server_fqdn + type: string + - name: server_domain + type: string + - name: client_ip + type: string + - name: server_ip + type: string + - name: server_asn + type: string + - name: decoded_as + type: string + - name: device_group + type: string + - name: device_tag + type: string properties: # # [string] Event Data, it will be parsed to Map<String, Object> by the specified format. diff --git a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_local_template.yaml b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_local_template.yaml index 5a8fcb0..b328f01 100644 --- a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_local_template.yaml +++ b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_local_template.yaml @@ -1,7 +1,6 @@ sources: kafka_source: type: kafka - # fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. properties: # [object] Source Properties topic: SESSION-RECORD kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094 diff --git a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml index 7c448f6..61f4d9e 100644 --- a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml +++ b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml @@ -1,7 +1,6 @@ sources: kafka_source: type: kafka - # fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. properties: # [object] Source Properties topic: SESSION-RECORD kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094 diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_clickhouse.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_clickhouse.yaml index 370b7a8..829741d 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_clickhouse.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_clickhouse.yaml @@ -1,27 +1,28 @@ sources: # [object] Define connector source inline_source: type: inline - fields: # [array of object] Schema field projection, support read data only from specified fields. - - name: log_id - type: bigint - - name: recv_time - type: bigint - - name: server_fqdn - type: string - - name: server_domain - type: string - - name: client_ip - type: string - - name: server_ip - type: string - - name: server_asn - type: string - - name: decoded_as - type: string - - name: device_group - type: string - - name: device_tag - type: string + schema: + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: log_id + type: bigint + - name: recv_time + type: bigint + - name: server_fqdn + type: string + - name: server_domain + type: string + - name: client_ip + type: string + - name: server_ip + type: string + - name: server_asn + type: string + - name: decoded_as + type: string + - name: device_group + type: string + - name: device_tag + type: string properties: # # [string] Event Data, it will be parsed to Map<String, Object> by the specified format. diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml index a70f588..a5c5ece 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml @@ -1,27 +1,28 @@ sources: # [object] Define connector source inline_source: type: inline - fields: # [array of object] Schema field projection, support read data only from specified fields. - - name: log_id - type: bigint - - name: recv_time - type: bigint - - name: server_fqdn - type: string - - name: server_domain - type: string - - name: client_ip - type: string - - name: server_ip - type: string - - name: server_asn - type: string - - name: decoded_as - type: string - - name: device_group - type: string - - name: device_tag - type: string + schema: + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: log_id + type: bigint + - name: recv_time + type: bigint + - name: server_fqdn + type: string + - name: server_domain + type: string + - name: client_ip + type: string + - name: server_ip + type: string + - name: server_asn + type: string + - name: decoded_as + type: string + - name: device_group + type: string + - name: device_tag + type: string properties: # # [string] Event Data, it will be parsed to Map<String, Object> by the specified format. diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml index d42c05a..cfd3917 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml @@ -1,27 +1,28 @@ sources: inline_source: type: inline - fields: - - name: log_id - type: bigint - - name: recv_time - type: bigint - - name: server_fqdn - type: string - - name: server_domain - type: string - - name: client_ip - type: string - - name: server_ip - type: string - - name: server_asn - type: string - - name: decoded_as - type: string - - name: device_group - type: string - - name: device_tag - type: string + schema: + fields: + - name: log_id + type: bigint + - name: recv_time + type: bigint + - name: server_fqdn + type: string + - name: server_domain + type: string + - name: client_ip + type: string + - name: server_ip + type: string + - name: server_asn + type: string + - name: decoded_as + type: string + - name: device_group + type: string + - name: device_tag + type: string properties: data: '{"tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}' format: json diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml index d3c46b7..e883bce 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/kafka_to_print.yaml @@ -1,11 +1,12 @@ sources: kafka_source: type : kafka - fields: # [array of object] Schema field projection, support read data only from specified fields. - - name: client_ip - type: string - - name: server_ip - type: string + schema: + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: client_ip + type: string + - name: server_ip + type: string properties: # [object] Kafka source properties topic: SESSION-RECORD kafka.bootstrap.servers: 192.168.44.11:9092 |
