diff options
| author | qidaijie <[email protected]> | 2022-03-09 10:18:16 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2022-03-09 10:18:16 +0800 |
| commit | e1ccce31e05ab76490b117096aacec58664eacf7 (patch) | |
| tree | 63c025b1b56a75c9b61a012ef5f33811723b6764 /src | |
| parent | beeff0a9e03d3016caa4bb0aa5327b8a6ee0b8a0 (diff) | |
优化Kafka认证方式,删除配置项通过连接端口判断
Diffstat (limited to 'src')
| -rw-r--r-- | src/main/java/com/zdjizhi/bean/RadiusKnowledge.java | 62 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java | 20 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java | 13 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/functions/FilterLogFunction.java | 40 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java | 94 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/kafka/CertUtils.java | 48 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java (renamed from src/main/java/com/zdjizhi/utils/kafka/Consumer.java) | 18 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java (renamed from src/main/java/com/zdjizhi/utils/kafka/Producer.java) | 8 |
8 files changed, 126 insertions, 177 deletions
diff --git a/src/main/java/com/zdjizhi/bean/RadiusKnowledge.java b/src/main/java/com/zdjizhi/bean/RadiusKnowledge.java deleted file mode 100644 index 43e71a1..0000000 --- a/src/main/java/com/zdjizhi/bean/RadiusKnowledge.java +++ /dev/null @@ -1,62 +0,0 @@ -package com.zdjizhi.bean; - -/** - * @author qidaijie - */ -public class RadiusKnowledge { - - private String framed_ip; - private String account; - private String acct_session_id; - private long acct_status_type; - private long acct_session_time; - private long event_timestamp; - - public String getFramed_ip() { - return framed_ip; - } - - public void setFramed_ip(String framed_ip) { - this.framed_ip = framed_ip; - } - - public String getAccount() { - return account; - } - - public void setAccount(String account) { - this.account = account; - } - - public String getAcct_session_id() { - return acct_session_id; - } - - public void setAcct_session_id(String acct_session_id) { - this.acct_session_id = acct_session_id; - } - - public long getAcct_status_type() { - return acct_status_type; - } - - public void setAcct_status_type(long acct_status_type) { - this.acct_status_type = acct_status_type; - } - - public long getAcct_session_time() { - return acct_session_time; - } - - public void setAcct_session_time(long acct_session_time) { - this.acct_session_time = acct_session_time; - } - - public long getEvent_timestamp() { - return event_timestamp; - } - - public void setEvent_timestamp(long event_timestamp) { - this.event_timestamp = event_timestamp; - } -} diff --git a/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java b/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java index d267b6b..43d2a9d 100644 --- a/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java +++ b/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java @@ -15,6 +15,11 @@ public class RadiusKnowledgeConfig { * 1、开始计费 */ public static final int START_BILLING = 1; + + /** + * 3、过渡计费 + */ + public static final int UPDATE_BILLING = 3; /** * 2、停止计费 */ @@ -50,11 +55,11 @@ public class RadiusKnowledgeConfig { /** * kafka */ - public static final String INPUT_KAFKA_SERVERS = RadiusKnowledgeConfigurations.getStringProperty(0, "input.kafka.servers"); - public static final String OUTPUT_KAFKA_SERVERS = RadiusKnowledgeConfigurations.getStringProperty(0, "output.kafka.servers"); + public static final String SOURCE_KAFKA_SERVERS = RadiusKnowledgeConfigurations.getStringProperty(0, "source.kafka.servers"); + public static final String SOURCE_KAFKA_TOPIC = RadiusKnowledgeConfigurations.getStringProperty(0, "source.kafka.topic"); + public static final String SINK_KAFKA_SERVERS = RadiusKnowledgeConfigurations.getStringProperty(0, "sink.kafka.servers"); + public static final String SINK_KAFKA_TOPIC = RadiusKnowledgeConfigurations.getStringProperty(0, "sink.kafka.topic"); public static final String GROUP_ID = RadiusKnowledgeConfigurations.getStringProperty(0, "group.id"); - public static final String OUTPUT_KAFKA_TOPIC = RadiusKnowledgeConfigurations.getStringProperty(0, "output.kafka.topic"); - public static final String INPUT_KAFKA_TOPIC = RadiusKnowledgeConfigurations.getStringProperty(0, "input.kafka.topic"); public static final String PRODUCER_ACK = RadiusKnowledgeConfigurations.getStringProperty(0, "producer.ack"); public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = RadiusKnowledgeConfigurations.getStringProperty(0, "producer.kafka.compression.type"); @@ -73,4 +78,11 @@ public class RadiusKnowledgeConfig { public static final String KAFKA_USER = RadiusKnowledgeConfigurations.getStringProperty(1, "kafka.user"); public static final String KAFKA_PIN = RadiusKnowledgeConfigurations.getStringProperty(1, "kafka.pin"); + /** + * kafka source config + */ + public static final String SESSION_TIMEOUT_MS = RadiusKnowledgeConfigurations.getStringProperty(1, "session.timeout.ms"); + public static final String MAX_POLL_RECORDS = RadiusKnowledgeConfigurations.getStringProperty(1, "max.poll.records"); + public static final String MAX_PARTITION_FETCH_BYTES = RadiusKnowledgeConfigurations.getStringProperty(1, "max.partition.fetch.bytes"); + }
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java b/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java index 77418a6..c70a69b 100644 --- a/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java +++ b/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java @@ -2,12 +2,10 @@ package com.zdjizhi.topology; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.common.RadiusKnowledgeConfig; -import com.zdjizhi.utils.functions.FilterLogFunction; import com.zdjizhi.utils.functions.FilterNullFunction; import com.zdjizhi.utils.functions.MapCompletedFunction; -import com.zdjizhi.utils.kafka.Consumer; -import com.zdjizhi.utils.kafka.Producer; +import com.zdjizhi.utils.kafka.KafkaConsumer; +import com.zdjizhi.utils.kafka.KafkaProducer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -26,13 +24,12 @@ public class RadiusKnowledgeTopology { // environment.enableCheckpointing(5000); - DataStreamSource<String> streamSource = environment.addSource(Consumer.getKafkaConsumer()); + DataStreamSource<String> streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer()); - DataStream<String> result = streamSource.filter(new FilterLogFunction()).name("FilterOriginalData") - .map(new MapCompletedFunction()).name("RadiusOnOffMap") + DataStream<String> result = streamSource.map(new MapCompletedFunction()).name("RadiusOnOffMap") .filter(new FilterNullFunction()).name("FilterAbnormalData"); - result.addSink(Producer.getKafkaProducer()).name("LogSink"); + result.addSink(KafkaProducer.getKafkaProducer()).name("LogSink"); try { environment.execute("RADIUS-ON-OFF"); diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterLogFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterLogFunction.java deleted file mode 100644 index 723ceaa..0000000 --- a/src/main/java/com/zdjizhi/utils/functions/FilterLogFunction.java +++ /dev/null @@ -1,40 +0,0 @@ -package com.zdjizhi.utils.functions; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.alibaba.fastjson.JSONObject; -import com.zdjizhi.common.RadiusKnowledgeConfig; -import com.zdjizhi.utils.StringUtil; -import org.apache.flink.api.common.functions.FilterFunction; - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.functions - * @Description: - * @date 2021/5/2715:01 - */ -public class FilterLogFunction implements FilterFunction<String> { - private static final Log logger = LogFactory.get(); - - @Override - public boolean filter(String message) { - boolean legitimate = false; - try { - if (StringUtil.isNotBlank(message)) { - JSONObject jsonObject = JSONObject.parseObject(message); - if (jsonObject.containsKey(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE)) { - int packetType = jsonObject.getInteger(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE); - if (RadiusKnowledgeConfig.ACCOUNTING_REQUEST == packetType) { - int statusType = jsonObject.getInteger(RadiusKnowledgeConfig.RADIUS_ACCT_STATUS_TYPE); - if (RadiusKnowledgeConfig.START_BILLING == statusType || RadiusKnowledgeConfig.STOP_BILLING == statusType) { - legitimate = true; - } - } - } - } - } catch (RuntimeException re) { - logger.error("数据解析异常,异常信息:" + re); - } - return legitimate; - } -} diff --git a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java index e58419c..3c0a1da 100644 --- a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java @@ -1,8 +1,10 @@ package com.zdjizhi.utils.functions; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import com.alibaba.fastjson.JSONObject; -import com.zdjizhi.bean.RadiusKnowledge; import com.zdjizhi.common.RadiusKnowledgeConfig; +import com.zdjizhi.utils.StringUtil; import org.apache.flink.api.common.functions.MapFunction; /** @@ -12,44 +14,70 @@ import org.apache.flink.api.common.functions.MapFunction; * @date 2021/5/2715:01 */ public class MapCompletedFunction implements MapFunction<String, String> { + private static final Log logger = LogFactory.get(); @Override public String map(String logs) { + String result = null; try { - JSONObject jsonObject = JSONObject.parseObject(logs); - RadiusKnowledge knowledge = new RadiusKnowledge(); - knowledge.setFramed_ip(jsonObject.getString("radius_framed_ip")); - knowledge.setAccount(jsonObject.getString("radius_account")); - knowledge.setAcct_status_type(jsonObject.getInteger("radius_acct_status_type")); - /* - 如果存在时间戳则选择此时间戳没有获取当前时间 - */ - if (jsonObject.containsKey(RadiusKnowledgeConfig.RADIUS_EVENT_TIMESTAMP)) { - knowledge.setEvent_timestamp(jsonObject.getLong("radius_event_timestamp")); - } else { - knowledge.setEvent_timestamp(System.currentTimeMillis() / 1000); - } - /* - * 标识同一个连接: - * 1.数据若存在acct_multi_session_id属性,取该属性 - * 2.不存在取 acct_session_id - */ - if (jsonObject.containsKey(RadiusKnowledgeConfig.RADIUS_MULTI_SESSION_ID)) { - knowledge.setAcct_session_id(jsonObject.getString("radius_acct_multi_session_id")); - } else { - knowledge.setAcct_session_id(jsonObject.getString("radius_acct_session_id")); - } - /* - 用户的在线时长,以秒为单位,下线用户无此属性,默认为0 - */ - if (jsonObject.containsKey(RadiusKnowledgeConfig.RADIUS_SESSION_TIME)) { - knowledge.setAcct_session_time(jsonObject.getInteger("radius_acct_session_time")); - } else { - knowledge.setAcct_session_time(0); + if (StringUtil.isNotBlank(logs)) { + JSONObject jsonMap = JSONObject.parseObject(logs); + if (jsonMap.containsKey(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE)) { + int packetType = jsonMap.getInteger(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE); + if (RadiusKnowledgeConfig.ACCOUNTING_REQUEST == packetType) { + result = GetKnowledgeLogs(jsonMap); + } + + } } - return JSONObject.toJSONString(knowledge); } catch (RuntimeException e) { - return ""; + logger.error("数据解析异常,异常信息:" + e); } + return result; + } + + /** + * 获取RadiusOnOff日志 + * + * @param jsonMap 原始日志 + * @return OnOff日志 + */ + private static String GetKnowledgeLogs(JSONObject jsonMap) { + JSONObject knowledge = new JSONObject(); + + knowledge.put("framed_ip", jsonMap.getString("radius_framed_ip")); + + knowledge.put("account", jsonMap.getString("radius_account")); + + knowledge.put("acct_status_type", jsonMap.getString("radius_acct_status_type")); + + /* + 如果存在时间戳则选择此时间戳没有获取当前时间 + */ + if (jsonMap.containsKey(RadiusKnowledgeConfig.RADIUS_EVENT_TIMESTAMP)) { + knowledge.put("event_timestamp", jsonMap.getString("radius_event_timestamp")); + } else { + knowledge.put("event_timestamp", (System.currentTimeMillis() / 1000)); + } + /* + * 标识同一个连接: + * 1.数据若存在acct_multi_session_id属性,取该属性 + * 2.不存在取 acct_session_id + */ + if (jsonMap.containsKey(RadiusKnowledgeConfig.RADIUS_MULTI_SESSION_ID)) { + knowledge.put("acct_session_id", jsonMap.getString("radius_acct_multi_session_id")); + } else { + knowledge.put("acct_session_id", jsonMap.getString("radius_acct_multi_session_id")); + } + /* + 用户的在线时长,以秒为单位,下线用户无此属性,默认为0 + */ + if (jsonMap.containsKey(RadiusKnowledgeConfig.RADIUS_SESSION_TIME)) { + knowledge.put("acct_session_time", jsonMap.getString("radius_acct_session_time")); + } else { + knowledge.put("acct_session_time", 0); + } + + return knowledge.toJSONString(); } } diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java index a325dd6..a5bce21 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java +++ b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java @@ -12,24 +12,36 @@ import java.util.Properties; * @date 2021/9/610:37 */ class CertUtils { - static void chooseCert(String type, Properties properties) { - switch (type) { - case "SSL": - properties.put("security.protocol", "SSL"); - properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); - properties.put("ssl.keystore.location", RadiusKnowledgeConfig.TOOLS_LIBRARY + "keystore.jks"); - properties.put("ssl.keystore.password", RadiusKnowledgeConfig.KAFKA_PIN); - properties.put("ssl.truststore.location", RadiusKnowledgeConfig.TOOLS_LIBRARY + "truststore.jks"); - properties.put("ssl.truststore.password", RadiusKnowledgeConfig.KAFKA_PIN); - properties.put("ssl.key.password", RadiusKnowledgeConfig.KAFKA_PIN); - break; - case "SASL": - properties.put("security.protocol", "SASL_PLAINTEXT"); - properties.put("sasl.mechanism", "PLAIN"); - properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" - + RadiusKnowledgeConfig.KAFKA_USER + " password=" + RadiusKnowledgeConfig.KAFKA_PIN + ";"); - break; - default: + /** + * Kafka SASL认证端口 + */ + private static final String SASL_PORT = "9094"; + + /** + * Kafka SSL认证端口 + */ + private static final String SSL_PORT = "9095"; + + /** + * 根据连接信息端口判断认证方式。 + * + * @param servers kafka 连接信息 + * @param properties kafka 连接配置信息 + */ + static void chooseCert(String servers, Properties properties) { + if (servers.contains(SASL_PORT)) { + properties.put("security.protocol", "SASL_PLAINTEXT"); + properties.put("sasl.mechanism", "PLAIN"); + properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + + RadiusKnowledgeConfig.KAFKA_USER + " password=" + RadiusKnowledgeConfig.KAFKA_PIN + ";"); + } else if (servers.contains(SSL_PORT)) { + properties.put("security.protocol", "SSL"); + properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + properties.put("ssl.keystore.location", RadiusKnowledgeConfig.TOOLS_LIBRARY + "keystore.jks"); + properties.put("ssl.keystore.password", RadiusKnowledgeConfig.KAFKA_PIN); + properties.put("ssl.truststore.location", RadiusKnowledgeConfig.TOOLS_LIBRARY + "truststore.jks"); + properties.put("ssl.truststore.password", RadiusKnowledgeConfig.KAFKA_PIN); + properties.put("ssl.key.password", RadiusKnowledgeConfig.KAFKA_PIN); } } diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java index 7e75117..bb47bf9 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java @@ -13,25 +13,27 @@ import java.util.Properties; * @Description: * @date 2021/6/813:54 */ -public class Consumer { +public class KafkaConsumer { private static Properties createConsumerConfig() { Properties properties = new Properties(); - properties.put("bootstrap.servers", RadiusKnowledgeConfig.INPUT_KAFKA_SERVERS); + properties.put("bootstrap.servers", RadiusKnowledgeConfig.SOURCE_KAFKA_SERVERS); properties.put("group.id", RadiusKnowledgeConfig.GROUP_ID); - properties.put("session.timeout.ms", "60000"); - properties.put("max.poll.records", 3000); - properties.put("max.partition.fetch.bytes", 31457280); + properties.put("session.timeout.ms", RadiusKnowledgeConfig.SESSION_TIMEOUT_MS); + properties.put("max.poll.records", RadiusKnowledgeConfig.MAX_POLL_RECORDS); + properties.put("max.partition.fetch.bytes", RadiusKnowledgeConfig.MAX_PARTITION_FETCH_BYTES); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - CertUtils.chooseCert(RadiusKnowledgeConfig.KAFKA_SOURCE_PROTOCOL,properties); + CertUtils.chooseCert(RadiusKnowledgeConfig.SOURCE_KAFKA_SERVERS, properties); return properties; } public static FlinkKafkaConsumer<String> getKafkaConsumer() { - FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(RadiusKnowledgeConfig.INPUT_KAFKA_TOPIC, - new SimpleStringSchema(), createConsumerConfig()); + FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>( + RadiusKnowledgeConfig.SOURCE_KAFKA_TOPIC, + new SimpleStringSchema(), + createConsumerConfig()); kafkaConsumer.setCommitOffsetsOnCheckpoints(true); kafkaConsumer.setStartFromGroupOffsets(); diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java index ce8fab3..c259fd7 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/Producer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java @@ -14,11 +14,11 @@ import java.util.Properties; * @Description: * @date 2021/6/814:04 */ -public class Producer { +public class KafkaProducer { private static Properties createProducerConfig() { Properties properties = new Properties(); - properties.put("bootstrap.servers", RadiusKnowledgeConfig.OUTPUT_KAFKA_SERVERS); + properties.put("bootstrap.servers", RadiusKnowledgeConfig.SINK_KAFKA_SERVERS); properties.put("acks", RadiusKnowledgeConfig.PRODUCER_ACK); properties.put("retries", RadiusKnowledgeConfig.RETRIES); properties.put("linger.ms", RadiusKnowledgeConfig.LINGER_MS); @@ -28,7 +28,7 @@ public class Producer { properties.put("max.request.size", RadiusKnowledgeConfig.MAX_REQUEST_SIZE); properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, RadiusKnowledgeConfig.PRODUCER_KAFKA_COMPRESSION_TYPE); - CertUtils.chooseCert(RadiusKnowledgeConfig.KAFKA_SINK_PROTOCOL, properties); + CertUtils.chooseCert(RadiusKnowledgeConfig.SINK_KAFKA_SERVERS, properties); return properties; } @@ -36,7 +36,7 @@ public class Producer { public static FlinkKafkaProducer<String> getKafkaProducer() { FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>( - RadiusKnowledgeConfig.OUTPUT_KAFKA_TOPIC, + RadiusKnowledgeConfig.SINK_KAFKA_TOPIC, new SimpleStringSchema(), createProducerConfig(), Optional.empty()); |
