diff options
| author | lifengchao <[email protected]> | 2024-02-22 14:34:40 +0800 |
|---|---|---|
| committer | lifengchao <[email protected]> | 2024-02-22 14:34:40 +0800 |
| commit | bfb76be76da30fcd7ec189c671cdb26a15713863 (patch) | |
| tree | 82259a6dd0c84cace2bda63a5a6daa55bc86201e | |
| parent | 45d68199735d0c0d3ac460c1f594c22fbced5250 (diff) | |
[feature][connector-kafka] kafka sink增加log.failures.only配置控制log发送失败是否抛出异常
8 files changed, 133 insertions, 125 deletions
diff --git a/docs/connector/formats/json.md b/docs/connector/formats/json.md index 8be2959..073c888 100644 --- a/docs/connector/formats/json.md +++ b/docs/connector/formats/json.md @@ -12,7 +12,7 @@ Event serialization and deserialization format. | Name | Type | Required | Default | Description | |---------------------------|----------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | format | String | Yes | - | Specify what format to use, here should be 'json'. | -| json.ignore.parse.errors | Boolean | No | true | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors. | +| json.ignore.parse.errors | Boolean | No | false | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors. | # How to use ## Inline uses example diff --git a/docs/connector/formats/protobuf.md b/docs/connector/formats/protobuf.md index 2efbeff..9d685be 100644 --- a/docs/connector/formats/protobuf.md +++ b/docs/connector/formats/protobuf.md @@ -15,7 +15,7 @@ It is very popular in Streaming Data Pipeline. Now support protobuf format in so | format | String | Yes | - | Specify what format to use, here should be 'protobuf'. | | protobuf.descriptor.file.path | String | Yes | - | The descriptor file path. | | protobuf.message.name | String | Yes | - | The protobuf messageName to look for in descriptor file. | -| protobuf.ignore.parse.errors | Boolean | No | true | Protobuf ignore parse errors, otherwise will throw exception. | +| protobuf.ignore.parse.errors | Boolean | No | false | Protobuf ignore parse errors, otherwise will throw exception. | | protobuf.emit.default.values | Boolean | No | false | If true, default values will be emitted for missing fields. It is not recommended, because it will cause performance degradation. About basic data type, it is suggested to use `optional` instead of `required`. | ## Data Type Mapping diff --git a/docs/connector/sink/kafka.md b/docs/connector/sink/kafka.md index 04dae2f..56331bc 100644 --- a/docs/connector/sink/kafka.md +++ b/docs/connector/sink/kafka.md @@ -11,13 +11,14 @@ In order to use the Kafka connector, the following dependencies are required. Th Kafka sink custom properties. if properties belongs to Kafka Producer Config, you can use `kafka.` prefix to set. -| Name | Type | Required | Default | Description | -|-------------------------|--------|----------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------| -| topic | String | Yes | - | Topic name is required. It used to write data to kafka. | -| kafka.bootstrap.servers | String | Yes | - | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. This list should be in the form `host1:port1,host2:port2,...`. | -| format | String | No | json | Data format. The default value is `json`. The Optional values are `json`, `protobuf`. | -| [format].config | | No | - | Data format properties. Please refer to [Format Options](../formats) for details. | -| kafka.config | | No | - | Kafka producer properties. Please refer to [Kafka Producer Config](https://kafka.apache.org/documentation/#producerconfigs) for details. | +| Name | Type | Required | Default | Description | +|-------------------------|---------|----------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| topic | String | Yes | - | Topic name is required. It used to write data to kafka. | +| kafka.bootstrap.servers | String | Yes | - | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. This list should be in the form `host1:port1,host2:port2,...`. | +| log.failures.only | Boolean | No | true | Whether the producer should fail on errors, or only log them; If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown. | +| format | String | No | json | Data format. The default value is `json`. The Optional values are `json`, `protobuf`. | +| [format].config | | No | - | Data format properties. Please refer to [Format Options](../formats) for details. | +| kafka.config | | No | - | Kafka producer properties. Please refer to [Kafka Producer Config](https://kafka.apache.org/documentation/#producerconfigs) for details. | ## Example This example read data of inline test source and write to kafka topic `SESSION-RECORD-TEST`. diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptions.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptions.java index b265362..b827b42 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptions.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptions.java @@ -1,27 +1,33 @@ -package com.geedgenetworks.connectors.kafka; - -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; - -import java.util.List; - -import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX; - -public class KafkaConnectorOptions { - public static final ConfigOption<List<String>> TOPIC = - ConfigOptions.key("topic") - .stringType() - .asList() - .noDefaultValue() - .withDescription( - "Topic names from which the table is read."); - - public static final ConfigOption<String> PROPS_BOOTSTRAP_SERVERS = - ConfigOptions.key( PROPERTIES_PREFIX + "bootstrap.servers") - .stringType() - .noDefaultValue() - .withDescription("Required Kafka server connection string"); - - - -} +package com.geedgenetworks.connectors.kafka;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import java.util.List;
+
+import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
+
+public class KafkaConnectorOptions {
+ public static final ConfigOption<List<String>> TOPIC =
+ ConfigOptions.key("topic")
+ .stringType()
+ .asList()
+ .noDefaultValue()
+ .withDescription("Topic names from which the table is read.");
+
+ public static final ConfigOption<Boolean> LOG_FAILURES_ONLY =
+ ConfigOptions.key("log.failures.only")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Optional flag to whether the producer should fail on errors, or only log them;\n"
+ + "If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown, true by default.");
+
+ public static final ConfigOption<String> PROPS_BOOTSTRAP_SERVERS =
+ ConfigOptions.key( PROPERTIES_PREFIX + "bootstrap.servers")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Required Kafka server connection string");
+
+
+
+}
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java index d300650..a50c1e4 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java @@ -18,17 +18,20 @@ public class KafkaSinkProvider implements SinkProvider { private final String topic; private final Properties properties; + private final boolean logFailuresOnly; public KafkaSinkProvider( StructType dataType, EncodingFormat valueEncodingFormat, String topic, - Properties properties + Properties properties, + boolean logFailuresOnly ) { this.dataType = dataType; this.valueSerialization = valueEncodingFormat.createRuntimeEncoder(dataType); this.topic = topic; this.properties = properties; + this.logFailuresOnly = logFailuresOnly; } @Override @@ -39,7 +42,7 @@ public class KafkaSinkProvider implements SinkProvider { properties, Optional.empty() ); - kafkaProducer.setLogFailuresOnly(true); + kafkaProducer.setLogFailuresOnly(logFailuresOnly); return dataStream.addSink(kafkaProducer); } } diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java index 68d7f43..b26c161 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java @@ -1,79 +1,80 @@ -package com.geedgenetworks.connectors.kafka; - -import com.geedgenetworks.core.connector.format.DecodingFormat; -import com.geedgenetworks.core.connector.format.EncodingFormat; -import com.geedgenetworks.core.connector.sink.SinkProvider; -import com.geedgenetworks.core.connector.source.SourceProvider; -import com.geedgenetworks.core.factories.*; -import com.geedgenetworks.core.factories.FactoryUtil.TableFactoryHelper; -import com.geedgenetworks.core.types.StructType; -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ReadableConfig; - -import java.util.HashSet; -import java.util.List; -import java.util.Properties; -import java.util.Set; - -import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS; -import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptions.TOPIC; -import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX; -import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptionsUtil.getKafkaProperties; - -public class KafkaTableFactory implements SourceTableFactory, SinkTableFactory { - public static final String IDENTIFIER = "kafka"; - @Override - public String factoryIdentifier() { - return IDENTIFIER; - } - - @Override - public SourceProvider getSourceProvider(Context context) { - final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); - // 获取valueDecodingFormat - DecodingFormat valueDecodingFormat = helper.discoverDecodingFormat(DecodingFormatFactory.class, FactoryUtil.FORMAT); - - helper.validateExcept(PROPERTIES_PREFIX); // 校验参数,排除properties.参数 - - StructType physicalDataType = context.getPhysicalDataType(); // 列类型 - ReadableConfig config = context.getConfiguration(); - - List<String> topics = config.get(TOPIC); - final Properties properties = getKafkaProperties(context.getOptions()); - - return new KafkaSourceProvider(physicalDataType, valueDecodingFormat, topics, properties); - } - - @Override - public SinkProvider getSinkProvider(Context context) { - final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); - // 获取valueEncodingFormat - EncodingFormat valueEncodingFormat = helper.discoverEncodingFormat(EncodingFormatFactory.class, FactoryUtil.FORMAT); - - helper.validateExcept(PROPERTIES_PREFIX); // 校验参数,排除properties.参数 - - StructType dataType = context.getSchema(); - ReadableConfig config = context.getConfiguration(); - - String topic = config.get(TOPIC).get(0); - final Properties properties = getKafkaProperties(context.getOptions()); - - return new KafkaSinkProvider(dataType, valueEncodingFormat, topic, properties); - } - - @Override - public Set<ConfigOption<?>> requiredOptions() { - final Set<ConfigOption<?>> options = new HashSet<>(); - options.add(TOPIC); - options.add(FactoryUtil.FORMAT); - options.add(PROPS_BOOTSTRAP_SERVERS); - return options; - } - - @Override - public Set<ConfigOption<?>> optionalOptions() { - final Set<ConfigOption<?>> options = new HashSet<>(); - return options; - } - -} +package com.geedgenetworks.connectors.kafka;
+
+import com.geedgenetworks.core.connector.format.DecodingFormat;
+import com.geedgenetworks.core.connector.format.EncodingFormat;
+import com.geedgenetworks.core.connector.sink.SinkProvider;
+import com.geedgenetworks.core.connector.source.SourceProvider;
+import com.geedgenetworks.core.factories.*;
+import com.geedgenetworks.core.factories.FactoryUtil.TableFactoryHelper;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptions.*;
+import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
+import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptionsUtil.getKafkaProperties;
+
+public class KafkaTableFactory implements SourceTableFactory, SinkTableFactory {
+ public static final String IDENTIFIER = "kafka";
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public SourceProvider getSourceProvider(Context context) {
+ final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ // 获取valueDecodingFormat
+ DecodingFormat valueDecodingFormat = helper.discoverDecodingFormat(DecodingFormatFactory.class, FactoryUtil.FORMAT);
+
+ helper.validateExcept(PROPERTIES_PREFIX); // 校验参数,排除properties.参数
+
+ StructType physicalDataType = context.getPhysicalDataType(); // 列类型
+ ReadableConfig config = context.getConfiguration();
+
+ List<String> topics = config.get(TOPIC);
+ final Properties properties = getKafkaProperties(context.getOptions());
+
+ return new KafkaSourceProvider(physicalDataType, valueDecodingFormat, topics, properties);
+ }
+
+ @Override
+ public SinkProvider getSinkProvider(Context context) {
+ final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ // 获取valueEncodingFormat
+ EncodingFormat valueEncodingFormat = helper.discoverEncodingFormat(EncodingFormatFactory.class, FactoryUtil.FORMAT);
+
+ helper.validateExcept(PROPERTIES_PREFIX); // 校验参数,排除properties.参数
+
+ StructType dataType = context.getSchema();
+ ReadableConfig config = context.getConfiguration();
+
+ String topic = config.get(TOPIC).get(0);
+ boolean logFailuresOnly = config.get(LOG_FAILURES_ONLY);
+ final Properties properties = getKafkaProperties(context.getOptions());
+
+ return new KafkaSinkProvider(dataType, valueEncodingFormat, topic, properties, logFailuresOnly);
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(TOPIC);
+ options.add(FactoryUtil.FORMAT);
+ options.add(PROPS_BOOTSTRAP_SERVERS);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(LOG_FAILURES_ONLY);
+ return options;
+ }
+
+}
diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatOptions.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatOptions.java index fa7a486..76908a1 100644 --- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatOptions.java +++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatOptions.java @@ -8,8 +8,7 @@ public class JsonFormatOptions { ConfigOptions.key("ignore.parse.errors")
.booleanType()
.defaultValue(false)
- .withDescription(
- "Optional flag to skip fields and rows with parse errors instead of failing;\n"
- + "fields are set to null in case of errors, true by default.");
+ .withDescription("Optional flag to skip fields and rows with parse errors instead of failing;\n"
+ + "fields are set to null in case of errors, false by default.");
}
diff --git a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufOptions.java b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufOptions.java index b29091a..bd9c64a 100644 --- a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufOptions.java +++ b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufOptions.java @@ -20,16 +20,14 @@ public class ProtobufOptions { ConfigOptions.key("ignore.parse.errors")
.booleanType()
.defaultValue(false)
- .withDescription(
- "Optional flag to skip fields and rows with parse errors instead of failing;\n"
- + "fields are set to null in case of errors, true by default.");
+ .withDescription("Optional flag to skip fields and rows with parse errors instead of failing;\n"
+ + "fields are set to null in case of errors, false by default.");
public static final ConfigOption<Boolean> EMIT_DEFAULT_VALUES =
ConfigOptions.key("emit.default.values")
.booleanType()
.defaultValue(false)
- .withDescription(
- "Optional flag for whether to render fields with zero values when deserializing Protobuf to struct.\n"
+ .withDescription("Optional flag for whether to render fields with zero values when deserializing Protobuf to struct.\n"
+ "When a field is empty in the serialized Protobuf, this library will deserialize them as null by default. However, this flag can control whether to render the type-specific zero value.\n"
+ "This operates similarly to includingDefaultValues in protobuf-java-util's JsonFormat, or emitDefaults in golang/protobuf's jsonpb.");
}
|
