summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-02-22 19:42:36 +0800
committerdoufenghu <[email protected]>2024-02-22 19:42:36 +0800
commite0e35051f46744beba606bd5aea179a39e54665d (patch)
tree3c74ff81eff1405f9fa0eb9de90cc03ab3a00147
parent8473d8e895066eacbb09fa7a0d8c3dde72d236bc (diff)
[Improve][Example] Add default properties for job template.
-rw-r--r--config/template/grootstream_job_template.yaml13
-rw-r--r--docs/connector/formats/json.md8
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml27
3 files changed, 40 insertions, 8 deletions
diff --git a/config/template/grootstream_job_template.yaml b/config/template/grootstream_job_template.yaml
index 1be75da..0cb39ad 100644
--- a/config/template/grootstream_job_template.yaml
+++ b/config/template/grootstream_job_template.yaml
@@ -17,6 +17,8 @@ sources: # [object] Define connector source
kafka.group.id: SESSION-RECORD-GROUP-GROOT-STREAM-001 # [string] Kafka Group ID for Consumer
kafka.auto.offset.reset: latest # [string] Kafka Auto Offset Reset, default is latest
format: json # [string] Data Format for Source. eg. json, protobuf, etc.
+ json.ignore.parse.errors: false # [boolean] Flag to ignore parse errors, default will record the parse errors. If set true, it will ignore the parse errors.
+
inline_source: # [object] Inline source connector name, must be unique. It used to define the source node of the job topology.
type: inline
@@ -263,12 +265,14 @@ sinks: # [object] Define connector sink
kafka.max.request.size: 10485760
kafka.compression.type: snappy
format: json
+ json.ignore.parse.errors: false
+ log.failures.only: true
kafka_sink_b:
type: kafka
properties:
topic: SESSION-RECORD-B
- kafka.bootstrap.servers: 127.0.0.1:9092
+ kafka.bootstrap.servers: 127.0.0.1:9094
kafka.retries: 0
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
@@ -276,7 +280,12 @@ sinks: # [object] Define connector sink
kafka.buffer.memory: 134217728
kafka.max.request.size: 10485760
kafka.compression.type: snappy
+ kafka.security.protocol: SASL_PLAINTEXT
+ kafka.sasl.mechanism: PLAIN
+ kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456";
format: json
+ json.ignore.parse.errors: false
+ log.failures.only: true
clickhouse_sink: # [object] ClickHouse sink connector name, must be unique. It used to define the sink node of the job topology.
type: clickhouse
@@ -304,7 +313,7 @@ application: # [object] Application Configuration
env: # [object] Define job runtime environment variables
name: inline-to-print-job # [string] Job Name
parallelism: 3 # [number] Job-Level Parallelism
- shade.identifier: default # [string] Shade Identifier, Using to encrypt and decrypt sensitive configuration. if set default, it will not encrypt and decrypt sensitive configuration.
+ shade.identifier: default # [string] Shade Identifier, Using to encrypt and decrypt sensitive configuration. Support enum: default, aes, base64. if set default, it will not encrypt and decrypt sensitive configuration.
pipeline:
object-reuse: true # [boolean] Object Reuse, default is false
topology: # [array of object] Node List. It will be used build data flow for job dag graph.
diff --git a/docs/connector/formats/json.md b/docs/connector/formats/json.md
index 073c888..a87afd0 100644
--- a/docs/connector/formats/json.md
+++ b/docs/connector/formats/json.md
@@ -9,10 +9,10 @@ Event serialization and deserialization format.
## Format Options
-| Name | Type | Required | Default | Description |
-|---------------------------|----------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| format | String | Yes | - | Specify what format to use, here should be 'json'. |
-| json.ignore.parse.errors | Boolean | No | false | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors. |
+| Name | Type | Required | Default | Description |
+|---------------------------|----------|----------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------|
+| format | String | Yes | - | Specify what format to use, here should be 'json'. |
+| json.ignore.parse.errors | Boolean | No | false | Skip the parsing error or throw an exception. If set to true, the parsing error will be ignored. If set to false, the parsing error will be thrown. |
# How to use
## Inline uses example
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml
index 44355c7..a70f588 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml
@@ -31,6 +31,24 @@ sources: # [object] Define connector source
json.ignore.parse.errors: false
sinks:
+ failure_connector_kafka:
+ type: kafka
+ properties:
+ topic: SESSION-RECORD-TEST
+ kafka.bootstrap.servers: 192.168.44.121:9094
+ kafka.retries: 0
+ kafka.linger.ms: 10
+ kafka.request.timeout.ms: 30000
+ kafka.batch.size: 262144
+ kafka.buffer.memory: 134217728
+ kafka.max.request.size: 10485760
+ kafka.compression.type: snappy
+ kafka.security.protocol: SASL_PLAINTEXT
+ kafka.sasl.mechanism: PLAIN
+ kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
+ format: json
+ log.failures.only: true
+
connector_kafka:
type: kafka
properties:
@@ -47,6 +65,7 @@ sinks:
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
format: json
+ log.failures.only: true
application: # [object] Define job configuration
env:
@@ -57,6 +76,10 @@ application: # [object] Define job configuration
object-reuse: true
topology:
- name: inline_source
- downstream: [connector_kafka]
+ downstream: [failure_connector_kafka]
+ - name: inline_source
+ downstream: [ connector_kafka ]
- name: connector_kafka
- downstream: [] \ No newline at end of file
+ downstream: []
+ - name: failure_connector_kafka
+ downstream: [ ] \ No newline at end of file