summaryrefslogtreecommitdiff
path: root/groot-connectors
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2023-12-29 11:18:01 +0800
committerlifengchao <[email protected]>2023-12-29 11:18:01 +0800
commit49c48740354f71d0dcc5ebf302153362836ab69c (patch)
treee304688cffe4025cdfefa8bc80e43fd5f15058fc /groot-connectors
parent879697c2f727e4bae7d66fbf42e06ba83e03cd9f (diff)
[improve] [connector-kafka] 删除不引用的代码
Diffstat (limited to 'groot-connectors')
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java3
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/util/KafkaUtils.java92
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/util/TimestampDeserializationSchema.java46
3 files changed, 0 insertions, 141 deletions
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java
index 4a98148..c3c19c3 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java
@@ -40,9 +40,6 @@ public class KafkaSourceProvider implements SourceProvider {
properties
);
- kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
- kafkaConsumer.setStartFromGroupOffsets();
-
return env.addSource(kafkaConsumer);
}
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/util/KafkaUtils.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/util/KafkaUtils.java
deleted file mode 100644
index ea5f8ba..0000000
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/util/KafkaUtils.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package com.geedgenetworks.connectors.kafka.util;
-
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-public class KafkaUtils {
-
- public static Properties getKafkaSinkProperty(Map<String, Object> kafkaconfig) {
- Properties properties = new Properties();
- for (Map.Entry<String, Object> entry : kafkaconfig.entrySet()) {
-
- if (entry.getKey().contains("kafka.")
- && !"".equals(entry.getValue())
- && entry.getValue() != null) {
- String key = entry.getKey().replace("kafka.", "");
- properties.put(key, entry.getValue());
- }
- }
-
- return properties;
- }
-
- public static Properties getKafkaSourceProperty(Map<String, Object> kafkaconfig) {
- Properties properties = new Properties();
-
- for (Map.Entry<String, Object> entry : kafkaconfig.entrySet()) {
-
- if (entry.getKey().contains("kafka.")
- && !"".equals(entry.getValue())
- && entry.getValue() != null) {
- String key = entry.getKey().replace("kafka.", "");
- properties.put(key, entry.getValue());
- }
- }
- return properties;
- }
-
- public static FlinkKafkaConsumer<String> getKafkaConsumer(Map<String, Object> kafkaconfig) {
- FlinkKafkaConsumer<String> kafkaConsumer =
- new FlinkKafkaConsumer<>(
- (String) kafkaconfig.get("topics"),
- new SimpleStringSchema(),
- getKafkaSourceProperty(kafkaconfig));
-
- kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
- kafkaConsumer.setStartFromGroupOffsets();
-
- return kafkaConsumer;
- }
-
- public static FlinkKafkaConsumer<Tuple2<String, Long>> timestampDeserializationConsumer(
- Map<String, Object> kafkaconfig) {
- FlinkKafkaConsumer<Tuple2<String, Long>> kafkaConsumer =
- new FlinkKafkaConsumer<>(
- (List<String>) kafkaconfig.get("topics"),
- new TimestampDeserializationSchema(),
- getKafkaSourceProperty(kafkaconfig));
-
- kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
- kafkaConsumer.setStartFromGroupOffsets();
-
- return kafkaConsumer;
- }
-
- public static FlinkKafkaConsumer<String> getKafkaConsumerLists(
- List<String> topics, Map<String, Object> kafkaconfig) {
- FlinkKafkaConsumer<String> kafkaConsumer =
- new FlinkKafkaConsumer<>(
- topics, new SimpleStringSchema(), getKafkaSourceProperty(kafkaconfig));
-
- kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
- kafkaConsumer.setStartFromGroupOffsets();
-
- return kafkaConsumer;
- }
-
- public static SinkFunction<String> getKafkaSink(Map<String, Object> kafkaconfig) {
- return new FlinkKafkaProducer<String>(
- (String) kafkaconfig.get("topic"),
- new SimpleStringSchema(),
- getKafkaSinkProperty(kafkaconfig),
- Optional.empty());
- }
-}
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/util/TimestampDeserializationSchema.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/util/TimestampDeserializationSchema.java
deleted file mode 100644
index 46e6388..0000000
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/util/TimestampDeserializationSchema.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package com.geedgenetworks.connectors.kafka.util;
-
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-
-/**
- * @author qidaijie
- * @version 2022/3/89:42
- */
-public class TimestampDeserializationSchema implements KafkaDeserializationSchema {
- private static final Log logger = LogFactory.get();
-
- @Override
- public TypeInformation getProducedType() {
- return TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {});
- }
-
- @Override
- public boolean isEndOfStream(Object nextElement) {
- return false;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public Tuple2<String, Long> deserialize(ConsumerRecord record) throws Exception {
- if (record != null) {
- try {
-
- long timestamp = record.timestamp() / 1000;
- String value = new String((byte[]) record.value(), "UTF8");
- return new Tuple2<>(value, timestamp);
- } catch (RuntimeException e) {
- logger.error(
- "KafkaConsumer Deserialize failed,The exception is : " + e.getMessage());
- e.printStackTrace();
- }
- }
- return new Tuple2<>(null, null);
- }
-}