diff options
| author | LAPTOP-CUUVN8AS\wk <[email protected]> | 2022-09-19 13:50:22 +0800 |
|---|---|---|
| committer | LAPTOP-CUUVN8AS\wk <[email protected]> | 2022-09-19 13:50:22 +0800 |
| commit | 67db966e10cf101e034cb353d127ac393c358e62 (patch) | |
| tree | d5ba27a07280d604242edee6e79eec5d0af530e0 | |
| parent | bea6a964df33658b83b924fd2ba9197c42d050b6 (diff) | |
上下行teid
| -rw-r--r-- | pom.xml | 4 | ||||
| -rw-r--r-- | properties/service_flow_config.properties | 11 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/common/GtpConfig.java | 1 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/pojo/Entity.java | 29 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/pojo/Gtp.java | 35 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/functions/ParseFunction.java | 84 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java | 172 |
7 files changed, 185 insertions, 151 deletions
@@ -6,7 +6,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>relationship-gtpc-user</artifactId> - <version>22-08-15</version> + <version>22-08-19</version> <name>relationship-gtpc-user</name> <url>http://www.example.com</url> @@ -251,7 +251,7 @@ <goal>shade</goal> </goals> <configuration> - <finalName>relationship-gtpc-user-22-08-15</finalName> + <finalName>relationship-gtpc-user-22-08-19</finalName> <transformers combine.children="append"> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index a6976ba..2dd30ee 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,13 +1,14 @@ #--------------------------------地址配置------------------------------# #管理kafka地址 -#input.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094 -input.kafka.servers=192.168.44.12:9094 +input.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094 +#input.kafka.servers=192.168.44.12:9094 +#input.kafka.servers=192.168.40.151:9094 #hbase zookeeper地址 用于连接HBase -#hbase.zookeeper.servers=192.168.44.12 -hbase.zookeeper.servers=192.168.44.12:2181 - +hbase.zookeeper.servers=192.168.44.11 +#hbase.zookeeper.servers=192.168.40.151:2181 +#if.hbase.scan=1 hbase.scan.limit=0 cache.expire.seconds=86400 diff --git a/src/main/java/com/zdjizhi/common/GtpConfig.java b/src/main/java/com/zdjizhi/common/GtpConfig.java index ee377b9..87353ec 100644 --- a/src/main/java/com/zdjizhi/common/GtpConfig.java +++ b/src/main/java/com/zdjizhi/common/GtpConfig.java @@ -42,4 +42,5 @@ public class GtpConfig { public static final int CACHE_UPDATE_SECONDS = GtpConfigurations.getIntProperty(0, "cache.update.seconds"); + }
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/pojo/Entity.java b/src/main/java/com/zdjizhi/pojo/Entity.java index aac3316..37bb83c 100644 --- a/src/main/java/com/zdjizhi/pojo/Entity.java +++ b/src/main/java/com/zdjizhi/pojo/Entity.java @@ -9,11 +9,11 @@ public class Entity { private String gtp_imei; private String gtp_imsi; private String gtp_phone_number; - private Long gtp_uplink_teid; - private Long gtp_downlink_teid ; + private long gtp_uplink_teid; + private long gtp_downlink_teid ; private String gtp_msg_type; - private Long common_recv_time; - private Long gtp_teid; + private long common_recv_time; + private String gtp_uuid; public String getHashkey() { @@ -64,19 +64,19 @@ public class Entity { this.gtp_phone_number = gtp_phone_number; } - public Long getGtp_uplink_teid() { + public long getGtp_uplink_teid() { return gtp_uplink_teid; } - public void setGtp_uplink_teid(Long gtp_uplink_teid) { + public void setGtp_uplink_teid(long gtp_uplink_teid) { this.gtp_uplink_teid = gtp_uplink_teid; } - public Long getGtp_downlink_teid() { + public long getGtp_downlink_teid() { return gtp_downlink_teid; } - public void setGtp_downlink_teid(Long gtp_downlink_teid) { + public void setGtp_downlink_teid(long gtp_downlink_teid) { this.gtp_downlink_teid = gtp_downlink_teid; } @@ -88,20 +88,19 @@ public class Entity { this.gtp_msg_type = gtp_msg_type; } - public Long getCommon_recv_time() { + public long getCommon_recv_time() { return common_recv_time; } - public void setCommon_recv_time(Long common_recv_time) { + public void setCommon_recv_time(long common_recv_time) { this.common_recv_time = common_recv_time; } - - public Long getGtp_teid() { - return gtp_teid; + public String getGtp_uuid() { + return gtp_uuid; } - public void setGtp_teid(Long gtp_teid) { - this.gtp_teid = gtp_teid; + public void setGtp_uuid(String gtp_uuid) { + this.gtp_uuid = gtp_uuid; } } diff --git a/src/main/java/com/zdjizhi/pojo/Gtp.java b/src/main/java/com/zdjizhi/pojo/Gtp.java index f71c922..2795f41 100644 --- a/src/main/java/com/zdjizhi/pojo/Gtp.java +++ b/src/main/java/com/zdjizhi/pojo/Gtp.java @@ -6,10 +6,11 @@ public class Gtp { private String gtp_imei; private String gtp_imsi; private String gtp_phone_number; - private Long gtp_teid; private Integer msg_type; - private Long last_update_time; - + private long last_update_time; + private long gtp_uplink_teid; + private long gtp_downlink_teid ; + private String gtp_uuid; public Integer getMsg_type() { return msg_type; @@ -51,19 +52,35 @@ public class Gtp { this.gtp_phone_number = gtp_phone_number; } - public Long getGtp_teid() { - return gtp_teid; + public long getGtp_uplink_teid() { + return gtp_uplink_teid; + } + + public void setGtp_uplink_teid(long gtp_uplink_teid) { + this.gtp_uplink_teid = gtp_uplink_teid; + } + + public long getGtp_downlink_teid() { + return gtp_downlink_teid; + } + + public void setGtp_downlink_teid(long gtp_downlink_teid) { + this.gtp_downlink_teid = gtp_downlink_teid; + } + + public String getGtp_uuid() { + return gtp_uuid; } - public void setGtp_teid(Long gtp_teid) { - this.gtp_teid = gtp_teid; + public void setGtp_uuid(String gtp_uuid) { + this.gtp_uuid = gtp_uuid; } - public Long getLast_update_time() { + public long getLast_update_time() { return last_update_time; } - public void setLast_update_time(Long last_update_time) { + public void setLast_update_time(long last_update_time) { this.last_update_time = last_update_time; } } diff --git a/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java index 6751f5b..aee3487 100644 --- a/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java @@ -28,72 +28,70 @@ public class ParseFunction implements MapFunction<String, Entity> { Entity entity = new Entity(); try { - if (StringUtil.isNotBlank(message)) { - entity = JSON.parseObject(message, Entity.class); - if(entity.getGtp_apn()==null){ + if (StringUtil.isNotBlank(message)) { + entity = JSON.parseObject(message, Entity.class); + if(entity.getGtp_apn()==null){ - entity.setGtp_apn(""); - } - if(entity.getGtp_phone_number()==null){ - - entity.setGtp_phone_number(""); - } - if(entity.getGtp_imei()==null){ - - entity.setGtp_imei(""); - } - if(entity.getGtp_imsi()==null){ - - entity.setGtp_imsi(""); - } + entity.setGtp_apn(""); + } + if(entity.getGtp_phone_number()==null){ + entity.setGtp_phone_number(""); + } + if(entity.getGtp_imei()==null){ - if(!"".equals(entity.getGtp_imei())|| !"".equals(entity.getGtp_imsi())|| !"".equals(entity.getGtp_phone_number())) { + entity.setGtp_imei(""); + } + if(entity.getGtp_imsi()==null){ + entity.setGtp_imsi(""); + } + /* if(entity.getCommon_recv_time()<0 || entity.getCommon_recv_time()>9999999999L){ - String md5Str = DigestUtils.md5Hex(entity.getGtp_imei() + entity.getGtp_imsi() + entity.getGtp_phone_number()); - entity.setHashkey(md5Str); + entity.setIfError(1); + logger.info("时间戳不合法 " + entity.getCommon_recv_time()); - if(entity.getGtp_uplink_teid()==null || entity.getGtp_uplink_teid()==0){ + }*/ - if(entity.getGtp_downlink_teid()==null || entity.getGtp_downlink_teid()==0){ - entity.setIfError(1); - logger.info("teid为空" + message); + if(!"".equals(entity.getGtp_imei())|| !"".equals(entity.getGtp_imsi())|| !"".equals(entity.getGtp_phone_number())) { - } - else{ - entity.setGtp_teid(entity.getGtp_downlink_teid()); - } - }else{ + String md5Str = DigestUtils.md5Hex(entity.getGtp_imei() + entity.getGtp_imsi() + entity.getGtp_phone_number()); + entity.setHashkey(md5Str); - entity.setGtp_teid(entity.getGtp_uplink_teid()); + if( entity.getGtp_uplink_teid()==0 && entity.getGtp_downlink_teid()==0){ - } + entity.setIfError(1); + logger.info("teid为空" + message); + }else{ + String gtp_uuid = DigestUtils.md5Hex(entity.getGtp_uplink_teid() +"|"+ entity.getGtp_downlink_teid() ); + entity.setGtp_uuid(gtp_uuid); } - else { - entity.setHashkey(""); - entity.setIfError(1); - logger.info("三元组为空" + message); - } + } + else { + entity.setHashkey(""); + entity.setIfError(1); + logger.info("三元组为空" + message); + + } - }else{ + }else{ - entity.setIfError(1); - logger.error("数据转换JSON格式异常,原始日志为:" + message); - } - } catch (JSONException jse) { + entity.setIfError(1); + logger.error("数据转换JSON格式异常,原始日志为:" + message); + } + } catch (JSONException jse) { entity.setIfError(1); logger.error("数据转换JSON格式异常,原始日志为:" + message); - } catch (RuntimeException re) { + } catch (RuntimeException re) { entity.setIfError(1); logger.error("GTP日志条件过滤异常,异常信息为:" + re); - } + } return entity; } diff --git a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java index dd545c3..b5eb799 100644 --- a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java +++ b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java @@ -5,6 +5,7 @@ import com.google.common.cache.CacheBuilder; import com.zdjizhi.common.GtpConfig; import com.zdjizhi.pojo.Entity; import com.zdjizhi.pojo.Gtp; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; @@ -41,59 +42,66 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, super.open(parameters); log = Logger.getLogger(HbaseSink.class); - org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create(); - configuration.set("hbase.zookeeper.quorum", hbase_zookeeper_host); - connection = ConnectionFactory.createConnection(configuration); - admin = connection.getAdmin(); + org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create(); + configuration.set("hbase.zookeeper.quorum", hbase_zookeeper_host); - try { + connection = ConnectionFactory.createConnection(configuration); + admin = connection.getAdmin(); - Table table = connection.getTable(TableName.valueOf(GtpConfig.RELATION_USER_TEID_TABLE_NAME)); - Scan scan = new Scan(); - scan.addColumn("gtp".getBytes(), "teid".getBytes()); - scan.addColumn("gtp".getBytes(), "apn".getBytes()); - scan.addColumn("gtp".getBytes(), "phone_number".getBytes()); - scan.addColumn("gtp".getBytes(), "imsi".getBytes()); - scan.addColumn("gtp".getBytes(), "imei".getBytes()); - scan.addColumn("gtp".getBytes(), "last_update_time".getBytes()); - scan.addColumn("gtp".getBytes(), "msg_type".getBytes()); - - if (GtpConfig.HBASE_SCAN_LIMIT != 0) { - scan.setLimit(GtpConfig.HBASE_SCAN_LIMIT); - } + try { + + Table table = connection.getTable(TableName.valueOf(GtpConfig.RELATION_USER_TEID_TABLE_NAME)); + Scan scan = new Scan(); + scan.addColumn("gtp".getBytes(), "uplink_teid".getBytes()); + scan.addColumn("gtp".getBytes(), "downlink_teid".getBytes()); + scan.addColumn("gtp".getBytes(), "apn".getBytes()); + scan.addColumn("gtp".getBytes(), "phone_number".getBytes()); + scan.addColumn("gtp".getBytes(), "imsi".getBytes()); + scan.addColumn("gtp".getBytes(), "imei".getBytes()); + scan.addColumn("gtp".getBytes(), "last_update_time".getBytes()); + scan.addColumn("gtp".getBytes(), "msg_type".getBytes()); - ResultScanner scanner = table.getScanner(scan); - for (Result result : scanner) { - if (result.containsColumn("gtp".getBytes(), "teid".getBytes()) && result.containsColumn("gtp".getBytes(), "msg_type".getBytes()) && result.containsColumn("gtp".getBytes(), "apn".getBytes()) && result.containsColumn("gtp".getBytes(), "last_update_time".getBytes()) && result.containsColumn("gtp".getBytes(), "imei".getBytes()) && result.containsColumn("gtp".getBytes(), "phone_number".getBytes()) && result.containsColumn("gtp".getBytes(), "imsi".getBytes())) { - - Gtp gtp = new Gtp(); - String key = Bytes.toString(result.getRow()); - Long teid = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("teid")))); - int msg_type = Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("msg_type")))); - - String apn = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("apn")))); - String phone_number = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("phone_number")))); - String imsi = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("imsi")))); - String imei = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("imei")))); - Long last_update_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("last_update_time")))); - - gtp.setLast_update_time(last_update_time); - gtp.setGtp_teid(teid); - gtp.setGtp_apn(apn); - gtp.setGtp_phone_number(phone_number); - gtp.setGtp_imsi(imsi); - gtp.setGtp_imei(imei); - gtp.setMsg_type(msg_type); - gtpConcurrentHashMap.put(key, gtp); + if (GtpConfig.HBASE_SCAN_LIMIT != 0) { + scan.setLimit(GtpConfig.HBASE_SCAN_LIMIT); } + + ResultScanner scanner = table.getScanner(scan); + for (Result result : scanner) { + if (result.containsColumn("gtp".getBytes(), "uplink_teid".getBytes()) && result.containsColumn("gtp".getBytes(), "downlink_teid".getBytes()) && result.containsColumn("gtp".getBytes(), "msg_type".getBytes()) && result.containsColumn("gtp".getBytes(), "apn".getBytes()) && result.containsColumn("gtp".getBytes(), "last_update_time".getBytes()) && result.containsColumn("gtp".getBytes(), "imei".getBytes()) && result.containsColumn("gtp".getBytes(), "phone_number".getBytes()) && result.containsColumn("gtp".getBytes(), "imsi".getBytes())) { + + Gtp gtp = new Gtp(); + String key = Bytes.toString(result.getRow()); + + Long uplink_teid = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("uplink_teid")))); + Long downlink_teid = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("downlink_teid")))); + int msg_type = Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("msg_type")))); + String apn = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("apn")))); + String phone_number = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("phone_number")))); + String imsi = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("imsi")))); + String imei = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("imei")))); + Long last_update_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("last_update_time")))); + + gtp.setLast_update_time(last_update_time); + gtp.setGtp_uplink_teid(uplink_teid); + gtp.setGtp_downlink_teid(downlink_teid); + gtp.setGtp_apn(apn); + gtp.setGtp_phone_number(phone_number); + gtp.setGtp_imsi(imsi); + gtp.setGtp_imei(imei); + gtp.setMsg_type(msg_type); + gtp.setGtp_uuid(DigestUtils.md5Hex(gtp.getGtp_uplink_teid() + "|" + gtp.getGtp_downlink_teid())); + + gtpConcurrentHashMap.put(key, gtp); + } + } + scanner.close(); + } catch (IOException ioe) { + log.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<==="); + } catch (RuntimeException e) { + log.error("HBaseUtils getAll() is Exception===>{" + e + "}<==="); } - scanner.close(); - } catch (IOException ioe) { - log.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<==="); - } catch (RuntimeException e) { - log.error("HBaseUtils getAll() is Exception===>{" + e + "}<==="); - } + } public void invoke(Entity entity, Context context) throws Exception { @@ -101,24 +109,18 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, Gtp gtp = gtpConcurrentHashMap.getIfPresent(entity.getHashkey()); if (gtp!=null) { - //Gtp gtp = gtpConcurrentHashMap.getIfPresent(entity.getHashkey()); - - - if (gtp.getLast_update_time() <= entity.getCommon_recv_time()) { - - if("delete".equals(entity.getGtp_msg_type())){ - if(gtp.getGtp_teid().equals(entity.getGtp_teid())){ - - - + if(gtp.getGtp_uuid().equals(entity.getGtp_uuid())){ gtp.setMsg_type(2); ArrayList<Row> rows = new ArrayList<>(); // ArrayList<Row> delrows = getDelRows(gtp); gtp.setLast_update_time(entity.getCommon_recv_time()); - gtp.setGtp_teid(entity.getGtp_teid()); + gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid()); + gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid()); + gtp.setGtp_uuid(DigestUtils.md5Hex(gtp.getGtp_uplink_teid() +"|"+ gtp.getGtp_downlink_teid() )); + gtp.setGtp_apn(entity.getGtp_apn()); gtp.setGtp_phone_number(entity.getGtp_phone_number()); gtp.setGtp_imsi(entity.getGtp_imsi()); @@ -136,7 +138,9 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, ArrayList<Row> rows = new ArrayList<>(); // ArrayList<Row> delrows = getDelRows(gtp); gtp.setLast_update_time(entity.getCommon_recv_time()); - gtp.setGtp_teid(entity.getGtp_teid()); + gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid()); + gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid()); + gtp.setGtp_uuid(DigestUtils.md5Hex(gtp.getGtp_uplink_teid() +"|"+ gtp.getGtp_downlink_teid() )); gtp.setGtp_apn(entity.getGtp_apn()); gtp.setGtp_phone_number(entity.getGtp_phone_number()); gtp.setGtp_imsi(entity.getGtp_imsi()); @@ -154,14 +158,17 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, } else{ - if(!gtp.getGtp_teid().equals(entity.getGtp_teid())){ + if(!gtp.getGtp_uuid().equals(entity.getGtp_uuid())){ gtp.setMsg_type(1); ArrayList<Row> rows = new ArrayList<>(); // ArrayList<Row> delrows = getDelRows(gtp); gtp.setLast_update_time(entity.getCommon_recv_time()); - gtp.setGtp_teid(entity.getGtp_teid()); + gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid()); + gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid()); + gtp.setGtp_uuid(DigestUtils.md5Hex(gtp.getGtp_uplink_teid() +"|"+ gtp.getGtp_downlink_teid() )); + gtp.setGtp_apn(entity.getGtp_apn()); gtp.setGtp_phone_number(entity.getGtp_phone_number()); gtp.setGtp_imsi(entity.getGtp_imsi()); @@ -181,7 +188,9 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, ArrayList<Row> rows = new ArrayList<>(); // ArrayList<Row> delrows = getDelRows(gtp); gtp.setLast_update_time(entity.getCommon_recv_time()); - gtp.setGtp_teid(entity.getGtp_teid()); + gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid()); + gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid()); + gtp.setGtp_uuid(DigestUtils.md5Hex(gtp.getGtp_uplink_teid() +"|"+ gtp.getGtp_downlink_teid() )); gtp.setGtp_apn(entity.getGtp_apn()); gtp.setGtp_phone_number(entity.getGtp_phone_number()); gtp.setGtp_imsi(entity.getGtp_imsi()); @@ -207,7 +216,9 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, ArrayList<Row> rows = new ArrayList<>(); // ArrayList<Row> delrows = getDelRows(gtp); gtp.setLast_update_time(entity.getCommon_recv_time()); - gtp.setGtp_teid(entity.getGtp_teid()); + gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid()); + gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid()); + gtp.setGtp_uuid(DigestUtils.md5Hex(gtp.getGtp_uplink_teid() +"|"+ gtp.getGtp_downlink_teid() )); gtp.setGtp_apn(entity.getGtp_apn()); gtp.setGtp_phone_number(entity.getGtp_phone_number()); gtp.setGtp_imsi(entity.getGtp_imsi()); @@ -224,7 +235,9 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, Gtp gtpobj = new Gtp(); gtpobj.setLast_update_time(entity.getCommon_recv_time()); - gtpobj.setGtp_teid(entity.getGtp_teid()); + gtpobj.setGtp_uplink_teid(entity.getGtp_uplink_teid()); + gtpobj.setGtp_downlink_teid(entity.getGtp_downlink_teid()); + gtpobj.setGtp_uuid(DigestUtils.md5Hex(gtpobj.getGtp_uplink_teid() +"|"+ gtpobj.getGtp_downlink_teid() )); gtpobj.setGtp_apn(entity.getGtp_apn()); gtpobj.setGtp_phone_number(entity.getGtp_phone_number()); gtpobj.setGtp_imsi(entity.getGtp_imsi()); @@ -257,7 +270,8 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, try { table = connection.getTable(TableName.valueOf(GtpConfig.RELATION_USER_TEID_TABLE_NAME)); Put put = new Put(key.getBytes()); - put.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid())); + put.addColumn("gtp".getBytes(), "uplink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_uplink_teid())); + put.addColumn("gtp".getBytes(), "downlink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_downlink_teid())); put.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn())); put.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number())); put.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi())); @@ -300,26 +314,26 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, ArrayList<Row> delrows = new ArrayList<>(); if (!"".equals(entity.getGtp_apn())) { - String oldapnkey = "3" + new StringBuffer(entity.getGtp_apn()).reverse().toString() + "|" + entity.getGtp_teid(); + String oldapnkey = "3" + new StringBuffer(entity.getGtp_apn()).reverse().toString() + "|" + entity.getGtp_uuid(); Delete del_apnkey = new Delete(Bytes.toBytes(oldapnkey)); delrows.add(del_apnkey); } if (!"".equals(entity.getGtp_phone_number())) { - String oldpnkey = "2" + new StringBuffer(entity.getGtp_phone_number()).reverse().toString() + "|" + entity.getGtp_teid(); + String oldpnkey = "2" + new StringBuffer(entity.getGtp_phone_number()).reverse().toString() + "|" + entity.getGtp_uuid(); Delete del_pnkey = new Delete(Bytes.toBytes(oldpnkey)); delrows.add(del_pnkey); } if (!"".equals(entity.getGtp_imsi())) { - String oldimsikey = "1" + entity.getGtp_imsi() + "|" + entity.getGtp_teid(); + String oldimsikey = "1" + entity.getGtp_imsi() + "|" + entity.getGtp_uuid(); Delete del_imsikey = new Delete(Bytes.toBytes(oldimsikey)); delrows.add(del_imsikey); } if (!"".equals(entity.getGtp_imei())) { - String oldimeikey = "0" + entity.getGtp_imei() + "|" + entity.getGtp_teid(); + String oldimeikey = "0" + entity.getGtp_imei() + "|" + entity.getGtp_uuid(); Delete del_imeikey = new Delete(Bytes.toBytes(oldimeikey)); delrows.add(del_imeikey); } @@ -334,9 +348,10 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, ArrayList<Row> updaterows = new ArrayList<>(); if (!"".equals(gtp.getGtp_apn())) { - String apnkey = "3" + new StringBuffer(gtp.getGtp_apn()).reverse().toString() + "|" + gtp.getGtp_teid(); + String apnkey = "3" + new StringBuffer(gtp.getGtp_apn()).reverse().toString() + "|" + gtp.getGtp_uuid(); Put putApn = new Put(apnkey.getBytes()); - putApn.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid())); + putApn.addColumn("gtp".getBytes(), "uplink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_uplink_teid())); + putApn.addColumn("gtp".getBytes(), "downlink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_downlink_teid())); putApn.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn())); putApn.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number())); putApn.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi())); @@ -348,9 +363,10 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, } if (!"".equals(gtp.getGtp_phone_number())) { - String pnkey = "2" + new StringBuffer(gtp.getGtp_phone_number()).reverse().toString() + "|" + gtp.getGtp_teid(); + String pnkey = "2" + new StringBuffer(gtp.getGtp_phone_number()).reverse().toString() + "|" + gtp.getGtp_uuid(); Put putPn = new Put(pnkey.getBytes()); - putPn.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid())); + putPn.addColumn("gtp".getBytes(), "uplink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_uplink_teid())); + putPn.addColumn("gtp".getBytes(), "downlink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_downlink_teid())); putPn.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn())); putPn.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number())); putPn.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi())); @@ -363,9 +379,10 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, } if (!"".equals(gtp.getGtp_imsi())) { - String imsikey = "1" + gtp.getGtp_imsi() + "|" + gtp.getGtp_teid(); + String imsikey = "1" + gtp.getGtp_imsi() + "|" + gtp.getGtp_uuid(); Put putImsi = new Put(imsikey.getBytes()); - putImsi.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid())); + putImsi.addColumn("gtp".getBytes(), "uplink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_uplink_teid())); + putImsi.addColumn("gtp".getBytes(), "downlink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_downlink_teid())); putImsi.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn())); putImsi.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number())); putImsi.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi())); @@ -377,9 +394,10 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, } if (!"".equals(gtp.getGtp_imei())) { - String imeikey = "0" + gtp.getGtp_imei() + "|" + gtp.getGtp_teid(); + String imeikey = "0" + gtp.getGtp_imei() + "|" + gtp.getGtp_uuid(); Put putImei = new Put(imeikey.getBytes()); - putImei.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid())); + putImei.addColumn("gtp".getBytes(), "uplink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_uplink_teid())); + putImei.addColumn("gtp".getBytes(), "downlink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_downlink_teid())); putImei.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn())); putImei.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number())); putImei.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi())); |
