summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2024-02-22 14:34:40 +0800
committerlifengchao <[email protected]>2024-02-22 14:34:40 +0800
commitbfb76be76da30fcd7ec189c671cdb26a15713863 (patch)
tree82259a6dd0c84cace2bda63a5a6daa55bc86201e
parent45d68199735d0c0d3ac460c1f594c22fbced5250 (diff)
[feature][connector-kafka] kafka sink增加log.failures.only配置控制log发送失败是否抛出异常
-rw-r--r--docs/connector/formats/json.md2
-rw-r--r--docs/connector/formats/protobuf.md2
-rw-r--r--docs/connector/sink/kafka.md15
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptions.java60
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java7
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java159
-rw-r--r--groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatOptions.java5
-rw-r--r--groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufOptions.java8
8 files changed, 133 insertions, 125 deletions
diff --git a/docs/connector/formats/json.md b/docs/connector/formats/json.md
index 8be2959..073c888 100644
--- a/docs/connector/formats/json.md
+++ b/docs/connector/formats/json.md
@@ -12,7 +12,7 @@ Event serialization and deserialization format.
| Name | Type | Required | Default | Description |
|---------------------------|----------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| format | String | Yes | - | Specify what format to use, here should be 'json'. |
-| json.ignore.parse.errors | Boolean | No | true | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors. |
+| json.ignore.parse.errors | Boolean | No | false | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors. |
# How to use
## Inline uses example
diff --git a/docs/connector/formats/protobuf.md b/docs/connector/formats/protobuf.md
index 2efbeff..9d685be 100644
--- a/docs/connector/formats/protobuf.md
+++ b/docs/connector/formats/protobuf.md
@@ -15,7 +15,7 @@ It is very popular in Streaming Data Pipeline. Now support protobuf format in so
| format | String | Yes | - | Specify what format to use, here should be 'protobuf'. |
| protobuf.descriptor.file.path | String | Yes | - | The descriptor file path. |
| protobuf.message.name | String | Yes | - | The protobuf messageName to look for in descriptor file. |
-| protobuf.ignore.parse.errors | Boolean | No | true | Protobuf ignore parse errors, otherwise will throw exception. |
+| protobuf.ignore.parse.errors | Boolean | No | false | Protobuf ignore parse errors, otherwise will throw exception. |
| protobuf.emit.default.values | Boolean | No | false | If true, default values will be emitted for missing fields. It is not recommended, because it will cause performance degradation. About basic data type, it is suggested to use `optional` instead of `required`. |
## Data Type Mapping
diff --git a/docs/connector/sink/kafka.md b/docs/connector/sink/kafka.md
index 04dae2f..56331bc 100644
--- a/docs/connector/sink/kafka.md
+++ b/docs/connector/sink/kafka.md
@@ -11,13 +11,14 @@ In order to use the Kafka connector, the following dependencies are required. Th
Kafka sink custom properties. if properties belongs to Kafka Producer Config, you can use `kafka.` prefix to set.
-| Name | Type | Required | Default | Description |
-|-------------------------|--------|----------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| topic | String | Yes | - | Topic name is required. It used to write data to kafka. |
-| kafka.bootstrap.servers | String | Yes | - | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. This list should be in the form `host1:port1,host2:port2,...`. |
-| format | String | No | json | Data format. The default value is `json`. The Optional values are `json`, `protobuf`. |
-| [format].config | | No | - | Data format properties. Please refer to [Format Options](../formats) for details. |
-| kafka.config | | No | - | Kafka producer properties. Please refer to [Kafka Producer Config](https://kafka.apache.org/documentation/#producerconfigs) for details. |
+| Name | Type | Required | Default | Description |
+|-------------------------|---------|----------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| topic | String | Yes | - | Topic name is required. It used to write data to kafka. |
+| kafka.bootstrap.servers | String | Yes | - | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. This list should be in the form `host1:port1,host2:port2,...`. |
+| log.failures.only | Boolean | No | true | Whether the producer should fail on errors, or only log them; If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown. |
+| format | String | No | json | Data format. The default value is `json`. The Optional values are `json`, `protobuf`. |
+| [format].config | | No | - | Data format properties. Please refer to [Format Options](../formats) for details. |
+| kafka.config | | No | - | Kafka producer properties. Please refer to [Kafka Producer Config](https://kafka.apache.org/documentation/#producerconfigs) for details. |
## Example
This example read data of inline test source and write to kafka topic `SESSION-RECORD-TEST`.
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptions.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptions.java
index b265362..b827b42 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptions.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptions.java
@@ -1,27 +1,33 @@
-package com.geedgenetworks.connectors.kafka;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-
-import java.util.List;
-
-import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
-
-public class KafkaConnectorOptions {
- public static final ConfigOption<List<String>> TOPIC =
- ConfigOptions.key("topic")
- .stringType()
- .asList()
- .noDefaultValue()
- .withDescription(
- "Topic names from which the table is read.");
-
- public static final ConfigOption<String> PROPS_BOOTSTRAP_SERVERS =
- ConfigOptions.key( PROPERTIES_PREFIX + "bootstrap.servers")
- .stringType()
- .noDefaultValue()
- .withDescription("Required Kafka server connection string");
-
-
-
-}
+package com.geedgenetworks.connectors.kafka;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import java.util.List;
+
+import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
+
+public class KafkaConnectorOptions {
+ public static final ConfigOption<List<String>> TOPIC =
+ ConfigOptions.key("topic")
+ .stringType()
+ .asList()
+ .noDefaultValue()
+ .withDescription("Topic names from which the table is read.");
+
+ public static final ConfigOption<Boolean> LOG_FAILURES_ONLY =
+ ConfigOptions.key("log.failures.only")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Optional flag to whether the producer should fail on errors, or only log them;\n"
+ + "If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown, true by default.");
+
+ public static final ConfigOption<String> PROPS_BOOTSTRAP_SERVERS =
+ ConfigOptions.key( PROPERTIES_PREFIX + "bootstrap.servers")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Required Kafka server connection string");
+
+
+
+}
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java
index d300650..a50c1e4 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java
@@ -18,17 +18,20 @@ public class KafkaSinkProvider implements SinkProvider {
private final String topic;
private final Properties properties;
+ private final boolean logFailuresOnly;
public KafkaSinkProvider(
StructType dataType,
EncodingFormat valueEncodingFormat,
String topic,
- Properties properties
+ Properties properties,
+ boolean logFailuresOnly
) {
this.dataType = dataType;
this.valueSerialization = valueEncodingFormat.createRuntimeEncoder(dataType);
this.topic = topic;
this.properties = properties;
+ this.logFailuresOnly = logFailuresOnly;
}
@Override
@@ -39,7 +42,7 @@ public class KafkaSinkProvider implements SinkProvider {
properties,
Optional.empty()
);
- kafkaProducer.setLogFailuresOnly(true);
+ kafkaProducer.setLogFailuresOnly(logFailuresOnly);
return dataStream.addSink(kafkaProducer);
}
}
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java
index 68d7f43..b26c161 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java
@@ -1,79 +1,80 @@
-package com.geedgenetworks.connectors.kafka;
-
-import com.geedgenetworks.core.connector.format.DecodingFormat;
-import com.geedgenetworks.core.connector.format.EncodingFormat;
-import com.geedgenetworks.core.connector.sink.SinkProvider;
-import com.geedgenetworks.core.connector.source.SourceProvider;
-import com.geedgenetworks.core.factories.*;
-import com.geedgenetworks.core.factories.FactoryUtil.TableFactoryHelper;
-import com.geedgenetworks.core.types.StructType;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ReadableConfig;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS;
-import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptions.TOPIC;
-import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
-import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptionsUtil.getKafkaProperties;
-
-public class KafkaTableFactory implements SourceTableFactory, SinkTableFactory {
- public static final String IDENTIFIER = "kafka";
- @Override
- public String factoryIdentifier() {
- return IDENTIFIER;
- }
-
- @Override
- public SourceProvider getSourceProvider(Context context) {
- final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
- // 获取valueDecodingFormat
- DecodingFormat valueDecodingFormat = helper.discoverDecodingFormat(DecodingFormatFactory.class, FactoryUtil.FORMAT);
-
- helper.validateExcept(PROPERTIES_PREFIX); // 校验参数,排除properties.参数
-
- StructType physicalDataType = context.getPhysicalDataType(); // 列类型
- ReadableConfig config = context.getConfiguration();
-
- List<String> topics = config.get(TOPIC);
- final Properties properties = getKafkaProperties(context.getOptions());
-
- return new KafkaSourceProvider(physicalDataType, valueDecodingFormat, topics, properties);
- }
-
- @Override
- public SinkProvider getSinkProvider(Context context) {
- final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
- // 获取valueEncodingFormat
- EncodingFormat valueEncodingFormat = helper.discoverEncodingFormat(EncodingFormatFactory.class, FactoryUtil.FORMAT);
-
- helper.validateExcept(PROPERTIES_PREFIX); // 校验参数,排除properties.参数
-
- StructType dataType = context.getSchema();
- ReadableConfig config = context.getConfiguration();
-
- String topic = config.get(TOPIC).get(0);
- final Properties properties = getKafkaProperties(context.getOptions());
-
- return new KafkaSinkProvider(dataType, valueEncodingFormat, topic, properties);
- }
-
- @Override
- public Set<ConfigOption<?>> requiredOptions() {
- final Set<ConfigOption<?>> options = new HashSet<>();
- options.add(TOPIC);
- options.add(FactoryUtil.FORMAT);
- options.add(PROPS_BOOTSTRAP_SERVERS);
- return options;
- }
-
- @Override
- public Set<ConfigOption<?>> optionalOptions() {
- final Set<ConfigOption<?>> options = new HashSet<>();
- return options;
- }
-
-}
+package com.geedgenetworks.connectors.kafka;
+
+import com.geedgenetworks.core.connector.format.DecodingFormat;
+import com.geedgenetworks.core.connector.format.EncodingFormat;
+import com.geedgenetworks.core.connector.sink.SinkProvider;
+import com.geedgenetworks.core.connector.source.SourceProvider;
+import com.geedgenetworks.core.factories.*;
+import com.geedgenetworks.core.factories.FactoryUtil.TableFactoryHelper;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptions.*;
+import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
+import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptionsUtil.getKafkaProperties;
+
+public class KafkaTableFactory implements SourceTableFactory, SinkTableFactory {
+ public static final String IDENTIFIER = "kafka";
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public SourceProvider getSourceProvider(Context context) {
+ final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ // 获取valueDecodingFormat
+ DecodingFormat valueDecodingFormat = helper.discoverDecodingFormat(DecodingFormatFactory.class, FactoryUtil.FORMAT);
+
+ helper.validateExcept(PROPERTIES_PREFIX); // 校验参数,排除properties.参数
+
+ StructType physicalDataType = context.getPhysicalDataType(); // 列类型
+ ReadableConfig config = context.getConfiguration();
+
+ List<String> topics = config.get(TOPIC);
+ final Properties properties = getKafkaProperties(context.getOptions());
+
+ return new KafkaSourceProvider(physicalDataType, valueDecodingFormat, topics, properties);
+ }
+
+ @Override
+ public SinkProvider getSinkProvider(Context context) {
+ final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ // 获取valueEncodingFormat
+ EncodingFormat valueEncodingFormat = helper.discoverEncodingFormat(EncodingFormatFactory.class, FactoryUtil.FORMAT);
+
+ helper.validateExcept(PROPERTIES_PREFIX); // 校验参数,排除properties.参数
+
+ StructType dataType = context.getSchema();
+ ReadableConfig config = context.getConfiguration();
+
+ String topic = config.get(TOPIC).get(0);
+ boolean logFailuresOnly = config.get(LOG_FAILURES_ONLY);
+ final Properties properties = getKafkaProperties(context.getOptions());
+
+ return new KafkaSinkProvider(dataType, valueEncodingFormat, topic, properties, logFailuresOnly);
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(TOPIC);
+ options.add(FactoryUtil.FORMAT);
+ options.add(PROPS_BOOTSTRAP_SERVERS);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(LOG_FAILURES_ONLY);
+ return options;
+ }
+
+}
diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatOptions.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatOptions.java
index fa7a486..76908a1 100644
--- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatOptions.java
+++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatOptions.java
@@ -8,8 +8,7 @@ public class JsonFormatOptions {
ConfigOptions.key("ignore.parse.errors")
.booleanType()
.defaultValue(false)
- .withDescription(
- "Optional flag to skip fields and rows with parse errors instead of failing;\n"
- + "fields are set to null in case of errors, true by default.");
+ .withDescription("Optional flag to skip fields and rows with parse errors instead of failing;\n"
+ + "fields are set to null in case of errors, false by default.");
}
diff --git a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufOptions.java b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufOptions.java
index b29091a..bd9c64a 100644
--- a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufOptions.java
+++ b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufOptions.java
@@ -20,16 +20,14 @@ public class ProtobufOptions {
ConfigOptions.key("ignore.parse.errors")
.booleanType()
.defaultValue(false)
- .withDescription(
- "Optional flag to skip fields and rows with parse errors instead of failing;\n"
- + "fields are set to null in case of errors, true by default.");
+ .withDescription("Optional flag to skip fields and rows with parse errors instead of failing;\n"
+ + "fields are set to null in case of errors, false by default.");
public static final ConfigOption<Boolean> EMIT_DEFAULT_VALUES =
ConfigOptions.key("emit.default.values")
.booleanType()
.defaultValue(false)
- .withDescription(
- "Optional flag for whether to render fields with zero values when deserializing Protobuf to struct.\n"
+ .withDescription("Optional flag for whether to render fields with zero values when deserializing Protobuf to struct.\n"
+ "When a field is empty in the serialized Protobuf, this library will deserialize them as null by default. However, this flag can control whether to render the type-specific zero value.\n"
+ "This operates similarly to includingDefaultValues in protobuf-java-util's JsonFormat, or emitDefaults in golang/protobuf's jsonpb.");
}