From ebe09ebbe46965fce40e2ae002b052a082d85e0f Mon Sep 17 00:00:00 2001 From: lifengchao Date: Sun, 7 Apr 2024 15:46:11 +0800 Subject: [improve] [connector-kafka] properties删除过滤空值的逻辑 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kafka/KafkaConnectorOptionsUtil.java | 70 ++++++++++------------ 1 file changed, 33 insertions(+), 37 deletions(-) diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptionsUtil.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptionsUtil.java index 6960610..48734ea 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptionsUtil.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptionsUtil.java @@ -1,37 +1,33 @@ -package com.geedgenetworks.connectors.kafka; - -import org.apache.commons.lang3.StringUtils; - -import java.util.Map; -import java.util.Properties; - -public class KafkaConnectorOptionsUtil { - - public static final String PROPERTIES_PREFIX = "kafka."; - - public static Properties getKafkaProperties(Map tableOptions) { - final Properties kafkaProperties = new Properties(); - - if (hasKafkaClientProperties(tableOptions)) { - tableOptions.keySet().stream() - .filter(key -> key.startsWith(PROPERTIES_PREFIX)) - .forEach( - key -> { - final String value = tableOptions.get(key); - final String subKey = key.substring((PROPERTIES_PREFIX).length()); - if(!StringUtils.isBlank(value)){ - kafkaProperties.put(subKey, value); - } - }); - } - return kafkaProperties; - } - - /** - * Decides if the table options contains Kafka client properties that start with prefix - * 'properties'. - */ - private static boolean hasKafkaClientProperties(Map tableOptions) { - return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX)); - } -} +package com.geedgenetworks.connectors.kafka; + +import java.util.Map; +import java.util.Properties; + +public class KafkaConnectorOptionsUtil { + + public static final String PROPERTIES_PREFIX = "kafka."; + + public static Properties getKafkaProperties(Map tableOptions) { + final Properties kafkaProperties = new Properties(); + + if (hasKafkaClientProperties(tableOptions)) { + tableOptions.keySet().stream() + .filter(key -> key.startsWith(PROPERTIES_PREFIX)) + .forEach( + key -> { + final String value = tableOptions.get(key); + final String subKey = key.substring((PROPERTIES_PREFIX).length()); + kafkaProperties.put(subKey, value); + }); + } + return kafkaProperties; + } + + /** + * Decides if the table options contains Kafka client properties that start with prefix + * 'properties'. + */ + private static boolean hasKafkaClientProperties(Map tableOptions) { + return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX)); + } +} -- cgit v1.2.3