summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLAPTOP-CUUVN8AS\wk <[email protected]>2022-08-17 17:48:33 +0800
committerLAPTOP-CUUVN8AS\wk <[email protected]>2022-08-17 17:48:33 +0800
commitbea6a964df33658b83b924fd2ba9197c42d050b6 (patch)
treebfbc86a6b7feff6bea02ffce954eb3701efb9b73
parent6f0b2cb82b83a035aef9c2785fae1604df0a7ae7 (diff)
GTPC初始版本
-rw-r--r--.gitignore1
-rw-r--r--pom.xml287
-rw-r--r--properties/default_config.properties16
-rw-r--r--properties/service_flow_config.properties32
-rw-r--r--src/main/java/com/zdjizhi/common/GtpConfig.java45
-rw-r--r--src/main/java/com/zdjizhi/pojo/Entity.java107
-rw-r--r--src/main/java/com/zdjizhi/pojo/Gtp.java69
-rw-r--r--src/main/java/com/zdjizhi/topology/GtpRelation.java58
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java14
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/ParseFunction.java100
-rw-r--r--src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java397
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/CertUtils.java48
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/Consumer.java41
-rw-r--r--src/main/java/com/zdjizhi/utils/system/GtpConfigurations.java70
-rw-r--r--src/main/log4j.properties25
-rw-r--r--src/test/java/com/zdjizhi/FunctionTest.java34
16 files changed, 1344 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..68048cd
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+src/main/java/com/zdjizhi/utils/kafka/test.java
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..8d78c2f
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,287 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.zdjizhi</groupId>
+ <artifactId>relationship-gtpc-user</artifactId>
+ <version>22-08-15</version>
+
+ <name>relationship-gtpc-user</name>
+ <url>http://www.example.com</url>
+
+
+ <repositories>
+ <repository>
+ <id>nexus</id>
+ <name>Team Nexus Repository</name>
+ <url>http://192.168.40.125:8099/content/groups/public</url>
+ </repository>
+
+ <repository>
+ <id>maven-ali</id>
+ <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
+ <releases>
+ <!--<enabled>true</enabled>-->
+ </releases>
+ <snapshots>
+ <!--<enabled>true</enabled>-->
+ <checksumPolicy>fail</checksumPolicy>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <flink.version>1.13.1</flink.version>
+ <hadoop.version>2.7.1</hadoop.version>
+ <kafka.version>1.0.0</kafka.version>
+ <hbase.version>2.2.3</hbase.version>
+ <scope.type>provided</scope.type>
+ <!--<scope.type>compile</scope.type>-->
+ </properties>
+
+
+ <dependencies>
+
+ <dependency>
+ <groupId>com.zdjizhi</groupId>
+ <artifactId>galaxy</artifactId>
+ <version>1.0.6</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>1.2.70</version>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ <scope>${scope.type}</scope>
+ </dependency>
+
+
+ <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.12</artifactId>
+ <version>${flink.version}</version>
+ <scope>${scope.type}</scope>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_2.12</artifactId>
+ <version>${flink.version}</version>
+ <scope>${scope.type}</scope>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka_2.12</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>${scope.type}</scope>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${hbase.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-io</artifactId>
+ <groupId>commons-io</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-lang3</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>netty</artifactId>
+ <groupId>io.netty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>netty-all</artifactId>
+ <groupId>io.netty</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>cglib</groupId>
+ <artifactId>cglib-nodep</artifactId>
+ <version>3.2.4</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <version>5.3.2</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.jayway.jsonpath</groupId>
+ <artifactId>json-path</artifactId>
+ <version>2.4.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>cn.hutool</groupId>
+ <artifactId>hutool-all</artifactId>
+ <version>5.5.2</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.21</version>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.7.21</version>
+ </dependency>
+
+ </dependencies>
+
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.8.0</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ <!-- The semantics of this option are reversed, see MCOMPILER-209. -->
+ <useIncrementalCompilation>false</useIncrementalCompilation>
+ <compilerArgs>
+ <!-- Prevents recompilation due to missing package-info.class, see MCOMPILER-205 -->
+ <arg>-Xpkginfo:always</arg>
+ </compilerArgs>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <finalName>relationship-gtpc-user-22-08-15</finalName>
+ <transformers combine.children="append">
+ <!-- The service transformer is needed to merge META-INF/services files -->
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>com.zdjizhi.topology.GtpRelation</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ <resources>
+ <resource>
+ <directory>properties</directory>
+ <includes>
+ <include>**/*.properties</include>
+ <include>**/*.xml</include>
+ </includes>
+ <filtering>false</filtering>
+ </resource>
+
+ <resource>
+ <directory>src\main\java</directory>
+ <includes>
+ <include>log4j.properties</include>
+ </includes>
+ <filtering>false</filtering>
+ </resource>
+ </resources>
+ </build>
+</project>
+
diff --git a/properties/default_config.properties b/properties/default_config.properties
new file mode 100644
index 0000000..c1832ce
--- /dev/null
+++ b/properties/default_config.properties
@@ -0,0 +1,16 @@
+#====================Kafka KafkaConsumer====================#
+#kafka source connection timeout
+session.timeout.ms=60000
+
+#kafka source poll
+max.poll.records=3000
+
+#kafka source poll bytes
+max.partition.fetch.bytes=31457280
+#====================kafka default====================#
+
+#kafka SASL��֤�û���
+kafka.user=admin
+
+#kafka SASL��SSL��֤����
+kafka.pin=galaxy2019 \ No newline at end of file
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
new file mode 100644
index 0000000..a6976ba
--- /dev/null
+++ b/properties/service_flow_config.properties
@@ -0,0 +1,32 @@
+#--------------------------------地址配置------------------------------#
+
+#管理kafka地址
+#input.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
+input.kafka.servers=192.168.44.12:9094
+
+#hbase zookeeper地址 用于连接HBase
+#hbase.zookeeper.servers=192.168.44.12
+hbase.zookeeper.servers=192.168.44.12:2181
+
+hbase.scan.limit=0
+
+cache.expire.seconds=86400
+cache.max.size=10000000
+cache.update.seconds=3600
+
+#--------------------------------Kafka消费组信息------------------------------#
+
+#kafka 接收数据topic
+input.kafka.topic=GTPC-RECORD-COMPLETED
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+group.id=test3
+
+#--------------------------------topology配置------------------------------#
+#ip-account对应关系表
+relation.user.teid.table.name=tsg_galaxy:relation_user_teid
+
+#定位库地址
+tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
+#account-ip对应关系表
+gtpc.knowledge.base.table.name=tsg_galaxy:gtpc_knowledge_base \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/GtpConfig.java b/src/main/java/com/zdjizhi/common/GtpConfig.java
new file mode 100644
index 0000000..ee377b9
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/GtpConfig.java
@@ -0,0 +1,45 @@
+package com.zdjizhi.common;
+
+
+import com.zdjizhi.utils.system.GtpConfigurations;
+
+/**
+ * @author Administrator
+ */
+public class GtpConfig {
+
+ public static final int HBASE_SCAN_LIMIT = GtpConfigurations.getIntProperty(0, "hbase.scan.limit");
+
+
+ /**
+ * System
+ */
+ public static final String RELATION_USER_TEID_TABLE_NAME = GtpConfigurations.getStringProperty(0, "relation.user.teid.table.name");
+
+ public static final String GTPC_KNOWLEDGE_BASE_TABLE_NAME = GtpConfigurations.getStringProperty(0, "gtpc.knowledge.base.table.name");
+
+ /**
+ * kafka
+ */
+ public static final String INPUT_KAFKA_SERVERS = GtpConfigurations.getStringProperty(0, "input.kafka.servers");
+ public static final String HBASE_ZOOKEEPER_SERVERS = GtpConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
+ public static final String SESSION_TIMEOUT_MS = GtpConfigurations.getStringProperty(1, "session.timeout.ms");
+ public static final String MAX_POLL_RECORDS = GtpConfigurations.getStringProperty(1, "max.poll.records");
+ public static final String MAX_PARTITION_FETCH_BYTES = GtpConfigurations.getStringProperty(1, "max.partition.fetch.bytes");
+
+
+
+ public static final String GROUP_ID = GtpConfigurations.getStringProperty(0, "group.id");
+ public static final String INPUT_KAFKA_TOPIC = GtpConfigurations.getStringProperty(0, "input.kafka.topic");
+
+ public static final String TOOLS_LIBRARY = GtpConfigurations.getStringProperty(0, "tools.library");
+ public static final String KAFKA_USER = GtpConfigurations.getStringProperty(1, "kafka.user");
+ public static final String KAFKA_PIN = GtpConfigurations.getStringProperty(1, "kafka.pin");
+
+
+ public static final int CACHE_EXPIRE_SECONDS = GtpConfigurations.getIntProperty(0, "cache.expire.seconds");
+ public static final int CACHE_MAX_SIZE = GtpConfigurations.getIntProperty(0, "cache.max.size");
+
+
+ public static final int CACHE_UPDATE_SECONDS = GtpConfigurations.getIntProperty(0, "cache.update.seconds");
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/pojo/Entity.java b/src/main/java/com/zdjizhi/pojo/Entity.java
new file mode 100644
index 0000000..aac3316
--- /dev/null
+++ b/src/main/java/com/zdjizhi/pojo/Entity.java
@@ -0,0 +1,107 @@
+package com.zdjizhi.pojo;
+
+public class Entity {
+
+ private String Hashkey;
+ private int ifError;
+
+ private String gtp_apn;
+ private String gtp_imei;
+ private String gtp_imsi;
+ private String gtp_phone_number;
+ private Long gtp_uplink_teid;
+ private Long gtp_downlink_teid ;
+ private String gtp_msg_type;
+ private Long common_recv_time;
+ private Long gtp_teid;
+
+
+ public String getHashkey() {
+ return Hashkey;
+ }
+
+ public void setHashkey(String hashkey) {
+ Hashkey = hashkey;
+ }
+
+ public int getIfError() {
+ return ifError;
+ }
+
+ public void setIfError(int ifError) {
+ this.ifError = ifError;
+ }
+
+ public String getGtp_apn() {
+ return gtp_apn;
+ }
+
+ public void setGtp_apn(String gtp_apn) {
+ this.gtp_apn = gtp_apn;
+ }
+
+ public String getGtp_imei() {
+ return gtp_imei;
+ }
+
+ public void setGtp_imei(String gtp_imei) {
+ this.gtp_imei = gtp_imei;
+ }
+
+ public String getGtp_imsi() {
+ return gtp_imsi;
+ }
+
+ public void setGtp_imsi(String gtp_imsi) {
+ this.gtp_imsi = gtp_imsi;
+ }
+
+ public String getGtp_phone_number() {
+ return gtp_phone_number;
+ }
+
+ public void setGtp_phone_number(String gtp_phone_number) {
+ this.gtp_phone_number = gtp_phone_number;
+ }
+
+ public Long getGtp_uplink_teid() {
+ return gtp_uplink_teid;
+ }
+
+ public void setGtp_uplink_teid(Long gtp_uplink_teid) {
+ this.gtp_uplink_teid = gtp_uplink_teid;
+ }
+
+ public Long getGtp_downlink_teid() {
+ return gtp_downlink_teid;
+ }
+
+ public void setGtp_downlink_teid(Long gtp_downlink_teid) {
+ this.gtp_downlink_teid = gtp_downlink_teid;
+ }
+
+ public String getGtp_msg_type() {
+ return gtp_msg_type;
+ }
+
+ public void setGtp_msg_type(String gtp_msg_type) {
+ this.gtp_msg_type = gtp_msg_type;
+ }
+
+ public Long getCommon_recv_time() {
+ return common_recv_time;
+ }
+
+ public void setCommon_recv_time(Long common_recv_time) {
+ this.common_recv_time = common_recv_time;
+ }
+
+
+ public Long getGtp_teid() {
+ return gtp_teid;
+ }
+
+ public void setGtp_teid(Long gtp_teid) {
+ this.gtp_teid = gtp_teid;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/pojo/Gtp.java b/src/main/java/com/zdjizhi/pojo/Gtp.java
new file mode 100644
index 0000000..f71c922
--- /dev/null
+++ b/src/main/java/com/zdjizhi/pojo/Gtp.java
@@ -0,0 +1,69 @@
+package com.zdjizhi.pojo;
+
+public class Gtp {
+
+ private String gtp_apn;
+ private String gtp_imei;
+ private String gtp_imsi;
+ private String gtp_phone_number;
+ private Long gtp_teid;
+ private Integer msg_type;
+ private Long last_update_time;
+
+
+ public Integer getMsg_type() {
+ return msg_type;
+ }
+
+ public void setMsg_type(Integer msg_type) {
+ this.msg_type = msg_type;
+ }
+
+ public String getGtp_apn() {
+ return gtp_apn;
+ }
+
+ public void setGtp_apn(String gtp_apn) {
+ this.gtp_apn = gtp_apn;
+ }
+
+ public String getGtp_imei() {
+ return gtp_imei;
+ }
+
+ public void setGtp_imei(String gtp_imei) {
+ this.gtp_imei = gtp_imei;
+ }
+
+ public String getGtp_imsi() {
+ return gtp_imsi;
+ }
+
+ public void setGtp_imsi(String gtp_imsi) {
+ this.gtp_imsi = gtp_imsi;
+ }
+
+ public String getGtp_phone_number() {
+ return gtp_phone_number;
+ }
+
+ public void setGtp_phone_number(String gtp_phone_number) {
+ this.gtp_phone_number = gtp_phone_number;
+ }
+
+ public Long getGtp_teid() {
+ return gtp_teid;
+ }
+
+ public void setGtp_teid(Long gtp_teid) {
+ this.gtp_teid = gtp_teid;
+ }
+
+ public Long getLast_update_time() {
+ return last_update_time;
+ }
+
+ public void setLast_update_time(Long last_update_time) {
+ this.last_update_time = last_update_time;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/topology/GtpRelation.java b/src/main/java/com/zdjizhi/topology/GtpRelation.java
new file mode 100644
index 0000000..056029d
--- /dev/null
+++ b/src/main/java/com/zdjizhi/topology/GtpRelation.java
@@ -0,0 +1,58 @@
+package com.zdjizhi.topology;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.GtpConfig;
+import com.zdjizhi.pojo.Entity;
+import com.zdjizhi.utils.functions.FilterNullFunction;
+import com.zdjizhi.utils.functions.ParseFunction;
+import com.zdjizhi.utils.hbasepackage.HbaseSink;
+import com.zdjizhi.utils.kafka.Consumer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.topology
+ * @Description:
+ * @date 2021/5/2016:42
+ */
+public class GtpRelation {
+ private static final Log logger = LogFactory.get();
+
+
+ public static void main(String[] args) {
+ final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStreamSource<String> streamSource = environment.addSource(Consumer.getKafkaConsumer());
+
+ DataStream<Entity> getObject = streamSource.map(new ParseFunction()).name("ParseJson");
+
+ DataStream<Entity> filterOriginalData = getObject.filter(new FilterNullFunction()).name("FilterOriginalData");
+
+ KeyedStream<Entity, Tuple1<String>> GtprelationTuple = filterOriginalData.keyBy(new oneKeySelector());
+
+
+ GtprelationTuple.addSink(new HbaseSink(GtpConfig.HBASE_ZOOKEEPER_SERVERS));
+
+ try {
+ environment.execute("RELATIONSHIP-GTPC-USER");
+ } catch (Exception e) {
+ logger.error("This Flink task start ERROR! Exception information is :" + e);
+ }
+
+ }
+
+
+ public static class oneKeySelector implements KeySelector<Entity, Tuple1<String>> {
+
+ @Override
+ public Tuple1<String> getKey(Entity entity) throws Exception {
+ return new Tuple1<>(entity.getHashkey());
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
new file mode 100644
index 0000000..46e3ad0
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
@@ -0,0 +1,14 @@
+package com.zdjizhi.utils.functions;
+
+import com.zdjizhi.pojo.Entity;
+import org.apache.flink.api.common.functions.FilterFunction;
+
+public class FilterNullFunction implements FilterFunction<Entity> {
+
+ @Override
+ public boolean filter(Entity entity) {
+
+ return entity.getIfError()!=1;
+ }
+}
+
diff --git a/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java
new file mode 100644
index 0000000..6751f5b
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java
@@ -0,0 +1,100 @@
+package com.zdjizhi.utils.functions;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONException;
+import com.zdjizhi.pojo.Entity;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.flink.api.common.functions.MapFunction;
+
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/5/2715:01
+ */
+
+public class ParseFunction implements MapFunction<String, Entity> {
+ private static final Log logger = LogFactory.get();
+
+
+ @Override
+ public Entity map(String message) {
+
+
+ Entity entity = new Entity();
+
+ try {
+ if (StringUtil.isNotBlank(message)) {
+ entity = JSON.parseObject(message, Entity.class);
+ if(entity.getGtp_apn()==null){
+
+ entity.setGtp_apn("");
+ }
+ if(entity.getGtp_phone_number()==null){
+
+ entity.setGtp_phone_number("");
+ }
+ if(entity.getGtp_imei()==null){
+
+ entity.setGtp_imei("");
+ }
+ if(entity.getGtp_imsi()==null){
+
+ entity.setGtp_imsi("");
+ }
+
+
+ if(!"".equals(entity.getGtp_imei())|| !"".equals(entity.getGtp_imsi())|| !"".equals(entity.getGtp_phone_number())) {
+
+
+ String md5Str = DigestUtils.md5Hex(entity.getGtp_imei() + entity.getGtp_imsi() + entity.getGtp_phone_number());
+ entity.setHashkey(md5Str);
+
+ if(entity.getGtp_uplink_teid()==null || entity.getGtp_uplink_teid()==0){
+
+ if(entity.getGtp_downlink_teid()==null || entity.getGtp_downlink_teid()==0){
+
+ entity.setIfError(1);
+ logger.info("teid为空" + message);
+
+ }
+ else{
+
+ entity.setGtp_teid(entity.getGtp_downlink_teid());
+ }
+ }else{
+
+ entity.setGtp_teid(entity.getGtp_uplink_teid());
+
+ }
+
+ }
+ else {
+ entity.setHashkey("");
+ entity.setIfError(1);
+ logger.info("三元组为空" + message);
+
+ }
+
+
+
+ }else{
+
+ entity.setIfError(1);
+ logger.error("数据转换JSON格式异常,原始日志为:" + message);
+ }
+ } catch (JSONException jse) {
+ entity.setIfError(1);
+ logger.error("数据转换JSON格式异常,原始日志为:" + message);
+ } catch (RuntimeException re) {
+ entity.setIfError(1);
+ logger.error("GTP日志条件过滤异常,异常信息为:" + re);
+ }
+
+ return entity;
+ }
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java
new file mode 100644
index 0000000..dd545c3
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java
@@ -0,0 +1,397 @@
+package com.zdjizhi.utils.hbasepackage;
+
+
+import com.google.common.cache.CacheBuilder;
+import com.zdjizhi.common.GtpConfig;
+import com.zdjizhi.pojo.Entity;
+import com.zdjizhi.pojo.Gtp;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+
+public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, SinkFunction<Entity> {
+ private Logger log;
+
+ private String hbase_zookeeper_host;
+ // public Map<String, Gtp> gtpConcurrentHashMap = new ConcurrentHashMap<>(80000);
+ //public Cache<String, Gtp> gtpConcurrentHashMap = CacheUtil.newLRUCache(4, DateUnit.SECOND.getMillis() * 60);
+ public com.google.common.cache.Cache<String, Gtp> gtpConcurrentHashMap ;
+
+ private Connection connection;
+ private Admin admin;
+
+ public HbaseSink(String hbase_zookeeper_host) {
+ this.hbase_zookeeper_host = hbase_zookeeper_host;
+ gtpConcurrentHashMap=CacheBuilder.newBuilder().expireAfterWrite(GtpConfig.CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS).initialCapacity(100000).maximumSize(GtpConfig.CACHE_MAX_SIZE).build();
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ log = Logger.getLogger(HbaseSink.class);
+
+ org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
+ configuration.set("hbase.zookeeper.quorum", hbase_zookeeper_host);
+
+ connection = ConnectionFactory.createConnection(configuration);
+ admin = connection.getAdmin();
+
+ try {
+
+ Table table = connection.getTable(TableName.valueOf(GtpConfig.RELATION_USER_TEID_TABLE_NAME));
+ Scan scan = new Scan();
+ scan.addColumn("gtp".getBytes(), "teid".getBytes());
+ scan.addColumn("gtp".getBytes(), "apn".getBytes());
+ scan.addColumn("gtp".getBytes(), "phone_number".getBytes());
+ scan.addColumn("gtp".getBytes(), "imsi".getBytes());
+ scan.addColumn("gtp".getBytes(), "imei".getBytes());
+ scan.addColumn("gtp".getBytes(), "last_update_time".getBytes());
+ scan.addColumn("gtp".getBytes(), "msg_type".getBytes());
+
+ if (GtpConfig.HBASE_SCAN_LIMIT != 0) {
+ scan.setLimit(GtpConfig.HBASE_SCAN_LIMIT);
+ }
+
+ ResultScanner scanner = table.getScanner(scan);
+ for (Result result : scanner) {
+ if (result.containsColumn("gtp".getBytes(), "teid".getBytes()) && result.containsColumn("gtp".getBytes(), "msg_type".getBytes()) && result.containsColumn("gtp".getBytes(), "apn".getBytes()) && result.containsColumn("gtp".getBytes(), "last_update_time".getBytes()) && result.containsColumn("gtp".getBytes(), "imei".getBytes()) && result.containsColumn("gtp".getBytes(), "phone_number".getBytes()) && result.containsColumn("gtp".getBytes(), "imsi".getBytes())) {
+
+ Gtp gtp = new Gtp();
+ String key = Bytes.toString(result.getRow());
+ Long teid = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("teid"))));
+ int msg_type = Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("msg_type"))));
+
+ String apn = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("apn"))));
+ String phone_number = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("phone_number"))));
+ String imsi = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("imsi"))));
+ String imei = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("imei"))));
+ Long last_update_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("last_update_time"))));
+
+ gtp.setLast_update_time(last_update_time);
+ gtp.setGtp_teid(teid);
+ gtp.setGtp_apn(apn);
+ gtp.setGtp_phone_number(phone_number);
+ gtp.setGtp_imsi(imsi);
+ gtp.setGtp_imei(imei);
+ gtp.setMsg_type(msg_type);
+ gtpConcurrentHashMap.put(key, gtp);
+ }
+ }
+ scanner.close();
+ } catch (IOException ioe) {
+ log.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<===");
+ } catch (RuntimeException e) {
+ log.error("HBaseUtils getAll() is Exception===>{" + e + "}<===");
+ }
+ }
+
+ public void invoke(Entity entity, Context context) throws Exception {
+ // 按 project:table 归纳
+
+ Gtp gtp = gtpConcurrentHashMap.getIfPresent(entity.getHashkey());
+ if (gtp!=null) {
+ //Gtp gtp = gtpConcurrentHashMap.getIfPresent(entity.getHashkey());
+
+
+
+ if (gtp.getLast_update_time() <= entity.getCommon_recv_time()) {
+
+
+ if("delete".equals(entity.getGtp_msg_type())){
+
+ if(gtp.getGtp_teid().equals(entity.getGtp_teid())){
+
+
+
+ gtp.setMsg_type(2);
+ ArrayList<Row> rows = new ArrayList<>();
+ // ArrayList<Row> delrows = getDelRows(gtp);
+ gtp.setLast_update_time(entity.getCommon_recv_time());
+ gtp.setGtp_teid(entity.getGtp_teid());
+ gtp.setGtp_apn(entity.getGtp_apn());
+ gtp.setGtp_phone_number(entity.getGtp_phone_number());
+ gtp.setGtp_imsi(entity.getGtp_imsi());
+ gtp.setGtp_imei(entity.getGtp_imei());
+ ArrayList<Row> updaterows = getupdateRows(gtp);
+ rows.addAll(updaterows);
+ updateKnowledgeMessage(rows);
+ updateRelationMessage(entity.getHashkey(), gtp);
+
+ }
+ else{
+
+
+ gtp.setMsg_type(2);
+ ArrayList<Row> rows = new ArrayList<>();
+ // ArrayList<Row> delrows = getDelRows(gtp);
+ gtp.setLast_update_time(entity.getCommon_recv_time());
+ gtp.setGtp_teid(entity.getGtp_teid());
+ gtp.setGtp_apn(entity.getGtp_apn());
+ gtp.setGtp_phone_number(entity.getGtp_phone_number());
+ gtp.setGtp_imsi(entity.getGtp_imsi());
+ gtp.setGtp_imei(entity.getGtp_imei());
+ ArrayList<Row> updaterows = getupdateRows(gtp);
+ // rows.addAll(delrows);
+ rows.addAll(updaterows);
+ updateKnowledgeMessage(rows);
+ // updateRelationMessage(entity.getHashkey(), gtp);
+
+ }
+
+
+
+ }
+ else{
+
+ if(!gtp.getGtp_teid().equals(entity.getGtp_teid())){
+
+
+ gtp.setMsg_type(1);
+ ArrayList<Row> rows = new ArrayList<>();
+ // ArrayList<Row> delrows = getDelRows(gtp);
+ gtp.setLast_update_time(entity.getCommon_recv_time());
+ gtp.setGtp_teid(entity.getGtp_teid());
+ gtp.setGtp_apn(entity.getGtp_apn());
+ gtp.setGtp_phone_number(entity.getGtp_phone_number());
+ gtp.setGtp_imsi(entity.getGtp_imsi());
+ gtp.setGtp_imei(entity.getGtp_imei());
+ ArrayList<Row> updaterows = getupdateRows(gtp);
+ // rows.addAll(delrows);
+ rows.addAll(updaterows);
+ updateKnowledgeMessage(rows);
+ updateRelationMessage(entity.getHashkey(), gtp);
+
+ }
+ else {
+ if(entity.getCommon_recv_time()-gtp.getLast_update_time()>GtpConfig.CACHE_UPDATE_SECONDS){
+
+
+ gtp.setMsg_type(1);
+ ArrayList<Row> rows = new ArrayList<>();
+ // ArrayList<Row> delrows = getDelRows(gtp);
+ gtp.setLast_update_time(entity.getCommon_recv_time());
+ gtp.setGtp_teid(entity.getGtp_teid());
+ gtp.setGtp_apn(entity.getGtp_apn());
+ gtp.setGtp_phone_number(entity.getGtp_phone_number());
+ gtp.setGtp_imsi(entity.getGtp_imsi());
+ gtp.setGtp_imei(entity.getGtp_imei());
+ ArrayList<Row> updaterows = getupdateRows(gtp);
+ // rows.addAll(delrows);
+ rows.addAll(updaterows);
+ updateKnowledgeMessage(rows);
+ updateRelationMessage(entity.getHashkey(), gtp);
+ }
+ }
+
+ }
+
+
+
+ }else{
+
+ if ("delete".equals(entity.getGtp_msg_type())) {
+
+
+ gtp.setMsg_type(2);
+ ArrayList<Row> rows = new ArrayList<>();
+ // ArrayList<Row> delrows = getDelRows(gtp);
+ gtp.setLast_update_time(entity.getCommon_recv_time());
+ gtp.setGtp_teid(entity.getGtp_teid());
+ gtp.setGtp_apn(entity.getGtp_apn());
+ gtp.setGtp_phone_number(entity.getGtp_phone_number());
+ gtp.setGtp_imsi(entity.getGtp_imsi());
+ gtp.setGtp_imei(entity.getGtp_imei());
+ ArrayList<Row> updaterows = getupdateRows(gtp);
+ // rows.addAll(delrows);
+ rows.addAll(updaterows);
+ updateKnowledgeMessage(rows);
+ }
+
+ }
+ } else {
+
+
+ Gtp gtpobj = new Gtp();
+ gtpobj.setLast_update_time(entity.getCommon_recv_time());
+ gtpobj.setGtp_teid(entity.getGtp_teid());
+ gtpobj.setGtp_apn(entity.getGtp_apn());
+ gtpobj.setGtp_phone_number(entity.getGtp_phone_number());
+ gtpobj.setGtp_imsi(entity.getGtp_imsi());
+ gtpobj.setGtp_imei(entity.getGtp_imei());
+ if(!"delete".equals(entity.getGtp_msg_type())) {
+ gtpobj.setMsg_type(1);
+ }
+ else{
+ gtpobj.setMsg_type(2);
+
+ }
+ ArrayList<Row> rows = new ArrayList<>();
+ ArrayList<Row> updaterows = getupdateRows(gtpobj);
+ rows.addAll(updaterows);
+ updateRelationMessage(entity.getHashkey(), gtpobj);
+ updateKnowledgeMessage(rows);
+
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ }
+
+
+ public void updateRelationMessage(String key, Gtp gtp) throws IOException {
+
+ Table table = null;
+ try {
+ table = connection.getTable(TableName.valueOf(GtpConfig.RELATION_USER_TEID_TABLE_NAME));
+ Put put = new Put(key.getBytes());
+ put.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid()));
+ put.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn()));
+ put.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number()));
+ put.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi()));
+ put.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei()));
+ put.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time()));
+ put.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type()));
+
+ table.put(put);
+ gtpConcurrentHashMap.put(key, gtp);
+ } catch (Exception e) {
+ log.error(e.toString());
+ } finally {
+ table.close();
+ }
+
+
+ }
+
+ public void updateKnowledgeMessage(ArrayList<Row> rows) throws IOException {
+
+ Table tableR = null;
+ try {
+
+ tableR = connection.getTable(TableName.valueOf(GtpConfig.GTPC_KNOWLEDGE_BASE_TABLE_NAME));
+ Object[] results = new Object[rows.size()];
+ tableR.batch(rows, results);
+ } catch (Exception e) {
+ log.error(e.toString());
+ } finally {
+ tableR.close();
+ }
+
+
+ }
+
+
+ public ArrayList<Row> getDelRows(Gtp entity) {
+
+
+ ArrayList<Row> delrows = new ArrayList<>();
+
+ if (!"".equals(entity.getGtp_apn())) {
+ String oldapnkey = "3" + new StringBuffer(entity.getGtp_apn()).reverse().toString() + "|" + entity.getGtp_teid();
+ Delete del_apnkey = new Delete(Bytes.toBytes(oldapnkey));
+ delrows.add(del_apnkey);
+ }
+ if (!"".equals(entity.getGtp_phone_number())) {
+
+ String oldpnkey = "2" + new StringBuffer(entity.getGtp_phone_number()).reverse().toString() + "|" + entity.getGtp_teid();
+ Delete del_pnkey = new Delete(Bytes.toBytes(oldpnkey));
+ delrows.add(del_pnkey);
+ }
+
+ if (!"".equals(entity.getGtp_imsi())) {
+
+ String oldimsikey = "1" + entity.getGtp_imsi() + "|" + entity.getGtp_teid();
+ Delete del_imsikey = new Delete(Bytes.toBytes(oldimsikey));
+ delrows.add(del_imsikey);
+ }
+ if (!"".equals(entity.getGtp_imei())) {
+
+ String oldimeikey = "0" + entity.getGtp_imei() + "|" + entity.getGtp_teid();
+ Delete del_imeikey = new Delete(Bytes.toBytes(oldimeikey));
+ delrows.add(del_imeikey);
+ }
+
+ return delrows;
+ }
+
+
+ public ArrayList<Row> getupdateRows(Gtp gtp) {
+
+
+ ArrayList<Row> updaterows = new ArrayList<>();
+
+ if (!"".equals(gtp.getGtp_apn())) {
+ String apnkey = "3" + new StringBuffer(gtp.getGtp_apn()).reverse().toString() + "|" + gtp.getGtp_teid();
+ Put putApn = new Put(apnkey.getBytes());
+ putApn.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid()));
+ putApn.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn()));
+ putApn.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number()));
+ putApn.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi()));
+ putApn.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei()));
+ putApn.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time()));
+ putApn.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type()));
+
+ updaterows.add(putApn);
+
+ }
+ if (!"".equals(gtp.getGtp_phone_number())) {
+ String pnkey = "2" + new StringBuffer(gtp.getGtp_phone_number()).reverse().toString() + "|" + gtp.getGtp_teid();
+ Put putPn = new Put(pnkey.getBytes());
+ putPn.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid()));
+ putPn.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn()));
+ putPn.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number()));
+ putPn.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi()));
+ putPn.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei()));
+ putPn.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time()));
+ putPn.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type()));
+
+ updaterows.add(putPn);
+
+ }
+ if (!"".equals(gtp.getGtp_imsi())) {
+
+ String imsikey = "1" + gtp.getGtp_imsi() + "|" + gtp.getGtp_teid();
+ Put putImsi = new Put(imsikey.getBytes());
+ putImsi.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid()));
+ putImsi.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn()));
+ putImsi.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number()));
+ putImsi.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi()));
+ putImsi.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei()));
+ putImsi.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time()));
+ putImsi.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type()));
+
+ updaterows.add(putImsi);
+
+ }
+ if (!"".equals(gtp.getGtp_imei())) {
+ String imeikey = "0" + gtp.getGtp_imei() + "|" + gtp.getGtp_teid();
+ Put putImei = new Put(imeikey.getBytes());
+ putImei.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid()));
+ putImei.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn()));
+ putImei.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number()));
+ putImei.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi()));
+ putImei.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei()));
+ putImei.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time()));
+ putImei.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type()));
+
+ updaterows.add(putImei);
+
+ }
+
+
+ return updaterows;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
new file mode 100644
index 0000000..3ac901d
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
@@ -0,0 +1,48 @@
+package com.zdjizhi.utils.kafka;
+
+import com.zdjizhi.common.GtpConfig;
+import org.apache.kafka.common.config.SslConfigs;
+
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.kafka
+ * @Description:
+ * @date 2021/9/610:37
+ */
+class CertUtils {
+ /**
+ * Kafka SASL认证端口
+ */
+ private static final String SASL_PORT = "9094";
+
+ /**
+ * Kafka SSL认证端口
+ */
+ private static final String SSL_PORT = "9095";
+
+ /**
+ * 根据连接信息端口判断认证方式。
+ *
+ * @param servers kafka 连接信息
+ * @param properties kafka 连接配置信息
+ */
+ static void chooseCert(String servers, Properties properties) {
+ if (servers.contains(SASL_PORT)) {
+ properties.put("security.protocol", "SASL_PLAINTEXT");
+ properties.put("sasl.mechanism", "PLAIN");
+ properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ + GtpConfig.KAFKA_USER + " password=" + GtpConfig.KAFKA_PIN + ";");
+ } else if (servers.contains(SSL_PORT)) {
+ properties.put("security.protocol", "SSL");
+ properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+ properties.put("ssl.keystore.location", GtpConfig.TOOLS_LIBRARY + "keystore.jks");
+ properties.put("ssl.keystore.password", GtpConfig.KAFKA_PIN);
+ properties.put("ssl.truststore.location", GtpConfig.TOOLS_LIBRARY + "truststore.jks");
+ properties.put("ssl.truststore.password", GtpConfig.KAFKA_PIN);
+ properties.put("ssl.key.password", GtpConfig.KAFKA_PIN);
+ }
+
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java
new file mode 100644
index 0000000..04a895d
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java
@@ -0,0 +1,41 @@
+package com.zdjizhi.utils.kafka;
+
+import com.zdjizhi.common.GtpConfig;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.kafka
+ * @Description:
+ * @date 2021/6/813:54
+ */
+public class Consumer {
+ private static Properties createConsumerConfig() {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", GtpConfig.INPUT_KAFKA_SERVERS);
+ properties.put("group.id", GtpConfig.GROUP_ID);
+ properties.put("session.timeout.ms", GtpConfig.SESSION_TIMEOUT_MS);
+ properties.put("max.poll.records", GtpConfig.MAX_POLL_RECORDS);
+ properties.put("max.partition.fetch.bytes", GtpConfig.MAX_PARTITION_FETCH_BYTES);
+ properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+
+
+ CertUtils.chooseCert(GtpConfig.INPUT_KAFKA_SERVERS, properties);
+
+ return properties;
+ }
+
+ public static FlinkKafkaConsumer<String> getKafkaConsumer() {
+ FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(GtpConfig.INPUT_KAFKA_TOPIC,
+ new SimpleStringSchema(), createConsumerConfig());
+
+ kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
+ kafkaConsumer.setStartFromGroupOffsets();
+
+ return kafkaConsumer;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/system/GtpConfigurations.java b/src/main/java/com/zdjizhi/utils/system/GtpConfigurations.java
new file mode 100644
index 0000000..c3c7576
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/system/GtpConfigurations.java
@@ -0,0 +1,70 @@
+package com.zdjizhi.utils.system;
+
+import com.zdjizhi.utils.StringUtil;
+
+import java.io.IOException;
+import java.util.Locale;
+import java.util.Properties;
+
+
+/**
+ * @author Administrator
+ */
+
+public final class GtpConfigurations {
+
+ private static Properties propDefault = new Properties();
+ private static Properties propService = new Properties();
+
+
+ public static String getStringProperty(Integer type, String key) {
+ if (type == 0) {
+ return propService.getProperty(key);
+ } else if (type == 1) {
+ return propDefault.getProperty(key);
+ } else {
+ return null;
+ }
+
+ }
+
+ public static Integer getIntProperty(Integer type, String key) {
+ if (type == 0) {
+ return Integer.parseInt(propService.getProperty(key));
+ } else if (type == 1) {
+ return Integer.parseInt(propDefault.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Long getLongProperty(Integer type, String key) {
+ if (type == 0) {
+ return Long.parseLong(propService.getProperty(key));
+ } else if (type == 1) {
+ return Long.parseLong(propDefault.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Boolean getBooleanProperty(Integer type, String key) {
+ if (type == 0) {
+ return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
+ } else if (type == 1) {
+ return StringUtil.equals(propDefault.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
+ } else {
+ return null;
+ }
+ }
+
+ static {
+ try {
+ propService.load(GtpConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
+ propDefault.load(GtpConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties"));
+ } catch (IOException | RuntimeException e) {
+ propDefault = null;
+ propService = null;
+ }
+ }
+}
diff --git a/src/main/log4j.properties b/src/main/log4j.properties
new file mode 100644
index 0000000..96f758c
--- /dev/null
+++ b/src/main/log4j.properties
@@ -0,0 +1,25 @@
+#Log4j
+log4j.rootLogger=console,file
+# 控制台日志设置
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=ERROR
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
+
+# 文件日志设置
+log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.file.Threshold=info
+log4j.appender.file.encoding=UTF-8
+log4j.appender.file.Append=true
+#路径请用相对路径,做好相关测试输出到应用目下
+log4j.appender.file.file=${nis.root}/log/galaxy-name.log
+log4j.appender.file.DatePattern='.'yyyy-MM-dd
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
+log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
+##MyBatis 配置,com.nis.web.dao是mybatis接口所在包
+#log4j.logger.com.nis.web.dao=debug
+##bonecp数据源配置
+#log4j.category.com.jolbox=debug,console
+
+
diff --git a/src/test/java/com/zdjizhi/FunctionTest.java b/src/test/java/com/zdjizhi/FunctionTest.java
new file mode 100644
index 0000000..511fac6
--- /dev/null
+++ b/src/test/java/com/zdjizhi/FunctionTest.java
@@ -0,0 +1,34 @@
+package com.zdjizhi;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi
+ * @Description:
+ * @date 2021/6/2314:32
+ */
+public class FunctionTest {
+
+ @Test
+ public void jsonTest() {
+ Map<String, Long> methodCount = new HashMap<>(16);
+ methodCount.put("A",20L);
+ methodCount.put("B",20L);
+ methodCount.put("C",20L);
+ String jsonString = JSON.toJSONString(methodCount);
+ System.out.println(jsonString);
+ JSONObject jsonObject = JSONObject.parseObject(jsonString);
+ Map<String, Object> hmCount = (Map<String, Object>) jsonObject;
+ System.out.println(hmCount.toString());
+
+
+ }
+
+
+}