summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2021-09-27 11:15:46 +0800
committerqidaijie <[email protected]>2021-09-27 11:15:46 +0800
commitbeeff0a9e03d3016caa4bb0aa5327b8a6ee0b8a0 (patch)
tree5ea7bec72591d645ad6b2f6ba7c23dee31756bfe
parent8e93eb3d028d331a22bdbb246f6d21433f7a1259 (diff)
提交2109版本
-rw-r--r--pom.xml223
-rw-r--r--properties/default_config.properties29
-rw-r--r--properties/service_flow_config.properties28
-rw-r--r--src/main/java/com/zdjizhi/bean/RadiusKnowledge.java62
-rw-r--r--src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java76
-rw-r--r--src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java46
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/FilterLogFunction.java40
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java17
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java55
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/CertUtils.java36
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/Consumer.java41
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/Producer.java48
-rw-r--r--src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java70
-rw-r--r--src/main/log4j.properties25
-rw-r--r--src/test/java/com/zdjizhi/FunctionTest.java24
15 files changed, 820 insertions, 0 deletions
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..1062c03
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,223 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.zdjizhi</groupId>
+ <artifactId>radius-account-knowledge</artifactId>
+ <version>210908-security</version>
+
+ <name>radius-account-knowledge</name>
+ <url>http://www.example.com</url>
+
+
+ <repositories>
+ <repository>
+ <id>nexus</id>
+ <name>Team Nexus Repository</name>
+ <url>http://192.168.40.125:8099/content/groups/public</url>
+ </repository>
+
+ <repository>
+ <id>maven-ali</id>
+ <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
+ <releases>
+ <!--<enabled>true</enabled>-->
+ </releases>
+ <snapshots>
+ <!--<enabled>true</enabled>-->
+ <checksumPolicy>fail</checksumPolicy>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <flink.version>1.13.1</flink.version>
+ <hadoop.version>2.7.1</hadoop.version>
+ <kafka.version>1.0.0</kafka.version>
+ <hbase.version>2.2.3</hbase.version>
+ <!--<scope.type>provided</scope.type>-->
+ <scope.type>compile</scope.type>
+ </properties>
+
+ <build>
+ <plugins>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.4.2</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>com.zdjizhi.topology.RadiusKnowledgeTopology</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>io.github.zlika</groupId>
+ <artifactId>reproducible-build-maven-plugin</artifactId>
+ <version>0.2</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>strip-jar</goal>
+ </goals>
+ <phase>package</phase>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ <resources>
+ <resource>
+ <directory>properties</directory>
+ <includes>
+ <include>**/*.properties</include>
+ <include>**/*.xml</include>
+ </includes>
+ <filtering>false</filtering>
+ </resource>
+
+ <resource>
+ <directory>src\main\java</directory>
+ <includes>
+ <include>log4j.properties</include>
+ </includes>
+ <filtering>false</filtering>
+ </resource>
+ </resources>
+ </build>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>com.zdjizhi</groupId>
+ <artifactId>galaxy</artifactId>
+ <version>1.0.6</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>1.2.70</version>
+ </dependency>
+
+ <!--&lt;!&ndash; https://mvnrepository.com/artifact/org.apache.flink/flink-table &ndash;&gt;-->
+ <!--<dependency>-->
+ <!--<groupId>org.apache.flink</groupId>-->
+ <!--<artifactId>flink-table</artifactId>-->
+ <!--<version>${flink.version}</version>-->
+ <!--<type>pom</type>-->
+ <!--<scope>${scope.type}</scope>-->
+ <!--</dependency>-->
+
+ <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ <scope>${scope.type}</scope>
+ </dependency>
+
+
+ <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.12</artifactId>
+ <version>${flink.version}</version>
+ <scope>${scope.type}</scope>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_2.12</artifactId>
+ <version>${flink.version}</version>
+ <scope>${scope.type}</scope>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka_2.12</artifactId>
+ <version>${flink.version}</version>
+ <scope>${scope.type}</scope>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>${scope.type}</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>cglib</groupId>
+ <artifactId>cglib-nodep</artifactId>
+ <version>3.2.4</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <version>5.3.2</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.jayway.jsonpath</groupId>
+ <artifactId>json-path</artifactId>
+ <version>2.4.0</version>
+ </dependency>
+
+
+ <dependency>
+ <groupId>cn.hutool</groupId>
+ <artifactId>hutool-all</artifactId>
+ <version>5.5.2</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+</project>
+
diff --git a/properties/default_config.properties b/properties/default_config.properties
new file mode 100644
index 0000000..021eebd
--- /dev/null
+++ b/properties/default_config.properties
@@ -0,0 +1,29 @@
+#producer���ԵĴ�������
+retries=0
+
+#���ĺ������˵һ��Batch������֮��������ã��������Batch��û��д���������뷢�ͳ�ȥ��
+linger.ms=5
+
+#����ڳ�ʱ֮ǰδ�յ���Ӧ���ͻ��˽��ڱ�Ҫʱ���·�������
+request.timeout.ms=30000
+
+#producer���ǰ���batch���з��͵�,���δ�С��Ĭ��:16384
+batch.size=262144
+
+#Producer�����ڻ�����Ϣ�Ļ�������С
+buffer.memory=67108864
+
+#�������������ÿ�η��͸�Kafka���������������С,Ĭ��1048576
+max.request.size=5242880
+
+#kafka SASL��֤�û���
+kafka.user=admin
+
+#kafka SASL��SSL��֤����
+kafka.pin=galaxy2019
+
+#kafka sink protocol; SSL or SASL
+kafka.source.protocol=SASL
+
+#kafka sink protocol; SSL or SASL
+kafka.sink.protocol=SASL \ No newline at end of file
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
new file mode 100644
index 0000000..eb2cb4c
--- /dev/null
+++ b/properties/service_flow_config.properties
@@ -0,0 +1,28 @@
+#--------------------------------地址配置------------------------------#
+
+#管理kafka地址
+input.kafka.servers=192.168.44.12:9092
+
+#管理输出kafka地址
+output.kafka.servers=192.168.44.12:9092
+
+#--------------------------------Kafka消费组信息------------------------------#
+
+#kafka 接收数据topic
+input.kafka.topic=RADIUS-RECORD
+
+#补全数据 输出 topic
+output.kafka.topic=RADIUS-ONFF-LOG
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+group.id=radius-on-off-flink-20210615
+
+#生产者压缩模式 none or snappy
+producer.kafka.compression.type=none
+
+#生产者ack
+producer.ack=1
+
+#--------------------------------topology配置------------------------------#
+#第三方工具地址
+tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/bean/RadiusKnowledge.java b/src/main/java/com/zdjizhi/bean/RadiusKnowledge.java
new file mode 100644
index 0000000..43e71a1
--- /dev/null
+++ b/src/main/java/com/zdjizhi/bean/RadiusKnowledge.java
@@ -0,0 +1,62 @@
+package com.zdjizhi.bean;
+
+/**
+ * @author qidaijie
+ */
+public class RadiusKnowledge {
+
+ private String framed_ip;
+ private String account;
+ private String acct_session_id;
+ private long acct_status_type;
+ private long acct_session_time;
+ private long event_timestamp;
+
+ public String getFramed_ip() {
+ return framed_ip;
+ }
+
+ public void setFramed_ip(String framed_ip) {
+ this.framed_ip = framed_ip;
+ }
+
+ public String getAccount() {
+ return account;
+ }
+
+ public void setAccount(String account) {
+ this.account = account;
+ }
+
+ public String getAcct_session_id() {
+ return acct_session_id;
+ }
+
+ public void setAcct_session_id(String acct_session_id) {
+ this.acct_session_id = acct_session_id;
+ }
+
+ public long getAcct_status_type() {
+ return acct_status_type;
+ }
+
+ public void setAcct_status_type(long acct_status_type) {
+ this.acct_status_type = acct_status_type;
+ }
+
+ public long getAcct_session_time() {
+ return acct_session_time;
+ }
+
+ public void setAcct_session_time(long acct_session_time) {
+ this.acct_session_time = acct_session_time;
+ }
+
+ public long getEvent_timestamp() {
+ return event_timestamp;
+ }
+
+ public void setEvent_timestamp(long event_timestamp) {
+ this.event_timestamp = event_timestamp;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java b/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java
new file mode 100644
index 0000000..d267b6b
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java
@@ -0,0 +1,76 @@
+package com.zdjizhi.common;
+
+
+import com.zdjizhi.utils.system.RadiusKnowledgeConfigurations;
+
+/**
+ * @author Administrator
+ */
+public class RadiusKnowledgeConfig {
+ /**
+ * 4- Accounting-Request(账户授权)
+ */
+ public static final int ACCOUNTING_REQUEST = 4;
+ /**
+ * 1、开始计费
+ */
+ public static final int START_BILLING = 1;
+ /**
+ * 2、停止计费
+ */
+ public static final int STOP_BILLING = 2;
+ /**
+ * 报文类型
+ */
+ public static final String RADIUS_PACKET_TYPE = "radius_packet_type";
+ /**
+ * 计费请求报文类型
+ */
+ public static final String RADIUS_ACCT_STATUS_TYPE = "radius_acct_status_type";
+ /**
+ * 发送计费请求报文时间戳
+ */
+ public static final String RADIUS_EVENT_TIMESTAMP = "radius_event_timestamp";
+
+ /**
+ * 一个用户多个计费ID关联属性
+ */
+ public static final String RADIUS_MULTI_SESSION_ID = "radius_acct_multi_session_id";
+
+ /**
+ * 用户的在线时长,以秒为单位
+ */
+ public static final String RADIUS_SESSION_TIME = "radius_acct_session_time";
+
+
+ /**
+ * System
+ */
+
+ /**
+ * kafka
+ */
+ public static final String INPUT_KAFKA_SERVERS = RadiusKnowledgeConfigurations.getStringProperty(0, "input.kafka.servers");
+ public static final String OUTPUT_KAFKA_SERVERS = RadiusKnowledgeConfigurations.getStringProperty(0, "output.kafka.servers");
+ public static final String GROUP_ID = RadiusKnowledgeConfigurations.getStringProperty(0, "group.id");
+ public static final String OUTPUT_KAFKA_TOPIC = RadiusKnowledgeConfigurations.getStringProperty(0, "output.kafka.topic");
+ public static final String INPUT_KAFKA_TOPIC = RadiusKnowledgeConfigurations.getStringProperty(0, "input.kafka.topic");
+ public static final String PRODUCER_ACK = RadiusKnowledgeConfigurations.getStringProperty(0, "producer.ack");
+ public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = RadiusKnowledgeConfigurations.getStringProperty(0, "producer.kafka.compression.type");
+
+ /**
+ * default config
+ */
+ public static final String RETRIES = RadiusKnowledgeConfigurations.getStringProperty(1, "retries");
+ public static final String LINGER_MS = RadiusKnowledgeConfigurations.getStringProperty(1, "linger.ms");
+ public static final Integer REQUEST_TIMEOUT_MS = RadiusKnowledgeConfigurations.getIntProperty(1, "request.timeout.ms");
+ public static final Integer BATCH_SIZE = RadiusKnowledgeConfigurations.getIntProperty(1, "batch.size");
+ public static final Integer BUFFER_MEMORY = RadiusKnowledgeConfigurations.getIntProperty(1, "buffer.memory");
+ public static final Integer MAX_REQUEST_SIZE = RadiusKnowledgeConfigurations.getIntProperty(1, "max.request.size");
+ public static final String TOOLS_LIBRARY = RadiusKnowledgeConfigurations.getStringProperty(0, "tools.library");
+ public static final String KAFKA_SOURCE_PROTOCOL = RadiusKnowledgeConfigurations.getStringProperty(1, "kafka.source.protocol");
+ public static final String KAFKA_SINK_PROTOCOL = RadiusKnowledgeConfigurations.getStringProperty(1, "kafka.sink.protocol");
+ public static final String KAFKA_USER = RadiusKnowledgeConfigurations.getStringProperty(1, "kafka.user");
+ public static final String KAFKA_PIN = RadiusKnowledgeConfigurations.getStringProperty(1, "kafka.pin");
+
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java b/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java
new file mode 100644
index 0000000..77418a6
--- /dev/null
+++ b/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java
@@ -0,0 +1,46 @@
+package com.zdjizhi.topology;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.RadiusKnowledgeConfig;
+import com.zdjizhi.utils.functions.FilterLogFunction;
+import com.zdjizhi.utils.functions.FilterNullFunction;
+import com.zdjizhi.utils.functions.MapCompletedFunction;
+import com.zdjizhi.utils.kafka.Consumer;
+import com.zdjizhi.utils.kafka.Producer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.topology
+ * @Description:
+ * @date 2021/5/2016:42
+ */
+public class RadiusKnowledgeTopology {
+ private static final Log logger = LogFactory.get();
+
+ public static void main(String[] args) {
+ final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+
+// environment.enableCheckpointing(5000);
+
+ DataStreamSource<String> streamSource = environment.addSource(Consumer.getKafkaConsumer());
+
+ DataStream<String> result = streamSource.filter(new FilterLogFunction()).name("FilterOriginalData")
+ .map(new MapCompletedFunction()).name("RadiusOnOffMap")
+ .filter(new FilterNullFunction()).name("FilterAbnormalData");
+
+ result.addSink(Producer.getKafkaProducer()).name("LogSink");
+
+ try {
+ environment.execute("RADIUS-ON-OFF");
+ } catch (Exception e) {
+ logger.error("This Flink task start ERROR! Exception information is :" + e);
+ }
+
+ }
+
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterLogFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterLogFunction.java
new file mode 100644
index 0000000..723ceaa
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/FilterLogFunction.java
@@ -0,0 +1,40 @@
+package com.zdjizhi.utils.functions;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.common.RadiusKnowledgeConfig;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.flink.api.common.functions.FilterFunction;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/5/2715:01
+ */
+public class FilterLogFunction implements FilterFunction<String> {
+ private static final Log logger = LogFactory.get();
+
+ @Override
+ public boolean filter(String message) {
+ boolean legitimate = false;
+ try {
+ if (StringUtil.isNotBlank(message)) {
+ JSONObject jsonObject = JSONObject.parseObject(message);
+ if (jsonObject.containsKey(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE)) {
+ int packetType = jsonObject.getInteger(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE);
+ if (RadiusKnowledgeConfig.ACCOUNTING_REQUEST == packetType) {
+ int statusType = jsonObject.getInteger(RadiusKnowledgeConfig.RADIUS_ACCT_STATUS_TYPE);
+ if (RadiusKnowledgeConfig.START_BILLING == statusType || RadiusKnowledgeConfig.STOP_BILLING == statusType) {
+ legitimate = true;
+ }
+ }
+ }
+ }
+ } catch (RuntimeException re) {
+ logger.error("数据解析异常,异常信息:" + re);
+ }
+ return legitimate;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
new file mode 100644
index 0000000..de507ad
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
@@ -0,0 +1,17 @@
+package com.zdjizhi.utils.functions;
+
+import com.zdjizhi.utils.StringUtil;
+import org.apache.flink.api.common.functions.FilterFunction;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/5/2715:01
+ */
+public class FilterNullFunction implements FilterFunction<String> {
+ @Override
+ public boolean filter(String message) {
+ return StringUtil.isNotBlank(message);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java
new file mode 100644
index 0000000..e58419c
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java
@@ -0,0 +1,55 @@
+package com.zdjizhi.utils.functions;
+
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.bean.RadiusKnowledge;
+import com.zdjizhi.common.RadiusKnowledgeConfig;
+import org.apache.flink.api.common.functions.MapFunction;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/5/2715:01
+ */
+public class MapCompletedFunction implements MapFunction<String, String> {
+
+ @Override
+ public String map(String logs) {
+ try {
+ JSONObject jsonObject = JSONObject.parseObject(logs);
+ RadiusKnowledge knowledge = new RadiusKnowledge();
+ knowledge.setFramed_ip(jsonObject.getString("radius_framed_ip"));
+ knowledge.setAccount(jsonObject.getString("radius_account"));
+ knowledge.setAcct_status_type(jsonObject.getInteger("radius_acct_status_type"));
+ /*
+ 如果存在时间戳则选择此时间戳没有获取当前时间
+ */
+ if (jsonObject.containsKey(RadiusKnowledgeConfig.RADIUS_EVENT_TIMESTAMP)) {
+ knowledge.setEvent_timestamp(jsonObject.getLong("radius_event_timestamp"));
+ } else {
+ knowledge.setEvent_timestamp(System.currentTimeMillis() / 1000);
+ }
+ /*
+ * 标识同一个连接:
+ * 1.数据若存在acct_multi_session_id属性,取该属性
+ * 2.不存在取 acct_session_id
+ */
+ if (jsonObject.containsKey(RadiusKnowledgeConfig.RADIUS_MULTI_SESSION_ID)) {
+ knowledge.setAcct_session_id(jsonObject.getString("radius_acct_multi_session_id"));
+ } else {
+ knowledge.setAcct_session_id(jsonObject.getString("radius_acct_session_id"));
+ }
+ /*
+ 用户的在线时长,以秒为单位,下线用户无此属性,默认为0
+ */
+ if (jsonObject.containsKey(RadiusKnowledgeConfig.RADIUS_SESSION_TIME)) {
+ knowledge.setAcct_session_time(jsonObject.getInteger("radius_acct_session_time"));
+ } else {
+ knowledge.setAcct_session_time(0);
+ }
+ return JSONObject.toJSONString(knowledge);
+ } catch (RuntimeException e) {
+ return "";
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
new file mode 100644
index 0000000..a325dd6
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
@@ -0,0 +1,36 @@
+package com.zdjizhi.utils.kafka;
+
+import com.zdjizhi.common.RadiusKnowledgeConfig;
+import org.apache.kafka.common.config.SslConfigs;
+
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.kafka
+ * @Description:
+ * @date 2021/9/610:37
+ */
+class CertUtils {
+ static void chooseCert(String type, Properties properties) {
+ switch (type) {
+ case "SSL":
+ properties.put("security.protocol", "SSL");
+ properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+ properties.put("ssl.keystore.location", RadiusKnowledgeConfig.TOOLS_LIBRARY + "keystore.jks");
+ properties.put("ssl.keystore.password", RadiusKnowledgeConfig.KAFKA_PIN);
+ properties.put("ssl.truststore.location", RadiusKnowledgeConfig.TOOLS_LIBRARY + "truststore.jks");
+ properties.put("ssl.truststore.password", RadiusKnowledgeConfig.KAFKA_PIN);
+ properties.put("ssl.key.password", RadiusKnowledgeConfig.KAFKA_PIN);
+ break;
+ case "SASL":
+ properties.put("security.protocol", "SASL_PLAINTEXT");
+ properties.put("sasl.mechanism", "PLAIN");
+ properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ + RadiusKnowledgeConfig.KAFKA_USER + " password=" + RadiusKnowledgeConfig.KAFKA_PIN + ";");
+ break;
+ default:
+ }
+
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java
new file mode 100644
index 0000000..7e75117
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java
@@ -0,0 +1,41 @@
+package com.zdjizhi.utils.kafka;
+
+import com.zdjizhi.common.RadiusKnowledgeConfig;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.kafka
+ * @Description:
+ * @date 2021/6/813:54
+ */
+public class Consumer {
+ private static Properties createConsumerConfig() {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", RadiusKnowledgeConfig.INPUT_KAFKA_SERVERS);
+ properties.put("group.id", RadiusKnowledgeConfig.GROUP_ID);
+ properties.put("session.timeout.ms", "60000");
+ properties.put("max.poll.records", 3000);
+ properties.put("max.partition.fetch.bytes", 31457280);
+ properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+
+ CertUtils.chooseCert(RadiusKnowledgeConfig.KAFKA_SOURCE_PROTOCOL,properties);
+
+ return properties;
+ }
+
+ public static FlinkKafkaConsumer<String> getKafkaConsumer() {
+ FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(RadiusKnowledgeConfig.INPUT_KAFKA_TOPIC,
+ new SimpleStringSchema(), createConsumerConfig());
+
+ kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
+ kafkaConsumer.setStartFromGroupOffsets();
+
+ return kafkaConsumer;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/Producer.java
new file mode 100644
index 0000000..ce8fab3
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/kafka/Producer.java
@@ -0,0 +1,48 @@
+package com.zdjizhi.utils.kafka;
+
+import com.zdjizhi.common.RadiusKnowledgeConfig;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.kafka
+ * @Description:
+ * @date 2021/6/814:04
+ */
+public class Producer {
+
+ private static Properties createProducerConfig() {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", RadiusKnowledgeConfig.OUTPUT_KAFKA_SERVERS);
+ properties.put("acks", RadiusKnowledgeConfig.PRODUCER_ACK);
+ properties.put("retries", RadiusKnowledgeConfig.RETRIES);
+ properties.put("linger.ms", RadiusKnowledgeConfig.LINGER_MS);
+ properties.put("request.timeout.ms", RadiusKnowledgeConfig.REQUEST_TIMEOUT_MS);
+ properties.put("batch.size", RadiusKnowledgeConfig.BATCH_SIZE);
+ properties.put("buffer.memory", RadiusKnowledgeConfig.BUFFER_MEMORY);
+ properties.put("max.request.size", RadiusKnowledgeConfig.MAX_REQUEST_SIZE);
+ properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, RadiusKnowledgeConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
+
+ CertUtils.chooseCert(RadiusKnowledgeConfig.KAFKA_SINK_PROTOCOL, properties);
+
+ return properties;
+ }
+
+
+ public static FlinkKafkaProducer<String> getKafkaProducer() {
+ FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
+ RadiusKnowledgeConfig.OUTPUT_KAFKA_TOPIC,
+ new SimpleStringSchema(),
+ createProducerConfig(), Optional.empty());
+
+ kafkaProducer.setLogFailuresOnly(false);
+// kafkaProducer.setWriteTimestampToKafka(true);
+
+ return kafkaProducer;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java b/src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java
new file mode 100644
index 0000000..7b7c046
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java
@@ -0,0 +1,70 @@
+package com.zdjizhi.utils.system;
+
+import com.zdjizhi.utils.StringUtil;
+
+import java.io.IOException;
+import java.util.Locale;
+import java.util.Properties;
+
+
+/**
+ * @author Administrator
+ */
+
+public final class RadiusKnowledgeConfigurations {
+
+ private static Properties propDefault = new Properties();
+ private static Properties propService = new Properties();
+
+
+ public static String getStringProperty(Integer type, String key) {
+ if (type == 0) {
+ return propService.getProperty(key);
+ } else if (type == 1) {
+ return propDefault.getProperty(key);
+ } else {
+ return null;
+ }
+
+ }
+
+ public static Integer getIntProperty(Integer type, String key) {
+ if (type == 0) {
+ return Integer.parseInt(propService.getProperty(key));
+ } else if (type == 1) {
+ return Integer.parseInt(propDefault.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Long getLongProperty(Integer type, String key) {
+ if (type == 0) {
+ return Long.parseLong(propService.getProperty(key));
+ } else if (type == 1) {
+ return Long.parseLong(propDefault.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Boolean getBooleanProperty(Integer type, String key) {
+ if (type == 0) {
+ return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
+ } else if (type == 1) {
+ return StringUtil.equals(propDefault.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
+ } else {
+ return null;
+ }
+ }
+
+ static {
+ try {
+ propService.load(RadiusKnowledgeConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
+ propDefault.load(RadiusKnowledgeConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties"));
+ } catch (IOException | RuntimeException e) {
+ propDefault = null;
+ propService = null;
+ }
+ }
+}
diff --git a/src/main/log4j.properties b/src/main/log4j.properties
new file mode 100644
index 0000000..9d91936
--- /dev/null
+++ b/src/main/log4j.properties
@@ -0,0 +1,25 @@
+#Log4j
+log4j.rootLogger=info,console,file
+# 控制台日志设置
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=info
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
+
+# 文件日志设置
+log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.file.Threshold=info
+log4j.appender.file.encoding=UTF-8
+log4j.appender.file.Append=true
+#路径请用相对路径,做好相关测试输出到应用目下
+log4j.appender.file.file=${nis.root}/log/galaxy-name.log
+log4j.appender.file.DatePattern='.'yyyy-MM-dd
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
+log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
+#MyBatis 配置,com.nis.web.dao是mybatis接口所在包
+log4j.logger.com.nis.web.dao=debug
+#bonecp数据源配置
+log4j.category.com.jolbox=debug,console
+
+
diff --git a/src/test/java/com/zdjizhi/FunctionTest.java b/src/test/java/com/zdjizhi/FunctionTest.java
new file mode 100644
index 0000000..6b52828
--- /dev/null
+++ b/src/test/java/com/zdjizhi/FunctionTest.java
@@ -0,0 +1,24 @@
+package com.zdjizhi;
+
+import com.alibaba.fastjson.JSONObject;
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.JsonPath;
+import org.junit.Test;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi
+ * @Description:
+ * @date 2021/6/3011:46
+ */
+public class FunctionTest {
+
+ @Test
+ public void jsonTest(){
+ String logs = "{\"name\":\"test\"}";
+ Object document = Configuration.defaultConfiguration().jsonProvider().parse(logs);
+ System.out.println(JsonPath.read(document, "$.name").toString());
+ JSONObject jsonObject = JSONObject.parseObject(logs);
+ System.out.println(jsonObject.getLong("age"));
+ }
+}