summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2022-03-09 10:18:16 +0800
committerqidaijie <[email protected]>2022-03-09 10:18:16 +0800
commite1ccce31e05ab76490b117096aacec58664eacf7 (patch)
tree63c025b1b56a75c9b61a012ef5f33811723b6764
parentbeeff0a9e03d3016caa4bb0aa5327b8a6ee0b8a0 (diff)
优化Kafka认证方式,删除配置项通过连接端口判断
-rw-r--r--pom.xml12
-rw-r--r--properties/default_config.properties28
-rw-r--r--properties/service_flow_config.properties12
-rw-r--r--src/main/java/com/zdjizhi/bean/RadiusKnowledge.java62
-rw-r--r--src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java20
-rw-r--r--src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java13
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/FilterLogFunction.java40
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java94
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/CertUtils.java48
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java (renamed from src/main/java/com/zdjizhi/utils/kafka/Consumer.java)18
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java (renamed from src/main/java/com/zdjizhi/utils/kafka/Producer.java)8
11 files changed, 151 insertions, 204 deletions
diff --git a/pom.xml b/pom.xml
index 1062c03..d408ff3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>radius-account-knowledge</artifactId>
- <version>210908-security</version>
+ <version>211116-jackson</version>
<name>radius-account-knowledge</name>
<url>http://www.example.com</url>
@@ -116,7 +116,7 @@
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
- <version>1.0.6</version>
+ <version>1.0.7</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
@@ -135,14 +135,6 @@
<version>1.2.70</version>
</dependency>
- <!--&lt;!&ndash; https://mvnrepository.com/artifact/org.apache.flink/flink-table &ndash;&gt;-->
- <!--<dependency>-->
- <!--<groupId>org.apache.flink</groupId>-->
- <!--<artifactId>flink-table</artifactId>-->
- <!--<version>${flink.version}</version>-->
- <!--<type>pom</type>-->
- <!--<scope>${scope.type}</scope>-->
- <!--</dependency>-->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
diff --git a/properties/default_config.properties b/properties/default_config.properties
index 021eebd..39c2834 100644
--- a/properties/default_config.properties
+++ b/properties/default_config.properties
@@ -1,8 +1,18 @@
+#====================Kafka KafkaConsumer====================#
+#kafka source connection timeout
+session.timeout.ms=60000
+
+#kafka source poll
+max.poll.records=3000
+
+#kafka source poll bytes
+max.partition.fetch.bytes=31457280
+#====================Kafka KafkaProducer====================#
#producer���ԵĴ�������
retries=0
#���ĺ������˵һ��Batch������֮��������ã��������Batch��û��д���������뷢�ͳ�ȥ��
-linger.ms=5
+linger.ms=10
#����ڳ�ʱ֮ǰδ�յ���Ӧ���ͻ��˽��ڱ�Ҫʱ���·�������
request.timeout.ms=30000
@@ -11,19 +21,15 @@ request.timeout.ms=30000
batch.size=262144
#Producer�����ڻ�����Ϣ�Ļ�������С
-buffer.memory=67108864
+#128M
+buffer.memory=134217728
#�������������ÿ�η��͸�Kafka���������������С,Ĭ��1048576
-max.request.size=5242880
-
+#10M
+max.request.size=10485760
+#====================kafka default====================#
#kafka SASL��֤�û���
kafka.user=admin
#kafka SASL��SSL��֤����
-kafka.pin=galaxy2019
-
-#kafka sink protocol; SSL or SASL
-kafka.source.protocol=SASL
-
-#kafka sink protocol; SSL or SASL
-kafka.sink.protocol=SASL \ No newline at end of file
+kafka.pin=galaxy2019 \ No newline at end of file
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index eb2cb4c..dfd5043 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,18 +1,18 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
-input.kafka.servers=192.168.44.12:9092
+source.kafka.servers=10.233.12.4:9094
#管理输出kafka地址
-output.kafka.servers=192.168.44.12:9092
+sink.kafka.servers=10.224.11.14:9095,10.224.11.15:9095,10.224.11.16:9095,10.224.11.17:9095,10.224.11.18:9095,10.224.11.19:9095,10.224.11.20:9095,10.224.11.21:9095,10.224.11.22:9095,10.224.11.23:9095
#--------------------------------Kafka消费组信息------------------------------#
#kafka 接收数据topic
-input.kafka.topic=RADIUS-RECORD
+source.kafka.topic=RADIUS-RECORD
#补全数据 输出 topic
-output.kafka.topic=RADIUS-ONFF-LOG
+sink.kafka.topic=RADIUS-ONFF
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
group.id=radius-on-off-flink-20210615
@@ -24,5 +24,5 @@ producer.kafka.compression.type=none
producer.ack=1
#--------------------------------topology配置------------------------------#
-#第三方工具地址
-tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ \ No newline at end of file
+#定位库地址
+tools.library=D:\\workerspace\\dat \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/bean/RadiusKnowledge.java b/src/main/java/com/zdjizhi/bean/RadiusKnowledge.java
deleted file mode 100644
index 43e71a1..0000000
--- a/src/main/java/com/zdjizhi/bean/RadiusKnowledge.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package com.zdjizhi.bean;
-
-/**
- * @author qidaijie
- */
-public class RadiusKnowledge {
-
- private String framed_ip;
- private String account;
- private String acct_session_id;
- private long acct_status_type;
- private long acct_session_time;
- private long event_timestamp;
-
- public String getFramed_ip() {
- return framed_ip;
- }
-
- public void setFramed_ip(String framed_ip) {
- this.framed_ip = framed_ip;
- }
-
- public String getAccount() {
- return account;
- }
-
- public void setAccount(String account) {
- this.account = account;
- }
-
- public String getAcct_session_id() {
- return acct_session_id;
- }
-
- public void setAcct_session_id(String acct_session_id) {
- this.acct_session_id = acct_session_id;
- }
-
- public long getAcct_status_type() {
- return acct_status_type;
- }
-
- public void setAcct_status_type(long acct_status_type) {
- this.acct_status_type = acct_status_type;
- }
-
- public long getAcct_session_time() {
- return acct_session_time;
- }
-
- public void setAcct_session_time(long acct_session_time) {
- this.acct_session_time = acct_session_time;
- }
-
- public long getEvent_timestamp() {
- return event_timestamp;
- }
-
- public void setEvent_timestamp(long event_timestamp) {
- this.event_timestamp = event_timestamp;
- }
-}
diff --git a/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java b/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java
index d267b6b..43d2a9d 100644
--- a/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java
+++ b/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java
@@ -15,6 +15,11 @@ public class RadiusKnowledgeConfig {
* 1、开始计费
*/
public static final int START_BILLING = 1;
+
+ /**
+ * 3、过渡计费
+ */
+ public static final int UPDATE_BILLING = 3;
/**
* 2、停止计费
*/
@@ -50,11 +55,11 @@ public class RadiusKnowledgeConfig {
/**
* kafka
*/
- public static final String INPUT_KAFKA_SERVERS = RadiusKnowledgeConfigurations.getStringProperty(0, "input.kafka.servers");
- public static final String OUTPUT_KAFKA_SERVERS = RadiusKnowledgeConfigurations.getStringProperty(0, "output.kafka.servers");
+ public static final String SOURCE_KAFKA_SERVERS = RadiusKnowledgeConfigurations.getStringProperty(0, "source.kafka.servers");
+ public static final String SOURCE_KAFKA_TOPIC = RadiusKnowledgeConfigurations.getStringProperty(0, "source.kafka.topic");
+ public static final String SINK_KAFKA_SERVERS = RadiusKnowledgeConfigurations.getStringProperty(0, "sink.kafka.servers");
+ public static final String SINK_KAFKA_TOPIC = RadiusKnowledgeConfigurations.getStringProperty(0, "sink.kafka.topic");
public static final String GROUP_ID = RadiusKnowledgeConfigurations.getStringProperty(0, "group.id");
- public static final String OUTPUT_KAFKA_TOPIC = RadiusKnowledgeConfigurations.getStringProperty(0, "output.kafka.topic");
- public static final String INPUT_KAFKA_TOPIC = RadiusKnowledgeConfigurations.getStringProperty(0, "input.kafka.topic");
public static final String PRODUCER_ACK = RadiusKnowledgeConfigurations.getStringProperty(0, "producer.ack");
public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = RadiusKnowledgeConfigurations.getStringProperty(0, "producer.kafka.compression.type");
@@ -73,4 +78,11 @@ public class RadiusKnowledgeConfig {
public static final String KAFKA_USER = RadiusKnowledgeConfigurations.getStringProperty(1, "kafka.user");
public static final String KAFKA_PIN = RadiusKnowledgeConfigurations.getStringProperty(1, "kafka.pin");
+ /**
+ * kafka source config
+ */
+ public static final String SESSION_TIMEOUT_MS = RadiusKnowledgeConfigurations.getStringProperty(1, "session.timeout.ms");
+ public static final String MAX_POLL_RECORDS = RadiusKnowledgeConfigurations.getStringProperty(1, "max.poll.records");
+ public static final String MAX_PARTITION_FETCH_BYTES = RadiusKnowledgeConfigurations.getStringProperty(1, "max.partition.fetch.bytes");
+
} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java b/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java
index 77418a6..c70a69b 100644
--- a/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java
+++ b/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java
@@ -2,12 +2,10 @@ package com.zdjizhi.topology;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.RadiusKnowledgeConfig;
-import com.zdjizhi.utils.functions.FilterLogFunction;
import com.zdjizhi.utils.functions.FilterNullFunction;
import com.zdjizhi.utils.functions.MapCompletedFunction;
-import com.zdjizhi.utils.kafka.Consumer;
-import com.zdjizhi.utils.kafka.Producer;
+import com.zdjizhi.utils.kafka.KafkaConsumer;
+import com.zdjizhi.utils.kafka.KafkaProducer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -26,13 +24,12 @@ public class RadiusKnowledgeTopology {
// environment.enableCheckpointing(5000);
- DataStreamSource<String> streamSource = environment.addSource(Consumer.getKafkaConsumer());
+ DataStreamSource<String> streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer());
- DataStream<String> result = streamSource.filter(new FilterLogFunction()).name("FilterOriginalData")
- .map(new MapCompletedFunction()).name("RadiusOnOffMap")
+ DataStream<String> result = streamSource.map(new MapCompletedFunction()).name("RadiusOnOffMap")
.filter(new FilterNullFunction()).name("FilterAbnormalData");
- result.addSink(Producer.getKafkaProducer()).name("LogSink");
+ result.addSink(KafkaProducer.getKafkaProducer()).name("LogSink");
try {
environment.execute("RADIUS-ON-OFF");
diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterLogFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterLogFunction.java
deleted file mode 100644
index 723ceaa..0000000
--- a/src/main/java/com/zdjizhi/utils/functions/FilterLogFunction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package com.zdjizhi.utils.functions;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.alibaba.fastjson.JSONObject;
-import com.zdjizhi.common.RadiusKnowledgeConfig;
-import com.zdjizhi.utils.StringUtil;
-import org.apache.flink.api.common.functions.FilterFunction;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.functions
- * @Description:
- * @date 2021/5/2715:01
- */
-public class FilterLogFunction implements FilterFunction<String> {
- private static final Log logger = LogFactory.get();
-
- @Override
- public boolean filter(String message) {
- boolean legitimate = false;
- try {
- if (StringUtil.isNotBlank(message)) {
- JSONObject jsonObject = JSONObject.parseObject(message);
- if (jsonObject.containsKey(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE)) {
- int packetType = jsonObject.getInteger(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE);
- if (RadiusKnowledgeConfig.ACCOUNTING_REQUEST == packetType) {
- int statusType = jsonObject.getInteger(RadiusKnowledgeConfig.RADIUS_ACCT_STATUS_TYPE);
- if (RadiusKnowledgeConfig.START_BILLING == statusType || RadiusKnowledgeConfig.STOP_BILLING == statusType) {
- legitimate = true;
- }
- }
- }
- }
- } catch (RuntimeException re) {
- logger.error("数据解析异常,异常信息:" + re);
- }
- return legitimate;
- }
-}
diff --git a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java
index e58419c..3c0a1da 100644
--- a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java
@@ -1,8 +1,10 @@
package com.zdjizhi.utils.functions;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSONObject;
-import com.zdjizhi.bean.RadiusKnowledge;
import com.zdjizhi.common.RadiusKnowledgeConfig;
+import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.common.functions.MapFunction;
/**
@@ -12,44 +14,70 @@ import org.apache.flink.api.common.functions.MapFunction;
* @date 2021/5/2715:01
*/
public class MapCompletedFunction implements MapFunction<String, String> {
+ private static final Log logger = LogFactory.get();
@Override
public String map(String logs) {
+ String result = null;
try {
- JSONObject jsonObject = JSONObject.parseObject(logs);
- RadiusKnowledge knowledge = new RadiusKnowledge();
- knowledge.setFramed_ip(jsonObject.getString("radius_framed_ip"));
- knowledge.setAccount(jsonObject.getString("radius_account"));
- knowledge.setAcct_status_type(jsonObject.getInteger("radius_acct_status_type"));
- /*
- 如果存在时间戳则选择此时间戳没有获取当前时间
- */
- if (jsonObject.containsKey(RadiusKnowledgeConfig.RADIUS_EVENT_TIMESTAMP)) {
- knowledge.setEvent_timestamp(jsonObject.getLong("radius_event_timestamp"));
- } else {
- knowledge.setEvent_timestamp(System.currentTimeMillis() / 1000);
- }
- /*
- * 标识同一个连接:
- * 1.数据若存在acct_multi_session_id属性,取该属性
- * 2.不存在取 acct_session_id
- */
- if (jsonObject.containsKey(RadiusKnowledgeConfig.RADIUS_MULTI_SESSION_ID)) {
- knowledge.setAcct_session_id(jsonObject.getString("radius_acct_multi_session_id"));
- } else {
- knowledge.setAcct_session_id(jsonObject.getString("radius_acct_session_id"));
- }
- /*
- 用户的在线时长,以秒为单位,下线用户无此属性,默认为0
- */
- if (jsonObject.containsKey(RadiusKnowledgeConfig.RADIUS_SESSION_TIME)) {
- knowledge.setAcct_session_time(jsonObject.getInteger("radius_acct_session_time"));
- } else {
- knowledge.setAcct_session_time(0);
+ if (StringUtil.isNotBlank(logs)) {
+ JSONObject jsonMap = JSONObject.parseObject(logs);
+ if (jsonMap.containsKey(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE)) {
+ int packetType = jsonMap.getInteger(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE);
+ if (RadiusKnowledgeConfig.ACCOUNTING_REQUEST == packetType) {
+ result = GetKnowledgeLogs(jsonMap);
+ }
+
+ }
}
- return JSONObject.toJSONString(knowledge);
} catch (RuntimeException e) {
- return "";
+ logger.error("数据解析异常,异常信息:" + e);
}
+ return result;
+ }
+
+ /**
+ * 获取RadiusOnOff日志
+ *
+ * @param jsonMap 原始日志
+ * @return OnOff日志
+ */
+ private static String GetKnowledgeLogs(JSONObject jsonMap) {
+ JSONObject knowledge = new JSONObject();
+
+ knowledge.put("framed_ip", jsonMap.getString("radius_framed_ip"));
+
+ knowledge.put("account", jsonMap.getString("radius_account"));
+
+ knowledge.put("acct_status_type", jsonMap.getString("radius_acct_status_type"));
+
+ /*
+ 如果存在时间戳则选择此时间戳没有获取当前时间
+ */
+ if (jsonMap.containsKey(RadiusKnowledgeConfig.RADIUS_EVENT_TIMESTAMP)) {
+ knowledge.put("event_timestamp", jsonMap.getString("radius_event_timestamp"));
+ } else {
+ knowledge.put("event_timestamp", (System.currentTimeMillis() / 1000));
+ }
+ /*
+ * 标识同一个连接:
+ * 1.数据若存在acct_multi_session_id属性,取该属性
+ * 2.不存在取 acct_session_id
+ */
+ if (jsonMap.containsKey(RadiusKnowledgeConfig.RADIUS_MULTI_SESSION_ID)) {
+ knowledge.put("acct_session_id", jsonMap.getString("radius_acct_multi_session_id"));
+ } else {
+ knowledge.put("acct_session_id", jsonMap.getString("radius_acct_multi_session_id"));
+ }
+ /*
+ 用户的在线时长,以秒为单位,下线用户无此属性,默认为0
+ */
+ if (jsonMap.containsKey(RadiusKnowledgeConfig.RADIUS_SESSION_TIME)) {
+ knowledge.put("acct_session_time", jsonMap.getString("radius_acct_session_time"));
+ } else {
+ knowledge.put("acct_session_time", 0);
+ }
+
+ return knowledge.toJSONString();
}
}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
index a325dd6..a5bce21 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
@@ -12,24 +12,36 @@ import java.util.Properties;
* @date 2021/9/610:37
*/
class CertUtils {
- static void chooseCert(String type, Properties properties) {
- switch (type) {
- case "SSL":
- properties.put("security.protocol", "SSL");
- properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
- properties.put("ssl.keystore.location", RadiusKnowledgeConfig.TOOLS_LIBRARY + "keystore.jks");
- properties.put("ssl.keystore.password", RadiusKnowledgeConfig.KAFKA_PIN);
- properties.put("ssl.truststore.location", RadiusKnowledgeConfig.TOOLS_LIBRARY + "truststore.jks");
- properties.put("ssl.truststore.password", RadiusKnowledgeConfig.KAFKA_PIN);
- properties.put("ssl.key.password", RadiusKnowledgeConfig.KAFKA_PIN);
- break;
- case "SASL":
- properties.put("security.protocol", "SASL_PLAINTEXT");
- properties.put("sasl.mechanism", "PLAIN");
- properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
- + RadiusKnowledgeConfig.KAFKA_USER + " password=" + RadiusKnowledgeConfig.KAFKA_PIN + ";");
- break;
- default:
+ /**
+ * Kafka SASL认证端口
+ */
+ private static final String SASL_PORT = "9094";
+
+ /**
+ * Kafka SSL认证端口
+ */
+ private static final String SSL_PORT = "9095";
+
+ /**
+ * 根据连接信息端口判断认证方式。
+ *
+ * @param servers kafka 连接信息
+ * @param properties kafka 连接配置信息
+ */
+ static void chooseCert(String servers, Properties properties) {
+ if (servers.contains(SASL_PORT)) {
+ properties.put("security.protocol", "SASL_PLAINTEXT");
+ properties.put("sasl.mechanism", "PLAIN");
+ properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ + RadiusKnowledgeConfig.KAFKA_USER + " password=" + RadiusKnowledgeConfig.KAFKA_PIN + ";");
+ } else if (servers.contains(SSL_PORT)) {
+ properties.put("security.protocol", "SSL");
+ properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+ properties.put("ssl.keystore.location", RadiusKnowledgeConfig.TOOLS_LIBRARY + "keystore.jks");
+ properties.put("ssl.keystore.password", RadiusKnowledgeConfig.KAFKA_PIN);
+ properties.put("ssl.truststore.location", RadiusKnowledgeConfig.TOOLS_LIBRARY + "truststore.jks");
+ properties.put("ssl.truststore.password", RadiusKnowledgeConfig.KAFKA_PIN);
+ properties.put("ssl.key.password", RadiusKnowledgeConfig.KAFKA_PIN);
}
}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
index 7e75117..bb47bf9 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
@@ -13,25 +13,27 @@ import java.util.Properties;
* @Description:
* @date 2021/6/813:54
*/
-public class Consumer {
+public class KafkaConsumer {
private static Properties createConsumerConfig() {
Properties properties = new Properties();
- properties.put("bootstrap.servers", RadiusKnowledgeConfig.INPUT_KAFKA_SERVERS);
+ properties.put("bootstrap.servers", RadiusKnowledgeConfig.SOURCE_KAFKA_SERVERS);
properties.put("group.id", RadiusKnowledgeConfig.GROUP_ID);
- properties.put("session.timeout.ms", "60000");
- properties.put("max.poll.records", 3000);
- properties.put("max.partition.fetch.bytes", 31457280);
+ properties.put("session.timeout.ms", RadiusKnowledgeConfig.SESSION_TIMEOUT_MS);
+ properties.put("max.poll.records", RadiusKnowledgeConfig.MAX_POLL_RECORDS);
+ properties.put("max.partition.fetch.bytes", RadiusKnowledgeConfig.MAX_PARTITION_FETCH_BYTES);
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- CertUtils.chooseCert(RadiusKnowledgeConfig.KAFKA_SOURCE_PROTOCOL,properties);
+ CertUtils.chooseCert(RadiusKnowledgeConfig.SOURCE_KAFKA_SERVERS, properties);
return properties;
}
public static FlinkKafkaConsumer<String> getKafkaConsumer() {
- FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(RadiusKnowledgeConfig.INPUT_KAFKA_TOPIC,
- new SimpleStringSchema(), createConsumerConfig());
+ FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
+ RadiusKnowledgeConfig.SOURCE_KAFKA_TOPIC,
+ new SimpleStringSchema(),
+ createConsumerConfig());
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
kafkaConsumer.setStartFromGroupOffsets();
diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
index ce8fab3..c259fd7 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/Producer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
@@ -14,11 +14,11 @@ import java.util.Properties;
* @Description:
* @date 2021/6/814:04
*/
-public class Producer {
+public class KafkaProducer {
private static Properties createProducerConfig() {
Properties properties = new Properties();
- properties.put("bootstrap.servers", RadiusKnowledgeConfig.OUTPUT_KAFKA_SERVERS);
+ properties.put("bootstrap.servers", RadiusKnowledgeConfig.SINK_KAFKA_SERVERS);
properties.put("acks", RadiusKnowledgeConfig.PRODUCER_ACK);
properties.put("retries", RadiusKnowledgeConfig.RETRIES);
properties.put("linger.ms", RadiusKnowledgeConfig.LINGER_MS);
@@ -28,7 +28,7 @@ public class Producer {
properties.put("max.request.size", RadiusKnowledgeConfig.MAX_REQUEST_SIZE);
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, RadiusKnowledgeConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
- CertUtils.chooseCert(RadiusKnowledgeConfig.KAFKA_SINK_PROTOCOL, properties);
+ CertUtils.chooseCert(RadiusKnowledgeConfig.SINK_KAFKA_SERVERS, properties);
return properties;
}
@@ -36,7 +36,7 @@ public class Producer {
public static FlinkKafkaProducer<String> getKafkaProducer() {
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
- RadiusKnowledgeConfig.OUTPUT_KAFKA_TOPIC,
+ RadiusKnowledgeConfig.SINK_KAFKA_TOPIC,
new SimpleStringSchema(),
createProducerConfig(), Optional.empty());