summaryrefslogtreecommitdiff
path: root/src/main/java/com
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2023-05-23 17:45:06 +0800
committerwangkuan <[email protected]>2023-05-23 17:45:06 +0800
commit3c46347d2bd5e820bb9fc19cb434bfb8e8855178 (patch)
treec8a75fa5090b58c48f952fb4bc55fc0868ae2823 /src/main/java/com
parent67db966e10cf101e034cb353d127ac393c358e62 (diff)
线上版本HEADmain
Diffstat (limited to 'src/main/java/com')
-rw-r--r--src/main/java/com/zdjizhi/common/GtpConfig.java4
-rw-r--r--src/main/java/com/zdjizhi/pojo/Entity.java9
-rw-r--r--src/main/java/com/zdjizhi/pojo/Gtp.java10
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/ParseFunction.java6
-rw-r--r--src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java40
5 files changed, 56 insertions, 13 deletions
diff --git a/src/main/java/com/zdjizhi/common/GtpConfig.java b/src/main/java/com/zdjizhi/common/GtpConfig.java
index 87353ec..366e1c4 100644
--- a/src/main/java/com/zdjizhi/common/GtpConfig.java
+++ b/src/main/java/com/zdjizhi/common/GtpConfig.java
@@ -23,6 +23,10 @@ public class GtpConfig {
*/
public static final String INPUT_KAFKA_SERVERS = GtpConfigurations.getStringProperty(0, "input.kafka.servers");
public static final String HBASE_ZOOKEEPER_SERVERS = GtpConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
+
+ public static final int HBASE_RPC_TIMEOUT = GtpConfigurations.getIntProperty(0, "hbase.rpc.timeout");
+
+
public static final String SESSION_TIMEOUT_MS = GtpConfigurations.getStringProperty(1, "session.timeout.ms");
public static final String MAX_POLL_RECORDS = GtpConfigurations.getStringProperty(1, "max.poll.records");
public static final String MAX_PARTITION_FETCH_BYTES = GtpConfigurations.getStringProperty(1, "max.partition.fetch.bytes");
diff --git a/src/main/java/com/zdjizhi/pojo/Entity.java b/src/main/java/com/zdjizhi/pojo/Entity.java
index 37bb83c..487135c 100644
--- a/src/main/java/com/zdjizhi/pojo/Entity.java
+++ b/src/main/java/com/zdjizhi/pojo/Entity.java
@@ -14,6 +14,7 @@ public class Entity {
private String gtp_msg_type;
private long common_recv_time;
private String gtp_uuid;
+ private int common_vsys_id;
public String getHashkey() {
@@ -103,4 +104,12 @@ public class Entity {
public void setGtp_uuid(String gtp_uuid) {
this.gtp_uuid = gtp_uuid;
}
+
+ public int getCommon_vsys_id() {
+ return common_vsys_id;
+ }
+
+ public void setCommon_vsys_id(int common_vsys_id) {
+ this.common_vsys_id = common_vsys_id;
+ }
}
diff --git a/src/main/java/com/zdjizhi/pojo/Gtp.java b/src/main/java/com/zdjizhi/pojo/Gtp.java
index 2795f41..c7ebe1e 100644
--- a/src/main/java/com/zdjizhi/pojo/Gtp.java
+++ b/src/main/java/com/zdjizhi/pojo/Gtp.java
@@ -11,6 +11,8 @@ public class Gtp {
private long gtp_uplink_teid;
private long gtp_downlink_teid ;
private String gtp_uuid;
+ private int common_vsys_id;
+
public Integer getMsg_type() {
return msg_type;
@@ -83,4 +85,12 @@ public class Gtp {
public void setLast_update_time(long last_update_time) {
this.last_update_time = last_update_time;
}
+
+ public int getCommon_vsys_id() {
+ return common_vsys_id;
+ }
+
+ public void setCommon_vsys_id(int common_vsys_id) {
+ this.common_vsys_id = common_vsys_id;
+ }
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java
index aee3487..7d8758b 100644
--- a/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java
@@ -46,6 +46,10 @@ public class ParseFunction implements MapFunction<String, Entity> {
entity.setGtp_imsi("");
}
+ if(entity.getCommon_vsys_id()==0){
+
+ entity.setCommon_vsys_id(1);
+ }
/* if(entity.getCommon_recv_time()<0 || entity.getCommon_recv_time()>9999999999L){
entity.setIfError(1);
@@ -57,7 +61,7 @@ public class ParseFunction implements MapFunction<String, Entity> {
if(!"".equals(entity.getGtp_imei())|| !"".equals(entity.getGtp_imsi())|| !"".equals(entity.getGtp_phone_number())) {
- String md5Str = DigestUtils.md5Hex(entity.getGtp_imei() + entity.getGtp_imsi() + entity.getGtp_phone_number());
+ String md5Str = DigestUtils.md5Hex(entity.getGtp_imei() + entity.getGtp_imsi() + entity.getGtp_phone_number()+entity.getCommon_vsys_id());
entity.setHashkey(md5Str);
if( entity.getGtp_uplink_teid()==0 && entity.getGtp_downlink_teid()==0){
diff --git a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java
index b5eb799..ce330c3 100644
--- a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java
+++ b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java
@@ -45,6 +45,10 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", hbase_zookeeper_host);
+ configuration.set("hbase.rpc.timeout", String.valueOf(GtpConfig.HBASE_RPC_TIMEOUT));
+ configuration.set("zookeeper.recovery.retry", "1");
+ configuration.set("hbase.client.retries.number", "1");
+
connection = ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();
@@ -61,6 +65,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
scan.addColumn("gtp".getBytes(), "imei".getBytes());
scan.addColumn("gtp".getBytes(), "last_update_time".getBytes());
scan.addColumn("gtp".getBytes(), "msg_type".getBytes());
+ scan.addColumn("common".getBytes(), "vsys_id".getBytes());
if (GtpConfig.HBASE_SCAN_LIMIT != 0) {
scan.setLimit(GtpConfig.HBASE_SCAN_LIMIT);
@@ -68,7 +73,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
- if (result.containsColumn("gtp".getBytes(), "uplink_teid".getBytes()) && result.containsColumn("gtp".getBytes(), "downlink_teid".getBytes()) && result.containsColumn("gtp".getBytes(), "msg_type".getBytes()) && result.containsColumn("gtp".getBytes(), "apn".getBytes()) && result.containsColumn("gtp".getBytes(), "last_update_time".getBytes()) && result.containsColumn("gtp".getBytes(), "imei".getBytes()) && result.containsColumn("gtp".getBytes(), "phone_number".getBytes()) && result.containsColumn("gtp".getBytes(), "imsi".getBytes())) {
+ if (result.containsColumn("common".getBytes(), "vsys_id".getBytes()) && result.containsColumn("gtp".getBytes(), "uplink_teid".getBytes()) && result.containsColumn("gtp".getBytes(), "downlink_teid".getBytes()) && result.containsColumn("gtp".getBytes(), "msg_type".getBytes()) && result.containsColumn("gtp".getBytes(), "apn".getBytes()) && result.containsColumn("gtp".getBytes(), "last_update_time".getBytes()) && result.containsColumn("gtp".getBytes(), "imei".getBytes()) && result.containsColumn("gtp".getBytes(), "phone_number".getBytes()) && result.containsColumn("gtp".getBytes(), "imsi".getBytes())) {
Gtp gtp = new Gtp();
String key = Bytes.toString(result.getRow());
@@ -81,6 +86,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
String imsi = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("imsi"))));
String imei = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("imei"))));
Long last_update_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("last_update_time"))));
+ int vsys_id = Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("common"), Bytes.toBytes("vsys_id"))));
gtp.setLast_update_time(last_update_time);
gtp.setGtp_uplink_teid(uplink_teid);
@@ -91,6 +97,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
gtp.setGtp_imei(imei);
gtp.setMsg_type(msg_type);
gtp.setGtp_uuid(DigestUtils.md5Hex(gtp.getGtp_uplink_teid() + "|" + gtp.getGtp_downlink_teid()));
+ gtp.setCommon_vsys_id(vsys_id);
gtpConcurrentHashMap.put(key, gtp);
}
@@ -120,7 +127,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid());
gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid());
gtp.setGtp_uuid(DigestUtils.md5Hex(gtp.getGtp_uplink_teid() +"|"+ gtp.getGtp_downlink_teid() ));
-
+ gtp.setCommon_vsys_id(entity.getCommon_vsys_id());
gtp.setGtp_apn(entity.getGtp_apn());
gtp.setGtp_phone_number(entity.getGtp_phone_number());
gtp.setGtp_imsi(entity.getGtp_imsi());
@@ -136,7 +143,8 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
gtp.setMsg_type(2);
ArrayList<Row> rows = new ArrayList<>();
- // ArrayList<Row> delrows = getDelRows(gtp);
+ gtp.setCommon_vsys_id(entity.getCommon_vsys_id());
+ // ArrayList<Row> delrows = getDelRows(gtp);
gtp.setLast_update_time(entity.getCommon_recv_time());
gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid());
gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid());
@@ -158,7 +166,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
}
else{
- if(!gtp.getGtp_uuid().equals(entity.getGtp_uuid())){
+ if(!gtp.getGtp_uuid().equals(entity.getGtp_uuid()) || !gtp.getGtp_apn().equals(entity.getGtp_apn()) || gtp.getCommon_vsys_id()!=entity.getCommon_vsys_id()){
gtp.setMsg_type(1);
@@ -168,7 +176,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid());
gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid());
gtp.setGtp_uuid(DigestUtils.md5Hex(gtp.getGtp_uplink_teid() +"|"+ gtp.getGtp_downlink_teid() ));
-
+ gtp.setCommon_vsys_id(entity.getCommon_vsys_id());
gtp.setGtp_apn(entity.getGtp_apn());
gtp.setGtp_phone_number(entity.getGtp_phone_number());
gtp.setGtp_imsi(entity.getGtp_imsi());
@@ -187,6 +195,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
gtp.setMsg_type(1);
ArrayList<Row> rows = new ArrayList<>();
// ArrayList<Row> delrows = getDelRows(gtp);
+ gtp.setCommon_vsys_id(entity.getCommon_vsys_id());
gtp.setLast_update_time(entity.getCommon_recv_time());
gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid());
gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid());
@@ -215,6 +224,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
gtp.setMsg_type(2);
ArrayList<Row> rows = new ArrayList<>();
// ArrayList<Row> delrows = getDelRows(gtp);
+ gtp.setCommon_vsys_id(entity.getCommon_vsys_id());
gtp.setLast_update_time(entity.getCommon_recv_time());
gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid());
gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid());
@@ -234,6 +244,8 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
Gtp gtpobj = new Gtp();
+
+ gtpobj.setCommon_vsys_id(entity.getCommon_vsys_id());
gtpobj.setLast_update_time(entity.getCommon_recv_time());
gtpobj.setGtp_uplink_teid(entity.getGtp_uplink_teid());
gtpobj.setGtp_downlink_teid(entity.getGtp_downlink_teid());
@@ -278,6 +290,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
put.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei()));
put.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time()));
put.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type()));
+ put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(gtp.getCommon_vsys_id()));
table.put(put);
gtpConcurrentHashMap.put(key, gtp);
@@ -308,7 +321,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
}
- public ArrayList<Row> getDelRows(Gtp entity) {
+/* public ArrayList<Row> getDelRows(Gtp entity) {
ArrayList<Row> delrows = new ArrayList<>();
@@ -339,7 +352,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
}
return delrows;
- }
+ }*/
public ArrayList<Row> getupdateRows(Gtp gtp) {
@@ -348,7 +361,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
ArrayList<Row> updaterows = new ArrayList<>();
if (!"".equals(gtp.getGtp_apn())) {
- String apnkey = "3" + new StringBuffer(gtp.getGtp_apn()).reverse().toString() + "|" + gtp.getGtp_uuid();
+ String apnkey = "3" + new StringBuffer(gtp.getGtp_apn()).reverse().toString() +"|" + gtp.getCommon_vsys_id() + "|" + gtp.getGtp_uuid();
Put putApn = new Put(apnkey.getBytes());
putApn.addColumn("gtp".getBytes(), "uplink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_uplink_teid()));
putApn.addColumn("gtp".getBytes(), "downlink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_downlink_teid()));
@@ -358,12 +371,12 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
putApn.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei()));
putApn.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time()));
putApn.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type()));
-
+ putApn.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(gtp.getCommon_vsys_id()));
updaterows.add(putApn);
}
if (!"".equals(gtp.getGtp_phone_number())) {
- String pnkey = "2" + new StringBuffer(gtp.getGtp_phone_number()).reverse().toString() + "|" + gtp.getGtp_uuid();
+ String pnkey = "2" + new StringBuffer(gtp.getGtp_phone_number()).reverse().toString() +"|" + gtp.getCommon_vsys_id() + "|" + gtp.getGtp_uuid();
Put putPn = new Put(pnkey.getBytes());
putPn.addColumn("gtp".getBytes(), "uplink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_uplink_teid()));
putPn.addColumn("gtp".getBytes(), "downlink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_downlink_teid()));
@@ -373,13 +386,14 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
putPn.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei()));
putPn.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time()));
putPn.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type()));
+ putPn.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(gtp.getCommon_vsys_id()));
updaterows.add(putPn);
}
if (!"".equals(gtp.getGtp_imsi())) {
- String imsikey = "1" + gtp.getGtp_imsi() + "|" + gtp.getGtp_uuid();
+ String imsikey = "1" + gtp.getGtp_imsi() +"|" + gtp.getCommon_vsys_id() + "|" + gtp.getGtp_uuid();
Put putImsi = new Put(imsikey.getBytes());
putImsi.addColumn("gtp".getBytes(), "uplink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_uplink_teid()));
putImsi.addColumn("gtp".getBytes(), "downlink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_downlink_teid()));
@@ -389,12 +403,13 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
putImsi.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei()));
putImsi.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time()));
putImsi.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type()));
+ putImsi.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(gtp.getCommon_vsys_id()));
updaterows.add(putImsi);
}
if (!"".equals(gtp.getGtp_imei())) {
- String imeikey = "0" + gtp.getGtp_imei() + "|" + gtp.getGtp_uuid();
+ String imeikey = "0" + gtp.getGtp_imei() +"|" + gtp.getCommon_vsys_id() + "|" + gtp.getGtp_uuid();
Put putImei = new Put(imeikey.getBytes());
putImei.addColumn("gtp".getBytes(), "uplink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_uplink_teid()));
putImei.addColumn("gtp".getBytes(), "downlink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_downlink_teid()));
@@ -404,6 +419,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
putImei.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei()));
putImei.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time()));
putImei.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type()));
+ putImei.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(gtp.getCommon_vsys_id()));
updaterows.add(putImei);