summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pom.xml4
-rw-r--r--properties/service_flow_config.properties12
-rw-r--r--src/main/java/com/zdjizhi/common/GtpConfig.java4
-rw-r--r--src/main/java/com/zdjizhi/pojo/Entity.java9
-rw-r--r--src/main/java/com/zdjizhi/pojo/Gtp.java10
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/ParseFunction.java6
-rw-r--r--src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java40
7 files changed, 64 insertions, 21 deletions
diff --git a/pom.xml b/pom.xml
index ada3e18..bc618d6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>relationship-gtpc-user</artifactId>
- <version>22-08-19</version>
+ <version>22-10-09</version>
<name>relationship-gtpc-user</name>
<url>http://www.example.com</url>
@@ -251,7 +251,7 @@
<goal>shade</goal>
</goals>
<configuration>
- <finalName>relationship-gtpc-user-22-08-19</finalName>
+ <finalName>relationship-gtpc-user-22-10-09</finalName>
<transformers combine.children="append">
<!-- The service transformer is needed to merge META-INF/services files -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 2dd30ee..1c781a7 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,15 +1,15 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
-input.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
-#input.kafka.servers=192.168.44.12:9094
+#input.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
+input.kafka.servers=192.168.44.12:9094
#input.kafka.servers=192.168.40.151:9094
#hbase zookeeper地址 用于连接HBase
-hbase.zookeeper.servers=192.168.44.11
+hbase.zookeeper.servers=192.168.44.12
#hbase.zookeeper.servers=192.168.40.151:2181
-#if.hbase.scan=1
-hbase.scan.limit=0
+hbase.rpc.timeout=60000
+hbase.scan.limit=100000
cache.expire.seconds=86400
cache.max.size=10000000
@@ -21,7 +21,7 @@ cache.update.seconds=3600
input.kafka.topic=GTPC-RECORD-COMPLETED
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
-group.id=test3
+group.id=test6
#--------------------------------topology配置------------------------------#
#ip-account对应关系表
diff --git a/src/main/java/com/zdjizhi/common/GtpConfig.java b/src/main/java/com/zdjizhi/common/GtpConfig.java
index 87353ec..366e1c4 100644
--- a/src/main/java/com/zdjizhi/common/GtpConfig.java
+++ b/src/main/java/com/zdjizhi/common/GtpConfig.java
@@ -23,6 +23,10 @@ public class GtpConfig {
*/
public static final String INPUT_KAFKA_SERVERS = GtpConfigurations.getStringProperty(0, "input.kafka.servers");
public static final String HBASE_ZOOKEEPER_SERVERS = GtpConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
+
+ public static final int HBASE_RPC_TIMEOUT = GtpConfigurations.getIntProperty(0, "hbase.rpc.timeout");
+
+
public static final String SESSION_TIMEOUT_MS = GtpConfigurations.getStringProperty(1, "session.timeout.ms");
public static final String MAX_POLL_RECORDS = GtpConfigurations.getStringProperty(1, "max.poll.records");
public static final String MAX_PARTITION_FETCH_BYTES = GtpConfigurations.getStringProperty(1, "max.partition.fetch.bytes");
diff --git a/src/main/java/com/zdjizhi/pojo/Entity.java b/src/main/java/com/zdjizhi/pojo/Entity.java
index 37bb83c..487135c 100644
--- a/src/main/java/com/zdjizhi/pojo/Entity.java
+++ b/src/main/java/com/zdjizhi/pojo/Entity.java
@@ -14,6 +14,7 @@ public class Entity {
private String gtp_msg_type;
private long common_recv_time;
private String gtp_uuid;
+ private int common_vsys_id;
public String getHashkey() {
@@ -103,4 +104,12 @@ public class Entity {
public void setGtp_uuid(String gtp_uuid) {
this.gtp_uuid = gtp_uuid;
}
+
+ public int getCommon_vsys_id() {
+ return common_vsys_id;
+ }
+
+ public void setCommon_vsys_id(int common_vsys_id) {
+ this.common_vsys_id = common_vsys_id;
+ }
}
diff --git a/src/main/java/com/zdjizhi/pojo/Gtp.java b/src/main/java/com/zdjizhi/pojo/Gtp.java
index 2795f41..c7ebe1e 100644
--- a/src/main/java/com/zdjizhi/pojo/Gtp.java
+++ b/src/main/java/com/zdjizhi/pojo/Gtp.java
@@ -11,6 +11,8 @@ public class Gtp {
private long gtp_uplink_teid;
private long gtp_downlink_teid ;
private String gtp_uuid;
+ private int common_vsys_id;
+
public Integer getMsg_type() {
return msg_type;
@@ -83,4 +85,12 @@ public class Gtp {
public void setLast_update_time(long last_update_time) {
this.last_update_time = last_update_time;
}
+
+ public int getCommon_vsys_id() {
+ return common_vsys_id;
+ }
+
+ public void setCommon_vsys_id(int common_vsys_id) {
+ this.common_vsys_id = common_vsys_id;
+ }
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java
index aee3487..7d8758b 100644
--- a/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java
@@ -46,6 +46,10 @@ public class ParseFunction implements MapFunction<String, Entity> {
entity.setGtp_imsi("");
}
+ if(entity.getCommon_vsys_id()==0){
+
+ entity.setCommon_vsys_id(1);
+ }
/* if(entity.getCommon_recv_time()<0 || entity.getCommon_recv_time()>9999999999L){
entity.setIfError(1);
@@ -57,7 +61,7 @@ public class ParseFunction implements MapFunction<String, Entity> {
if(!"".equals(entity.getGtp_imei())|| !"".equals(entity.getGtp_imsi())|| !"".equals(entity.getGtp_phone_number())) {
- String md5Str = DigestUtils.md5Hex(entity.getGtp_imei() + entity.getGtp_imsi() + entity.getGtp_phone_number());
+ String md5Str = DigestUtils.md5Hex(entity.getGtp_imei() + entity.getGtp_imsi() + entity.getGtp_phone_number()+entity.getCommon_vsys_id());
entity.setHashkey(md5Str);
if( entity.getGtp_uplink_teid()==0 && entity.getGtp_downlink_teid()==0){
diff --git a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java
index b5eb799..ce330c3 100644
--- a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java
+++ b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java
@@ -45,6 +45,10 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", hbase_zookeeper_host);
+ configuration.set("hbase.rpc.timeout", String.valueOf(GtpConfig.HBASE_RPC_TIMEOUT));
+ configuration.set("zookeeper.recovery.retry", "1");
+ configuration.set("hbase.client.retries.number", "1");
+
connection = ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();
@@ -61,6 +65,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
scan.addColumn("gtp".getBytes(), "imei".getBytes());
scan.addColumn("gtp".getBytes(), "last_update_time".getBytes());
scan.addColumn("gtp".getBytes(), "msg_type".getBytes());
+ scan.addColumn("common".getBytes(), "vsys_id".getBytes());
if (GtpConfig.HBASE_SCAN_LIMIT != 0) {
scan.setLimit(GtpConfig.HBASE_SCAN_LIMIT);
@@ -68,7 +73,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
- if (result.containsColumn("gtp".getBytes(), "uplink_teid".getBytes()) && result.containsColumn("gtp".getBytes(), "downlink_teid".getBytes()) && result.containsColumn("gtp".getBytes(), "msg_type".getBytes()) && result.containsColumn("gtp".getBytes(), "apn".getBytes()) && result.containsColumn("gtp".getBytes(), "last_update_time".getBytes()) && result.containsColumn("gtp".getBytes(), "imei".getBytes()) && result.containsColumn("gtp".getBytes(), "phone_number".getBytes()) && result.containsColumn("gtp".getBytes(), "imsi".getBytes())) {
+ if (result.containsColumn("common".getBytes(), "vsys_id".getBytes()) && result.containsColumn("gtp".getBytes(), "uplink_teid".getBytes()) && result.containsColumn("gtp".getBytes(), "downlink_teid".getBytes()) && result.containsColumn("gtp".getBytes(), "msg_type".getBytes()) && result.containsColumn("gtp".getBytes(), "apn".getBytes()) && result.containsColumn("gtp".getBytes(), "last_update_time".getBytes()) && result.containsColumn("gtp".getBytes(), "imei".getBytes()) && result.containsColumn("gtp".getBytes(), "phone_number".getBytes()) && result.containsColumn("gtp".getBytes(), "imsi".getBytes())) {
Gtp gtp = new Gtp();
String key = Bytes.toString(result.getRow());
@@ -81,6 +86,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
String imsi = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("imsi"))));
String imei = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("imei"))));
Long last_update_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("last_update_time"))));
+ int vsys_id = Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("common"), Bytes.toBytes("vsys_id"))));
gtp.setLast_update_time(last_update_time);
gtp.setGtp_uplink_teid(uplink_teid);
@@ -91,6 +97,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
gtp.setGtp_imei(imei);
gtp.setMsg_type(msg_type);
gtp.setGtp_uuid(DigestUtils.md5Hex(gtp.getGtp_uplink_teid() + "|" + gtp.getGtp_downlink_teid()));
+ gtp.setCommon_vsys_id(vsys_id);
gtpConcurrentHashMap.put(key, gtp);
}
@@ -120,7 +127,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid());
gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid());
gtp.setGtp_uuid(DigestUtils.md5Hex(gtp.getGtp_uplink_teid() +"|"+ gtp.getGtp_downlink_teid() ));
-
+ gtp.setCommon_vsys_id(entity.getCommon_vsys_id());
gtp.setGtp_apn(entity.getGtp_apn());
gtp.setGtp_phone_number(entity.getGtp_phone_number());
gtp.setGtp_imsi(entity.getGtp_imsi());
@@ -136,7 +143,8 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
gtp.setMsg_type(2);
ArrayList<Row> rows = new ArrayList<>();
- // ArrayList<Row> delrows = getDelRows(gtp);
+ gtp.setCommon_vsys_id(entity.getCommon_vsys_id());
+ // ArrayList<Row> delrows = getDelRows(gtp);
gtp.setLast_update_time(entity.getCommon_recv_time());
gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid());
gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid());
@@ -158,7 +166,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
}
else{
- if(!gtp.getGtp_uuid().equals(entity.getGtp_uuid())){
+ if(!gtp.getGtp_uuid().equals(entity.getGtp_uuid()) || !gtp.getGtp_apn().equals(entity.getGtp_apn()) || gtp.getCommon_vsys_id()!=entity.getCommon_vsys_id()){
gtp.setMsg_type(1);
@@ -168,7 +176,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid());
gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid());
gtp.setGtp_uuid(DigestUtils.md5Hex(gtp.getGtp_uplink_teid() +"|"+ gtp.getGtp_downlink_teid() ));
-
+ gtp.setCommon_vsys_id(entity.getCommon_vsys_id());
gtp.setGtp_apn(entity.getGtp_apn());
gtp.setGtp_phone_number(entity.getGtp_phone_number());
gtp.setGtp_imsi(entity.getGtp_imsi());
@@ -187,6 +195,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
gtp.setMsg_type(1);
ArrayList<Row> rows = new ArrayList<>();
// ArrayList<Row> delrows = getDelRows(gtp);
+ gtp.setCommon_vsys_id(entity.getCommon_vsys_id());
gtp.setLast_update_time(entity.getCommon_recv_time());
gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid());
gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid());
@@ -215,6 +224,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
gtp.setMsg_type(2);
ArrayList<Row> rows = new ArrayList<>();
// ArrayList<Row> delrows = getDelRows(gtp);
+ gtp.setCommon_vsys_id(entity.getCommon_vsys_id());
gtp.setLast_update_time(entity.getCommon_recv_time());
gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid());
gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid());
@@ -234,6 +244,8 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
Gtp gtpobj = new Gtp();
+
+ gtpobj.setCommon_vsys_id(entity.getCommon_vsys_id());
gtpobj.setLast_update_time(entity.getCommon_recv_time());
gtpobj.setGtp_uplink_teid(entity.getGtp_uplink_teid());
gtpobj.setGtp_downlink_teid(entity.getGtp_downlink_teid());
@@ -278,6 +290,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
put.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei()));
put.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time()));
put.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type()));
+ put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(gtp.getCommon_vsys_id()));
table.put(put);
gtpConcurrentHashMap.put(key, gtp);
@@ -308,7 +321,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
}
- public ArrayList<Row> getDelRows(Gtp entity) {
+/* public ArrayList<Row> getDelRows(Gtp entity) {
ArrayList<Row> delrows = new ArrayList<>();
@@ -339,7 +352,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
}
return delrows;
- }
+ }*/
public ArrayList<Row> getupdateRows(Gtp gtp) {
@@ -348,7 +361,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
ArrayList<Row> updaterows = new ArrayList<>();
if (!"".equals(gtp.getGtp_apn())) {
- String apnkey = "3" + new StringBuffer(gtp.getGtp_apn()).reverse().toString() + "|" + gtp.getGtp_uuid();
+ String apnkey = "3" + new StringBuffer(gtp.getGtp_apn()).reverse().toString() +"|" + gtp.getCommon_vsys_id() + "|" + gtp.getGtp_uuid();
Put putApn = new Put(apnkey.getBytes());
putApn.addColumn("gtp".getBytes(), "uplink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_uplink_teid()));
putApn.addColumn("gtp".getBytes(), "downlink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_downlink_teid()));
@@ -358,12 +371,12 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
putApn.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei()));
putApn.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time()));
putApn.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type()));
-
+ putApn.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(gtp.getCommon_vsys_id()));
updaterows.add(putApn);
}
if (!"".equals(gtp.getGtp_phone_number())) {
- String pnkey = "2" + new StringBuffer(gtp.getGtp_phone_number()).reverse().toString() + "|" + gtp.getGtp_uuid();
+ String pnkey = "2" + new StringBuffer(gtp.getGtp_phone_number()).reverse().toString() +"|" + gtp.getCommon_vsys_id() + "|" + gtp.getGtp_uuid();
Put putPn = new Put(pnkey.getBytes());
putPn.addColumn("gtp".getBytes(), "uplink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_uplink_teid()));
putPn.addColumn("gtp".getBytes(), "downlink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_downlink_teid()));
@@ -373,13 +386,14 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
putPn.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei()));
putPn.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time()));
putPn.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type()));
+ putPn.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(gtp.getCommon_vsys_id()));
updaterows.add(putPn);
}
if (!"".equals(gtp.getGtp_imsi())) {
- String imsikey = "1" + gtp.getGtp_imsi() + "|" + gtp.getGtp_uuid();
+ String imsikey = "1" + gtp.getGtp_imsi() +"|" + gtp.getCommon_vsys_id() + "|" + gtp.getGtp_uuid();
Put putImsi = new Put(imsikey.getBytes());
putImsi.addColumn("gtp".getBytes(), "uplink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_uplink_teid()));
putImsi.addColumn("gtp".getBytes(), "downlink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_downlink_teid()));
@@ -389,12 +403,13 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
putImsi.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei()));
putImsi.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time()));
putImsi.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type()));
+ putImsi.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(gtp.getCommon_vsys_id()));
updaterows.add(putImsi);
}
if (!"".equals(gtp.getGtp_imei())) {
- String imeikey = "0" + gtp.getGtp_imei() + "|" + gtp.getGtp_uuid();
+ String imeikey = "0" + gtp.getGtp_imei() +"|" + gtp.getCommon_vsys_id() + "|" + gtp.getGtp_uuid();
Put putImei = new Put(imeikey.getBytes());
putImei.addColumn("gtp".getBytes(), "uplink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_uplink_teid()));
putImei.addColumn("gtp".getBytes(), "downlink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_downlink_teid()));
@@ -404,6 +419,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
putImei.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei()));
putImei.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time()));
putImei.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type()));
+ putImei.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(gtp.getCommon_vsys_id()));
updaterows.add(putImei);