package com.zdjizhi.utils.kafka; import com.zdjizhi.common.FlowWriteConfig; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Map; import java.util.Properties; /** * @author qidaijie * @Package com.zdjizhi.utils.kafka * @Description: * @date 2021/6/813:54 */ public class KafkaConsumer { private static Properties createConsumerConfig(String topic) { Properties properties = new Properties(); properties.put("bootstrap.servers", FlowWriteConfig.SOURCE_KAFKA_SERVERS); properties.put("group.id", FlowWriteConfig.GROUP_ID + "-" + topic); properties.put("session.timeout.ms", FlowWriteConfig.SESSION_TIMEOUT_MS); properties.put("max.poll.records", FlowWriteConfig.MAX_POLL_RECORDS); properties.put("max.partition.fetch.bytes", FlowWriteConfig.MAX_PARTITION_FETCH_BYTES); properties.put("partition.discovery.interval.ms", "10000"); CertUtils.chooseCert(FlowWriteConfig.SOURCE_KAFKA_SERVERS, properties); return properties; } /** * 用户序列化kafka数据,增加 kafka Timestamp内容。 * * @return kafka logs -> map */ @SuppressWarnings("unchecked") public static FlinkKafkaConsumer> myDeserializationConsumer(String topic) { FlinkKafkaConsumer> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new TimestampDeserializationSchema(), createConsumerConfig(topic)); //随着checkpoint提交,将offset提交到kafka kafkaConsumer.setCommitOffsetsOnCheckpoints(true); //从消费组当前的offset开始消费 kafkaConsumer.setStartFromGroupOffsets(); return kafkaConsumer; } /** * 官方序列化kafka数据 * * @return kafka logs */ public static FlinkKafkaConsumer flinkConsumer(String topic) { FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), createConsumerConfig(topic)); //随着checkpoint提交,将offset提交到kafka kafkaConsumer.setCommitOffsetsOnCheckpoints(true); //从消费组当前的offset开始消费 kafkaConsumer.setStartFromGroupOffsets(); return kafkaConsumer; } }