diff options
| -rw-r--r-- | groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptionsUtil.java | 70 |
1 files changed, 33 insertions, 37 deletions
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptionsUtil.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptionsUtil.java index 6960610..48734ea 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptionsUtil.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptionsUtil.java @@ -1,37 +1,33 @@ -package com.geedgenetworks.connectors.kafka; - -import org.apache.commons.lang3.StringUtils; - -import java.util.Map; -import java.util.Properties; - -public class KafkaConnectorOptionsUtil { - - public static final String PROPERTIES_PREFIX = "kafka."; - - public static Properties getKafkaProperties(Map<String, String> tableOptions) { - final Properties kafkaProperties = new Properties(); - - if (hasKafkaClientProperties(tableOptions)) { - tableOptions.keySet().stream() - .filter(key -> key.startsWith(PROPERTIES_PREFIX)) - .forEach( - key -> { - final String value = tableOptions.get(key); - final String subKey = key.substring((PROPERTIES_PREFIX).length()); - if(!StringUtils.isBlank(value)){ - kafkaProperties.put(subKey, value); - } - }); - } - return kafkaProperties; - } - - /** - * Decides if the table options contains Kafka client properties that start with prefix - * 'properties'. - */ - private static boolean hasKafkaClientProperties(Map<String, String> tableOptions) { - return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX)); - } -} +package com.geedgenetworks.connectors.kafka;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class KafkaConnectorOptionsUtil {
+
+ public static final String PROPERTIES_PREFIX = "kafka.";
+
+ public static Properties getKafkaProperties(Map<String, String> tableOptions) {
+ final Properties kafkaProperties = new Properties();
+
+ if (hasKafkaClientProperties(tableOptions)) {
+ tableOptions.keySet().stream()
+ .filter(key -> key.startsWith(PROPERTIES_PREFIX))
+ .forEach(
+ key -> {
+ final String value = tableOptions.get(key);
+ final String subKey = key.substring((PROPERTIES_PREFIX).length());
+ kafkaProperties.put(subKey, value);
+ });
+ }
+ return kafkaProperties;
+ }
+
+ /**
+ * Decides if the table options contains Kafka client properties that start with prefix
+ * 'properties'.
+ */
+ private static boolean hasKafkaClientProperties(Map<String, String> tableOptions) {
+ return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
+ }
+}
|
