diff options
| author | qidaijie <[email protected]> | 2022-03-09 10:18:16 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2022-03-09 10:18:16 +0800 |
| commit | e1ccce31e05ab76490b117096aacec58664eacf7 (patch) | |
| tree | 63c025b1b56a75c9b61a012ef5f33811723b6764 | |
| parent | beeff0a9e03d3016caa4bb0aa5327b8a6ee0b8a0 (diff) | |
优化Kafka认证方式,删除配置项通过连接端口判断
| -rw-r--r-- | pom.xml | 12 | ||||
| -rw-r--r-- | properties/default_config.properties | 28 | ||||
| -rw-r--r-- | properties/service_flow_config.properties | 12 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/bean/RadiusKnowledge.java | 62 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java | 20 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java | 13 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/functions/FilterLogFunction.java | 40 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java | 94 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/kafka/CertUtils.java | 48 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java (renamed from src/main/java/com/zdjizhi/utils/kafka/Consumer.java) | 18 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java (renamed from src/main/java/com/zdjizhi/utils/kafka/Producer.java) | 8 |
11 files changed, 151 insertions, 204 deletions
@@ -6,7 +6,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>radius-account-knowledge</artifactId> - <version>210908-security</version> + <version>211116-jackson</version> <name>radius-account-knowledge</name> <url>http://www.example.com</url> @@ -116,7 +116,7 @@ <dependency> <groupId>com.zdjizhi</groupId> <artifactId>galaxy</artifactId> - <version>1.0.6</version> + <version>1.0.7</version> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> @@ -135,14 +135,6 @@ <version>1.2.70</version> </dependency> - <!--<!– https://mvnrepository.com/artifact/org.apache.flink/flink-table –>--> - <!--<dependency>--> - <!--<groupId>org.apache.flink</groupId>--> - <!--<artifactId>flink-table</artifactId>--> - <!--<version>${flink.version}</version>--> - <!--<type>pom</type>--> - <!--<scope>${scope.type}</scope>--> - <!--</dependency>--> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core --> <dependency> diff --git a/properties/default_config.properties b/properties/default_config.properties index 021eebd..39c2834 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -1,8 +1,18 @@ +#====================Kafka KafkaConsumer====================# +#kafka source connection timeout +session.timeout.ms=60000 + +#kafka source poll +max.poll.records=3000 + +#kafka source poll bytes +max.partition.fetch.bytes=31457280 +#====================Kafka KafkaProducer====================# #producer���ԵĴ������� retries=0 #���ĺ������˵һ��Batch������֮��������ã��������Batch��û��д���������뷢�ͳ�ȥ�� -linger.ms=5 +linger.ms=10 #����ڳ�ʱ֮ǰδ�յ���Ӧ���ͻ��˽��ڱ�Ҫʱ���·������� request.timeout.ms=30000 @@ -11,19 +21,15 @@ request.timeout.ms=30000 batch.size=262144 #Producer�����ڻ�����Ϣ�Ļ�������С -buffer.memory=67108864 +#128M +buffer.memory=134217728 #�������������ÿ�η���Kafka���������������С,Ĭ��1048576 -max.request.size=5242880 - +#10M +max.request.size=10485760 +#====================kafka default====================# #kafka SASL��֤�û��� kafka.user=admin #kafka SASL��SSL��֤���� -kafka.pin=galaxy2019 - -#kafka sink protocol; SSL or SASL -kafka.source.protocol=SASL - -#kafka sink protocol; SSL or SASL -kafka.sink.protocol=SASL
\ No newline at end of file +kafka.pin=galaxy2019
\ No newline at end of file diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index eb2cb4c..dfd5043 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,18 +1,18 @@ #--------------------------------地址配置------------------------------# #管理kafka地址 -input.kafka.servers=192.168.44.12:9092 +source.kafka.servers=10.233.12.4:9094 #管理输出kafka地址 -output.kafka.servers=192.168.44.12:9092 +sink.kafka.servers=10.224.11.14:9095,10.224.11.15:9095,10.224.11.16:9095,10.224.11.17:9095,10.224.11.18:9095,10.224.11.19:9095,10.224.11.20:9095,10.224.11.21:9095,10.224.11.22:9095,10.224.11.23:9095 #--------------------------------Kafka消费组信息------------------------------# #kafka 接收数据topic -input.kafka.topic=RADIUS-RECORD +source.kafka.topic=RADIUS-RECORD #补全数据 输出 topic -output.kafka.topic=RADIUS-ONFF-LOG +sink.kafka.topic=RADIUS-ONFF #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; group.id=radius-on-off-flink-20210615 @@ -24,5 +24,5 @@ producer.kafka.compression.type=none producer.ack=1 #--------------------------------topology配置------------------------------# -#第三方工具地址 -tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
\ No newline at end of file +#定位库地址 +tools.library=D:\\workerspace\\dat
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/bean/RadiusKnowledge.java b/src/main/java/com/zdjizhi/bean/RadiusKnowledge.java deleted file mode 100644 index 43e71a1..0000000 --- a/src/main/java/com/zdjizhi/bean/RadiusKnowledge.java +++ /dev/null @@ -1,62 +0,0 @@ -package com.zdjizhi.bean; - -/** - * @author qidaijie - */ -public class RadiusKnowledge { - - private String framed_ip; - private String account; - private String acct_session_id; - private long acct_status_type; - private long acct_session_time; - private long event_timestamp; - - public String getFramed_ip() { - return framed_ip; - } - - public void setFramed_ip(String framed_ip) { - this.framed_ip = framed_ip; - } - - public String getAccount() { - return account; - } - - public void setAccount(String account) { - this.account = account; - } - - public String getAcct_session_id() { - return acct_session_id; - } - - public void setAcct_session_id(String acct_session_id) { - this.acct_session_id = acct_session_id; - } - - public long getAcct_status_type() { - return acct_status_type; - } - - public void setAcct_status_type(long acct_status_type) { - this.acct_status_type = acct_status_type; - } - - public long getAcct_session_time() { - return acct_session_time; - } - - public void setAcct_session_time(long acct_session_time) { - this.acct_session_time = acct_session_time; - } - - public long getEvent_timestamp() { - return event_timestamp; - } - - public void setEvent_timestamp(long event_timestamp) { - this.event_timestamp = event_timestamp; - } -} diff --git a/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java b/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java index d267b6b..43d2a9d 100644 --- a/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java +++ b/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java @@ -15,6 +15,11 @@ public class RadiusKnowledgeConfig { * 1、开始计费 */ public static final int START_BILLING = 1; + + /** + * 3、过渡计费 + */ + public static final int UPDATE_BILLING = 3; /** * 2、停止计费 */ @@ -50,11 +55,11 @@ public class RadiusKnowledgeConfig { /** * kafka */ - public static final String INPUT_KAFKA_SERVERS = RadiusKnowledgeConfigurations.getStringProperty(0, "input.kafka.servers"); - public static final String OUTPUT_KAFKA_SERVERS = RadiusKnowledgeConfigurations.getStringProperty(0, "output.kafka.servers"); + public static final String SOURCE_KAFKA_SERVERS = RadiusKnowledgeConfigurations.getStringProperty(0, "source.kafka.servers"); + public static final String SOURCE_KAFKA_TOPIC = RadiusKnowledgeConfigurations.getStringProperty(0, "source.kafka.topic"); + public static final String SINK_KAFKA_SERVERS = RadiusKnowledgeConfigurations.getStringProperty(0, "sink.kafka.servers"); + public static final String SINK_KAFKA_TOPIC = RadiusKnowledgeConfigurations.getStringProperty(0, "sink.kafka.topic"); public static final String GROUP_ID = RadiusKnowledgeConfigurations.getStringProperty(0, "group.id"); - public static final String OUTPUT_KAFKA_TOPIC = RadiusKnowledgeConfigurations.getStringProperty(0, "output.kafka.topic"); - public static final String INPUT_KAFKA_TOPIC = RadiusKnowledgeConfigurations.getStringProperty(0, "input.kafka.topic"); public static final String PRODUCER_ACK = RadiusKnowledgeConfigurations.getStringProperty(0, "producer.ack"); public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = RadiusKnowledgeConfigurations.getStringProperty(0, "producer.kafka.compression.type"); @@ -73,4 +78,11 @@ public class RadiusKnowledgeConfig { public static final String KAFKA_USER = RadiusKnowledgeConfigurations.getStringProperty(1, "kafka.user"); public static final String KAFKA_PIN = RadiusKnowledgeConfigurations.getStringProperty(1, "kafka.pin"); + /** + * kafka source config + */ + public static final String SESSION_TIMEOUT_MS = RadiusKnowledgeConfigurations.getStringProperty(1, "session.timeout.ms"); + public static final String MAX_POLL_RECORDS = RadiusKnowledgeConfigurations.getStringProperty(1, "max.poll.records"); + public static final String MAX_PARTITION_FETCH_BYTES = RadiusKnowledgeConfigurations.getStringProperty(1, "max.partition.fetch.bytes"); + }
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java b/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java index 77418a6..c70a69b 100644 --- a/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java +++ b/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java @@ -2,12 +2,10 @@ package com.zdjizhi.topology; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.common.RadiusKnowledgeConfig; -import com.zdjizhi.utils.functions.FilterLogFunction; import com.zdjizhi.utils.functions.FilterNullFunction; import com.zdjizhi.utils.functions.MapCompletedFunction; -import com.zdjizhi.utils.kafka.Consumer; -import com.zdjizhi.utils.kafka.Producer; +import com.zdjizhi.utils.kafka.KafkaConsumer; +import com.zdjizhi.utils.kafka.KafkaProducer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -26,13 +24,12 @@ public class RadiusKnowledgeTopology { // environment.enableCheckpointing(5000); - DataStreamSource<String> streamSource = environment.addSource(Consumer.getKafkaConsumer()); + DataStreamSource<String> streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer()); - DataStream<String> result = streamSource.filter(new FilterLogFunction()).name("FilterOriginalData") - .map(new MapCompletedFunction()).name("RadiusOnOffMap") + DataStream<String> result = streamSource.map(new MapCompletedFunction()).name("RadiusOnOffMap") .filter(new FilterNullFunction()).name("FilterAbnormalData"); - result.addSink(Producer.getKafkaProducer()).name("LogSink"); + result.addSink(KafkaProducer.getKafkaProducer()).name("LogSink"); try { environment.execute("RADIUS-ON-OFF"); diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterLogFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterLogFunction.java deleted file mode 100644 index 723ceaa..0000000 --- a/src/main/java/com/zdjizhi/utils/functions/FilterLogFunction.java +++ /dev/null @@ -1,40 +0,0 @@ -package com.zdjizhi.utils.functions; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.alibaba.fastjson.JSONObject; -import com.zdjizhi.common.RadiusKnowledgeConfig; -import com.zdjizhi.utils.StringUtil; -import org.apache.flink.api.common.functions.FilterFunction; - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.functions - * @Description: - * @date 2021/5/2715:01 - */ -public class FilterLogFunction implements FilterFunction<String> { - private static final Log logger = LogFactory.get(); - - @Override - public boolean filter(String message) { - boolean legitimate = false; - try { - if (StringUtil.isNotBlank(message)) { - JSONObject jsonObject = JSONObject.parseObject(message); - if (jsonObject.containsKey(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE)) { - int packetType = jsonObject.getInteger(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE); - if (RadiusKnowledgeConfig.ACCOUNTING_REQUEST == packetType) { - int statusType = jsonObject.getInteger(RadiusKnowledgeConfig.RADIUS_ACCT_STATUS_TYPE); - if (RadiusKnowledgeConfig.START_BILLING == statusType || RadiusKnowledgeConfig.STOP_BILLING == statusType) { - legitimate = true; - } - } - } - } - } catch (RuntimeException re) { - logger.error("数据解析异常,异常信息:" + re); - } - return legitimate; - } -} diff --git a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java index e58419c..3c0a1da 100644 --- a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java @@ -1,8 +1,10 @@ package com.zdjizhi.utils.functions; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import com.alibaba.fastjson.JSONObject; -import com.zdjizhi.bean.RadiusKnowledge; import com.zdjizhi.common.RadiusKnowledgeConfig; +import com.zdjizhi.utils.StringUtil; import org.apache.flink.api.common.functions.MapFunction; /** @@ -12,44 +14,70 @@ import org.apache.flink.api.common.functions.MapFunction; * @date 2021/5/2715:01 */ public class MapCompletedFunction implements MapFunction<String, String> { + private static final Log logger = LogFactory.get(); @Override public String map(String logs) { + String result = null; try { - JSONObject jsonObject = JSONObject.parseObject(logs); - RadiusKnowledge knowledge = new RadiusKnowledge(); - knowledge.setFramed_ip(jsonObject.getString("radius_framed_ip")); - knowledge.setAccount(jsonObject.getString("radius_account")); - knowledge.setAcct_status_type(jsonObject.getInteger("radius_acct_status_type")); - /* - 如果存在时间戳则选择此时间戳没有获取当前时间 - */ - if (jsonObject.containsKey(RadiusKnowledgeConfig.RADIUS_EVENT_TIMESTAMP)) { - knowledge.setEvent_timestamp(jsonObject.getLong("radius_event_timestamp")); - } else { - knowledge.setEvent_timestamp(System.currentTimeMillis() / 1000); - } - /* - * 标识同一个连接: - * 1.数据若存在acct_multi_session_id属性,取该属性 - * 2.不存在取 acct_session_id - */ - if (jsonObject.containsKey(RadiusKnowledgeConfig.RADIUS_MULTI_SESSION_ID)) { - knowledge.setAcct_session_id(jsonObject.getString("radius_acct_multi_session_id")); - } else { - knowledge.setAcct_session_id(jsonObject.getString("radius_acct_session_id")); - } - /* - 用户的在线时长,以秒为单位,下线用户无此属性,默认为0 - */ - if (jsonObject.containsKey(RadiusKnowledgeConfig.RADIUS_SESSION_TIME)) { - knowledge.setAcct_session_time(jsonObject.getInteger("radius_acct_session_time")); - } else { - knowledge.setAcct_session_time(0); + if (StringUtil.isNotBlank(logs)) { + JSONObject jsonMap = JSONObject.parseObject(logs); + if (jsonMap.containsKey(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE)) { + int packetType = jsonMap.getInteger(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE); + if (RadiusKnowledgeConfig.ACCOUNTING_REQUEST == packetType) { + result = GetKnowledgeLogs(jsonMap); + } + + } } - return JSONObject.toJSONString(knowledge); } catch (RuntimeException e) { - return ""; + logger.error("数据解析异常,异常信息:" + e); } + return result; + } + + /** + * 获取RadiusOnOff日志 + * + * @param jsonMap 原始日志 + * @return OnOff日志 + */ + private static String GetKnowledgeLogs(JSONObject jsonMap) { + JSONObject knowledge = new JSONObject(); + + knowledge.put("framed_ip", jsonMap.getString("radius_framed_ip")); + + knowledge.put("account", jsonMap.getString("radius_account")); + + knowledge.put("acct_status_type", jsonMap.getString("radius_acct_status_type")); + + /* + 如果存在时间戳则选择此时间戳没有获取当前时间 + */ + if (jsonMap.containsKey(RadiusKnowledgeConfig.RADIUS_EVENT_TIMESTAMP)) { + knowledge.put("event_timestamp", jsonMap.getString("radius_event_timestamp")); + } else { + knowledge.put("event_timestamp", (System.currentTimeMillis() / 1000)); + } + /* + * 标识同一个连接: + * 1.数据若存在acct_multi_session_id属性,取该属性 + * 2.不存在取 acct_session_id + */ + if (jsonMap.containsKey(RadiusKnowledgeConfig.RADIUS_MULTI_SESSION_ID)) { + knowledge.put("acct_session_id", jsonMap.getString("radius_acct_multi_session_id")); + } else { + knowledge.put("acct_session_id", jsonMap.getString("radius_acct_multi_session_id")); + } + /* + 用户的在线时长,以秒为单位,下线用户无此属性,默认为0 + */ + if (jsonMap.containsKey(RadiusKnowledgeConfig.RADIUS_SESSION_TIME)) { + knowledge.put("acct_session_time", jsonMap.getString("radius_acct_session_time")); + } else { + knowledge.put("acct_session_time", 0); + } + + return knowledge.toJSONString(); } } diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java index a325dd6..a5bce21 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java +++ b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java @@ -12,24 +12,36 @@ import java.util.Properties; * @date 2021/9/610:37 */ class CertUtils { - static void chooseCert(String type, Properties properties) { - switch (type) { - case "SSL": - properties.put("security.protocol", "SSL"); - properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); - properties.put("ssl.keystore.location", RadiusKnowledgeConfig.TOOLS_LIBRARY + "keystore.jks"); - properties.put("ssl.keystore.password", RadiusKnowledgeConfig.KAFKA_PIN); - properties.put("ssl.truststore.location", RadiusKnowledgeConfig.TOOLS_LIBRARY + "truststore.jks"); - properties.put("ssl.truststore.password", RadiusKnowledgeConfig.KAFKA_PIN); - properties.put("ssl.key.password", RadiusKnowledgeConfig.KAFKA_PIN); - break; - case "SASL": - properties.put("security.protocol", "SASL_PLAINTEXT"); - properties.put("sasl.mechanism", "PLAIN"); - properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" - + RadiusKnowledgeConfig.KAFKA_USER + " password=" + RadiusKnowledgeConfig.KAFKA_PIN + ";"); - break; - default: + /** + * Kafka SASL认证端口 + */ + private static final String SASL_PORT = "9094"; + + /** + * Kafka SSL认证端口 + */ + private static final String SSL_PORT = "9095"; + + /** + * 根据连接信息端口判断认证方式。 + * + * @param servers kafka 连接信息 + * @param properties kafka 连接配置信息 + */ + static void chooseCert(String servers, Properties properties) { + if (servers.contains(SASL_PORT)) { + properties.put("security.protocol", "SASL_PLAINTEXT"); + properties.put("sasl.mechanism", "PLAIN"); + properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + + RadiusKnowledgeConfig.KAFKA_USER + " password=" + RadiusKnowledgeConfig.KAFKA_PIN + ";"); + } else if (servers.contains(SSL_PORT)) { + properties.put("security.protocol", "SSL"); + properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + properties.put("ssl.keystore.location", RadiusKnowledgeConfig.TOOLS_LIBRARY + "keystore.jks"); + properties.put("ssl.keystore.password", RadiusKnowledgeConfig.KAFKA_PIN); + properties.put("ssl.truststore.location", RadiusKnowledgeConfig.TOOLS_LIBRARY + "truststore.jks"); + properties.put("ssl.truststore.password", RadiusKnowledgeConfig.KAFKA_PIN); + properties.put("ssl.key.password", RadiusKnowledgeConfig.KAFKA_PIN); } } diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java index 7e75117..bb47bf9 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java @@ -13,25 +13,27 @@ import java.util.Properties; * @Description: * @date 2021/6/813:54 */ -public class Consumer { +public class KafkaConsumer { private static Properties createConsumerConfig() { Properties properties = new Properties(); - properties.put("bootstrap.servers", RadiusKnowledgeConfig.INPUT_KAFKA_SERVERS); + properties.put("bootstrap.servers", RadiusKnowledgeConfig.SOURCE_KAFKA_SERVERS); properties.put("group.id", RadiusKnowledgeConfig.GROUP_ID); - properties.put("session.timeout.ms", "60000"); - properties.put("max.poll.records", 3000); - properties.put("max.partition.fetch.bytes", 31457280); + properties.put("session.timeout.ms", RadiusKnowledgeConfig.SESSION_TIMEOUT_MS); + properties.put("max.poll.records", RadiusKnowledgeConfig.MAX_POLL_RECORDS); + properties.put("max.partition.fetch.bytes", RadiusKnowledgeConfig.MAX_PARTITION_FETCH_BYTES); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - CertUtils.chooseCert(RadiusKnowledgeConfig.KAFKA_SOURCE_PROTOCOL,properties); + CertUtils.chooseCert(RadiusKnowledgeConfig.SOURCE_KAFKA_SERVERS, properties); return properties; } public static FlinkKafkaConsumer<String> getKafkaConsumer() { - FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(RadiusKnowledgeConfig.INPUT_KAFKA_TOPIC, - new SimpleStringSchema(), createConsumerConfig()); + FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>( + RadiusKnowledgeConfig.SOURCE_KAFKA_TOPIC, + new SimpleStringSchema(), + createConsumerConfig()); kafkaConsumer.setCommitOffsetsOnCheckpoints(true); kafkaConsumer.setStartFromGroupOffsets(); diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java index ce8fab3..c259fd7 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/Producer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java @@ -14,11 +14,11 @@ import java.util.Properties; * @Description: * @date 2021/6/814:04 */ -public class Producer { +public class KafkaProducer { private static Properties createProducerConfig() { Properties properties = new Properties(); - properties.put("bootstrap.servers", RadiusKnowledgeConfig.OUTPUT_KAFKA_SERVERS); + properties.put("bootstrap.servers", RadiusKnowledgeConfig.SINK_KAFKA_SERVERS); properties.put("acks", RadiusKnowledgeConfig.PRODUCER_ACK); properties.put("retries", RadiusKnowledgeConfig.RETRIES); properties.put("linger.ms", RadiusKnowledgeConfig.LINGER_MS); @@ -28,7 +28,7 @@ public class Producer { properties.put("max.request.size", RadiusKnowledgeConfig.MAX_REQUEST_SIZE); properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, RadiusKnowledgeConfig.PRODUCER_KAFKA_COMPRESSION_TYPE); - CertUtils.chooseCert(RadiusKnowledgeConfig.KAFKA_SINK_PROTOCOL, properties); + CertUtils.chooseCert(RadiusKnowledgeConfig.SINK_KAFKA_SERVERS, properties); return properties; } @@ -36,7 +36,7 @@ public class Producer { public static FlinkKafkaProducer<String> getKafkaProducer() { FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>( - RadiusKnowledgeConfig.OUTPUT_KAFKA_TOPIC, + RadiusKnowledgeConfig.SINK_KAFKA_TOPIC, new SimpleStringSchema(), createProducerConfig(), Optional.empty()); |
