diff options
| author | lifengchao <[email protected]> | 2023-12-29 11:18:01 +0800 |
|---|---|---|
| committer | lifengchao <[email protected]> | 2023-12-29 11:18:01 +0800 |
| commit | 49c48740354f71d0dcc5ebf302153362836ab69c (patch) | |
| tree | e304688cffe4025cdfefa8bc80e43fd5f15058fc /groot-connectors | |
| parent | 879697c2f727e4bae7d66fbf42e06ba83e03cd9f (diff) | |
[improve] [connector-kafka] 删除不引用的代码
Diffstat (limited to 'groot-connectors')
3 files changed, 0 insertions, 141 deletions
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java index 4a98148..c3c19c3 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java @@ -40,9 +40,6 @@ public class KafkaSourceProvider implements SourceProvider { properties ); - kafkaConsumer.setCommitOffsetsOnCheckpoints(true); - kafkaConsumer.setStartFromGroupOffsets(); - return env.addSource(kafkaConsumer); } diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/util/KafkaUtils.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/util/KafkaUtils.java deleted file mode 100644 index ea5f8ba..0000000 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/util/KafkaUtils.java +++ /dev/null @@ -1,92 +0,0 @@ -package com.geedgenetworks.connectors.kafka.util; - -import org.apache.flink.api.common.serialization.SimpleStringSchema; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; - -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; - -public class KafkaUtils { - - public static Properties getKafkaSinkProperty(Map<String, Object> kafkaconfig) { - Properties properties = new Properties(); - for (Map.Entry<String, Object> entry : kafkaconfig.entrySet()) { - - if (entry.getKey().contains("kafka.") - && !"".equals(entry.getValue()) - && entry.getValue() != null) { - String key = entry.getKey().replace("kafka.", ""); - properties.put(key, entry.getValue()); - } - } - - return properties; - } - - public static Properties getKafkaSourceProperty(Map<String, Object> kafkaconfig) { - Properties properties = new Properties(); - - for (Map.Entry<String, Object> entry : kafkaconfig.entrySet()) { - - if (entry.getKey().contains("kafka.") - && !"".equals(entry.getValue()) - && entry.getValue() != null) { - String key = entry.getKey().replace("kafka.", ""); - properties.put(key, entry.getValue()); - } - } - return properties; - } - - public static FlinkKafkaConsumer<String> getKafkaConsumer(Map<String, Object> kafkaconfig) { - FlinkKafkaConsumer<String> kafkaConsumer = - new FlinkKafkaConsumer<>( - (String) kafkaconfig.get("topics"), - new SimpleStringSchema(), - getKafkaSourceProperty(kafkaconfig)); - - kafkaConsumer.setCommitOffsetsOnCheckpoints(true); - kafkaConsumer.setStartFromGroupOffsets(); - - return kafkaConsumer; - } - - public static FlinkKafkaConsumer<Tuple2<String, Long>> timestampDeserializationConsumer( - Map<String, Object> kafkaconfig) { - FlinkKafkaConsumer<Tuple2<String, Long>> kafkaConsumer = - new FlinkKafkaConsumer<>( - (List<String>) kafkaconfig.get("topics"), - new TimestampDeserializationSchema(), - getKafkaSourceProperty(kafkaconfig)); - - kafkaConsumer.setCommitOffsetsOnCheckpoints(true); - kafkaConsumer.setStartFromGroupOffsets(); - - return kafkaConsumer; - } - - public static FlinkKafkaConsumer<String> getKafkaConsumerLists( - List<String> topics, Map<String, Object> kafkaconfig) { - FlinkKafkaConsumer<String> kafkaConsumer = - new FlinkKafkaConsumer<>( - topics, new SimpleStringSchema(), getKafkaSourceProperty(kafkaconfig)); - - kafkaConsumer.setCommitOffsetsOnCheckpoints(true); - kafkaConsumer.setStartFromGroupOffsets(); - - return kafkaConsumer; - } - - public static SinkFunction<String> getKafkaSink(Map<String, Object> kafkaconfig) { - return new FlinkKafkaProducer<String>( - (String) kafkaconfig.get("topic"), - new SimpleStringSchema(), - getKafkaSinkProperty(kafkaconfig), - Optional.empty()); - } -} diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/util/TimestampDeserializationSchema.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/util/TimestampDeserializationSchema.java deleted file mode 100644 index 46e6388..0000000 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/util/TimestampDeserializationSchema.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.geedgenetworks.connectors.kafka.util; - -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; -import org.apache.kafka.clients.consumer.ConsumerRecord; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; - -/** - * @author qidaijie - * @version 2022/3/89:42 - */ -public class TimestampDeserializationSchema implements KafkaDeserializationSchema { - private static final Log logger = LogFactory.get(); - - @Override - public TypeInformation getProducedType() { - return TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {}); - } - - @Override - public boolean isEndOfStream(Object nextElement) { - return false; - } - - @Override - @SuppressWarnings("unchecked") - public Tuple2<String, Long> deserialize(ConsumerRecord record) throws Exception { - if (record != null) { - try { - - long timestamp = record.timestamp() / 1000; - String value = new String((byte[]) record.value(), "UTF8"); - return new Tuple2<>(value, timestamp); - } catch (RuntimeException e) { - logger.error( - "KafkaConsumer Deserialize failed,The exception is : " + e.getMessage()); - e.printStackTrace(); - } - } - return new Tuple2<>(null, null); - } -} |
