diff options
| author | doufenghu <[email protected]> | 2024-01-23 18:16:02 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-01-23 18:16:02 +0800 |
| commit | d7d7164b6f7b3b61273780803ff200cebabafcfc (patch) | |
| tree | ed0e78c041d16a9737470715f2a716eb05f00aea /docs | |
| parent | 4a1fbba350ab79275b6cf976501900f83dbfe9d7 (diff) | |
[Improve][docs] Add connector.md, Kafka.md and improve user-guide.md description.
Diffstat (limited to 'docs')
| -rw-r--r-- | docs/connector/connector.md | 44 | ||||
| -rw-r--r-- | docs/connector/source/Kafka.md | 61 | ||||
| -rw-r--r-- | docs/user-guide.md | 58 |
3 files changed, 140 insertions, 23 deletions
diff --git a/docs/connector/connector.md b/docs/connector/connector.md new file mode 100644 index 0000000..6df1e23 --- /dev/null +++ b/docs/connector/connector.md @@ -0,0 +1,44 @@ +# Source Connector +Source Connector contains some common core features, and each source connector supports them to varying degrees. + +## Common Source Options + +```yaml +sources: + ${source_name}: + type: ${source_connector_type} + fields: + - name: ${field_name} + type: ${field_type} + properties: + ${prop_key}: ${prop_value} +``` + +| Name | Type | Required | Default | Description | +|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|--------------------------------------------------------------------------------------------------------------------------| +| type | String | Yes | - | The type of the source connector. The `SourceTableFactory` will use this value as identifier to create source connector. | +| fields | Array of `Field` | No | - | The structure of the data, including field names and field types. | +| properties | Map of String | Yes | - | The source connector customize properties, more details see the [Source](source) documentation. | + +## Schema Field Projection +The source connector supports reading only specified fields from the data source. For example `KafkaSource` will read all content from topic and then use `fields` to filter unnecessary columns. +The Schema Structure refer to [Schema Structure](../user-guide.md#schema-structure). + +# Sink Connector +Sink Connector contains some common core features, and each sink connector supports them to varying degrees. + +## Common Sink Options + +```yaml +sinks: + ${sink_name}: + type: ${sink_connector_type} + properties: + ${prop_key}: ${prop_value} +``` + +| Name | Type | Required | Default | Description | +|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|--------------------------------------------------------------------------------------------------------------------| +| type | String | Yes | - | The type of the sink connector. The `SinkTableFactory` will use this value as identifier to create sink connector. | +| properties | Map of String | Yes | - | The sink connector customize properties, more details see the [Sink](sink) documentation. | + diff --git a/docs/connector/source/Kafka.md b/docs/connector/source/Kafka.md index e69de29..49ea262 100644 --- a/docs/connector/source/Kafka.md +++ b/docs/connector/source/Kafka.md @@ -0,0 +1,61 @@ +# Kafka +> Kafka source connector +## Description +Source connector for Apache Kafka +## Source Options +In order to use the Kafka connector, the following dependencies are required. They can be download by Nexus Maven Repository. + +| Datasource | Supported Versions | Maven | +|-----------------|--------------------|--------------------------------------------------------------------------------------------------------------------------------| +| connector-kafka | Universal | [Download](http://192.168.40.153:8099/service/local/repositories/platform-release/content/com/geedgenetworks/connector-kafka/) | + +Kafka source customizes properties. if properties belongs to Kafka Consumer Config, you can use `kafka.` prefix to set. + +| Name | Type | Required | Default | Description | +|---------------------------|--------|----------|--------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| topic | String | Yes | - | Topic name(s). It also supports topic list for source by using comma to separate topic names. eg. `topic1,topic2` | +| kafka.bootstrap.servers | String | Yes | - | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. This list should be in the form `host1:port1,host2:port2,...`. | +| format | String | No | json | Data format. The default value is `json`. The Optional values are `json`, `protobuf`. | +| Format Properties | | No | - | Data format properties. Please refer to [Format Options](../formats) for details. | +| Kafka Consumer Properties | | No | - | Kafka consumer properties. Please refer to [Kafka Consumer Config](https://kafka.apache.org/documentation/#consumerconfigs) for details. | + +## Example +This example read data of kafka topic `SESSION-RECORD` and print to console. +```yaml +sources: + kafka_source: + type : kafka + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: client_ip + type: string + - name: server_ip + type: string + properties: # [object] Kafka source properties + topic: SESSION-RECORD + kafka.bootstrap.servers: 192.168.44.11:9092 + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.group.id: GROOT-STREAM-example-KAFKA-TO-PRINT + kafka.auto.offset.reset: latest + format: json + +sinks: # [object] Define connector sink + print_sink: + type: print + properties: + mode: log_info + format: json + +application: # [object] Define job configuration + env: + name: groot-stream-job-kafka-to-print + parallelism: 1 + pipeline: + object-reuse: true + topology: + - name: kafka_source + downstream: [print_sink] + - name: print_sink + downstream: [] +``` diff --git a/docs/user-guide.md b/docs/user-guide.md index aa36093..5b06a52 100644 --- a/docs/user-guide.md +++ b/docs/user-guide.md @@ -8,7 +8,7 @@ The main format of the config template file is `yaml`, for more details of this sources: inline_source: type: inline - fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. + fields: - name: log_id type: bigint - name: recv_time @@ -22,7 +22,7 @@ sources: - name: decoded_as type: string properties: - data: '{"log_id": 1, "recv_time":"111","fqdn_string":"baidu.com", "client_ip":"192.168.0.1","server_ip":"120.233.20.242","decoded_as":"BASE"}' + data: '{"log_id": 1, "recv_time":"111","fqdn_string":"baidu.com", "client_ip":"192.168.0.1","server_ip":"120.233.20.242","decoded_as":"BASE", "dup_traffic_flag":1}' format: json json.ignore.parse.errors: false @@ -33,7 +33,7 @@ filters: expression: event.decoded_as == 'BASE' preprocessing_pipelines: - transform_processor: + preprocessor: type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl functions: - function: EVAL @@ -42,13 +42,11 @@ preprocessing_pipelines: value_expression: fqdn_string processing_pipelines: - session_record_processor: # [object] Processing Pipeline + processor: type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl remove_fields: [log_id] - output_fields: - properties: - key: value - functions: # [array of object] Function List + output_fields: [] + functions: - function: DROP lookup_fields: [] output_fields: [] @@ -56,6 +54,11 @@ processing_pipelines: - function: SNOWFLAKE_ID lookup_fields: [] output_fields: [ log_id ] + +postprocessing_pipelines: + postprocessor: + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: [dup_traffic_flag] sinks: print_sink: @@ -63,21 +66,26 @@ sinks: properties: format: json -application: # [object] Application Configuration - env: # [object] Environment Variables - name: inline-to-console-job # [string] Job Name - parallelism: 1 # [number] Job-Level Parallelism - topology: # [array of object] Node List. It will be used build data flow for job dag graph. - - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. - #parallelism: 1 # [number] Operator-Level Parallelism. - downstream: [filter] - - name: filter - downstream: [transform_processor] - - name: transform_processor - downstream: [session_record_processor] - - name: session_record_processor - downstream: [print_sink] +application: + env: + name: inline-to-print-job + parallelism: 3 + pipeline: + object-reuse: true + topology: + - name: inline_source + parallelism: 1 + downstream: [http_filter] + - name: http_filter + downstream: [preprocessor] + - name: preprocessor + downstream: [processor] + - name: processor + downstream: [postprocessor] + - name: postprocessor + downstream: [ print_sink ] - name: print_sink + parallelism: 1 downstream: [] ``` @@ -114,13 +122,17 @@ fields: ## Sources -Source is used to define where GrootStream needs to ingest data. Multiple sources can be defined in a job. The supported sources are listed in [Source Connectors](connector/source). Each source has its own specific parameters to define how to fetch data, and GrootStream also extracts the properties that each source will use, such as the `topic` and `kafka.bootstrap.servers` of the `Kafka` source. +Source is used to define where GrootStream needs to ingest data. Multiple sources can be defined in a job. The supported sources are listed in [Source Connectors](connector/source). Each source has its own specific parameters to define how to fetch data, and GrootStream also extracts the properties that each source will use, such as the `topic` and `kafka.bootstrap.servers` of the `Kafka` source. ## Filters +Filter operator is used to define the conditions for filtering data. Multiple filters can be defined in a job. The supported filters are listed in [Filters](filter). Each filter has its own specific parameters to define how to filter data, and GrootStream also extracts the properties that each filter will use, such as the `expression` of the `Aviator` filter. +Based on the filter expression, the event will be passed to downstream if the expression is true, otherwise it will be dropped. ## Processing Pipelines +Processing pipelines are used to define the event processing logic of the job. It can be categorized by functionality into stateless and stateful processors. Based processing order, it can be categorized into pre-processing pipeline, processing pipeline and post-processing pipeline. Each processor can assemble `UDFs`(User-defined functions) into a pipeline. The detail of processor is listed in [Processor](processor). ## Sinks +Sink is used to define where GrootStream needs to output data. Multiple sinks can be defined in a job. The supported sinks are listed in [Sink Connectors](connector/sink). Each sink has its own specific parameters to define how to output data, and GrootStream also extracts the properties that each sink will use, such as the `topic` and `kafka.bootstrap.servers` of the `Kafka` sink. ## Application Used to define some common parameters of the job and the topology of the job. such as the name of the job, the parallelism of the job, etc. The following configuration parameters are supported. |
