summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptionsUtil.java70
1 files changed, 33 insertions, 37 deletions
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptionsUtil.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptionsUtil.java
index 6960610..48734ea 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptionsUtil.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptionsUtil.java
@@ -1,37 +1,33 @@
-package com.geedgenetworks.connectors.kafka;
-
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.Map;
-import java.util.Properties;
-
-public class KafkaConnectorOptionsUtil {
-
- public static final String PROPERTIES_PREFIX = "kafka.";
-
- public static Properties getKafkaProperties(Map<String, String> tableOptions) {
- final Properties kafkaProperties = new Properties();
-
- if (hasKafkaClientProperties(tableOptions)) {
- tableOptions.keySet().stream()
- .filter(key -> key.startsWith(PROPERTIES_PREFIX))
- .forEach(
- key -> {
- final String value = tableOptions.get(key);
- final String subKey = key.substring((PROPERTIES_PREFIX).length());
- if(!StringUtils.isBlank(value)){
- kafkaProperties.put(subKey, value);
- }
- });
- }
- return kafkaProperties;
- }
-
- /**
- * Decides if the table options contains Kafka client properties that start with prefix
- * 'properties'.
- */
- private static boolean hasKafkaClientProperties(Map<String, String> tableOptions) {
- return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
- }
-}
+package com.geedgenetworks.connectors.kafka;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class KafkaConnectorOptionsUtil {
+
+ public static final String PROPERTIES_PREFIX = "kafka.";
+
+ public static Properties getKafkaProperties(Map<String, String> tableOptions) {
+ final Properties kafkaProperties = new Properties();
+
+ if (hasKafkaClientProperties(tableOptions)) {
+ tableOptions.keySet().stream()
+ .filter(key -> key.startsWith(PROPERTIES_PREFIX))
+ .forEach(
+ key -> {
+ final String value = tableOptions.get(key);
+ final String subKey = key.substring((PROPERTIES_PREFIX).length());
+ kafkaProperties.put(subKey, value);
+ });
+ }
+ return kafkaProperties;
+ }
+
+ /**
+ * Decides if the table options contains Kafka client properties that start with prefix
+ * 'properties'.
+ */
+ private static boolean hasKafkaClientProperties(Map<String, String> tableOptions) {
+ return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
+ }
+}