diff options
| author | doufenghu <[email protected]> | 2024-02-22 19:42:36 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-02-22 19:42:36 +0800 |
| commit | e0e35051f46744beba606bd5aea179a39e54665d (patch) | |
| tree | 3c74ff81eff1405f9fa0eb9de90cc03ab3a00147 | |
| parent | 8473d8e895066eacbb09fa7a0d8c3dde72d236bc (diff) | |
[Improve][Example] Add default properties for job template.
| -rw-r--r-- | config/template/grootstream_job_template.yaml | 13 | ||||
| -rw-r--r-- | docs/connector/formats/json.md | 8 | ||||
| -rw-r--r-- | groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml | 27 |
3 files changed, 40 insertions, 8 deletions
diff --git a/config/template/grootstream_job_template.yaml b/config/template/grootstream_job_template.yaml index 1be75da..0cb39ad 100644 --- a/config/template/grootstream_job_template.yaml +++ b/config/template/grootstream_job_template.yaml @@ -17,6 +17,8 @@ sources: # [object] Define connector source kafka.group.id: SESSION-RECORD-GROUP-GROOT-STREAM-001 # [string] Kafka Group ID for Consumer kafka.auto.offset.reset: latest # [string] Kafka Auto Offset Reset, default is latest format: json # [string] Data Format for Source. eg. json, protobuf, etc. + json.ignore.parse.errors: false # [boolean] Flag to ignore parse errors, default will record the parse errors. If set true, it will ignore the parse errors. + inline_source: # [object] Inline source connector name, must be unique. It used to define the source node of the job topology. type: inline @@ -263,12 +265,14 @@ sinks: # [object] Define connector sink kafka.max.request.size: 10485760 kafka.compression.type: snappy format: json + json.ignore.parse.errors: false + log.failures.only: true kafka_sink_b: type: kafka properties: topic: SESSION-RECORD-B - kafka.bootstrap.servers: 127.0.0.1:9092 + kafka.bootstrap.servers: 127.0.0.1:9094 kafka.retries: 0 kafka.linger.ms: 10 kafka.request.timeout.ms: 30000 @@ -276,7 +280,12 @@ sinks: # [object] Define connector sink kafka.buffer.memory: 134217728 kafka.max.request.size: 10485760 kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456"; format: json + json.ignore.parse.errors: false + log.failures.only: true clickhouse_sink: # [object] ClickHouse sink connector name, must be unique. It used to define the sink node of the job topology. type: clickhouse @@ -304,7 +313,7 @@ application: # [object] Application Configuration env: # [object] Define job runtime environment variables name: inline-to-print-job # [string] Job Name parallelism: 3 # [number] Job-Level Parallelism - shade.identifier: default # [string] Shade Identifier, Using to encrypt and decrypt sensitive configuration. if set default, it will not encrypt and decrypt sensitive configuration. + shade.identifier: default # [string] Shade Identifier, Using to encrypt and decrypt sensitive configuration. Support enum: default, aes, base64. if set default, it will not encrypt and decrypt sensitive configuration. pipeline: object-reuse: true # [boolean] Object Reuse, default is false topology: # [array of object] Node List. It will be used build data flow for job dag graph. diff --git a/docs/connector/formats/json.md b/docs/connector/formats/json.md index 073c888..a87afd0 100644 --- a/docs/connector/formats/json.md +++ b/docs/connector/formats/json.md @@ -9,10 +9,10 @@ Event serialization and deserialization format. ## Format Options -| Name | Type | Required | Default | Description | -|---------------------------|----------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| format | String | Yes | - | Specify what format to use, here should be 'json'. | -| json.ignore.parse.errors | Boolean | No | false | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors. | +| Name | Type | Required | Default | Description | +|---------------------------|----------|----------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------| +| format | String | Yes | - | Specify what format to use, here should be 'json'. | +| json.ignore.parse.errors | Boolean | No | false | Skip the parsing error or throw an exception. If set to true, the parsing error will be ignored. If set to false, the parsing error will be thrown. | # How to use ## Inline uses example diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml index 44355c7..a70f588 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml @@ -31,6 +31,24 @@ sources: # [object] Define connector source json.ignore.parse.errors: false sinks: + failure_connector_kafka: + type: kafka + properties: + topic: SESSION-RECORD-TEST + kafka.bootstrap.servers: 192.168.44.121:9094 + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019"; + format: json + log.failures.only: true + connector_kafka: type: kafka properties: @@ -47,6 +65,7 @@ sinks: kafka.sasl.mechanism: PLAIN kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019"; format: json + log.failures.only: true application: # [object] Define job configuration env: @@ -57,6 +76,10 @@ application: # [object] Define job configuration object-reuse: true topology: - name: inline_source - downstream: [connector_kafka] + downstream: [failure_connector_kafka] + - name: inline_source + downstream: [ connector_kafka ] - name: connector_kafka - downstream: []
\ No newline at end of file + downstream: [] + - name: failure_connector_kafka + downstream: [ ]
\ No newline at end of file |
